-
Notifications
You must be signed in to change notification settings - Fork 10
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge pull request #126 from alan-francis/main
Introduce orchestration HTTP service - work in progress
- Loading branch information
Showing
1 changed file
with
301 additions
and
43 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -6,13 +6,189 @@ import { | |
Router, | ||
} from "https://deno.land/x/[email protected]/mod.ts"; | ||
import JSZip from "npm:jszip"; | ||
import { path } from "./deps.ts"; | ||
import { | ||
colors as c, | ||
path, | ||
SQLa_orch as o, | ||
SQLa_orch_duckdb as ddbo, | ||
} from "./deps.ts"; | ||
import * as mod from "./mod.ts"; | ||
|
||
async function addFolderToZip( | ||
zip: JSZip, | ||
folderPath: string, | ||
zipFolderPath = "", | ||
) { | ||
for await (const entry of Deno.readDir(folderPath)) { | ||
const fullPath = path.join(folderPath, entry.name); | ||
const zipPath = path.join(zipFolderPath, entry.name); | ||
|
||
if (entry.isDirectory) { | ||
await addFolderToZip(zip, fullPath, zipPath); // Recurse into subdirectories | ||
} else if (entry.isFile) { | ||
const fileContent = await Deno.readFile(fullPath); | ||
zip.file(zipPath, fileContent); // Add files to zip | ||
} | ||
} | ||
} | ||
|
||
async function orchestrateFiles( | ||
sessionID: string, | ||
govn: ddbo.DuckDbOrchGovernance, | ||
ip: mod.OrchEngineIngressPaths, | ||
src: | ||
| mod.ScreeningIngressGroup | ||
| o.IngressEntry<string, string> | ||
| o.IngressEntry<string, string>[], | ||
workflowPaths: mod.OrchEngineWorkflowPaths, | ||
referenceDataHome: string, | ||
fhirEndpointUrl?: string, | ||
) { | ||
const sessionStart = { | ||
ingressPaths: ip, | ||
initAt: new Date(), | ||
sessionID, | ||
src, | ||
version: mod.ORCHESTRATE_VERSION, | ||
}; | ||
|
||
const sessionLogFsPath = workflowPaths.egress.resolvedPath("session.json"); | ||
Deno.writeTextFile( | ||
sessionLogFsPath, | ||
JSON.stringify(sessionStart, null, " "), | ||
); | ||
|
||
const args: mod.OrchEngineArgs = { | ||
session: new o.OrchSession(sessionID, govn, mod.ORCHESTRATE_VERSION), | ||
workflowPaths, | ||
walkRootPaths: [ip.ingress.home], | ||
referenceDataHome, | ||
emitDagPuml: async (puml, _previewUrl) => { | ||
await Deno.writeTextFile( | ||
workflowPaths.inProcess.resolvedPath("dag.puml"), | ||
puml, | ||
); | ||
}, | ||
}; | ||
|
||
await o.orchestrate< | ||
ddbo.DuckDbOrchGovernance, | ||
mod.OrchEngine, | ||
mod.OrchEngineArgs, | ||
ddbo.DuckDbOrchEmitContext | ||
>(mod.OrchEngine.prototype, mod.oeDescr, { | ||
govn, | ||
newInstance: () => | ||
new mod.OrchEngine( | ||
mod.watchFsPatternIngestSourcesSupplier(govn, src), | ||
govn, | ||
args, | ||
), | ||
}, args); | ||
|
||
const { diagsMdSupplier, resourceDbSupplier } = workflowPaths.egress; | ||
const sessionEnd = { | ||
...sessionStart, | ||
consumed: [] as { | ||
readonly activity: "delete" | "move"; | ||
readonly fsPath: string; | ||
}[], | ||
stdErrsEncountered: | ||
"✅ No DuckDB orchestration SQL syntax or SQL parsing errors encountered.", | ||
diagsMarkdown: diagsMdSupplier | ||
? `📄 Diagnostics are in ${diagsMdSupplier()}` | ||
: undefined, | ||
duckDb: | ||
`🦆 ${workflowPaths.inProcess.duckDbFsPathSupplier()} has the raw ingested content and \`orch_session_*\` validation tables.`, | ||
sqliteDB: resourceDbSupplier | ||
? `📦 ${resourceDbSupplier()} has the aggregated content and \`orch_session_*\` validation tables.` | ||
: undefined, | ||
referenceDataHome: referenceDataHome, | ||
publishFhirURL: fhirEndpointUrl, | ||
publishFhirResult: [] as { | ||
response: string; | ||
fhirJsonStructValid: boolean; | ||
fhirFileName: string; | ||
}[], | ||
}; | ||
|
||
const archiveHome = workflowPaths.ingressArchive?.home; | ||
const consumeIngressed = async (fsPath: string) => { | ||
if (archiveHome) { | ||
await Deno.rename(fsPath, path.join(archiveHome, path.basename(fsPath))); | ||
sessionEnd.consumed.push({ activity: "move", fsPath }); | ||
} else { | ||
await Deno.remove(fsPath); | ||
sessionEnd.consumed.push({ activity: "delete", fsPath }); | ||
} | ||
}; | ||
|
||
if (mod.isScreeningIngressGroup(src)) { | ||
src.entries.forEach(async (entry) => await consumeIngressed(entry.fsPath)); | ||
} else { | ||
if (Array.isArray(src)) { | ||
src.forEach(async (entry) => await consumeIngressed(entry.fsPath)); | ||
} else { | ||
await consumeIngressed(src.fsPath); | ||
} | ||
} | ||
if (fhirEndpointUrl) { | ||
// const fhirFilePath = workflowPaths.egress.resolvedPath("fhir.json"); | ||
// const fhirContent = await Deno.readTextFile(fhirFilePath); | ||
const directoryPath = workflowPaths.egress.resolvedPath("."); | ||
for await (const entry of Deno.readDir(directoryPath)) { | ||
if (entry.isFile && entry.name.startsWith("fhir-")) { | ||
const fhirFilePath = `${directoryPath}/${entry.name}`; | ||
const fhirContent = await Deno.readTextFile(fhirFilePath); | ||
try { | ||
// parse the json just to make sure that a valid json is passed | ||
const _content = JSON.parse(fhirContent); | ||
const response = await fetch(fhirEndpointUrl, { | ||
method: "POST", | ||
headers: { | ||
"Content-Type": "application/json", | ||
}, | ||
body: fhirContent, | ||
}); | ||
const result = await response.json(); | ||
const fhirJson = "fhir-result-" + | ||
entry.name.substring(0, entry.name.lastIndexOf(".")) + ".json"; | ||
const fhirResultFilePath = `${directoryPath}/${fhirJson}`; | ||
await Deno.writeTextFile( | ||
fhirResultFilePath, | ||
JSON.stringify(result, null, " "), | ||
); | ||
sessionEnd.publishFhirResult.push({ | ||
"response": JSON.stringify(result), | ||
"fhirJsonStructValid": true, | ||
"fhirFileName": entry.name, | ||
}); | ||
} catch (error) { | ||
Deno.writeTextFile(fhirFilePath, error); | ||
sessionEnd.publishFhirResult.push({ | ||
"response": JSON.stringify(error), | ||
"fhirJsonStructValid": false, | ||
"fhirFileName": entry.name, | ||
}); | ||
} | ||
} | ||
} | ||
} | ||
|
||
async function orchestrateFiles(inputPath: string, outputPath: string) { | ||
console.log(`Orchestrating files from ${inputPath} to ${outputPath}`); | ||
Deno.writeTextFile( | ||
sessionLogFsPath, | ||
JSON.stringify({ ...sessionEnd, finalizeAt: new Date() }, null, " "), | ||
); | ||
console.info(c.dim(sessionLogFsPath)); | ||
} | ||
|
||
const runServer = async (host: string, port: number, shinnyFhirUrl: string) => { | ||
const runServer = async ( | ||
host: string, | ||
port: number, | ||
rootPath: string, | ||
referenceDataHome: string, | ||
shinnyFhirUrl: string, | ||
) => { | ||
const app = new Application(); | ||
const router = new Router(); | ||
|
||
|
@@ -25,61 +201,123 @@ const runServer = async (host: string, port: number, shinnyFhirUrl: string) => { | |
|
||
const bodyResult = context.request.body({ type: "form-data" }); | ||
const formData = await bodyResult.value.read(); | ||
if (!formData.fields.qe) { | ||
context.response.status = 400; | ||
context.response.body = "No QE found in the request."; | ||
return; | ||
} | ||
//TODO: Loop through individual files uploaded apart from | ||
// zip file and orchestrate them as in SFTP | ||
const zipFile = formData.files ? formData.files[0] : ""; | ||
const qe = formData.fields.qe; | ||
if (zipFile && zipFile.filename) { | ||
const qeName = formData.fields.qe ? formData.fields.qe : "healthelink"; | ||
const zip = new JSZip(); | ||
// Read the file from the filename path | ||
const zipData = await Deno.readFile(zipFile.filename); | ||
const unzippedData = await zip.loadAsync(zipData); | ||
// Container path would /SFTP/healthelink/egress/ | ||
const tempIngressTxPath = path.join( | ||
Deno.cwd(), | ||
"SFTP", | ||
qeName, | ||
"egress", | ||
"ingress-tx", | ||
const govn = new ddbo.DuckDbOrchGovernance( | ||
true, | ||
new ddbo.DuckDbOrchEmitContext(), | ||
); | ||
const sessionID = await govn.emitCtx.newUUID(false); | ||
const basePath = `${rootPath}/${qe}`; | ||
const ingressPath = `${basePath}/ingress`; | ||
await Deno.mkdir(ingressPath, { recursive: true }); | ||
const egressPath = `${rootPath}/${qe}/egress`; | ||
const ingressTxPath = `${egressPath}/${sessionID}/.ingress-tx`; | ||
const egressSessionPath = `${egressPath}/${sessionID}`; | ||
const workflowPaths = mod.orchEngineWorkflowPaths( | ||
basePath, | ||
sessionID, | ||
); | ||
//Make ingress-tx directory | ||
await Deno.mkdir(tempIngressTxPath, { recursive: true }); | ||
|
||
const ingressTxPaths = mod.orchEngineIngressPaths( | ||
workflowPaths.ingressTx.home, | ||
); | ||
await workflowPaths.initializePaths?.(); | ||
console.log(ingressTxPath); | ||
await Promise.all( | ||
Object.keys(unzippedData.files).map(async (fileName) => { | ||
const file = unzippedData.files[fileName]; | ||
if (!file.dir) { | ||
const content = await file.async("uint8array"); | ||
const filePath = path.join(tempIngressTxPath, fileName); | ||
const filePath = path.join(ingressTxPath, fileName); | ||
await Deno.writeFile(filePath, content); | ||
} | ||
}), | ||
); | ||
|
||
const tempEgressPath = path.join( | ||
Deno.cwd(), | ||
"SFTP", | ||
qeName, | ||
"egress", | ||
const screeningGroups = new mod.ScreeningIngressGroups( | ||
async (group) => { | ||
await orchestrateFiles( | ||
sessionID, | ||
govn, | ||
ingressTxPaths, | ||
group, | ||
workflowPaths, | ||
referenceDataHome, | ||
); | ||
}, | ||
); | ||
//Call the orchestration function | ||
await orchestrateFiles(tempIngressTxPath, tempEgressPath); | ||
|
||
const resultZip = new JSZip(); | ||
for await (const entry of Deno.readDir(tempIngressTxPath)) { | ||
const entryPath = path.join(tempIngressTxPath, entry.name); | ||
const data = await Deno.readFile(entryPath); | ||
resultZip.file(entry.name, data); | ||
} | ||
const zipBuffer = await resultZip.generateAsync({ type: "uint8array" }); | ||
// Specify the final ZIP file path | ||
const finalZipPath = path.join(tempEgressPath, zipFile.originalName); | ||
|
||
// Save the ZIP file to the path | ||
await Deno.writeFile(finalZipPath, zipBuffer); | ||
context.response.body = "ZIP file processed and saved successfully."; | ||
context.response.type = "application/zip"; | ||
// Use below code to emit the zip file | ||
// context.response.body = zipBuffer; | ||
// context.response.type = "application/zip"; | ||
const watchPaths: o.WatchFsPath<string, string>[] = [{ | ||
pathID: "ingress", | ||
rootPath: ingressTxPaths.ingress.home, | ||
// note: onIngress we just return promises (not awaited) so that we can | ||
// allow each async workflow to work independently (better performance) | ||
onIngress: (entry) => { | ||
const group = screeningGroups.potential(entry); | ||
try { | ||
orchestrateFiles( | ||
sessionID, | ||
govn, | ||
ingressTxPaths, | ||
group ?? entry, | ||
workflowPaths, | ||
referenceDataHome, | ||
); | ||
} catch (err) { | ||
// TODO: store the error in a proper log | ||
console.dir(entry); | ||
console.error(err); | ||
} | ||
}, | ||
}]; | ||
|
||
console.log(`Processing files in ${ingressTxPaths.ingress.home}`); | ||
|
||
await o.ingestWatchedFs({ | ||
drain: async (entries) => { | ||
if (entries.length) { | ||
await orchestrateFiles( | ||
sessionID, | ||
govn, | ||
ingressTxPaths, | ||
entries, | ||
workflowPaths, | ||
referenceDataHome, | ||
); | ||
const zip = new JSZip(); | ||
await addFolderToZip(zip, egressSessionPath); | ||
const zipContent = await zip.generateAsync({ type: "uint8array" }); | ||
// Specify the final ZIP file path | ||
const finalZipPath = path.join( | ||
egressPath, | ||
sessionID, | ||
sessionID, | ||
); | ||
await Deno.writeFile(finalZipPath, zipContent); | ||
console.log( | ||
`Completed processing files in ${ingressTxPaths.ingress.home}`, | ||
); | ||
// context.response.body = "ZIP file processed and saved successfully."; | ||
// Use below code to emit the zip file | ||
context.response.body = zipContent; | ||
context.response.type = "application/zip"; | ||
} | ||
}, | ||
watch: false, | ||
watchPaths, | ||
}); | ||
} else { | ||
context.response.status = 400; | ||
context.response.body = "No ZIP file found in the request"; | ||
|
@@ -109,10 +347,30 @@ await new Command() | |
.option("-H, --host <host:string>", "Host for the HTTP server.", { | ||
default: "127.0.0.1", | ||
}) | ||
.option("--session-artifacts-home <path>", "Root path.", { default: "/HTTP" }) | ||
.option("--reference-data-home <path>", "Reference data path.", { | ||
default: path.join(Deno.cwd(), "src/ahc-hrsn-elt/reference-data"), | ||
}) | ||
.option("--shinny-fhir-url <url:string>", "SHIN-NY FHIR endpoint URL.", { | ||
default: "https://default-fhir-url.com", | ||
}) | ||
.action(({ host, port, shinnyFhirUrl }) => { | ||
runServer(host, port, shinnyFhirUrl); | ||
}) | ||
.action( | ||
( | ||
{ | ||
host, | ||
port, | ||
sessionArtifactsHome, | ||
referenceDataHome, | ||
shinnyFhirUrl, | ||
}, | ||
) => { | ||
runServer( | ||
host, | ||
port, | ||
sessionArtifactsHome, | ||
referenceDataHome, | ||
shinnyFhirUrl, | ||
); | ||
}, | ||
) | ||
.parse(Deno.args); |