diff --git a/src/ahc-hrsn-elt/screening/orch-http-service.ts b/src/ahc-hrsn-elt/screening/orch-http-service.ts index d5a70cad..7d0f4b3c 100644 --- a/src/ahc-hrsn-elt/screening/orch-http-service.ts +++ b/src/ahc-hrsn-elt/screening/orch-http-service.ts @@ -6,13 +6,189 @@ import { Router, } from "https://deno.land/x/oak@v12.6.2/mod.ts"; import JSZip from "npm:jszip"; -import { path } from "./deps.ts"; +import { + colors as c, + path, + SQLa_orch as o, + SQLa_orch_duckdb as ddbo, +} from "./deps.ts"; +import * as mod from "./mod.ts"; + +async function addFolderToZip( + zip: JSZip, + folderPath: string, + zipFolderPath = "", +) { + for await (const entry of Deno.readDir(folderPath)) { + const fullPath = path.join(folderPath, entry.name); + const zipPath = path.join(zipFolderPath, entry.name); + + if (entry.isDirectory) { + await addFolderToZip(zip, fullPath, zipPath); // Recurse into subdirectories + } else if (entry.isFile) { + const fileContent = await Deno.readFile(fullPath); + zip.file(zipPath, fileContent); // Add files to zip + } + } +} + +async function orchestrateFiles( + sessionID: string, + govn: ddbo.DuckDbOrchGovernance, + ip: mod.OrchEngineIngressPaths, + src: + | mod.ScreeningIngressGroup + | o.IngressEntry + | o.IngressEntry[], + workflowPaths: mod.OrchEngineWorkflowPaths, + referenceDataHome: string, + fhirEndpointUrl?: string, +) { + const sessionStart = { + ingressPaths: ip, + initAt: new Date(), + sessionID, + src, + version: mod.ORCHESTRATE_VERSION, + }; + + const sessionLogFsPath = workflowPaths.egress.resolvedPath("session.json"); + Deno.writeTextFile( + sessionLogFsPath, + JSON.stringify(sessionStart, null, " "), + ); + + const args: mod.OrchEngineArgs = { + session: new o.OrchSession(sessionID, govn, mod.ORCHESTRATE_VERSION), + workflowPaths, + walkRootPaths: [ip.ingress.home], + referenceDataHome, + emitDagPuml: async (puml, _previewUrl) => { + await Deno.writeTextFile( + workflowPaths.inProcess.resolvedPath("dag.puml"), + puml, + ); + }, + }; + + await o.orchestrate< + ddbo.DuckDbOrchGovernance, + mod.OrchEngine, + mod.OrchEngineArgs, + ddbo.DuckDbOrchEmitContext + >(mod.OrchEngine.prototype, mod.oeDescr, { + govn, + newInstance: () => + new mod.OrchEngine( + mod.watchFsPatternIngestSourcesSupplier(govn, src), + govn, + args, + ), + }, args); + + const { diagsMdSupplier, resourceDbSupplier } = workflowPaths.egress; + const sessionEnd = { + ...sessionStart, + consumed: [] as { + readonly activity: "delete" | "move"; + readonly fsPath: string; + }[], + stdErrsEncountered: + "✅ No DuckDB orchestration SQL syntax or SQL parsing errors encountered.", + diagsMarkdown: diagsMdSupplier + ? `📄 Diagnostics are in ${diagsMdSupplier()}` + : undefined, + duckDb: + `🦆 ${workflowPaths.inProcess.duckDbFsPathSupplier()} has the raw ingested content and \`orch_session_*\` validation tables.`, + sqliteDB: resourceDbSupplier + ? `📦 ${resourceDbSupplier()} has the aggregated content and \`orch_session_*\` validation tables.` + : undefined, + referenceDataHome: referenceDataHome, + publishFhirURL: fhirEndpointUrl, + publishFhirResult: [] as { + response: string; + fhirJsonStructValid: boolean; + fhirFileName: string; + }[], + }; + + const archiveHome = workflowPaths.ingressArchive?.home; + const consumeIngressed = async (fsPath: string) => { + if (archiveHome) { + await Deno.rename(fsPath, path.join(archiveHome, path.basename(fsPath))); + sessionEnd.consumed.push({ activity: "move", fsPath }); + } else { + await Deno.remove(fsPath); + sessionEnd.consumed.push({ activity: "delete", fsPath }); + } + }; + + if (mod.isScreeningIngressGroup(src)) { + src.entries.forEach(async (entry) => await consumeIngressed(entry.fsPath)); + } else { + if (Array.isArray(src)) { + src.forEach(async (entry) => await consumeIngressed(entry.fsPath)); + } else { + await consumeIngressed(src.fsPath); + } + } + if (fhirEndpointUrl) { + // const fhirFilePath = workflowPaths.egress.resolvedPath("fhir.json"); + // const fhirContent = await Deno.readTextFile(fhirFilePath); + const directoryPath = workflowPaths.egress.resolvedPath("."); + for await (const entry of Deno.readDir(directoryPath)) { + if (entry.isFile && entry.name.startsWith("fhir-")) { + const fhirFilePath = `${directoryPath}/${entry.name}`; + const fhirContent = await Deno.readTextFile(fhirFilePath); + try { + // parse the json just to make sure that a valid json is passed + const _content = JSON.parse(fhirContent); + const response = await fetch(fhirEndpointUrl, { + method: "POST", + headers: { + "Content-Type": "application/json", + }, + body: fhirContent, + }); + const result = await response.json(); + const fhirJson = "fhir-result-" + + entry.name.substring(0, entry.name.lastIndexOf(".")) + ".json"; + const fhirResultFilePath = `${directoryPath}/${fhirJson}`; + await Deno.writeTextFile( + fhirResultFilePath, + JSON.stringify(result, null, " "), + ); + sessionEnd.publishFhirResult.push({ + "response": JSON.stringify(result), + "fhirJsonStructValid": true, + "fhirFileName": entry.name, + }); + } catch (error) { + Deno.writeTextFile(fhirFilePath, error); + sessionEnd.publishFhirResult.push({ + "response": JSON.stringify(error), + "fhirJsonStructValid": false, + "fhirFileName": entry.name, + }); + } + } + } + } -async function orchestrateFiles(inputPath: string, outputPath: string) { - console.log(`Orchestrating files from ${inputPath} to ${outputPath}`); + Deno.writeTextFile( + sessionLogFsPath, + JSON.stringify({ ...sessionEnd, finalizeAt: new Date() }, null, " "), + ); + console.info(c.dim(sessionLogFsPath)); } -const runServer = async (host: string, port: number, shinnyFhirUrl: string) => { +const runServer = async ( + host: string, + port: number, + rootPath: string, + referenceDataHome: string, + shinnyFhirUrl: string, +) => { const app = new Application(); const router = new Router(); @@ -25,61 +201,123 @@ const runServer = async (host: string, port: number, shinnyFhirUrl: string) => { const bodyResult = context.request.body({ type: "form-data" }); const formData = await bodyResult.value.read(); + if (!formData.fields.qe) { + context.response.status = 400; + context.response.body = "No QE found in the request."; + return; + } + //TODO: Loop through individual files uploaded apart from + // zip file and orchestrate them as in SFTP const zipFile = formData.files ? formData.files[0] : ""; + const qe = formData.fields.qe; if (zipFile && zipFile.filename) { - const qeName = formData.fields.qe ? formData.fields.qe : "healthelink"; const zip = new JSZip(); // Read the file from the filename path const zipData = await Deno.readFile(zipFile.filename); const unzippedData = await zip.loadAsync(zipData); - // Container path would /SFTP/healthelink/egress/ - const tempIngressTxPath = path.join( - Deno.cwd(), - "SFTP", - qeName, - "egress", - "ingress-tx", + const govn = new ddbo.DuckDbOrchGovernance( + true, + new ddbo.DuckDbOrchEmitContext(), + ); + const sessionID = await govn.emitCtx.newUUID(false); + const basePath = `${rootPath}/${qe}`; + const ingressPath = `${basePath}/ingress`; + await Deno.mkdir(ingressPath, { recursive: true }); + const egressPath = `${rootPath}/${qe}/egress`; + const ingressTxPath = `${egressPath}/${sessionID}/.ingress-tx`; + const egressSessionPath = `${egressPath}/${sessionID}`; + const workflowPaths = mod.orchEngineWorkflowPaths( + basePath, + sessionID, ); - //Make ingress-tx directory - await Deno.mkdir(tempIngressTxPath, { recursive: true }); + const ingressTxPaths = mod.orchEngineIngressPaths( + workflowPaths.ingressTx.home, + ); + await workflowPaths.initializePaths?.(); + console.log(ingressTxPath); await Promise.all( Object.keys(unzippedData.files).map(async (fileName) => { const file = unzippedData.files[fileName]; if (!file.dir) { const content = await file.async("uint8array"); - const filePath = path.join(tempIngressTxPath, fileName); + const filePath = path.join(ingressTxPath, fileName); await Deno.writeFile(filePath, content); } }), ); - const tempEgressPath = path.join( - Deno.cwd(), - "SFTP", - qeName, - "egress", + const screeningGroups = new mod.ScreeningIngressGroups( + async (group) => { + await orchestrateFiles( + sessionID, + govn, + ingressTxPaths, + group, + workflowPaths, + referenceDataHome, + ); + }, ); - //Call the orchestration function - await orchestrateFiles(tempIngressTxPath, tempEgressPath); - - const resultZip = new JSZip(); - for await (const entry of Deno.readDir(tempIngressTxPath)) { - const entryPath = path.join(tempIngressTxPath, entry.name); - const data = await Deno.readFile(entryPath); - resultZip.file(entry.name, data); - } - const zipBuffer = await resultZip.generateAsync({ type: "uint8array" }); - // Specify the final ZIP file path - const finalZipPath = path.join(tempEgressPath, zipFile.originalName); - - // Save the ZIP file to the path - await Deno.writeFile(finalZipPath, zipBuffer); - context.response.body = "ZIP file processed and saved successfully."; - context.response.type = "application/zip"; - // Use below code to emit the zip file - // context.response.body = zipBuffer; - // context.response.type = "application/zip"; + const watchPaths: o.WatchFsPath[] = [{ + pathID: "ingress", + rootPath: ingressTxPaths.ingress.home, + // note: onIngress we just return promises (not awaited) so that we can + // allow each async workflow to work independently (better performance) + onIngress: (entry) => { + const group = screeningGroups.potential(entry); + try { + orchestrateFiles( + sessionID, + govn, + ingressTxPaths, + group ?? entry, + workflowPaths, + referenceDataHome, + ); + } catch (err) { + // TODO: store the error in a proper log + console.dir(entry); + console.error(err); + } + }, + }]; + + console.log(`Processing files in ${ingressTxPaths.ingress.home}`); + + await o.ingestWatchedFs({ + drain: async (entries) => { + if (entries.length) { + await orchestrateFiles( + sessionID, + govn, + ingressTxPaths, + entries, + workflowPaths, + referenceDataHome, + ); + const zip = new JSZip(); + await addFolderToZip(zip, egressSessionPath); + const zipContent = await zip.generateAsync({ type: "uint8array" }); + // Specify the final ZIP file path + const finalZipPath = path.join( + egressPath, + sessionID, + sessionID, + ); + await Deno.writeFile(finalZipPath, zipContent); + console.log( + `Completed processing files in ${ingressTxPaths.ingress.home}`, + ); + // context.response.body = "ZIP file processed and saved successfully."; + // Use below code to emit the zip file + context.response.body = zipContent; + context.response.type = "application/zip"; + } + }, + watch: false, + watchPaths, + }); } else { context.response.status = 400; context.response.body = "No ZIP file found in the request"; @@ -109,10 +347,30 @@ await new Command() .option("-H, --host ", "Host for the HTTP server.", { default: "127.0.0.1", }) + .option("--session-artifacts-home ", "Root path.", { default: "/HTTP" }) + .option("--reference-data-home ", "Reference data path.", { + default: path.join(Deno.cwd(), "src/ahc-hrsn-elt/reference-data"), + }) .option("--shinny-fhir-url ", "SHIN-NY FHIR endpoint URL.", { default: "https://default-fhir-url.com", }) - .action(({ host, port, shinnyFhirUrl }) => { - runServer(host, port, shinnyFhirUrl); - }) + .action( + ( + { + host, + port, + sessionArtifactsHome, + referenceDataHome, + shinnyFhirUrl, + }, + ) => { + runServer( + host, + port, + sessionArtifactsHome, + referenceDataHome, + shinnyFhirUrl, + ); + }, + ) .parse(Deno.args);