Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix: generator chained flows #2440

Merged
merged 12 commits into from
Oct 1, 2024
3 changes: 2 additions & 1 deletion .idems_app/deployments/local/.gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,5 @@ cache
tasks
config.json
sheets/
!sheets/demo.xlsx
!sheets/demo.xlsx
reports
2 changes: 0 additions & 2 deletions packages/data-models/flowTypes.ts
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,6 @@ export namespace FlowTypes {
rows: any[];
/** Datalists populate rows as a hashmap instead to allow easier access to nested structures */
rowsHashmap?: { [id: string]: any };
/** Additional flows generated during parsing, such as data pipe or generator flow outputs */
_generated?: { [flow_type in FlowType]?: { [flow_name: string]: FlowTypeWithData } };
}

/*********************************************************************************************
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
import { TimeLike } from "fs";
import { JsonFileCache } from "./jsonFile";
import type { IContentsEntry } from "shared";

interface IContentsEntryWithValue extends IContentsEntry {
value?: any;
}

/**
* Mock implementation of JsonFileCache that only uses in-memory caching
* instead of persisting files to disk
*/
export class MockJsonFileCache extends JsonFileCache {
private mockContents: Record<string, IContentsEntryWithValue> = {};

constructor() {
super("", 0);
}

public add(data: any, entryName?: string, stats?: { mtime: TimeLike }) {
if (data) {
if (!entryName) {
entryName = this.generateCacheEntryName(data);
}
if (!this.mockContents[entryName]) {
this.mockContents[entryName] = {} as any;
}
this.mockContents[entryName].value = data;
return { filePath: entryName, entryName, data };
}
}

public clear() {
this.mockContents = {};
}

public remove(entryName: string) {
if (this.mockContents.hasOwnProperty(entryName)) {
delete this.mockContents[entryName];
}
}

public get<T = any>(entryName: string) {
return this.mockContents[entryName] as T;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,10 @@ export class JsonFileCache {

constructor(folderPath: string, version: number) {
this.version = version;
this.setup(folderPath);
// HACK - support tests by avoiding setup if folderPath not provided
if (folderPath) {
this.setup(folderPath);
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,14 +34,15 @@ jest.spyOn(ActiveDeployment, "get").mockReturnValue(mockDeployment as IDeploymen
/** yarn workspace scripts test -t convert.spec.ts */
describe("App Data Converter", () => {
let converter: AppDataConverter;
beforeAll(() => {

beforeEach(() => {
ensureDirSync(paths.outputFolder);
emptyDirSync(paths.outputFolder);
ensureDirSync(paths.cacheFolder);
emptyDirSync(paths.cacheFolder);
});
beforeEach(() => {
converter = new AppDataConverter(paths);
// HACK - Tests failing on CI due to logs persisting between runs
clearLogs(true);
});

it("Uses child caches", async () => {
Expand All @@ -64,7 +65,7 @@ describe("App Data Converter", () => {
it("Populates output to folder by data type", async () => {
await converter.run();
const outputFolders = readdirSync(paths.outputFolder);
expect(outputFolders).toEqual(["data_list", "data_pipe", "template"]);
expect(outputFolders).toEqual(["data_list", "template"]);
});
it("Supports input from multiple source folders", async () => {
const multipleSourceConverter = new AppDataConverter({
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,10 @@ class BaseProcessor<T = any, V = any> {
constructor(public context: { namespace: string; paths: IConverterPaths; cacheVersion: number }) {
const { namespace } = context;
this.logger = createChildFileLogger({ source: namespace });
this.setupCache();
// HACK - support tests by avoiding setup if folderPath not provided
if (context.paths) {
this.setupCache();
}
}
/**
* Create a namespaced cache folder and populate a list of all files currently cached,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,8 @@ describe("FlowParser Processor", () => {
});
it("Outputs flows by type", async () => {
const output = await processor.process(testInputs);
expect(Object.keys(output)).toEqual(["data_list", "template", "data_pipe"]);
// NOTE - data_pipe and generator flows will not populate self but instead generated outputs
expect(Object.keys(output)).toEqual(["data_list", "template"]);
const errorLogs = getLogs("error");
if (errorLogs.length > 0) {
console.log("Unexpected Errors:\n", errorLogs);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import { IConverterPaths, IFlowHashmapByType, IParsedWorkbookData } from "../../
import { arrayToHashmap, groupJsonByKey, IContentsEntry } from "../../utils";
import BaseProcessor from "../base";

const cacheVersion = 20240502.0;
const cacheVersion = 20240924.4;

export class FlowParserProcessor extends BaseProcessor<FlowTypes.FlowTypeWithData> {
public parsers: { [flowType in FlowTypes.FlowType]: Parsers.DefaultParser } = {
Expand Down Expand Up @@ -64,10 +64,8 @@ export class FlowParserProcessor extends BaseProcessor<FlowTypes.FlowTypeWithDat

public updateProcessedFlowHashmap(flow: FlowTypes.FlowTypeWithData) {
const { flow_name, flow_type, _xlsxPath } = flow;
if (!this.processedFlowHashmap[flow_type]) {
this.processedFlowHashmap[flow_type] = {};
this.processedFlowHashmapWithMeta[flow_type] = {};
}
this.processedFlowHashmap[flow_type] ??= {};
this.processedFlowHashmapWithMeta[flow_type] ??= {};
// NOTE - duplicate flows are identified up during main converter
this.processedFlowHashmap[flow_type][flow_name] = flow.rows;
this.processedFlowHashmapWithMeta[flow_type][flow_name] = flow;
Expand All @@ -93,49 +91,15 @@ export class FlowParserProcessor extends BaseProcessor<FlowTypes.FlowTypeWithDat
return k;
});
}
// populate any generated flows to main list
const flowTypesWithGenerated = this.populateGeneratedFlows(flowHashmapByType);

// convert back from hashmap to hashArrays for final output
const outputData: IParsedWorkbookData = {};
for (const [type, typeHashmap] of Object.entries(flowTypesWithGenerated)) {
for (const [type, typeHashmap] of Object.entries(flowHashmapByType)) {
outputData[type] = Object.values(typeHashmap);
}
return outputData;
}

/**
* Iterate over all flows to check for any that populate additional _generated flows
* that should be extracted to top-level
*/
private populateGeneratedFlows(flowsByType: IFlowHashmapByType) {
// handle any additional methods that operate on full list of processed flows,
// e.g. populating additional generated flows
for (const typeFlows of Object.values(flowsByType)) {
for (const { _generated, ...flow } of Object.values(typeFlows)) {
if (_generated) {
// remove _generated field from flow
flowsByType[flow.flow_type][flow.flow_name] = flow;
// populate generated to main list, ensure generated flows are also fully processed
for (const generatedFlows of Object.values(_generated)) {
for (const generatedFlow of Object.values(generatedFlows)) {
flowsByType[generatedFlow.flow_type] ??= {};
if (flowsByType[generatedFlow.flow_type][generatedFlow.flow_name]) {
this.logger.error({
message: "Generated flow will override existing flow",
details: [generatedFlow.flow_type, generatedFlow.flow_name],
});
}
const processed = this.processInput(JSON.parse(JSON.stringify(generatedFlow)));
flowsByType[generatedFlow.flow_type][generatedFlow.flow_name] = processed;
}
}
}
}
}
return flowsByType;
}

public shouldUseCachedEntry(
input: FlowTypes.FlowTypeWithData,
cachedEntry: IContentsEntry
Expand Down
Original file line number Diff line number Diff line change
@@ -1,15 +1,24 @@
import { FlowTypes } from "data-models";
import type { IDataPipeOperation } from "shared";
import { DataPipeParser } from "./data_pipe.parser";
import { FlowParserProcessor } from "../flowParser";
import { MockJsonFileCache } from "../../../cacheStrategy/jsonFile.mock";

const testInputSources = {
const getTestData = () => ({
data_list: { test_data_list: [{ id: 1 }, { id: 2 }, { id: 3 }] },
};
});

/** yarn workspace scripts test -t data_pipe.parser.spec.ts */
describe("data_pipe Parser", () => {
let parser: DataPipeParser;
beforeEach(() => {
// HACK - setup parser with in-memory cache to avoid writing to disk
parser = new DataPipeParser(new FlowParserProcessor(null as any));
parser.flowProcessor.cache = new MockJsonFileCache();
parser.flowProcessor.processedFlowHashmap = getTestData();
});
it("Populates generated data lists", async () => {
const parser = new DataPipeParser({ processedFlowHashmap: testInputSources } as any);
const output = parser.run({
parser.run({
flow_name: "test_pipe_parse",
flow_type: "data_pipe",
rows: [
Expand All @@ -27,24 +36,18 @@ describe("data_pipe Parser", () => {
},
],
}) as FlowTypes.DataPipeFlow;
expect(output._generated.data_list).toEqual({
test_output_1: {
flow_name: "test_output_1",
flow_subtype: "generated",
flow_type: "data_list",
rows: [{ id: 2 }, { id: 3 }],
},
test_output_2: {
flow_name: "test_output_2",
flow_subtype: "generated",
flow_type: "data_list",
rows: [{ id: 3 }],
},

// Ensure all flow processing completed, included deferred processing of generated
await parser.flowProcessor.queue.onIdle();

expect(parser.flowProcessor.processedFlowHashmap.data_list).toEqual({
...getTestData().data_list,
test_output_1: [{ id: 2 }, { id: 3 }],
test_output_2: [{ id: 3 }],
});
});

it("Allows outputs from one pipe to be used in another", () => {
const parser = new DataPipeParser({ processedFlowHashmap: testInputSources } as any);
it("Allows outputs from one pipe to be used in another", async () => {
const ops1: IDataPipeOperation[] = [
{
input_source: "test_data_list",
Expand All @@ -54,10 +57,10 @@ describe("data_pipe Parser", () => {
},
];
parser.run({ flow_name: "flow1", flow_type: "data_pipe", rows: ops1 });
expect(parser.flowProcessor.processedFlowHashmap.data_list.test_output_1).toEqual([
{ id: 2 },
{ id: 3 },
]);
expect(parser["outputHashmap"].flow1).toEqual({ test_output_1: [{ id: 2 }, { id: 3 }] });

// Ensure all flow processing completed, included deferred processing of generated
await parser.flowProcessor.queue.onIdle();
const ops2: IDataPipeOperation[] = [
{
input_source: "test_output_1",
Expand All @@ -66,47 +69,37 @@ describe("data_pipe Parser", () => {
output_target: "test_output_2",
},
];
const output = parser.run({
parser.run({
flow_name: "flow2",
flow_type: "data_pipe",
rows: ops2,
}) as FlowTypes.DataPipeFlow;
expect(output._generated.data_list).toEqual({
test_output_2: {
flow_name: "test_output_2",
flow_subtype: "generated",
flow_type: "data_list",
rows: [{ id: 3 }],
},
expect(parser["outputHashmap"].flow2).toEqual({
test_output_2: [{ id: 3 }],
});
});

it("Defers processing when inputs not available", async () => {
let deferred = [];
const parser = new DataPipeParser({
processedFlowHashmap: {},
deferInputProcess: (input: any, id: string) => {
deferred.push(id);
},
} as any);
it("Defers processing when inputs not available max 5 times", async () => {
parser.flowProcessor.processedFlowHashmap = {};

parser.run({
flow_name: "test_pipe_defer",
flow_type: "data_pipe",
rows: [{ input_source: "missing_list" }],
});
expect(deferred).toEqual(["data_pipe.test_pipe_defer"]);
await parser.flowProcessor.queue.onIdle();
expect(parser.flowProcessor.deferredCounter).toEqual({ "data_pipe.test_pipe_defer": 5 });
});

// QA - https://github.com/IDEMSInternational/parenting-app-ui/issues/2184
// NOTE - test case more explicitly handled by jsEvaluator.spec
it("Supports text with line break characters", async () => {
const parser = new DataPipeParser({
processedFlowHashmap: {
data_list: {
test_data_list: [{ id: 1, text: "normal" }, { id: 2, text: "line\nbreak" }, { id: 3 }],
},
parser.flowProcessor.processedFlowHashmap = {
data_list: {
test_data_list: [{ id: 1, text: "normal" }, { id: 2, text: "line\nbreak" }, { id: 3 }],
},
} as any);
};

const ops: IDataPipeOperation[] = [
{
input_source: "test_data_list",
Expand All @@ -115,14 +108,16 @@ describe("data_pipe Parser", () => {
output_target: "test_output",
},
];
const output = parser.run({
parser.run({
flow_name: "test_line_breaks",
flow_type: "data_pipe",
rows: ops,
});
expect(output._generated.data_list.test_output.rows).toEqual([
{ id: 1, text: "normal" },
{ id: 2, text: "line\nbreak" },
]);
expect(parser["outputHashmap"].test_line_breaks).toEqual({
test_output: [
{ id: 1, text: "normal" },
{ id: 2, text: "line\nbreak" },
],
});
});
});
Loading
Loading