Skip to content

Commit

Permalink
Merge pull request #989 from polywrap/nk/improve-workflow
Browse files Browse the repository at this point in the history
feat(cli): add job status in job result object
  • Loading branch information
dOrgJelli authored Jun 30, 2022
2 parents f10af9f + fb1a828 commit 0b5a2db
Show file tree
Hide file tree
Showing 7 changed files with 92 additions and 38 deletions.
1 change: 1 addition & 0 deletions packages/cli/lang/en.json
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,7 @@
"commands_run_error_noApi": "API needs to be initialized",
"commands_run_error_readFail": "Failed to read query {query}",
"commands_run_error_unsupportedOutputFileExt": "Unsupported outputFile extention: ${outputFileExt}",
"commands_run_error_cueDoesNotExist": "Require cue to run validator, checkout https://cuelang.org/ for more information",
"commands_run_error_noWorkflowScriptFound": "Workflow script not found at path: {path}",
"commands_run_error_noTestEnvFound": "polywrap test-env not found, please run 'polywrap infra up --modules=eth-ens-ipfs'",
"commands_testEnv_description": "Manage a test environment for Polywrap",
Expand Down
1 change: 1 addition & 0 deletions packages/cli/lang/es.json
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,7 @@
"commands_run_error_noApi": "API needs to be initialized",
"commands_run_error_readFail": "Failed to read query {query}",
"commands_run_error_unsupportedOutputFileExt": "Unsupported outputFile extention: ${outputFileExt}",
"commands_run_error_cueDoesNotExist": "Require cue to run validator, checkout https://cuelang.org/ for more information",
"commands_run_error_noWorkflowScriptFound": "Workflow script not found at path: {path}",
"commands_run_error_noTestEnvFound": "polywrap test-env not found, please run 'polywrap infra up --modules=eth-ens-ipfs'",
"commands_testEnv_description": "Manage a test environment for Polywrap",
Expand Down
6 changes: 4 additions & 2 deletions packages/cli/src/commands/run.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import {
validateOutput,
} from "../lib";

import { InvokeResult, Workflow } from "@polywrap/core-js";
import { InvokeResult, Workflow, JobResult } from "@polywrap/core-js";
import { PolywrapClient, PolywrapClientConfig } from "@polywrap/client-js";
import path from "path";
import yaml from "js-yaml";
Expand Down Expand Up @@ -85,7 +85,9 @@ const _run = async (workflowPath: string, options: WorkflowCommandOptions) => {
workflow,
config: clientConfig,
ids: jobs,
onExecution: async (id: string, data: unknown, error: Error) => {
onExecution: async (id: string, jobResult: JobResult) => {
const { data, error } = jobResult;

if (!quiet) {
console.log("-----------------------------------");
console.log(`ID: ${id}`);
Expand Down
16 changes: 13 additions & 3 deletions packages/cli/src/lib/helpers/workflow-validator.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import { runCommand } from "../system";
import { intlMsg } from "../intl";

import fs from "fs";
import { InvokeResult } from "@polywrap/core-js";
Expand All @@ -19,6 +20,10 @@ export async function validateOutput(
result: InvokeResult,
validateScriptPath: string
): Promise<void> {
if (!(await cueExists())) {
console.warn(intlMsg.commands_run_error_cueDoesNotExist());
}

const index = id.lastIndexOf(".");
const jobId = id.substring(0, index);
const stepId = id.substring(index + 1);
Expand All @@ -29,12 +34,17 @@ export async function validateOutput(
await fs.promises.writeFile(jsonOutput, JSON.stringify(result, null, 2));

try {
await runCommand(
`cue vet -d ${selector} ${validateScriptPath} ${jsonOutput}`
const { stderr } = await runCommand(
`cue vet -d ${selector} ${validateScriptPath} ${jsonOutput}`,
true
);

if (stderr) {
console.error(stderr);
console.log("-----------------------------------");
}
} catch (e) {
console.error(e);
console.log(e);
console.log("-----------------------------------");
process.exitCode = 1;
}
Expand Down
10 changes: 5 additions & 5 deletions packages/js/client/src/__tests__/e2e/workflow.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import {
ensAddresses,
providers
} from "@polywrap/test-env-js";
import { createPolywrapClient, PolywrapClient, PolywrapClientConfig } from "../..";
import { createPolywrapClient, JobResult, PolywrapClient, PolywrapClientConfig } from "../..";
import { outPropWorkflow, sanityWorkflow } from "./workflow-test-cases";

jest.setTimeout(200000);
Expand Down Expand Up @@ -89,17 +89,17 @@ describe("workflow", () => {
test("sanity workflow", async () => {
await client.run({
workflow: sanityWorkflow,
onExecution: async (id: string, data: unknown, error: unknown) => {
await tests[id](data, error);
onExecution: async (id: string, jobResult: JobResult) => {
await tests[id](jobResult.data, jobResult.error);
},
});
});

test("workflow with output propagation", async () => {
await client.run({
workflow: outPropWorkflow,
onExecution: async (id: string, data: unknown, error: unknown) => {
await tests[id](data, error);
onExecution: async (id: string, jobResult: JobResult) => {
await tests[id](jobResult.data, jobResult.error);
},
});
});
Expand Down
17 changes: 12 additions & 5 deletions packages/js/core/src/types/Workflow.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,17 @@ export type Workflow<TUri extends Uri | string = string> = {
jobs: Job<TUri>;
};

export enum JobStatus {
SUCCEED,
FAILED,
SKIPPED,
}

export interface JobResult<TData extends unknown = unknown>
extends InvokeResult<TData> {
status: JobStatus;
}

export interface RunOptions<
TData extends Record<string, unknown> = Record<string, unknown>,
TUri extends Uri | string = string
Expand All @@ -32,11 +43,7 @@ export interface RunOptions<
contextId?: string;
ids?: string[];

onExecution?(
id: string,
data?: InvokeResult<TData>["data"],
error?: InvokeResult<TData>["error"]
): MaybeAsync<void>;
onExecution?(id: string, jobResult: JobResult<TData>): MaybeAsync<void>;
}

export interface WorkflowHandler {
Expand Down
79 changes: 56 additions & 23 deletions packages/js/core/src/workflow/JobRunner.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
import {
Client,
executeMaybeAsyncFunction,
InvokeResult,
Job,
JobResult,
JobStatus,
MaybeAsync,
Uri,
} from "../types";
Expand All @@ -19,14 +20,13 @@ export class JobRunner<
TData extends unknown = unknown,
TUri extends Uri | string = string
> {
private jobOutput: Map<string, InvokeResult<TData>>;
private jobOutput: Map<string, JobResult<TData>>;

constructor(
private client: Client,
private onExecution?: (
id: string,
data?: InvokeResult<TData>["data"],
error?: InvokeResult<TData>["error"]
JobResult: JobResult<TData>
) => MaybeAsync<void>
) {
this.jobOutput = new Map();
Expand All @@ -45,27 +45,47 @@ export class JobRunner<
const steps = jobs[jobId].steps;
if (steps) {
for (let i = 0; i < steps.length; i++) {
let result: JobResult<TData> | undefined;
let args: Record<string, unknown> | undefined;

const step = steps[i];
const absoluteId = parentId
? `${parentId}.${jobId}.${i}`
: `${jobId}.${i}`;
const args = this.resolveArgs(absoluteId, step.args);
const result = await this.client.invoke<TData, TUri>({
uri: step.uri,
method: step.method,
config: step.config,
args: args,
});

this.jobOutput.set(absoluteId, result);

if (this.onExecution && typeof this.onExecution === "function") {
await executeMaybeAsyncFunction(
this.onExecution,
absoluteId,
result.data,
result.error
);
try {
args = this.resolveArgs(absoluteId, step.args);
} catch (e) {
result = {
error: e,
status: JobStatus.SKIPPED,
};
}

if (args) {
const invokeResult = await this.client.invoke<TData, TUri>({
uri: step.uri,
method: step.method,
config: step.config,
args: args,
});

if (invokeResult.error) {
result = { ...invokeResult, status: JobStatus.FAILED };
} else {
result = { ...invokeResult, status: JobStatus.SUCCEED };
}
}

if (result) {
this.jobOutput.set(absoluteId, result);

if (this.onExecution && typeof this.onExecution === "function") {
await executeMaybeAsyncFunction(
this.onExecution,
absoluteId,
result
);
}
}
}
}
Expand Down Expand Up @@ -124,8 +144,21 @@ export class JobRunner<
}
}
const output = outputs.get(absStepId);
if (output && output[dataOrErr]) {
return output[dataOrErr];
if (
output &&
dataOrErr === "data" &&
output.status === JobStatus.SUCCEED &&
output.data
) {
return output.data;
}
if (
output &&
dataOrErr === "error" &&
output.status === JobStatus.FAILED &&
output.error
) {
return output.error;
}
}

Expand Down

0 comments on commit 0b5a2db

Please sign in to comment.