Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(parser): add output references to for gen flows #2452

Merged
merged 5 commits into from
Oct 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions packages/data-models/flowTypes.ts
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,8 @@ export namespace FlowTypes {
export interface DataPipeFlow extends FlowTypeWithData {
flow_type: "data_pipe";
rows: IDataPipeOperation[];
/** Generated list of output flows created by generator */
_output_flows?: FlowTypeBase[];
}
export interface GeneratorFlow extends FlowTypeWithData {
flow_type: "generator";
Expand All @@ -91,6 +93,8 @@ export namespace FlowTypes {
output_flow_subtype?: string;
output_flow_type?: FlowType;
};
/** Generated list of output flows created by generator */
_output_flows?: FlowTypeBase[];
}
export interface Translation_strings {
[sourceText: string]: string;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ describe("App Data Converter", () => {
it("Populates output to folder by data type", async () => {
await converter.run();
const outputFolders = readdirSync(paths.outputFolder);
expect(outputFolders).toEqual(["data_list", "template"]);
expect(outputFolders).toEqual(["data_list", "data_pipe", "template"]);
});
it("Supports input from multiple source folders", async () => {
const multipleSourceConverter = new AppDataConverter({
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ describe("FlowParser Processor", () => {
it("Outputs flows by type", async () => {
const output = await processor.process(testInputs);
// NOTE - data_pipe and generator flows will not populate self but instead generated outputs
expect(Object.keys(output)).toEqual(["data_list", "template"]);
expect(Object.keys(output)).toEqual(["data_list", "template", "data_pipe"]);
const errorLogs = getLogs("error");
if (errorLogs.length > 0) {
console.log("Unexpected Errors:\n", errorLogs);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import { IConverterPaths, IFlowHashmapByType, IParsedWorkbookData } from "../../
import { arrayToHashmap, groupJsonByKey, IContentsEntry } from "../../utils";
import BaseProcessor from "../base";

const cacheVersion = 20240924.4;
const cacheVersion = 20241001.2;

export class FlowParserProcessor extends BaseProcessor<FlowTypes.FlowTypeWithData> {
public parsers: { [flowType in FlowTypes.FlowType]: Parsers.DefaultParser } = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ describe("data_pipe Parser", () => {
parser.flowProcessor.processedFlowHashmap = getTestData();
});
it("Populates generated data lists", async () => {
parser.run({
const res = parser.run({
flow_name: "test_pipe_parse",
flow_type: "data_pipe",
rows: [
Expand All @@ -45,6 +45,20 @@ describe("data_pipe Parser", () => {
test_output_1: [{ id: 2 }, { id: 3 }],
test_output_2: [{ id: 3 }],
});

// Also check output references stored
expect(res._output_flows).toEqual([
{
flow_type: "data_list",
flow_subtype: "generated",
flow_name: "test_output_1",
},
{
flow_type: "data_list",
flow_subtype: "generated",
flow_name: "test_output_2",
},
]);
});

it("Allows outputs from one pipe to be used in another", async () => {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ export class DataPipeParser extends DefaultParser<FlowTypes.DataPipeFlow> {
private outputHashmap: { [flow_name: string]: { [output_name: string]: any } } = {};

/** If extending the class add additional postprocess pipeline here */

public postProcessFlow(flow: FlowTypes.DataPipeFlow): FlowTypes.DataPipeFlow {
const inputSources = this.loadInputSources();
const pipe = new DataPipe(flow.rows, inputSources);
Expand All @@ -24,30 +23,31 @@ export class DataPipeParser extends DefaultParser<FlowTypes.DataPipeFlow> {
// HACK - populate to output hashmap for use in tests. Clone output due to deep nest issues
this.outputHashmap[flow.flow_name] = JSON.parse(JSON.stringify(outputs));

this.populateGeneratedFlows(outputs);
// As the populated flows will be passed directly to the processor queue
// can just return undefined so that the data pipe will not be stored in outputs
return undefined;
const generated = this.generateFlows(outputs);

// Pass all generated flows to the back of the current processing queue so that they can be
// populated to processed hashmap and referenced from other processes as required
for (const generatedFlow of generated) {
const deferId = `${generatedFlow.flow_type}.${generatedFlow.flow_subtype}.${generatedFlow.flow_name}`;
this.flowProcessor.deferInputProcess(generatedFlow, deferId);
}

// Return the parsed flow along with a summary of output flows to store within outputs
flow._output_flows = generated.map(({ rows, ...keptFields }) => keptFields);
return flow;
} catch (error) {
console.trace(error);
throw error;
}
}

private populateGeneratedFlows(outputs: { [output_name: string]: any[] }) {
for (const [flow_name, rows] of Object.entries(outputs)) {
const flow: FlowTypes.FlowTypeWithData = {
flow_name,
flow_subtype: "generated",
flow_type: "data_list",
rows,
};
const deferId = `${flow.flow_type}.${flow.flow_subtype}.${flow.flow_name}`;

// Pass all generated flows to the back of the current processing queue so that they can be
// populated to processed hashmap and referenced from other processes as required
this.flowProcessor.deferInputProcess(flow, deferId);
}
private generateFlows(outputs: { [output_name: string]: any[] }) {
const generatedFlows: FlowTypes.Data_list[] = Object.entries(outputs).map(
([flow_name, rows]) => {
return { flow_name, flow_subtype: "generated", flow_type: "data_list", rows };
}
);
return generatedFlows;
}

private loadInputSources() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,21 @@ describe("generator Parser", () => {
},
]);
});
it("populates list of outputs", async () => {
const res = parser.run(generatorInput()) as FlowTypes.GeneratorFlow;
expect(res._output_flows).toEqual([
{
flow_type: "template",
flow_subtype: "generated_type_1",
flow_name: "generated_template_1",
},
{
flow_type: "template",
flow_subtype: "generated_type_2",
flow_name: "generated_template_2",
},
]);
});
it("parses generated flows using type parser", async () => {
parser.run(generatorInput());
await parser.flowProcessor.queue.onIdle();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,9 @@ export class GeneratorParser extends DefaultParser<FlowTypes.GeneratorFlow> {
const deferId = `${generatedFlow.flow_type}.${generatedFlow.flow_subtype}.${generatedFlow.flow_name}`;
this.flowProcessor.deferInputProcess(generatedFlow, deferId);
}
// As the populated flows will be passed directly to the processor queue
// can just return undefined so that the data pipe will not be stored in outputs
return undefined;
// Return the parsed generator along with a summary of output flows to store within outputs
flow._output_flows = generated.map(({ rows, ...keptFields }) => keptFields);
return flow;
} catch (error) {
console.trace(error);
throw error;
Expand Down
Loading