import { Inject, Injectable, computed, inject, signal } from '@angular/core';
import {
  StorageReference,
  UploadTask,
  deleteObject,
  getBlob,
  getDownloadURL,
  getMetadata,
  getStorage,
  percentage,
  ref,
  uploadBytesResumable,
} from '@angular/fire/storage';
import { UtilityService } from '@fidoc/util';
import { BehaviorSubject, Subject, debounceTime, firstValueFrom, map, shareReplay, switchMap, tap, withLatestFrom } from 'rxjs';
import { DefaultsService, FirebaseService, UserService } from '@fidoc/util';
import {
  UserRecord,
  FlowFile,
  FlowPrompt,
  FlowStep,
  FlowTool,
  flowFileConverter,
  flowStepConverter,
  flowStepPromptConverter,
  FileflowServiceInterface,
  PIPELINE_ENV_KEY,
  isExecEnvCloud,
  PipelineEnvironment,
  getBlobFromJson,
  getUserFilePath,
  getUserFileStepPath,
  getUserFileStepPromptPath,
  getUserFilesPath,
  isObjectEmpty,
  getStorageFilePath,
  getStorageFilePathForSuffix,
  PipelineLoggerInterface,
  PipelineContext,
  NO_PIPELINE,
  blobToBase64,
  FlowDomain,
  FlowSchemaType,
  base64ToBlob,
  sendMeterEvent,
  ExcelUtils,
  getBlobFromBuffer,
  FirestoreCollectionTypes,
  PipelineStepLog,
  MailProviders,
  BatchMessagePayload,
  Group,
  CostRecordConverter,
  UserOrGroup,
  SubscriptionInfo,
  examineFile,
  removeFrontMatter
} from '@fidoc/shared';
import { connect } from 'ngxtension/connect';
import { JsonFormData } from '@cheaseed/node-utils';
import { marked } from 'marked'
import { DomSanitizer } from '@angular/platform-browser';
import { Workbook } from 'exceljs';
import { differenceInDays } from 'date-fns';
import { GroupService } from '@fidoc/groups';
import { limit, orderBy } from '@angular/fire/firestore';
// import { LlamaParseReader } from 'llamaindex';

// import { IPDFViewerApplication, PDFNotificationService } from 'ngx-extended-pdf-viewer';

export interface FlowState {
  domains: FlowDomain[];
  flowFiles: FlowFile[];
  currentFolder?: FlowFile;
}

@Injectable({
  providedIn: 'root',
})
export class FileflowService implements FileflowServiceInterface, PipelineLoggerInterface {
  firebase = inject(FirebaseService)
  utilityService = inject(UtilityService)
  userService = inject(UserService)
  groupService = inject(GroupService)
  defaults = inject(DefaultsService)
  private sanitizer = inject(DomSanitizer)
  showToolParameters$ = new BehaviorSubject<{ tool: FlowTool, file: FlowFile | null, params: JsonFormData } | undefined>(undefined)
  uploadedFileReadyQueue = signal<FlowFile[]>([])
  private consumedPages$ = new Subject<number>()
  ignoreIndenterErrors: boolean = this.defaults.getDefault('excelGeneration.ignoreIndenterErrors') === 'true'
  context: PipelineContext | null = null //transient;
  // private pdfViewerApplication!: IPDFViewerApplication | undefined;
  // pdfNotificationService = inject(PDFNotificationService)

  // Signal state
  private state = signal<FlowState>({
    flowFiles: [],
    domains: [],
    currentFolder: undefined,
  })

  // Signal selectors
  flowFiles = computed(() => this.state().flowFiles);
  domains = computed(() => this.state().domains);
  defaultDomain = computed(() => this.domains().find(d => d.isDefault));
  currentFolder = computed(() => this.state().currentFolder);
  availableFolders = computed(() => this.flowFiles().filter(f => f.isFolder).map(f => f.fileName).sort())
  currentFolderFiles = computed(() => {
    const fldr = this.currentFolder()?.fileName
    return this.flowFiles().filter(f => fldr ? f.containedInFolder === fldr : !f.containedInFolder)
  })
  selectedFolder = signal<FlowFile | null>(null)
  private currentTools = signal<Map<string, FlowTool>>(new Map())

  domainClassMaps = computed(() => {
    const map = new Map<string, Map<string, FlowSchemaType>>(this.domains().map(d => [d.domainName, new Map<string, FlowSchemaType>(d.classes.map(c => [c.className, c]))]))
    // console.log('domainClassMaps', map)
    return map
  })

  flowFiles$ = this.userService.userDocId$
    .pipe(
      switchMap(userId =>
        this.firebase.collectionWithConverter$(
          getUserFilesPath(userId as string),
          flowFileConverter)),
      debounceTime(300),
      map(files => files.toSorted((a, b) => a.createdAt > b.createdAt ? -1 : 1) as FlowFile[]),
      map(files => files.map(f => {
        if (!f.domainName)
          f.domainName = this.defaultDomainName()
        return f
      })),
      shareReplay(1)
    )

  domains$ = this.userService.userDocId$
    .pipe(
      switchMap(() => this.firebase.collection$(FirestoreCollectionTypes.DOMAINS_COLLECTION)),
      debounceTime(200),
      shareReplay(1))

  loading = signal(false)
  openedFiles = signal<Set<string>>(new Set<string>())

  constructor(@Inject('environment') private environment: any) {
    // Connect all observables to the state
    // See https://www.youtube.com/watch?v=R7-KdADEq0A
    connect(this.state)
      .with(this.flowFiles$, (prev, flowFiles) => ({ ...prev, flowFiles }))
      .with(this.domains$, (prev, domains) => ({ ...prev, domains }))

    this.consumedPages$
      .pipe(withLatestFrom(this.userService.user$))
      .subscribe(([numPages, user]) => {
        this.userService.updatePageBalance(user as UserRecord, 0 - numPages)
      })

    // See https://github.com/stephanrauh/ngx-extended-pdf-viewer/issues/2621#issuecomment-2442540589
    // effect(() => {
    //   this.pdfViewerApplication = this.pdfNotificationService.onPDFJSInitSignal();
    // });
  }

  constructPrefix(context: PipelineContext | null) {
    /*if(context)
      return `User: ${context.userId}:Pipeline:${context.pipeline || NO_PIPELINE}:Tool:${context.tool || NO_TOOL}`
    */
    return null
  }

  getEnv() {
    return this.environment.production === true ? "prod" : "dev"
  }
  async getUser(userId: string) {
    return this.userService.user() as UserRecord
  }

  async sendMeterEvent(numPages: number) {
    this.log('Sending meter event to Stripe with payload', numPages)
    let entity: UserOrGroup = this.userService.user() as UserRecord
    if (entity.groupDocId)
      entity = await firstValueFrom(this.userService.getGroup(entity.groupDocId))
    await sendMeterEvent(this.getEnv(), entity.name, entity.subscriptionInfo as SubscriptionInfo, numPages, this)
  }
  log(...args: any[]): void {
    const prefix = this.constructPrefix(this.context)
    if (prefix)
      console.log(prefix, ...args)
    else
      console.log(...args)
  }
  debug(...args: any[]): void {
    const prefix = this.constructPrefix(this.context)
    if (prefix)
      console.debug(prefix, ...args)
    else
      console.debug(...args)
  }
  warn(...args: any[]): void {
    const prefix = this.constructPrefix(this.context)
    if (prefix)
      console.warn(prefix, ...args)
    else
      console.warn(...args)
  }
  error(...args: any[]): void {
    const prefix = this.constructPrefix(this.context)
    if (prefix)
      console.error(prefix, ...args)
    else
      console.error(...args)
  }

  async getFlowFilesForUser(userDocId: string) {
    return await firstValueFrom(this.firebase.collectionWithConverter$(getUserFilesPath(userDocId), flowFileConverter))
  }

  async checkExecuteStep(tool: FlowTool, file: FlowFile) {
    this.log('checking execute step', tool, file);
    if (tool.parameters) {
      const params = this.getToolParameters(tool)
      this.showToolParameters$.next({ tool, file, params })
    }
    else {
      await this.executeStep(null, tool, file)
    }
  }

  getToolParameters(tool: FlowTool) {
    const params = { ...tool.parameters } as JsonFormData
    this.log('Tool params', params)
    const instructionsParam = params.controls.find(c => c.name === 'instructions')
    if (instructionsParam) {
      instructionsParam.value = tool.instructions
    }
    // this.log('getToolParameters', params)
    return params
  }

  setCurrentTools(tools: FlowTool[]) {
    const map = new Map<string, FlowTool>(tools.map(t => [t.name, t]))
    this.currentTools.set(map)
  }

  async getToolInstructions(tool: FlowTool) {
    const hit = this.currentTools().get(tool.name)
    return removeFrontMatter(hit?.instructions || '')
  }

  consumePages(numPages: number) {
    this.consumedPages$.next(numPages)
  }

  markFileOpened(file: FlowFile) {
    this.openedFiles.update(set => new Set(set.add(file.docId)))
  }

  toggleFileOpened(file: FlowFile) {
    if (file.isFolder) {
      // re-render view with folder contents, provide breadcrumbs to return
      this.state.update(s => ({ ...s, currentFolder: file }))
    }
    else {
      this.openedFiles.update(set => {
        // this.log('toggling file opened', file.docId, set.has(file.docId))
        if (set.has(file.docId))
          set.delete(file.docId)
        else
          set.add(file.docId)
        return new Set(set) // must create new object to trigger change detection
      })
    }
  }

  resetCurrentFolder() {
    this.state.update(s => ({ ...s, currentFolder: undefined }))
  }

  async executeStep(context: PipelineContext | null, tool: FlowTool, file: FlowFile, params?: any) {
    const env = this.defaults.getDefault(PIPELINE_ENV_KEY)
    if (isExecEnvCloud(this.userService.user() as UserRecord, env as PipelineEnvironment)) {
      const data: any = {
        userId: this.getUserId(),
        toolName: tool.name,
        file: JSON.stringify(file)
      }
      if (params) {
        data.params = JSON.stringify(params)
      }
      await this.firebase.awaitCloudFunction('executePipelineStep', data)
    }
    else {
      try {
        this.context = context
        await this.executeStepLocal(tool, file, params)
      }
      finally { this.context = null }
    }

  }

  private async executeStepLocal(tool: FlowTool, file: FlowFile, params?: any) {
    this.log(`executing tool`, tool.name, file, params)
    this.setContextTool(tool.name)
    this.markFileOpened(file)
    const lastStep = await firstValueFrom(this.getLastCompletedStep(file, tool.name)) as FlowStep
    const filteredParams = Object.fromEntries(Object.entries(params || {}).filter(([k, v]) => !!v)) as any
    const instructions = params?.instructions
    this.log('lastStep detected', lastStep)
    await this.updateStep(file, {
      name: tool.name,
      description: tool.description,
      state: 'pending',
      type: tool.type,
      instructions,
      parameters: filteredParams
    });
    await this.updateFile(file, {
      state: 'running',
      stateDescription: tool.type === 'ocr' ? 'scanning' : 'transforming'
    })
    await this.deleteStepPrompts(file, tool.name) // clear out any previous prompts
    try {
      if (tool.checkExecute)
        await tool.checkExecute(file, this.userService.user() as UserRecord)
      let env = params
      if (env) {
        env.production = this.environment.production
      }
      else
        env = { production: this.environment.production }
      await tool.execute(file, lastStep, env)
    }
    catch (e: any) {
      this.error('error executing tool', e)
      await this.updateStep(file, { name: tool.name, state: 'error', error: e.message });
      await this.updateFile(file, { state: 'error', stateDescription: 'error' })
      throw e
    }
    await this.updateFile(file, { state: 'idle' })
  }

  setContextTool(toolName: string) {
    if (this.context)
      this.context.tool = toolName
  }

  clearContextTool(): void {
    //do nothing - applicable in the cloud version
  }
  getFileSteps$(file: FlowFile) {
    const path = `${getUserFilePath(file.userDocId as string, file.docId)}/steps`
    return this.firebase
      .collectionWithConverter$(path, flowStepConverter)
      .pipe(
        debounceTime(300),
        map((steps: FlowStep[]) => steps.toSorted((a, b) => a.lastUpdatedAt < b.lastUpdatedAt ? -1 : 1) as FlowStep[]),
        map(steps => steps.map(s => ({ ...s, parameters: isObjectEmpty(s.parameters) ? undefined : s.parameters })) as FlowStep[]),
        // tap(steps => this.log('steps', steps)), // JSON.stringify(steps))),
        shareReplay(1)
      )
  }

  async getFileSteps(file: FlowFile) {
    return await firstValueFrom(this.getFileSteps$(file))
  }

  getFileStepPrompts(file: FlowFile, stepName: string) {
    const path = `${getUserFileStepPath(file.userDocId as string, file.docId, stepName)}/prompts`
    return this.firebase
      .collectionWithConverter$(path, flowStepPromptConverter)
      .pipe(
        debounceTime(300),
        map(prompts => prompts.toSorted((a, b) => a.createdAt < b.createdAt ? -1 : 1) as FlowPrompt[]),
        // tap(prompts => this.log('prompts', prompts)),
        shareReplay(1)
      )
  }

  getLastCompletedStep(file: FlowFile, priorToToolName = '') {
    return this.getFileSteps$(file)
      .pipe(
        map(steps => steps.toReversed().find(s => s.name !== priorToToolName && s.state === 'complete')),
        tap(step => this.log('last completed step', step?.name)),
      )
  }

  getCompletedStepForTool(file: FlowFile, toolName = '') {
    return this.getFileSteps$(file)
      .pipe(
        map(steps => steps.toReversed().find(s => s.name === toolName && s.state === 'complete')),
      )
  }

  private getFile(userId: string, docId: string) {
    return firstValueFrom(
      this.firebase.doc$(getUserFilePath(userId, docId), flowFileConverter),
    );
  }

  async addFile(f: Partial<FlowFile>) {
    this.loading.set(true);
    const path = getUserFilesPath(f.userDocId as string);
    const result = await this.firebase.updateAt(path, { ...f });
    this.loading.set(false);
    return result;
  }

  async updateStep(file: FlowFile, s: Partial<FlowStep>) {
    const path = `${getUserFilePath(file.userDocId as string, file.docId)}/steps/${s.name}`;
    const result = await this.firebase.updateAt(path, {
      ...s,
      lastUpdatedAt: new Date(),
    });
    return result;
  }

  async updateStepPrompt(file: FlowFile, step: string, input: any, output: string) {
    const path = `${getUserFileStepPath(file.userDocId as string, file.docId, step)}/prompts`;
    const result = await this.firebase.updateAt(path, {
      prompt: input,
      response: output,
      createdAt: new Date()
    });
    return result
  }

  async updateFile(file: FlowFile, data: Partial<FlowFile>) {
    const path = getUserFilePath(file.userDocId as string, file.docId);
    await this.firebase.updateAt(path, { ...data, updatedAt: new Date() });
  }

  async confirmMoveFile(doc: FlowFile, fldr: string) {
    await this.utilityService.confirm({
      header: `Move File`,
      message: `Are you sure you want to move ${doc.fileName} to ${fldr || 'My Files'}?`,
      confirm: () => {
        this.updateFile(doc, { containedInFolder: fldr })
      },
    });
  }

  async renameFolderOrFile(file: FlowFile) {
    await this.utilityService.prompt({
      message: `Enter the new name of the ${file.isFolder ? 'folder' : 'file'} ${file.fileName}`,
      inputType: 'text',
      confirm: async (name: any) => {
        try {
          const newName = name.value
          if (this.flowFiles().find(f => f.isFolder && f.fileName === newName)) {
            throw new Error(`Folder ${newName} already exists`)
          }
          else {
            const user = this.userService.user() as UserRecord
            await this.updateFile(file, ({ fileName: newName }))
            // Move files to new folder by setting containedInFolder: newName
            if (file.isFolder) {
                this.flowFiles().forEach(async f => {
                    if (f.containedInFolder === file.fileName)
                        this.updateFile(f, { containedInFolder: newName })
                })
                this.resetCurrentFolder()
            }
          }
        }
        catch (e: any) {
          this.utilityService.notify({ message: e.message })
        }
      }
    })
  }

  async zipFolder(folder: FlowFile | null) {
    const fileNames: string[] = []
    this.flowFiles().forEach(f => {
      if (!folder || f.containedInFolder === folder.fileName) {
        // for each file, add to zip the original and output files
        if (f.storageName)
          fileNames.push(f.storageName)
        if (f.outputStorageName)
          fileNames.push(f.outputStorageName as string)
      }
    })
    const zipfileName = folder ? `${folder.fileName}.zip` : 'MyFiles.zip'
    await this.zipFiles(zipfileName, fileNames, this.userService.userDocId() as string)
  }

  async zipFiles(zipfileName: string, fileNames: string[], userId: string) {
    console.log(`Will zip files`, fileNames)
    const loading = await this.utilityService.loading(`Creating ${zipfileName}`)
    const response = await this.firebase.awaitCloudFunction('createZipFromFiles', {
      userId,
      fileNames
    })
    loading.dismiss()
    this.log(response)
    // Download base64 file returned
    const buf = base64ToBlob(response.data as string, 'application/zip')
    const blob = new Blob([buf], { type: 'application/zip' });
    const url = URL.createObjectURL(blob);
    // window.open(url);
    const a = document.createElement('a');
    a.href = url;
    a.download = zipfileName;
    document.body.appendChild(a);
    a.click();
    document.body.removeChild(a);
  }

  async summarizeFolder(folder: FlowFile) {
    this.selectedFolder.set(folder)
    const schemaMap = this.buildSchemaMap(folder)
    if (schemaMap.size === 0) {
      await this.utilityService.presentToast(`No schemas in ${folder.fileName}`)
      return
    }
    console.log(schemaMap)
    const excelUtils = new ExcelUtils(this)

    // Create a workbook with one schema per tab
    const wb = new Workbook()
    const schemaNames = Array.from(schemaMap.keys())

    // Create a summary page
    const ws = wb.addWorksheet('Summary')
    const domain = folder.domainName || this.defaultDomainName()
    const classNameMap = await this.domainClassMaps().get(domain)
    const rows: any[] = []
    rows.push({ text: `Summarization of Folder: ${folder.fileName}`, style: { font: { size: 14 } } })
    rows.push({ text: `Domain: ${domain}`, style: { font: { size: 14 } } })
    rows.push('')
    for (const schemaName of schemaNames) {
      const cls = classNameMap?.get(schemaName)
      const files = schemaMap.get(schemaName) || []
      rows.push({
        text: `${schemaName}: ${cls?.description} : ${files.length} files`,
        style: { font: { bold: true } }
      })
      for (const file of files) {
        rows.push({ text: `     ${file.fileName}`, style: { font: { color: { argb: 'FF0000FF' } } } })
      }
      rows.push('')
    }
    excelUtils.addRows(ws, rows)

    // Create a worksheet for each schema
    for (const schemaName of schemaNames) {
      const ws = wb.addWorksheet(schemaName)
      const files = schemaMap.get(schemaName) || []
      excelUtils.streamObjectsToWorksheet(schemaName, files, ws)
    }
    const blob = getBlobFromBuffer(await wb.xlsx.writeBuffer());
    const url = URL.createObjectURL(blob);
    window.open(url, '_blank');
  }

  buildSchemaMap(folder: FlowFile) {
    const schemaMap = new Map<string, FlowFile[]>()
    const files = this.flowFiles().filter(f => f.containedInFolder === folder.fileName)
    files.forEach(f => {
      if (f.schemas) {
        const keys = Object.keys(f.schemas)
        keys.forEach(k => {
          const files = schemaMap.get(k) || []
          schemaMap.set(k, [...files, f])
        })
      }
    })
    return schemaMap
  }

  getPDFNumPages(buf: ArrayBuffer) {
    const binaryString = new TextDecoder().decode(new Uint8Array(buf));
    const numPages = (binaryString.match(/\/Type[\s]*\/Page[^s]/g) || []).length;
    return numPages
  }

  async examineFile(buf: ArrayBuffer, type: string) {
    try {
      return await examineFile(buf, type, console)
    }
    catch (error) {
      this.error('Error examining file:', error);
      throw new Error('Failed to examine file');
    }
  }

  async confirmDeleteFile(doc: FlowFile, contentsOnly = false) {
    await this.utilityService.confirm({
      header: `Delete ${doc.isFolder ? 'Folder' : 'File'} ${contentsOnly ? (doc.isFolder ? ' Contents' : ' Steps') : ''}`,
      message: `Are you sure you want to delete ${contentsOnly ? (doc.isFolder ? ' contents of' : 'pipeline steps of ') : ''} ${doc.fileName}?`,
      confirm: () => {
        if (doc.isFolder)
          this.deleteFolder(doc, contentsOnly)
        else
          this.deleteFile(doc, contentsOnly);
      },
    });
  }

  async getPageBalance(user: UserRecord) {
    return await this.userService.getPageBalance()
  }

  async deleteStepPrompts(file: FlowFile, stepName: string) {
    const { docId, userDocId } = file
    const prompts = await firstValueFrom(this.getFileStepPrompts(file, stepName))
    for (const prompt of prompts) {
      const path = getUserFileStepPromptPath(userDocId, docId, stepName, prompt.docId)
      await this.firebase.delete(path)
    }
  }

  async deleteStep(file: FlowFile, step: FlowStep) {
    const { docId, userDocId } = file
    if (step.storageName) {
      const storageRef = ref(getStorage(), step.storageName)
      try {
        await deleteObject(storageRef)
      }
      catch (e) {
        // Swallow missing file error
        this.warn(`Error deleting file ${step.storageName}`, e)
      }
      this.log('file deleted', step.storageName)
    }
    await this.deleteStepPrompts(file, step.name)
    await this.firebase.delete(getUserFileStepPath(userDocId, docId, step.name))
  }

  async deleteFolder(folder: FlowFile, contentsOnly = false) {
    this.flowFiles().forEach(async f => {
      if (f.containedInFolder === folder.fileName)
        await this.deleteFile(f)
    })
    if (!contentsOnly) {
      await this.deleteFile(folder)
      this.resetCurrentFolder()
    }
  }

  async deleteFile(file: FlowFile, stepsOnly = false) {
    this.loading.set(true)
    const { docId, userDocId } = file
    const filepath = getUserFilePath(userDocId, docId)
    try {
      await this.updateFile(file, { state: 'deleting', stateDescription: 'deleting' })
      // Delete all steps
      const steps = await this.getFileSteps(file)
      for (const step of steps) {
        await this.deleteStep(file, step)
      }
      // Delete file
      if (!stepsOnly) {
        if (file.storageName) {
            const storageRef = ref(getStorage(), file.storageName)
            try {
                await deleteObject(storageRef)
                this.log('file deletion process complete', file.storageName)
            }
            catch (e) {
                this.warn(`Error deleting file ${file.storageName}`, e)
            }
        }
        await this.firebase.delete(filepath)
      }
      else {
        await this.updateFile(file, { state: 'idle', stateDescription: 'deleted steps' })
      }
    }
    catch (error) {
      this.error(`error deleting file ${filepath}`, error)
    }
    this.loading.set(false)
  }

  async getFileContentsAsBlob(storagePath: string) {
    console.log('getting file contents as blob', storagePath)
    const storageRef = ref(getStorage(), storagePath)
    const blob = await getBlob(storageRef)
    return blob
  }

  async getFileContentsBase64(storagePath: string) {
    const blob = await this.getFileContentsAsBlob(storagePath)
    return await blobToBase64(blob) as string
  }

  async getFileContentsAsUInt8Array(storagePath: string) {
    const blob = await this.getFileContentsAsBlob(storagePath)
    const result = await blob.arrayBuffer()
    return new Uint8Array(result)
  }

  async getFileContents(storagePath: string, asJson = true) {
    const blob = await this.getFileContentsAsBlob(storagePath)
    const result = await blob.text()
    // this.log(result)
    return asJson ? JSON.parse(result) : result
  }

  async getFileContentsAsLocalURL(path: string) {
    const storageRef = ref(getStorage(), path)
    const blob = await getBlob(storageRef)
    const buffer = await blob.arrayBuffer()
    const metadata = await getMetadata(storageRef)
    // this.log(`Metadata for file ${path}`, metadata)
    const url = URL.createObjectURL(new Blob([buffer], { type: metadata.contentType }));
    return url
  }

  async uploadFile(userDocId: string, file: File, pipelineName: string) {
    // this.log('uploading file', file);
    const path = getStorageFilePath(userDocId, file.name);
    const docData = {
        userDocId,
        createdAt: new Date(),
        state: 'preparing',
        stateDescription: 'preparing',
        containedInFolder: this.currentFolder()?.fileName,
        fileName: file.name,
        fileType: file.type,
        storageName: path,
        size: file.size,
        pipelineName
    } as FlowFile
    const result = await this.addFile(docData)
    const docId = result.id

    //const name = getSafeName(file.name);
    const storageRef = ref(getStorage(), path);
    const uploadTask = uploadBytesResumable(storageRef, file, { contentDisposition: 'attachment; filename*=utf-8\'\'' + file.name });
    percentage(uploadTask).subscribe((change) => {
      this.log('task progress', change);
    });

    uploadTask.then(async (task) => {
      const data = {
        state: 'uploaded',
        stateDescription: 'waiting',
        downloadURL: await getDownloadURL(storageRef),
      } as FlowFile
      await this.updateFile({ userDocId, docId } as FlowFile, data)
      this.uploadedFileReadyQueue.update(queue => [...queue, { ...docData, ...data, docId }]);
      this.checkUploadNotification()
    })
  }

  private checkUploadNotification() {
    const user = this.userService.user() as UserRecord
    const last = user.lastUploadedNotificationDate
    // If last is null or more than 1 day ago
    const now = new Date()
    if (!last || differenceInDays(now, last) > 1) {
      // Send email to admin that uploads by user have begun
      const msg = `${this.environment.production ? 'prod' : 'dev'}: User uploads by ${user.docId} have begun today`
      this.firebase.awaitCloudFunction("sendEmailAttachment",
        {
          to: this.environment.adminEmail,
          provider: MailProviders.MAILGUN,
          subject: msg,
          text: msg
        })
      this.userService.updateUser(user, { lastUploadedNotificationDate: now })
    }
  }

  private async finalizeUpload(
    uploadTask: UploadTask,
    file: FlowFile,
    tool: FlowTool,
    storageRef: StorageReference) {
        // Wait for the upload to complete
        await new Promise<void>((resolve, reject) => {
            uploadTask.on(
                'state_changed',
                null,
                error => reject(error),
                () => {
                    console.log("upload completed")
                    resolve()
                }
            )
        })

        // Update the step
        const outputURL = await getDownloadURL(storageRef);
        const storageName = storageRef.fullPath
        await this.updateStep(file, {
            name: tool.name,
            description: tool.description,
            elapsedMsec: Date.now() - (tool.startTime || 0),
            state: 'complete',
            error: undefined,
            storageName,
            outputURL
        });
        // Store the last output URL on the file
        await this.updateFile(file, { outputURL, outputStorageName: storageName, outputType: tool.type });     
  }

  async uploadBlob(tool: FlowTool, file: FlowFile, blob: Blob, suffix: string) {
    const path = getStorageFilePathForSuffix(file.storageName, file.fileName, tool.name, suffix);
    const storageRef = ref(getStorage(), path);
    const task = uploadBytesResumable(storageRef, blob);
    await this.finalizeUpload(task, file, tool, storageRef);
  }

  async uploadAnalysis(tool: FlowTool, file: FlowFile, analysis: any) {
    const blob = getBlobFromJson(analysis);
    await this.uploadBlob(tool, file, blob, '.json');
  }

  submitParameters(event: any, tool: FlowTool, file: FlowFile | null) {
    this.log('submitting parameters', event, tool, file)
    this.showToolParameters$.next(undefined)
    if (file) {
      // Handle step execution
      let env = event
      if (env) {
        env.production = this.environment.production
      }
      else
        env = { production: this.environment.production }
      const context: PipelineContext = {
        pipeline: NO_PIPELINE,
        userId: this.getUserId(),
        tool: tool.name
      }
      this.executeStep(context, tool, file, env)
    }
    else {
        // Handle tool instructions update
        //   const instructions = event.instructions
        //   this.updateTool(tool, { instructions })
    }
  }

//   async updateTool(tool: FlowTool, data: Partial<FlowTool>) {
//     await this.firebase.updateAt(
//       getToolPath(tool.name),
//       {
//         ...data,
//         name: tool.name,
//         userId: this.userService.userDocId(),
//         updatedAt: new Date()
//       }
//     )
//   }

  renderMarkdown(text: string) {
    // this.log('rendering markdown', text)
    const block = (marked(text) as string).trim()
    return block ? this.sanitizer.bypassSecurityTrustHtml(block) : null
  }

  getUserId() {
    return (this.userService.user() as UserRecord).docId
  }

  defaultDomainName() {
    return this.defaultDomain()?.domainName as string
  }

  async defaultCheckExecute(file: FlowFile, user: UserRecord, pages: number) {
    if (user.groupDocId) {
      const group = this.groupService.currentUserGroup() as Group
      if (group.pageBalance > 0 && pages > group.pageBalance) {
        throw new Error(`Insufficient group credits ${group.pageBalance || 0}`)
      }
    }
    else if (user.pageBalance > 0 && pages > user.pageBalance && (user.subscriptionInfo?.status === 'trialing' || user.subscriptionInfo?.status === 'canceled')) {
      throw new Error(`Insufficient user credits ${user.pageBalance || 0}`)
    }
  }

  async logPipelineStep(data: PipelineStepLog) {
    await this.firebase.updateAt(FirestoreCollectionTypes.COSTS_COLLECTION, data)
  }

  async publishMessages(flowFiles: FlowFile[], pipelineName: string) {
    // console.log('publishMessages', flowFiles, pipelineName)
    if (flowFiles.length > 0) {
      try {
        if (pipelineName === 'none') {
            flowFiles.forEach( f => this.updateFile(f, { state: 'idle', stateDescription: 'uploaded' }))
        }
        else {
            this.firebase.awaitCloudFunction('publishPipelineMessages', {
                    pipelineName,
                    userId: this.getUserId(),
                    flowFileDocIds: flowFiles.map(flowFile => flowFile.docId)
                } as BatchMessagePayload)
        }
      }
      finally {
        this.uploadedFileReadyQueue.set([])
      }
    }
  }

  async downloadOutputFile(file: FlowFile) {
    if (file.outputStorageName) {
      const url = await this.getFileContentsAsLocalURL(file.outputStorageName)
      const link = document.createElement('a');
      link.href = url;
      link.download = file.outputStorageName!.split('/').pop()!
      link.click();
      setTimeout(() => URL.revokeObjectURL(url), 60) // For Firefox it is necessary to delay revoking the ObjectURL
    }
    else {
      const url = file.outputURL as string
      window.open(decodeURI(url), '_blank')
    }
  }

  async downloadSourceFile(file: FlowFile) {
    const url = await this.getFileContentsAsLocalURL(file.storageName)
    const link = document.createElement('a');
    link.href = url;
    link.target = '_blank';
    link.download = file.fileName;
    link.click();
    setTimeout(() => URL.revokeObjectURL(url), 60) // For Firefox it is necessary to delay revoking the ObjectURL
  }

  getClassMapForDomain(domain: string) {
    return this.domainClassMaps().get(domain)
  }

  async postDocument(payload: any) {
    return await this.firebase.awaitCloudFunction('fetchFromProvider', { ...payload, action: 'post', userId: this.getUserId() }, 54000)
  }

  async pollJob(payload: any) {
    return await this.firebase.awaitCloudFunction('fetchFromProvider', { ...payload, action: 'poll', userId: this.getUserId() }, 54000)
  }

  getUserFiles(userId: string) {
    return this.firebase.collectionWithConverter$(getUserFilesPath(userId), flowFileConverter)
      .pipe(
        debounceTime(300),
        map(files => files.toSorted((a, b) => a.createdAt > b.createdAt ? -1 : 1) as FlowFile[]),
        shareReplay(1))
  }

  async updateJob(file: FlowFile, status: 'error' | 'processing' | 'complete', statusDescription: string): Promise<void> {
      // do nothing - applicable only to API pipeline flow
  }

  async sendProcessingWarningEmail(payload: any): Promise<any> {
      //do nothing. applicable only to cloud processing
  }

}