Skip to content

Commit

Permalink
Merge pull request #2440 from IDEMSInternational/fix/generator-chaine…
Browse files Browse the repository at this point in the history
…d-flows

Fix: generator chained flows
  • Loading branch information
chrismclarke authored Oct 1, 2024
2 parents 6dfa713 + 139270f commit c0cdce2
Show file tree
Hide file tree
Showing 16 changed files with 277 additions and 189 deletions.
3 changes: 2 additions & 1 deletion .idems_app/deployments/local/.gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,5 @@ cache
tasks
config.json
sheets/
!sheets/demo.xlsx
!sheets/demo.xlsx
reports
2 changes: 0 additions & 2 deletions packages/data-models/flowTypes.ts
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,6 @@ export namespace FlowTypes {
rows: any[];
/** Datalists populate rows as a hashmap instead to allow easier access to nested structures */
rowsHashmap?: { [id: string]: any };
/** Additional flows generated during parsing, such as data pipe or generator flow outputs */
_generated?: { [flow_type in FlowType]?: { [flow_name: string]: FlowTypeWithData } };
}

/*********************************************************************************************
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
import { TimeLike } from "fs";
import { JsonFileCache } from "./jsonFile";
import type { IContentsEntry } from "shared";

interface IContentsEntryWithValue extends IContentsEntry {
value?: any;
}

/**
* Mock implementation of JsonFileCache that only uses in-memory caching
* instead of persisting files to disk
*/
export class MockJsonFileCache extends JsonFileCache {
private mockContents: Record<string, IContentsEntryWithValue> = {};

constructor() {
super("", 0);
}

public add(data: any, entryName?: string, stats?: { mtime: TimeLike }) {
if (data) {
if (!entryName) {
entryName = this.generateCacheEntryName(data);
}
if (!this.mockContents[entryName]) {
this.mockContents[entryName] = {} as any;
}
this.mockContents[entryName].value = data;
return { filePath: entryName, entryName, data };
}
}

public clear() {
this.mockContents = {};
}

public remove(entryName: string) {
if (this.mockContents.hasOwnProperty(entryName)) {
delete this.mockContents[entryName];
}
}

public get<T = any>(entryName: string) {
return this.mockContents[entryName] as T;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,10 @@ export class JsonFileCache {

constructor(folderPath: string, version: number) {
this.version = version;
this.setup(folderPath);
// HACK - support tests by avoiding setup if folderPath not provided
if (folderPath) {
this.setup(folderPath);
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,14 +34,15 @@ jest.spyOn(ActiveDeployment, "get").mockReturnValue(mockDeployment as IDeploymen
/** yarn workspace scripts test -t convert.spec.ts */
describe("App Data Converter", () => {
let converter: AppDataConverter;
beforeAll(() => {

beforeEach(() => {
ensureDirSync(paths.outputFolder);
emptyDirSync(paths.outputFolder);
ensureDirSync(paths.cacheFolder);
emptyDirSync(paths.cacheFolder);
});
beforeEach(() => {
converter = new AppDataConverter(paths);
// HACK - Tests failing on CI due to logs persisting between runs
clearLogs(true);
});

it("Uses child caches", async () => {
Expand All @@ -64,7 +65,7 @@ describe("App Data Converter", () => {
it("Populates output to folder by data type", async () => {
await converter.run();
const outputFolders = readdirSync(paths.outputFolder);
expect(outputFolders).toEqual(["data_list", "data_pipe", "template"]);
expect(outputFolders).toEqual(["data_list", "template"]);
});
it("Supports input from multiple source folders", async () => {
const multipleSourceConverter = new AppDataConverter({
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,10 @@ class BaseProcessor<T = any, V = any> {
constructor(public context: { namespace: string; paths: IConverterPaths; cacheVersion: number }) {
const { namespace } = context;
this.logger = createChildFileLogger({ source: namespace });
this.setupCache();
// HACK - support tests by avoiding setup if folderPath not provided
if (context.paths) {
this.setupCache();
}
}
/**
* Create a namespaced cache folder and populate a list of all files currently cached,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,8 @@ describe("FlowParser Processor", () => {
});
it("Outputs flows by type", async () => {
const output = await processor.process(testInputs);
expect(Object.keys(output)).toEqual(["data_list", "template", "data_pipe"]);
// NOTE - data_pipe and generator flows will not populate self but instead generated outputs
expect(Object.keys(output)).toEqual(["data_list", "template"]);
const errorLogs = getLogs("error");
if (errorLogs.length > 0) {
console.log("Unexpected Errors:\n", errorLogs);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import { IConverterPaths, IFlowHashmapByType, IParsedWorkbookData } from "../../
import { arrayToHashmap, groupJsonByKey, IContentsEntry } from "../../utils";
import BaseProcessor from "../base";

const cacheVersion = 20240502.0;
const cacheVersion = 20240924.4;

export class FlowParserProcessor extends BaseProcessor<FlowTypes.FlowTypeWithData> {
public parsers: { [flowType in FlowTypes.FlowType]: Parsers.DefaultParser } = {
Expand Down Expand Up @@ -64,10 +64,8 @@ export class FlowParserProcessor extends BaseProcessor<FlowTypes.FlowTypeWithDat

public updateProcessedFlowHashmap(flow: FlowTypes.FlowTypeWithData) {
const { flow_name, flow_type, _xlsxPath } = flow;
if (!this.processedFlowHashmap[flow_type]) {
this.processedFlowHashmap[flow_type] = {};
this.processedFlowHashmapWithMeta[flow_type] = {};
}
this.processedFlowHashmap[flow_type] ??= {};
this.processedFlowHashmapWithMeta[flow_type] ??= {};
// NOTE - duplicate flows are identified up during main converter
this.processedFlowHashmap[flow_type][flow_name] = flow.rows;
this.processedFlowHashmapWithMeta[flow_type][flow_name] = flow;
Expand All @@ -93,49 +91,15 @@ export class FlowParserProcessor extends BaseProcessor<FlowTypes.FlowTypeWithDat
return k;
});
}
// populate any generated flows to main list
const flowTypesWithGenerated = this.populateGeneratedFlows(flowHashmapByType);

// convert back from hashmap to hashArrays for final output
const outputData: IParsedWorkbookData = {};
for (const [type, typeHashmap] of Object.entries(flowTypesWithGenerated)) {
for (const [type, typeHashmap] of Object.entries(flowHashmapByType)) {
outputData[type] = Object.values(typeHashmap);
}
return outputData;
}

/**
* Iterate over all flows to check for any that populate additional _generated flows
* that should be extracted to top-level
*/
private populateGeneratedFlows(flowsByType: IFlowHashmapByType) {
// handle any additional methods that operate on full list of processed flows,
// e.g. populating additional generated flows
for (const typeFlows of Object.values(flowsByType)) {
for (const { _generated, ...flow } of Object.values(typeFlows)) {
if (_generated) {
// remove _generated field from flow
flowsByType[flow.flow_type][flow.flow_name] = flow;
// populate generated to main list, ensure generated flows are also fully processed
for (const generatedFlows of Object.values(_generated)) {
for (const generatedFlow of Object.values(generatedFlows)) {
flowsByType[generatedFlow.flow_type] ??= {};
if (flowsByType[generatedFlow.flow_type][generatedFlow.flow_name]) {
this.logger.error({
message: "Generated flow will override existing flow",
details: [generatedFlow.flow_type, generatedFlow.flow_name],
});
}
const processed = this.processInput(JSON.parse(JSON.stringify(generatedFlow)));
flowsByType[generatedFlow.flow_type][generatedFlow.flow_name] = processed;
}
}
}
}
}
return flowsByType;
}

public shouldUseCachedEntry(
input: FlowTypes.FlowTypeWithData,
cachedEntry: IContentsEntry
Expand Down
Original file line number Diff line number Diff line change
@@ -1,15 +1,24 @@
import { FlowTypes } from "data-models";
import type { IDataPipeOperation } from "shared";
import { DataPipeParser } from "./data_pipe.parser";
import { FlowParserProcessor } from "../flowParser";
import { MockJsonFileCache } from "../../../cacheStrategy/jsonFile.mock";

const testInputSources = {
const getTestData = () => ({
data_list: { test_data_list: [{ id: 1 }, { id: 2 }, { id: 3 }] },
};
});

/** yarn workspace scripts test -t data_pipe.parser.spec.ts */
describe("data_pipe Parser", () => {
let parser: DataPipeParser;
beforeEach(() => {
// HACK - setup parser with in-memory cache to avoid writing to disk
parser = new DataPipeParser(new FlowParserProcessor(null as any));
parser.flowProcessor.cache = new MockJsonFileCache();
parser.flowProcessor.processedFlowHashmap = getTestData();
});
it("Populates generated data lists", async () => {
const parser = new DataPipeParser({ processedFlowHashmap: testInputSources } as any);
const output = parser.run({
parser.run({
flow_name: "test_pipe_parse",
flow_type: "data_pipe",
rows: [
Expand All @@ -27,24 +36,18 @@ describe("data_pipe Parser", () => {
},
],
}) as FlowTypes.DataPipeFlow;
expect(output._generated.data_list).toEqual({
test_output_1: {
flow_name: "test_output_1",
flow_subtype: "generated",
flow_type: "data_list",
rows: [{ id: 2 }, { id: 3 }],
},
test_output_2: {
flow_name: "test_output_2",
flow_subtype: "generated",
flow_type: "data_list",
rows: [{ id: 3 }],
},

// Ensure all flow processing completed, included deferred processing of generated
await parser.flowProcessor.queue.onIdle();

expect(parser.flowProcessor.processedFlowHashmap.data_list).toEqual({
...getTestData().data_list,
test_output_1: [{ id: 2 }, { id: 3 }],
test_output_2: [{ id: 3 }],
});
});

it("Allows outputs from one pipe to be used in another", () => {
const parser = new DataPipeParser({ processedFlowHashmap: testInputSources } as any);
it("Allows outputs from one pipe to be used in another", async () => {
const ops1: IDataPipeOperation[] = [
{
input_source: "test_data_list",
Expand All @@ -54,10 +57,10 @@ describe("data_pipe Parser", () => {
},
];
parser.run({ flow_name: "flow1", flow_type: "data_pipe", rows: ops1 });
expect(parser.flowProcessor.processedFlowHashmap.data_list.test_output_1).toEqual([
{ id: 2 },
{ id: 3 },
]);
expect(parser["outputHashmap"].flow1).toEqual({ test_output_1: [{ id: 2 }, { id: 3 }] });

// Ensure all flow processing completed, included deferred processing of generated
await parser.flowProcessor.queue.onIdle();
const ops2: IDataPipeOperation[] = [
{
input_source: "test_output_1",
Expand All @@ -66,47 +69,37 @@ describe("data_pipe Parser", () => {
output_target: "test_output_2",
},
];
const output = parser.run({
parser.run({
flow_name: "flow2",
flow_type: "data_pipe",
rows: ops2,
}) as FlowTypes.DataPipeFlow;
expect(output._generated.data_list).toEqual({
test_output_2: {
flow_name: "test_output_2",
flow_subtype: "generated",
flow_type: "data_list",
rows: [{ id: 3 }],
},
expect(parser["outputHashmap"].flow2).toEqual({
test_output_2: [{ id: 3 }],
});
});

it("Defers processing when inputs not available", async () => {
let deferred = [];
const parser = new DataPipeParser({
processedFlowHashmap: {},
deferInputProcess: (input: any, id: string) => {
deferred.push(id);
},
} as any);
it("Defers processing when inputs not available max 5 times", async () => {
parser.flowProcessor.processedFlowHashmap = {};

parser.run({
flow_name: "test_pipe_defer",
flow_type: "data_pipe",
rows: [{ input_source: "missing_list" }],
});
expect(deferred).toEqual(["data_pipe.test_pipe_defer"]);
await parser.flowProcessor.queue.onIdle();
expect(parser.flowProcessor.deferredCounter).toEqual({ "data_pipe.test_pipe_defer": 5 });
});

// QA - https://github.com/IDEMSInternational/parenting-app-ui/issues/2184
// NOTE - test case more explicitly handled by jsEvaluator.spec
it("Supports text with line break characters", async () => {
const parser = new DataPipeParser({
processedFlowHashmap: {
data_list: {
test_data_list: [{ id: 1, text: "normal" }, { id: 2, text: "line\nbreak" }, { id: 3 }],
},
parser.flowProcessor.processedFlowHashmap = {
data_list: {
test_data_list: [{ id: 1, text: "normal" }, { id: 2, text: "line\nbreak" }, { id: 3 }],
},
} as any);
};

const ops: IDataPipeOperation[] = [
{
input_source: "test_data_list",
Expand All @@ -115,14 +108,16 @@ describe("data_pipe Parser", () => {
output_target: "test_output",
},
];
const output = parser.run({
parser.run({
flow_name: "test_line_breaks",
flow_type: "data_pipe",
rows: ops,
});
expect(output._generated.data_list.test_output.rows).toEqual([
{ id: 1, text: "normal" },
{ id: 2, text: "line\nbreak" },
]);
expect(parser["outputHashmap"].test_line_breaks).toEqual({
test_output: [
{ id: 1, text: "normal" },
{ id: 2, text: "line\nbreak" },
],
});
});
});
Loading

0 comments on commit c0cdce2

Please sign in to comment.