From 87e38c356287a42ad8b54f38707ecb0b421a9c65 Mon Sep 17 00:00:00 2001 From: j03-dev <24nomeniavo@gmail.com> Date: Fri, 18 Oct 2024 20:20:13 +0300 Subject: [PATCH 1/7] type Data && fix python syntax --- .../src/runtimes/substantial/agent.ts | 78 +++++++++++-------- .../src/runtimes/substantial/types.ts | 26 ++++++- .../src/runtimes/substantial/worker.ts | 46 ++++++----- .../substantial/workflow_worker_manager.ts | 23 +++--- .../python/typegraph/runtimes/substantial.py | 13 +++- 5 files changed, 118 insertions(+), 68 deletions(-) diff --git a/src/typegate/src/runtimes/substantial/agent.ts b/src/typegate/src/runtimes/substantial/agent.ts index 966ce460f3..709d29faa2 100644 --- a/src/typegate/src/runtimes/substantial/agent.ts +++ b/src/typegate/src/runtimes/substantial/agent.ts @@ -2,17 +2,17 @@ import { AddScheduleInput, Backend, NextRun, - Run, ReadOrCloseScheduleInput, + Run, } from "../../../engine/runtime.js"; import { getLogger } from "../../log.ts"; import { TaskContext } from "../deno/shared_types.ts"; import { + appendIfOngoing, Interrupt, Result, WorkerData, WorkflowResult, - appendIfOngoing, } from "./types.ts"; import { RunId, WorkerManager } from "./workflow_worker_manager.ts"; @@ -39,7 +39,7 @@ export class Agent { private backend: Backend, private queue: string, private config: AgentConfig, - private internalTCtx: TaskContext + private internalTCtx: TaskContext, ) {} async schedule(input: AddScheduleInput) { @@ -56,7 +56,7 @@ export class Agent { }); } catch (err) { logger.warn( - `Failed writing log metadata for schedule "${schedule}" (${runId}), skipping it: ${err}` + `Failed writing log metadata for schedule "${schedule}" (${runId}), skipping it: ${err}`, ); } } @@ -89,9 +89,11 @@ export class Agent { this.workflows = workflows; logger.warn( - `Initializing agent to handle ${workflows - .map(({ name }) => name) - .join(", ")}` + `Initializing agent to handle ${ + workflows + .map(({ name }) => name) + .join(", ") + }`, ); this.pollIntervalHandle = setInterval(async () => { @@ -132,7 +134,7 @@ export class Agent { for (const workflow of this.workflows) { const requests = replayRequests.filter( - ({ run_id }) => Agent.parseWorkflowName(run_id) == workflow.name + ({ run_id }) => Agent.parseWorkflowName(run_id) == workflow.name, ); while (requests.length > 0) { @@ -143,7 +145,7 @@ export class Agent { await this.#replay(next, workflow); } catch (err) { logger.error( - `Replay failed for ${workflow.name} => ${JSON.stringify(next)}` + `Replay failed for ${workflow.name} => ${JSON.stringify(next)}`, ); logger.error(err); } finally { @@ -189,7 +191,7 @@ export class Agent { // necessarily represent the state of what is actually running on the current typegate node if (this.workerManager.isOngoing(next.run_id)) { logger.warn( - `skip triggering ${next.run_id} for the current tick as it is still ongoing` + `skip triggering ${next.run_id} for the current tick as it is still ongoing`, ); return; @@ -234,9 +236,11 @@ export class Agent { // A consequence of the above, a workflow is always triggered by gql { start(..) } // This can also occur if an event is sent from gql under a runId that is not valid (e.g. due to typo) logger.warn( - `First item in the operation list is not a Start, got "${JSON.stringify( - first - )}" instead. Closing the underlying schedule.` + `First item in the operation list is not a Start, got "${ + JSON.stringify( + first, + ) + }" instead. Closing the underlying schedule.`, ); await Meta.substantial.storeCloseSchedule(schedDef); @@ -251,12 +255,12 @@ export class Agent { run, next.schedule_date, first.event.kwargs, - this.internalTCtx + this.internalTCtx, ); this.workerManager.listen( next.run_id, - this.#eventResultHandlerFor(workflow.name, next.run_id) + this.#eventResultHandlerFor(workflow.name, next.run_id), ); } catch (err) { throw err; @@ -279,7 +283,7 @@ export class Agent { // All Worker/Runner non-user issue should fall here // Note: Should never throw (typegate will panic), this will run in a worker logger.error( - `result error for "${runId}": ${JSON.stringify(result.payload)}` + `result error for "${runId}": ${JSON.stringify(result.payload)}`, ); return; } @@ -307,7 +311,7 @@ export class Agent { startedAt, workflowName, runId, - ret + ret, ); break; } @@ -318,9 +322,9 @@ export class Agent { } default: logger.error( - `Fatal: invalid type ${ - answer.type - } sent by "${runId}": ${JSON.stringify(answer.data)}` + `Fatal: invalid type ${answer.type} sent by "${runId}": ${ + JSON.stringify(answer.data) + }`, ); } }; @@ -329,7 +333,7 @@ export class Agent { async #workflowHandleInterrupts( workflowName: string, runId: string, - { result, schedule, run }: WorkflowResult + { result, schedule, run }: WorkflowResult, ) { this.workerManager.destroyWorker(workflowName, runId); // ! @@ -378,14 +382,16 @@ export class Agent { startedAt: Date, workflowName: string, runId: string, - { result, kind, schedule, run }: WorkflowResult + { result, kind, schedule, run }: WorkflowResult, ) { this.workerManager.destroyWorker(workflowName, runId); logger.info( - `gracefull completion of "${runId}" (${kind}): ${JSON.stringify( - result - )} started at "${startedAt}"` + `gracefull completion of "${runId}" (${kind}): ${ + JSON.stringify( + result, + ) + } started at "${startedAt}"`, ); logger.info(`Append Stop ${runId}`); @@ -404,7 +410,7 @@ export class Agent { }); logger.info( - `Persist finalized records for "${workflowName}": ${result}" and closing everything..` + `Persist finalized records for "${workflowName}": ${result}" and closing everything..`, ); const _run = await Meta.substantial.storePersistRun({ @@ -451,13 +457,15 @@ function checkIfRunHasStopped(run: Run) { if (op.event.type == "Start") { if (life >= 1) { logger.error( - `bad logs: ${JSON.stringify( - run.operations.map(({ event }) => event.type) - )}` + `bad logs: ${ + JSON.stringify( + run.operations.map(({ event }) => event.type), + ) + }`, ); throw new Error( - `"${run.run_id}" has potentially corrupted logs, another run occured yet previous has not stopped` + `"${run.run_id}" has potentially corrupted logs, another run occured yet previous has not stopped`, ); } @@ -466,13 +474,15 @@ function checkIfRunHasStopped(run: Run) { } else if (op.event.type == "Stop") { if (life <= 0) { logger.error( - `bad logs: ${JSON.stringify( - run.operations.map(({ event }) => event.type) - )}` + `bad logs: ${ + JSON.stringify( + run.operations.map(({ event }) => event.type), + ) + }`, ); throw new Error( - `"${run.run_id}" has potentitally corrupted logs, attempted stopping already closed run, or run with a missing Start` + `"${run.run_id}" has potentitally corrupted logs, attempted stopping already closed run, or run with a missing Start`, ); } diff --git a/src/typegate/src/runtimes/substantial/types.ts b/src/typegate/src/runtimes/substantial/types.ts index e7defcf5bd..a16440db86 100644 --- a/src/typegate/src/runtimes/substantial/types.ts +++ b/src/typegate/src/runtimes/substantial/types.ts @@ -2,20 +2,40 @@ // SPDX-License-Identifier: Elastic-2.0 import { Operation, Run } from "../../../engine/runtime.js"; +import { TaskContext } from "../deno/shared_types.ts"; export type { + Backend, Operation, OperationEvent, Run, - Backend, } from "../../../engine/runtime.js"; export type AnyString = string & Record; export type WorkerEvent = "START" | AnyString; +export type TaskData = { + modulePath: string; + functionName: string; + run: Run; + kwargs: Record; + schedule: string; + internal: TaskContext; +}; + +export type ResultData = { + kind: string; + result: unknown; + run: Run; + schedule: string; + exception?: Error; +}; + +export type Data = TaskData | ResultData | string; + export type WorkerData = { type: WorkerEvent; - data: any; + data: Data; }; export type WorkerEventHandler = (message: Result) => Promise; @@ -33,7 +53,7 @@ export function Err(payload: E): Result { return { error: true, payload }; } -export function Msg(type: WorkerEvent, data: unknown): WorkerData { +export function Msg(type: WorkerEvent, data: Data): WorkerData { return { type, data }; } diff --git a/src/typegate/src/runtimes/substantial/worker.ts b/src/typegate/src/runtimes/substantial/worker.ts index fcbc626421..864c68bd9a 100644 --- a/src/typegate/src/runtimes/substantial/worker.ts +++ b/src/typegate/src/runtimes/substantial/worker.ts @@ -1,9 +1,8 @@ // Copyright Metatype OÜ, licensed under the Elastic License 2.0. // SPDX-License-Identifier: Elastic-2.0 -import { errorToString } from "../../worker_utils.ts"; import { Context } from "./deno_context.ts"; -import { Err, Msg, Ok, WorkerData, WorkflowResult } from "./types.ts"; +import { Err, Msg, Ok, TaskData, WorkerData, WorkflowResult } from "./types.ts"; let runCtx: Context | undefined; @@ -12,7 +11,7 @@ self.onmessage = async function (event) { switch (type) { case "START": { const { modulePath, functionName, run, schedule, kwargs, internal } = - data; + data as TaskData; // FIXME: handle case when script is missing and notify WorkerManager so it cleans up // its registry. const module = await import(modulePath); @@ -32,27 +31,36 @@ self.onmessage = async function (event) { .then((wfResult: unknown) => { self.postMessage( Ok( - Msg(type, { - kind: "SUCCESS", - result: wfResult, - run: runCtx!.getRun(), - schedule, - } satisfies WorkflowResult) - ) + Msg( + type, + { + kind: "SUCCESS", + result: wfResult, + run: runCtx!.getRun(), + schedule, + } satisfies WorkflowResult, + ), + ), ); }) .catch((wfException: unknown) => { self.postMessage( Ok( - Msg(type, { - kind: "FAIL", - result: errorToString(wfException), - exception: - wfException instanceof Error ? wfException : undefined, - run: runCtx!.getRun(), - schedule, - } satisfies WorkflowResult) - ) + Msg( + type, + { + kind: "FAIL", + result: wfException instanceof Error + ? wfException.message + : JSON.stringify(wfException), + exception: wfException instanceof Error + ? wfException + : undefined, + run: runCtx!.getRun(), + schedule, + } satisfies WorkflowResult, + ), + ), ); }); break; diff --git a/src/typegate/src/runtimes/substantial/workflow_worker_manager.ts b/src/typegate/src/runtimes/substantial/workflow_worker_manager.ts index 795d20975a..5d84546346 100644 --- a/src/typegate/src/runtimes/substantial/workflow_worker_manager.ts +++ b/src/typegate/src/runtimes/substantial/workflow_worker_manager.ts @@ -5,6 +5,7 @@ import { envSharedWithWorkers } from "../../config/shared.ts"; import { getLogger } from "../../log.ts"; import { TaskContext } from "../deno/shared_types.ts"; import { + Data, Err, Msg, Result, @@ -48,7 +49,7 @@ export class WorkflowRecorder { name: WorkflowName, runId: RunId, worker: WorkerRecord, - startedAt: Date + startedAt: Date, ) { if (!this.workflowRuns.has(name)) { this.workflowRuns.set(name, new Set()); @@ -83,7 +84,7 @@ export class WorkflowRecorder { if (this.workflowRuns.has(name)) { if (!record) { logger.warn( - `"${runId}" associated with "${name}" does not exist or has been already destroyed` + `"${runId}" associated with "${name}" does not exist or has been already destroyed`, ); return false; } @@ -140,7 +141,7 @@ export class WorkerManager { modulePath, worker, }, - new Date() + new Date(), ); } @@ -151,10 +152,12 @@ export class WorkerManager { destroyAllWorkers() { this.recorder.destroyAllWorkers(); logger.warn( - `Destroyed workers for ${this.recorder - .getRegisteredWorkflowNames() - .map((w) => `"${w}"`) - .join(", ")}` + `Destroyed workers for ${ + this.recorder + .getRegisteredWorkflowNames() + .map((w) => `"${w}"`) + .join(", ") + }`, ); } @@ -181,7 +184,7 @@ export class WorkerManager { const rec = this.recorder.startedAtRecords.get(runId); if (!rec) { throw new Error( - `Invalid state: cannot find initial time for run "${runId}"` + `Invalid state: cannot find initial time for run "${runId}"`, ); } return rec; @@ -209,7 +212,7 @@ export class WorkerManager { worker.onerror = /*async*/ (event) => handlerFn(Err(event)); } - trigger(type: WorkerEvent, runId: RunId, data: unknown) { + trigger(type: WorkerEvent, runId: RunId, data: Data) { const { worker } = this.recorder.getWorkerRecord(runId); worker.postMessage(Msg(type, data)); logger.info(`trigger ${type} for ${runId}`); @@ -222,7 +225,7 @@ export class WorkerManager { storedRun: Run, schedule: string, kwargs: Record, - internalTCtx: TaskContext + internalTCtx: TaskContext, ) { this.#createWorker(name, workflowModPath, runId); this.trigger("START", runId, { diff --git a/src/typegraph/python/typegraph/runtimes/substantial.py b/src/typegraph/python/typegraph/runtimes/substantial.py index 836d35ccdd..065707b68d 100644 --- a/src/typegraph/python/typegraph/runtimes/substantial.py +++ b/src/typegraph/python/typegraph/runtimes/substantial.py @@ -30,12 +30,15 @@ class Backend: + @staticmethod def dev_memory(): return SubstantialBackendMemory() + @staticmethod def dev_fs(): return SubstantialBackendFs() + @staticmethod def redis(connection_string_secret: str): return SubstantialBackendRedis(value=RedisBackend(connection_string_secret)) @@ -47,7 +50,13 @@ def __init__( file_descriptions: List[WorkflowFileDescription], ): data = SubstantialRuntimeData(backend, file_descriptions) - super().__init__(runtimes.register_substantial_runtime(store, data)) + + runtime_id = runtimes.register_substantial_runtime(store, data) + if isinstance(runtime_id, Err): + raise Exception(runtime_id.value) + + super().__init__(runtime_id.value) + self.backend = backend def _generic_substantial_func( @@ -61,7 +70,7 @@ def _generic_substantial_func( func_out=None if func_out is None else func_out._id, operation=operation, ) - func_data = runtimes.generate_substantial_operation(store, self.id.value, data) + func_data = runtimes.generate_substantial_operation(store, self.id, data) if isinstance(func_data, Err): raise Exception(func_data.value) From 042d2749ed1dd27c25d6b342543c2f0a31f780a7 Mon Sep 17 00:00:00 2001 From: j03-dev <24nomeniavo@gmail.com> Date: Sun, 20 Oct 2024 17:04:39 +0300 Subject: [PATCH 2/7] add kind on data --- src/typegate/src/runtimes/substantial/agent.ts | 4 +++- src/typegate/src/runtimes/substantial/types.ts | 3 +++ .../src/runtimes/substantial/workflow_worker_manager.ts | 3 +++ 3 files changed, 9 insertions(+), 1 deletion(-) diff --git a/src/typegate/src/runtimes/substantial/agent.ts b/src/typegate/src/runtimes/substantial/agent.ts index 709d29faa2..a37f47e4bc 100644 --- a/src/typegate/src/runtimes/substantial/agent.ts +++ b/src/typegate/src/runtimes/substantial/agent.ts @@ -10,6 +10,7 @@ import { TaskContext } from "../deno/shared_types.ts"; import { appendIfOngoing, Interrupt, + Kind, Result, WorkerData, WorkflowResult, @@ -21,7 +22,7 @@ const logger = getLogger(); export interface WorkflowDescription { name: string; path: string; - kind: "DENO" | "PYTHON"; + kind: Kind; } export interface AgentConfig { @@ -256,6 +257,7 @@ export class Agent { next.schedule_date, first.event.kwargs, this.internalTCtx, + workflow.kind, ); this.workerManager.listen( diff --git a/src/typegate/src/runtimes/substantial/types.ts b/src/typegate/src/runtimes/substantial/types.ts index a16440db86..a331300017 100644 --- a/src/typegate/src/runtimes/substantial/types.ts +++ b/src/typegate/src/runtimes/substantial/types.ts @@ -14,6 +14,8 @@ export type AnyString = string & Record; export type WorkerEvent = "START" | AnyString; +export type Kind = "DENO" | "PYTHON"; + export type TaskData = { modulePath: string; functionName: string; @@ -21,6 +23,7 @@ export type TaskData = { kwargs: Record; schedule: string; internal: TaskContext; + kind: Kind; }; export type ResultData = { diff --git a/src/typegate/src/runtimes/substantial/workflow_worker_manager.ts b/src/typegate/src/runtimes/substantial/workflow_worker_manager.ts index 5d84546346..236be7ecc3 100644 --- a/src/typegate/src/runtimes/substantial/workflow_worker_manager.ts +++ b/src/typegate/src/runtimes/substantial/workflow_worker_manager.ts @@ -7,6 +7,7 @@ import { TaskContext } from "../deno/shared_types.ts"; import { Data, Err, + Kind, Msg, Result, Run, @@ -226,6 +227,7 @@ export class WorkerManager { schedule: string, kwargs: Record, internalTCtx: TaskContext, + kind: Kind, ) { this.#createWorker(name, workflowModPath, runId); this.trigger("START", runId, { @@ -235,6 +237,7 @@ export class WorkerManager { kwargs, schedule, internal: internalTCtx, + kind, }); } } From 884c3d771948aa356bc89f602676a0ee0ef3b6d5 Mon Sep 17 00:00:00 2001 From: j03-dev <24nomeniavo@gmail.com> Date: Sun, 27 Oct 2024 12:11:10 +0300 Subject: [PATCH 3/7] return output of the function --- Cargo.lock | 65 +++++++++++++++++++++ Cargo.toml | 21 ++++--- src/typegate/engine/Cargo.toml | 4 ++ src/typegate/engine/src/lib.rs | 1 + src/typegate/engine/src/python.rs | 96 +++++++++++++++++++++++++++++++ 5 files changed, 179 insertions(+), 8 deletions(-) create mode 100644 src/typegate/engine/src/python.rs diff --git a/Cargo.lock b/Cargo.lock index 78a09ccaca..beae0bce51 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -9074,6 +9074,69 @@ dependencies = [ "unicase", ] +[[package]] +name = "pyo3" +version = "0.22.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3d922163ba1f79c04bc49073ba7b32fd5a8d3b76a87c955921234b8e77333c51" +dependencies = [ + "cfg-if", + "indoc", + "libc", + "memoffset 0.9.1", + "once_cell", + "portable-atomic", + "pyo3-build-config", + "pyo3-ffi", + "pyo3-macros", + "unindent", +] + +[[package]] +name = "pyo3-build-config" +version = "0.22.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bc38c5feeb496c8321091edf3d63e9a6829eab4b863b4a6a65f26f3e9cc6b179" +dependencies = [ + "once_cell", + "target-lexicon", +] + +[[package]] +name = "pyo3-ffi" +version = "0.22.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "94845622d88ae274d2729fcefc850e63d7a3ddff5e3ce11bd88486db9f1d357d" +dependencies = [ + "libc", + "pyo3-build-config", +] + +[[package]] +name = "pyo3-macros" +version = "0.22.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e655aad15e09b94ffdb3ce3d217acf652e26bbc37697ef012f5e5e348c716e5e" +dependencies = [ + "proc-macro2", + "pyo3-macros-backend", + "quote", + "syn 2.0.71", +] + +[[package]] +name = "pyo3-macros-backend" +version = "0.22.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ae1e3f09eecd94618f60a455a23def79f79eba4dc561a97324bf9ac8c6df30ce" +dependencies = [ + "heck 0.5.0", + "proc-macro2", + "pyo3-build-config", + "quote", + "syn 2.0.71", +] + [[package]] name = "quaint" version = "0.2.0-alpha.13" @@ -12787,6 +12850,7 @@ dependencies = [ "protobuf", "protobuf-json-mapping", "psl", + "pyo3", "query-connector", "query-core", "query-engine-metrics", @@ -12795,6 +12859,7 @@ dependencies = [ "schema-connector", "schema-core", "serde", + "serde_json", "shadow-rs", "substantial", "tap", diff --git a/Cargo.toml b/Cargo.toml index 92024645b7..cead4b390f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -11,7 +11,7 @@ members = [ "src/typegate/standalone", "src/typegraph/core", "src/xtask", - "src/substantial" + "src/substantial", ] exclude = [ @@ -69,19 +69,19 @@ sha2 = "0.10.8" seahash = "4.1.0" # patterns -anyhow = "1.0.89" # FIXME: replace anyhow with eyre +anyhow = "1.0.89" # FIXME: replace anyhow with eyre color-eyre = "0.6.3" -eyre = "0.6.12" # NOTE: keep in sync with verison used by color-eyre +eyre = "0.6.12" # NOTE: keep in sync with verison used by color-eyre thiserror = "1.0.64" indoc = "2.0.5" unindent = "0.2.3" itertools = "0.13.0" -lazy_static = "1.5.0" # FIXME: replace with Lazy Cell +lazy_static = "1.5.0" # FIXME: replace with Lazy Cell crossbeam-channel = "0.5.13" enum_dispatch = "0.3.13" tap = "1.0.1" derive_more = { version = "1", features = ["from"] } -cached = "0.53.1" # FIXME: replace usage with a Lazy Cell + dashmap +cached = "0.53.1" # FIXME: replace usage with a Lazy Cell + dashmap garde = "0.20" paste = "1.0.15" @@ -115,7 +115,7 @@ indexmap = { version = "2.6.0", features = ["serde"] } semver = "1.0.23" dashmap = "6.1.0" connection-string = "0.2.0" -chrono = { version = "0.4.38", features = ["serde"] } +chrono = { version = "0.4.38", features = ["serde"] } tera = { version = "1.20", default-features = false } ordered-float = "4.3.0" graphql-parser = "0.4.0" @@ -149,7 +149,7 @@ tracing-unwrap = { version = "1.0.1", features = ["log-location"] } tracing-appender = "0.2.3" # async -futures = "=0.3.30" # pinned due to bug with .31 with zeromq (deno) +futures = "=0.3.30" # pinned due to bug with .31 with zeromq (deno) futures-concurrency = "7.6" futures-lite = "2.3" tokio = { version = "1", features = ["parking_lot"] } @@ -163,7 +163,9 @@ temporal-sdk-core-protos = { git = "https://github.com/temporalio/sdk-core", rev # prisma query-core = { git = "https://github.com/metatypedev/prisma-engines", branch = "fix/version-compat" } query-connector = { git = "https://github.com/metatypedev/prisma-engines", branch = "fix/version-compat" } -request-handlers = { git = "https://github.com/metatypedev/prisma-engines", features = ["all"], branch = "fix/version-compat" } +request-handlers = { git = "https://github.com/metatypedev/prisma-engines", features = [ + "all", +], branch = "fix/version-compat" } datamodel-renderer = { git = "https://github.com/metatypedev/prisma-engines", branch = "fix/version-compat" } user-facing-errors = { git = "https://github.com/metatypedev/prisma-engines", branch = "fix/version-compat" } query-engine-metrics = { git = "https://github.com/metatypedev/prisma-engines", branch = "fix/version-compat" } @@ -189,6 +191,9 @@ protobuf = "3.6.0" protobuf-json-mapping = "3.6.0" proto-parser = { git = "https://github.com/metatypedev/proto-parser", branch = "main" } +# python +pyo3 = { version = "0.22.5" } + # test assert_cmd = "2.0.16" pretty_assertions = "1.4.1" diff --git a/src/typegate/engine/Cargo.toml b/src/typegate/engine/Cargo.toml index 6567b7cfb1..d6693f9aac 100644 --- a/src/typegate/engine/Cargo.toml +++ b/src/typegate/engine/Cargo.toml @@ -18,6 +18,7 @@ tracing.workspace = true # encoding serde.workspace = true +serde_json.workspace = true regex.workspace = true zstd.workspace = true base64.workspace = true @@ -68,6 +69,9 @@ bytes.workspace = true protobuf.workspace = true protobuf-json-mapping.workspace = true +# python +pyo3 = { workspace = true, features = ["extension-module"] } + [dev-dependencies] env_logger.workspace = true diff --git a/src/typegate/engine/src/lib.rs b/src/typegate/engine/src/lib.rs index 5d6ba03fcc..9dc8915895 100644 --- a/src/typegate/engine/src/lib.rs +++ b/src/typegate/engine/src/lib.rs @@ -2,6 +2,7 @@ // SPDX-License-Identifier: Elastic-2.0 mod ext; +mod python; mod runtimes; // mod snapshot; mod typegraph; diff --git a/src/typegate/engine/src/python.rs b/src/typegate/engine/src/python.rs new file mode 100644 index 0000000000..fec10c4ca8 --- /dev/null +++ b/src/typegate/engine/src/python.rs @@ -0,0 +1,96 @@ +use anyhow::Result; +use pyo3::prelude::*; +use serde::{Deserialize, Serialize}; +use serde_json::Value; +#[rustfmt::skip] +use deno_core as deno_core; + +#[derive(Deserialize)] +#[serde(crate = "serde")] +pub struct PythonExecutionInput { + python_module_path: String, + python_function_name: String, + execution_context: Value, +} + +#[derive(Serialize)] +#[serde(crate = "serde")] +pub struct PythonExecutionOutput { + execution_result: Value, + execution_status: String, +} + +fn convert_json_to_python_object(py: Python, json_value: &Value) -> PyResult { + match json_value { + Value::Null => Ok(py.None()), + Value::Bool(boolean_value) => Ok(boolean_value.into_py(py)), + Value::Number(numeric_value) => { + if let Some(integer_value) = numeric_value.as_i64() { + Ok(integer_value.into_py(py)) + } else if let Some(float_value) = numeric_value.as_f64() { + Ok(float_value.into_py(py)) + } else { + Err(PyErr::new::( + "Unsupported numeric type in JSON", + )) + } + } + Value::String(string_value) => Ok(string_value.into_py(py)), + Value::Array(array_value) => { + let python_list = pyo3::types::PyList::empty_bound(py); + for item in array_value { + python_list.append(convert_json_to_python_object(py, item)?)?; + } + Ok(python_list.into()) + } + Value::Object(map_value) => { + let python_dict = pyo3::types::PyDict::new_bound(py); + for (key, value) in map_value { + python_dict.set_item(key, convert_json_to_python_object(py, value)?)?; + } + Ok(python_dict.into()) + } + } +} + +pub fn execute_python_function( + module_path: String, + function_name: String, + context: Value, +) -> Result { + Python::with_gil(|py| { + let python_code = std::fs::read_to_string(module_path.clone()) + .map_err(|e| PyErr::new::(e.to_string()))?; + + let python_module = + PyModule::from_code_bound(py, &python_code, &module_path, "dynamic_module")?; + let python_function = python_module.getattr(function_name.as_str())?; + + let python_context = convert_json_to_python_object(py, &context)?; + let execution_result = python_function.call1((python_context,))?; + + Ok(execution_result.into()) + }) +} + +#[deno_core::op2] +#[serde] +pub fn op_execute_python_with_context( + #[serde] input: PythonExecutionInput, +) -> Result { + let execution_result = execute_python_function( + input.python_module_path, + input.python_function_name, + input.execution_context, + )?; + + let result_value = Python::with_gil(|py| { + let result_str = execution_result.clone_ref(py).to_string(); + serde_json::from_str(&result_str).unwrap_or(Value::Null) + }); + + Ok(PythonExecutionOutput { + execution_result: result_value, + execution_status: "success".to_string(), + }) +} From 3f8629b72d7178372d904c698f612daf09310b42 Mon Sep 17 00:00:00 2001 From: j03-dev <24nomeniavo@gmail.com> Date: Mon, 28 Oct 2024 08:13:55 +0300 Subject: [PATCH 4/7] allow execting python code on sub worker --- src/typegate/engine/00_runtime.js | 13 +- src/typegate/engine/runtime.d.ts | 109 ++++++++------- src/typegate/engine/src/python.rs | 2 +- .../src/runtimes/substantial/worker.ts | 127 ++++++++++++------ 4 files changed, 157 insertions(+), 94 deletions(-) diff --git a/src/typegate/engine/00_runtime.js b/src/typegate/engine/00_runtime.js index 412d71c056..9ca51f4499 100644 --- a/src/typegate/engine/00_runtime.js +++ b/src/typegate/engine/00_runtime.js @@ -73,16 +73,21 @@ const Meta = { metadataAppend: getOp("op_sub_metadata_append"), metadataWriteWorkflowLink: getOp("op_sub_metadata_write_workflow_link"), metadataReadWorkflowLinks: getOp("op_sub_metadata_read_workflow_links"), - metadataWriteParentChildLink: getOp("op_sub_metadata_write_parent_child_link"), - metadataEnumerateAllChildren: getOp("op_sub_metadata_enumerate_all_children"), + metadataWriteParentChildLink: getOp( + "op_sub_metadata_write_parent_child_link", + ), + metadataEnumerateAllChildren: getOp( + "op_sub_metadata_enumerate_all_children", + ), }, grpc: { register: getOp("op_grpc_register"), unregister: getOp("op_grpc_unregister"), callGrpcMethod: getOp("op_call_grpc_method"), }, + python: { + executePython: getOp("op_execute_python_with_context"), + }, }; - - globalThis.Meta = Meta; diff --git a/src/typegate/engine/runtime.d.ts b/src/typegate/engine/runtime.d.ts index 109491903d..8b024d5a3b 100644 --- a/src/typegate/engine/runtime.d.ts +++ b/src/typegate/engine/runtime.d.ts @@ -17,7 +17,7 @@ export type MetaNS = { unregisterEngine: (engine_name: string) => Promise; query: (inp: PrismaQueryInp) => Promise; diff: ( - inp: PrismaDiffInp + inp: PrismaDiffInp, ) => Promise<[string, ParsedDiff[]] | undefined | null>; apply: (inp: PrismaDevInp) => Promise; deploy: (inp: PrismaDeployInp) => Promise; @@ -34,7 +34,7 @@ export type MetaNS = { workflowSignal: (inp: TemporalWorkflowSignalInput) => Promise; workflowQuery: (inp: TemporalWorkflowQueryInput) => Promise>; workflowDescribe: ( - inp: TemporalWorkflowDescribeInput + inp: TemporalWorkflowDescribeInput, ) => Promise; }; @@ -43,12 +43,12 @@ export type MetaNS = { componentPath: string, instanceId: string, args: WitWireInitArgs, - cb: (op_name: string, json: string) => Promise + cb: (op_name: string, json: string) => Promise, ) => Promise; destroy: (instanceId: string) => Promise; handle: ( instanceId: string, - args: WitWireReq + args: WitWireReq, ) => Promise; }; @@ -63,7 +63,7 @@ export type MetaNS = { storePersistRun: (inp: PersistRunInput) => Promise; storeAddSchedule: (inp: AddScheduleInput) => Promise; storeReadSchedule: ( - inp: ReadOrCloseScheduleInput + inp: ReadOrCloseScheduleInput, ) => Promise; storeCloseSchedule: (inp: ReadOrCloseScheduleInput) => Promise; agentNextRun: (inp: NextRunInput) => Promise; @@ -72,20 +72,24 @@ export type MetaNS = { agentRenewLease: (inp: LeaseInput) => Promise; agentRemoveLease: (inp: LeaseInput) => Promise; metadataReadAll: ( - inp: ReadAllMetadataInput + inp: ReadAllMetadataInput, ) => Promise>; metadataAppend: (inp: AppendMetadataInput) => Promise; metadataWriteWorkflowLink: (inp: WriteLinkInput) => Promise; metadataReadWorkflowLinks: ( - inp: ReadWorkflowLinkInput + inp: ReadWorkflowLinkInput, ) => Promise>; metadataWriteParentChildLink: ( - inp: WriteParentChildLinkInput + inp: WriteParentChildLinkInput, ) => Promise; metadataEnumerateAllChildren: ( - inp: EnumerateAllChildrenInput + inp: EnumerateAllChildrenInput, ) => Promise>; }; + + python: { + executePython: (inp: PythonExecutionInput) => PythonExecutionOutput; + }; }; interface WasmInput { @@ -116,16 +120,16 @@ interface PrismaDevInp { } type PrismaApplyOut = | { - ResetRequired: { - reset_reason: string; - }; - } + ResetRequired: { + reset_reason: string; + }; + } | { - Ok: { - applied_migrations: Array; - reset_reason: string | undefined | null; - }; + Ok: { + applied_migrations: Array; + reset_reason: string | undefined | null; }; + }; interface PrismaDeployOut { migration_count: number; applied_migrations: Array; @@ -237,14 +241,14 @@ export type WitWireReq = { export type WitWireHandleError = | { - InstanceNotFound: string; - } + InstanceNotFound: string; + } | { - ModuleErr: string; - } + ModuleErr: string; + } | { - MatErr: string; - }; + MatErr: string; + }; export type WitWireMatInfo = { op_name: string; @@ -261,29 +265,29 @@ export type WitWireInitArgs = { export type WitWireInitResponse = object; export type WitWireInitError = | { - VersionMismatch: string; - } + VersionMismatch: string; + } | { - UnexpectedMat: string; - } + UnexpectedMat: string; + } | { - ModuleErr: string; - } + ModuleErr: string; + } | { - Other: string; - }; + Other: string; + }; export type WitWireHandleResponse = | { - Ok: string; - } + Ok: string; + } | "NoHandler" | { - InJsonErr: string; - } + InJsonErr: string; + } | { - HandlerErr: string; - }; + HandlerErr: string; + }; export type GrpcRegisterInput = { proto_file_content: string; @@ -301,20 +305,20 @@ export type Backend = | { type: "fs" } | { type: "memory" } | { - type: "redis"; - connection_string: string; - }; + type: "redis"; + connection_string: string; + }; export type OperationEvent = | { type: "Sleep"; id: number; start: string; end: string } | { - type: "Save"; - id: number; - value: - | { type: "Retry"; wait_until: string; counter: number } - | { type: "Resolved"; payload: unknown } - | { type: "Failed"; err: unknown }; - } + type: "Save"; + id: number; + value: + | { type: "Retry"; wait_until: string; counter: number } + | { type: "Resolved"; payload: unknown } + | { type: "Failed"; err: unknown }; + } | { type: "Send"; event_name: string; value: unknown } | { type: "Stop"; result: unknown } | { type: "Start"; kwargs: Record } @@ -420,3 +424,14 @@ export interface EnumerateAllChildrenInput { backend: Backend; parent_run_id: string; } + +export interface PythonExecutionInput { + python_module_path: string; + python_function_name: string; + executing_context: any; +} + +export interface PythonExecutionOutput { + execution_result: any; + execution_status: string; +} diff --git a/src/typegate/engine/src/python.rs b/src/typegate/engine/src/python.rs index fec10c4ca8..e1b234569d 100644 --- a/src/typegate/engine/src/python.rs +++ b/src/typegate/engine/src/python.rs @@ -91,6 +91,6 @@ pub fn op_execute_python_with_context( Ok(PythonExecutionOutput { execution_result: result_value, - execution_status: "success".to_string(), + execution_status: "SUCCESS".to_string(), }) } diff --git a/src/typegate/src/runtimes/substantial/worker.ts b/src/typegate/src/runtimes/substantial/worker.ts index 864c68bd9a..01e509edf6 100644 --- a/src/typegate/src/runtimes/substantial/worker.ts +++ b/src/typegate/src/runtimes/substantial/worker.ts @@ -10,59 +10,102 @@ self.onmessage = async function (event) { const { type, data } = event.data as WorkerData; switch (type) { case "START": { - const { modulePath, functionName, run, schedule, kwargs, internal } = - data as TaskData; + const { + modulePath, + functionName, + run, + schedule, + kwargs, + internal, + kind, + } = data as TaskData; // FIXME: handle case when script is missing and notify WorkerManager so it cleans up // its registry. const module = await import(modulePath); - // TODO: for python use the same strategy but instead call from native + runCtx = new Context(run, kwargs, internal); const workflowFn = module[functionName]; - if (typeof workflowFn !== "function") { - self.postMessage(Err(`Function "${functionName}" not found`)); - self.close(); - return; - } - - runCtx = new Context(run, kwargs, internal); + if (kind == "DENO") { + if (typeof workflowFn !== "function") { + self.postMessage(Err(`Function "${functionName}" not found`)); + self.close(); + return; + } + workflowFn(runCtx) + .then((wfResult: unknown) => { + self.postMessage( + Ok( + Msg( + type, + { + kind: "SUCCESS", + result: wfResult, + run: runCtx!.getRun(), + schedule, + } satisfies WorkflowResult, + ), + ), + ); + }) + .catch((wfException: unknown) => { + self.postMessage( + Ok( + Msg( + type, + { + kind: "FAIL", + result: wfException instanceof Error + ? wfException.message + : JSON.stringify(wfException), + exception: wfException instanceof Error + ? wfException + : undefined, + run: runCtx!.getRun(), + schedule, + } satisfies WorkflowResult, + ), + ), + ); + }); + } else if (kind == "PYTHON") { + try { + const result = Meta.python.executePython({ + python_module_path: modulePath, + python_function_name: functionName, + executing_context: runCtx, + }); - workflowFn(runCtx) - .then((wfResult: unknown) => { self.postMessage( - Ok( - Msg( - type, - { - kind: "SUCCESS", - result: wfResult, - run: runCtx!.getRun(), - schedule, - } satisfies WorkflowResult, - ), - ), + Ok(Msg( + type, + { + kind: "SUCCESS", + result: result.execution_result, + run: runCtx!.getRun(), + schedule, + } satisfies WorkflowResult, + )), ); - }) - .catch((wfException: unknown) => { + } catch (pyException) { self.postMessage( - Ok( - Msg( - type, - { - kind: "FAIL", - result: wfException instanceof Error - ? wfException.message - : JSON.stringify(wfException), - exception: wfException instanceof Error - ? wfException - : undefined, - run: runCtx!.getRun(), - schedule, - } satisfies WorkflowResult, - ), - ), + Ok(Msg( + type, + { + kind: "FAIL", + result: pyException instanceof Error + ? pyException.message + : JSON.stringify(pyException), + exception: pyException instanceof Error + ? pyException + : undefined, + run: runCtx!.getRun(), + schedule, + } satisfies WorkflowResult, + )), ); - }); + } + } break; } default: From a11f43f3bb62cc7679d96cf5996c14d8ee13c11b Mon Sep 17 00:00:00 2001 From: j03-dev <24nomeniavo@gmail.com> Date: Thu, 31 Oct 2024 07:15:51 +0300 Subject: [PATCH 5/7] full save method in rust --- src/substantial/src/converters.rs | 204 +++++++++++++++++++++++++++++- src/typegate/engine/src/python.rs | 3 + 2 files changed, 204 insertions(+), 3 deletions(-) diff --git a/src/substantial/src/converters.rs b/src/substantial/src/converters.rs index 3c146ca090..03d4457277 100644 --- a/src/substantial/src/converters.rs +++ b/src/substantial/src/converters.rs @@ -1,7 +1,8 @@ -use std::collections::HashMap; +use serde_json::Value; +use std::{collections::HashMap, fmt}; -use anyhow::{bail, Context, Ok, Result}; -use chrono::{DateTime, Utc}; +use anyhow::{bail, Context, Result}; +use chrono::{DateTime, TimeZone, Utc}; use protobuf::{ well_known_types::{ @@ -82,6 +83,111 @@ pub struct Run { pub operations: Vec, } +#[derive(Debug)] +pub enum Interupt { + Sleep, + Saveretry, + WaitReceiveEvent, + WaitHandleEvent, + WaitEnsureValue, +} + +impl Interupt { + const PREFIX: &'static str = "SUBSTANTIAL_INTERRUPT_"; +} + +impl fmt::Display for Interupt { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + let variant = match self { + Self::Sleep => "SLEEP", + Self::Saveretry => "SAVE_RETRY", + Self::WaitReceiveEvent => "WAIT_RECEIVE_EVENT", + Self::WaitHandleEvent => "WAIT_HANDLE_EVENT", + Self::WaitEnsureValue => "WAIT_ENSURE_VALUE", + }; + write!(f, "{}{:?}", Self::PREFIX, variant) + } +} + +impl std::error::Error for Interupt {} + +pub enum Strategy { + Linear, +} + +pub struct Retry { + pub strategy: Option, + pub min_backoff_ms: i32, + pub max_backoff_ms: i32, + pub max_retries: i32, +} + +pub struct RetryStrategy { + min_backoff_ms: Option, + max_backoff_ms: Option, + max_retries: i32, +} + +impl RetryStrategy { + pub fn new( + max_retries: i32, + min_backoff_ms: Option, + max_backoff_ms: Option, + ) -> anyhow::Result { + if max_retries < 1 { + anyhow::bail!("maxRetries < 1"); + } + + let mut min_ms = min_backoff_ms; + let mut max_ms = max_backoff_ms; + + match (min_ms, max_ms) { + (Some(low), Some(high)) => { + if low >= high { + anyhow::bail!("minBackoffMs >= maxBackoffMs"); + } + if low < 0 { + anyhow::bail!("minBackoffMs < 0"); + } + } + (Some(low), None) => { + max_ms = Some(low + 10); + } + (None, Some(high)) => { + min_ms = Some(0.max(high - 10)); + } + (None, None) => {} + } + + Ok(Self { + min_backoff_ms: min_ms, + max_backoff_ms: max_ms, + max_retries, + }) + } + + pub fn eval(&self, strategy: Strategy, retries_left: i32) -> anyhow::Result { + match strategy { + Strategy::Linear => self.linear(retries_left), + // Add more strategy matches here + } + } + + fn linear(&self, retries_left: i32) -> anyhow::Result { + if retries_left <= 0 { + anyhow::bail!("retries left <= 0"); + } + + let dt = self.max_backoff_ms.unwrap_or(0) - self.min_backoff_ms.unwrap_or(0); + Ok(((self.max_retries - retries_left) * dt) / self.max_retries) + } +} + +pub struct Save { + pub timeout_ms: Option, + pub retry: Option, +} + impl Run { pub fn new(run_id: String) -> Self { Self { @@ -125,6 +231,98 @@ impl Run { pub fn reset(&mut self) { self.operations = vec![]; } + + fn _save(&mut self, func: F, option: Option) -> Result + where + F: Fn() -> Result, + { + let next_id = 1; + let mut current_retry_count: i32 = 1; + + for Operation { event, .. } in self.operations.iter() { + if let OperationEvent::Save { id, value } = event { + if *id == next_id { + if let SavedValue::Resolved { payload } = value { + return Ok(payload.clone()); + } else if let SavedValue::Retry { + counter, + wait_until, + } = value + { + let now = Utc::now(); + if wait_until > &now { + bail!(Interupt::Saveretry); + } else { + current_retry_count = *counter; + } + } + } + } + } + + current_retry_count += 1; + + let option = option.unwrap(); + + match func() { + Ok(result) => { + self.operations.push(Operation { + at: Utc::now(), + event: OperationEvent::Save { + id: next_id, + value: SavedValue::Resolved { + payload: result.clone(), + }, + }, + }); + Ok(result) + } + Err(err) => { + let retry = option.retry.unwrap(); + if retry.max_retries != 0 && current_retry_count < retry.max_retries { + let strategy = RetryStrategy { + min_backoff_ms: Some(retry.max_backoff_ms), + max_backoff_ms: Some(retry.max_backoff_ms), + max_retries: retry.max_retries, + }; + + let retries_left = (retry.max_retries - current_retry_count).max(0); + let delay_ms = strategy.eval(Strategy::Linear, retries_left)? as i64; + let wait_until_as_ms = std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap() + .as_millis() as i64 + + delay_ms; + + self.operations.push(Operation { + at: Utc::now(), // TODO verify if it's good + event: OperationEvent::Save { + id: next_id, + value: SavedValue::Retry { + counter: current_retry_count, + wait_until: Utc.timestamp_millis_opt(wait_until_as_ms).unwrap(), + }, + }, + }); + bail!(Interupt::Saveretry); + } else { + self.operations.push(Operation { + at: Utc::now(), // TODO verify if it's good + event: OperationEvent::Save { + id: next_id, + value: SavedValue::Failed { + err: serde_json::json!({ + "retries": current_retry_count, + "message": err.to_string() + }), + }, + }, + }); + } + bail!(err) + } + } + } } impl TryFrom for Operation { diff --git a/src/typegate/engine/src/python.rs b/src/typegate/engine/src/python.rs index e1b234569d..fc91e1084e 100644 --- a/src/typegate/engine/src/python.rs +++ b/src/typegate/engine/src/python.rs @@ -1,3 +1,6 @@ +// Copyright Metatype OÜ, licensed under the Elastic License 2.0. +// SPDX-License-Identifier: Elastic-2.0 + use anyhow::Result; use pyo3::prelude::*; use serde::{Deserialize, Serialize}; From 0efaa432253fe4dfaae8a8dcf0fb127f748a7455 Mon Sep 17 00:00:00 2001 From: j03-dev <24nomeniavo@gmail.com> Date: Thu, 31 Oct 2024 16:16:42 +0300 Subject: [PATCH 6/7] refact: deno context to native --- src/substantial/src/converters.rs | 137 +++++++++--------- src/typegate/engine/00_runtime.js | 7 +- src/typegate/engine/runtime.d.ts | 30 ++++ src/typegate/engine/src/ext.rs | 3 + .../engine/src/runtimes/substantial.rs | 52 ++++++- .../src/runtimes/substantial/deno_context.ts | 95 +++--------- .../src/runtimes/substantial/types.ts | 7 - .../src/runtimes/substantial/worker.ts | 114 +++++---------- 8 files changed, 221 insertions(+), 224 deletions(-) diff --git a/src/substantial/src/converters.rs b/src/substantial/src/converters.rs index 03d4457277..5c40979617 100644 --- a/src/substantial/src/converters.rs +++ b/src/substantial/src/converters.rs @@ -2,7 +2,7 @@ use serde_json::Value; use std::{collections::HashMap, fmt}; use anyhow::{bail, Context, Result}; -use chrono::{DateTime, TimeZone, Utc}; +use chrono::{DateTime, Duration, Utc}; use protobuf::{ well_known_types::{ @@ -79,6 +79,7 @@ pub struct Operation { /// Each operation is produced from the workflow execution #[derive(Serialize, Deserialize, Clone, Debug)] pub struct Run { + pub id: u32, pub run_id: String, pub operations: Vec, } @@ -188,9 +189,16 @@ pub struct Save { pub retry: Option, } +#[derive(Serialize)] +pub struct SaveOutput { + pub payload: Option, + pub current_retry_count: Option, +} + impl Run { pub fn new(run_id: String) -> Self { Self { + id: 0, run_id, operations: vec![], } @@ -232,18 +240,36 @@ impl Run { self.operations = vec![]; } - fn _save(&mut self, func: F, option: Option) -> Result - where - F: Fn() -> Result, - { - let next_id = 1; + pub fn next_id(&mut self) -> u32 { + self.id += 1; + self.id + } + + pub fn append_op(&mut self, op: OperationEvent) { + let has_stopped = self + .operations + .iter() + .any(|op| matches!(op.event, OperationEvent::Stop { .. })); + if !has_stopped { + self.operations.push(Operation { + at: Utc::now(), + event: op, + }); + } + } + + pub fn save(&mut self) -> Result { + let next_id = self.next_id(); let mut current_retry_count: i32 = 1; for Operation { event, .. } in self.operations.iter() { if let OperationEvent::Save { id, value } = event { if *id == next_id { if let SavedValue::Resolved { payload } = value { - return Ok(payload.clone()); + return Ok(SaveOutput { + payload: Some(payload.clone()), + current_retry_count: None, + }); } else if let SavedValue::Retry { counter, wait_until, @@ -260,68 +286,49 @@ impl Run { } } - current_retry_count += 1; - - let option = option.unwrap(); + Ok(SaveOutput { + payload: None, + current_retry_count: Some(current_retry_count), + }) + } - match func() { - Ok(result) => { - self.operations.push(Operation { - at: Utc::now(), - event: OperationEvent::Save { - id: next_id, - value: SavedValue::Resolved { - payload: result.clone(), - }, - }, - }); - Ok(result) - } - Err(err) => { - let retry = option.retry.unwrap(); - if retry.max_retries != 0 && current_retry_count < retry.max_retries { - let strategy = RetryStrategy { - min_backoff_ms: Some(retry.max_backoff_ms), - max_backoff_ms: Some(retry.max_backoff_ms), - max_retries: retry.max_retries, - }; - - let retries_left = (retry.max_retries - current_retry_count).max(0); - let delay_ms = strategy.eval(Strategy::Linear, retries_left)? as i64; - let wait_until_as_ms = std::time::SystemTime::now() - .duration_since(std::time::UNIX_EPOCH) - .unwrap() - .as_millis() as i64 - + delay_ms; - - self.operations.push(Operation { - at: Utc::now(), // TODO verify if it's good - event: OperationEvent::Save { - id: next_id, - value: SavedValue::Retry { - counter: current_retry_count, - wait_until: Utc.timestamp_millis_opt(wait_until_as_ms).unwrap(), - }, - }, - }); - bail!(Interupt::Saveretry); - } else { - self.operations.push(Operation { - at: Utc::now(), // TODO verify if it's good - event: OperationEvent::Save { - id: next_id, - value: SavedValue::Failed { - err: serde_json::json!({ - "retries": current_retry_count, - "message": err.to_string() - }), - }, - }, - }); + pub fn sleep(&mut self, duration_ms: i32) -> Result<()> { + let next_id = self.next_id(); + for Operation { event, .. } in self.operations.iter() { + if let OperationEvent::Sleep { id, end, .. } = event { + if next_id == *id { + if end <= &Utc::now() { + return Ok(()); + } else { + bail!(Interupt::Sleep); + } } - bail!(err) } } + + let start = Utc::now(); + + let end = start + Duration::milliseconds(start.timestamp() + duration_ms as i64); + + self.operations.push(Operation { + at: start, + event: OperationEvent::Sleep { + id: next_id, + start, + end, + }, + }); + bail!(Interupt::Sleep); + } + + pub fn append_event(&mut self, event_name: String, payload: Value) { + self.operations.push(Operation { + at: Utc::now(), + event: OperationEvent::Send { + event_name, + value: payload, + }, + }); } } diff --git a/src/typegate/engine/00_runtime.js b/src/typegate/engine/00_runtime.js index 9ca51f4499..5cbf36de0c 100644 --- a/src/typegate/engine/00_runtime.js +++ b/src/typegate/engine/00_runtime.js @@ -79,15 +79,16 @@ const Meta = { metadataEnumerateAllChildren: getOp( "op_sub_metadata_enumerate_all_children", ), + contextSave: getOp("op_context_save"), + contextSleep: getOp("op_context_sleep"), + contextAppendEvent: getOp("op_context_append_event"), + contextAppendOp: getOp("op_context_append_op"), }, grpc: { register: getOp("op_grpc_register"), unregister: getOp("op_grpc_unregister"), callGrpcMethod: getOp("op_call_grpc_method"), }, - python: { - executePython: getOp("op_execute_python_with_context"), - }, }; globalThis.Meta = Meta; diff --git a/src/typegate/engine/runtime.d.ts b/src/typegate/engine/runtime.d.ts index 8b024d5a3b..869aa529bc 100644 --- a/src/typegate/engine/runtime.d.ts +++ b/src/typegate/engine/runtime.d.ts @@ -85,6 +85,10 @@ export type MetaNS = { metadataEnumerateAllChildren: ( inp: EnumerateAllChildrenInput, ) => Promise>; + contextSave: (inp: SaveInput) => SaveOutput; + contextSleep: (inp: SleepInput) => void; + contextAppendEvent: (inp: AppendEventInput) => void; + contextAppendOp: (inp: AppendOpInput) => void; }; python: { @@ -327,6 +331,7 @@ export type OperationEvent = export type Operation = { at: string; event: OperationEvent }; export interface Run { + id: number; run_id: string; operations: Array; } @@ -435,3 +440,28 @@ export interface PythonExecutionOutput { execution_result: any; execution_status: string; } + +export interface SaveInput { + run: Run; +} + +export interface SaveOutput { + payload?: any; + current_retry_count?: number; +} + +export interface SleepInput { + run: Run; + duration_ms: number; // in millisecond +} + +export interface AppendEventInput { + run: Run; + event_name: string; + payload: any; +} + +export interface AppendOpInput { + run: Run; + op: OperationEvent; +} diff --git a/src/typegate/engine/src/ext.rs b/src/typegate/engine/src/ext.rs index 2f2ca8de81..0b47234f16 100644 --- a/src/typegate/engine/src/ext.rs +++ b/src/typegate/engine/src/ext.rs @@ -59,6 +59,9 @@ deno_core::extension!( substantial::op_sub_metadata_write_workflow_link, substantial::op_sub_metadata_write_parent_child_link, substantial::op_sub_metadata_enumerate_all_children, + substantial::op_context_save, + substantial::op_context_sleep, + substantial::op_context_append_event, // FIXME(yohe): this test broke and has proven difficult to fix // #[cfg(test)] diff --git a/src/typegate/engine/src/runtimes/substantial.rs b/src/typegate/engine/src/runtimes/substantial.rs index 9420c61e34..4b7ff1d6e3 100644 --- a/src/typegate/engine/src/runtimes/substantial.rs +++ b/src/typegate/engine/src/runtimes/substantial.rs @@ -9,9 +9,10 @@ use chrono::{DateTime, Utc}; use common::typegraph::runtimes::substantial::SubstantialBackend; use dashmap::DashMap; use deno_core::OpState; +use serde_json::Value; use substantial::{ backends::{fs::FsBackend, memory::MemoryBackend, redis::RedisBackend, Backend, NextRun}, - converters::{MetadataEvent, Operation, Run}, + converters::{MetadataEvent, Operation, OperationEvent, Run, SaveOutput}, }; #[rustfmt::skip] @@ -452,3 +453,52 @@ pub async fn op_sub_metadata_enumerate_all_children( backend.enumerate_all_children(input.parent_run_id.clone()) } + +#[derive(Deserialize)] +pub struct SaveInput { + pub run: Run, +} + +#[deno_core::op2] +#[serde] +pub fn op_context_save(#[serde] input: SaveInput) -> Result { + let mut run = input.run; + run.save() +} + +#[derive(Deserialize)] +pub struct SleepInut { + pub run: Run, + pub duration_ms: i32, +} + +#[deno_core::op2] +pub fn op_context_sleep(#[serde] input: SleepInut) -> Result<()> { + let mut run = input.run; + run.sleep(input.duration_ms) +} + +#[derive(Deserialize)] +pub struct AppendEventInput { + pub run: Run, + pub event_name: String, + pub payload: Value, +} + +#[deno_core::op2] +pub fn op_context_append_event(#[serde] input: AppendEventInput) { + let mut run = input.run; + run.append_event(input.event_name, input.payload) +} + +#[derive(Deserialize)] +pub struct AppendOpInput { + pub run: Run, + pub op: OperationEvent, +} + +#[deno_core::op2] +pub fn op_context_append_op(#[serde] input: AppendOpInput) { + let mut run = input.run; + run.append_op(input.op); +} diff --git a/src/typegate/src/runtimes/substantial/deno_context.ts b/src/typegate/src/runtimes/substantial/deno_context.ts index 44a48f81e0..72ad29413b 100644 --- a/src/typegate/src/runtimes/substantial/deno_context.ts +++ b/src/typegate/src/runtimes/substantial/deno_context.ts @@ -6,7 +6,7 @@ import { make_internal } from "../../worker_utils.ts"; import { TaskContext } from "../deno/shared_types.ts"; -import { Interrupt, OperationEvent, Run, appendIfOngoing } from "./types.ts"; +import { Interrupt, OperationEvent, Run } from "./types.ts"; // const isTest = Deno.env.get("DENO_TESTING") === "true"; const testBaseUrl = Deno.env.get("TEST_OVERRIDE_GQL_ORIGIN"); @@ -14,50 +14,27 @@ const testBaseUrl = Deno.env.get("TEST_OVERRIDE_GQL_ORIGIN"); const additionalHeaders = { connection: "keep-alive" }; export class Context { - private id: number = 0; gql: ReturnType; constructor( private run: Run, private kwargs: Record, - private internal: TaskContext + private internal: TaskContext, ) { this.gql = createGQLClient(internal); } - #nextId() { - // IDEA: this scheme does not account the step provided - // Different args => potentially different step (notably for Save) - this.id += 1; - return this.id; - } - #appendOp(op: OperationEvent) { - appendIfOngoing(this.run, { at: new Date().toJSON(), event: op }); + Meta.substantial.contextAppendOp({ run: this.run, op }); } async save(fn: () => T | Promise, option?: SaveOption) { - const id = this.#nextId(); - - let currRetryCount = 1; - for (const { event } of this.run.operations) { - if (event.type == "Save" && id == event.id) { - if (event.value.type == "Resolved") { - return event.value.payload; - } else if (event.value.type == "Retry") { - const delay = new Date(event.value.wait_until); - if (delay.getTime() > new Date().getTime()) { - // Too soon! - throw Interrupt.Variant("SAVE_RETRY"); - } else { - currRetryCount = event.value.counter; - } - } - } - } + const { payload, current_retry_count } = Meta.substantial.contextSave({ + run: this.run, + }); - // current call already counts - currRetryCount += 1; + if (payload) return payload; + const currRetryCount = current_retry_count ?? 1; try { let result: any; @@ -69,7 +46,7 @@ export class Context { this.#appendOp({ type: "Save", - id, + id: this.run.id, value: { type: "Resolved", payload: result ?? null, @@ -86,7 +63,7 @@ export class Context { const strategy = new RetryStrategy( retry.maxRetries, retry.minBackoffMs, - retry.maxBackoffMs + retry.maxBackoffMs, ); const retriesLeft = Math.max(retry.maxRetries - currRetryCount, 0); @@ -95,7 +72,7 @@ export class Context { this.#appendOp({ type: "Save", - id, + id: this.run.id, value: { type: "Retry", wait_until: new Date(waitUntilAsMs).toJSON(), @@ -107,7 +84,7 @@ export class Context { } else { this.#appendOp({ type: "Save", - id, + id: this.run.id, value: { type: "Failed", err: { @@ -123,27 +100,7 @@ export class Context { } sleep(durationMs: number) { - const id = this.#nextId(); - for (const { event } of this.run.operations) { - if (event.type == "Sleep" && id == event.id) { - const end = new Date(event.end); - if (end.getTime() <= new Date().getTime()) { - return; - } else { - throw Interrupt.Variant("SLEEP"); - } - } - } - - const start = new Date(); - const end = new Date(start.getTime() + durationMs); - this.#appendOp({ - type: "Sleep", - id, - start: start.toJSON(), - end: end.toJSON(), - }); - throw Interrupt.Variant("SLEEP"); + Meta.substantial.contextSleep({ run: this.run, duration_ms: durationMs }); } getRun() { @@ -151,11 +108,7 @@ export class Context { } appendEvent(event_name: string, payload: unknown) { - this.#appendOp({ - type: "Send", - event_name, - value: payload, - }); + Meta.substantial.contextAppendEvent({ run: this.run, event_name, payload }); } receive(eventName: string) { @@ -170,7 +123,7 @@ export class Context { async handle( eventName: string, - fn: (received: unknown) => unknown | Promise + fn: (received: unknown) => unknown | Promise, ) { for (const { event } of this.run.operations) { if (event.type == "Send" && event.event_name == eventName) { @@ -208,7 +161,7 @@ export class Context { createWorkflowHandle(handleDef: SerializableWorkflowHandle) { if (!handleDef.runId) { throw new Error( - "Cannot create handle from a definition that was not run" + "Cannot create handle from a definition that was not run", ); } return new ChildWorkflowHandle(this, handleDef); @@ -227,11 +180,11 @@ interface SerializableWorkflowHandle { export class ChildWorkflowHandle { constructor( private ctx: Context, - public handleDef: SerializableWorkflowHandle + public handleDef: SerializableWorkflowHandle, ) {} async start(): Promise { - const { data } = await this.ctx.gql/**/ ` + const { data } = await this.ctx.gql /**/` mutation ($name: String!, $kwargs: String!) { _sub_internal_start(name: $name, kwargs: $kwargs) } @@ -243,7 +196,7 @@ export class ChildWorkflowHandle { this.handleDef.runId = (data as any)._sub_internal_start as string; this.#checkRunId(); - const { data: _ } = await this.ctx.gql/**/ ` + const { data: _ } = await this.ctx.gql /**/` mutation ($parent_run_id: String!, $child_run_id: String!) { _sub_internal_link_parent_child(parent_run_id: $parent_run_id, child_run_id: $child_run_id) } @@ -258,7 +211,7 @@ export class ChildWorkflowHandle { async result(): Promise { this.#checkRunId(); - const { data } = await this.ctx.gql/**/ ` + const { data } = await this.ctx.gql /**/` query ($name: String!) { _sub_internal_results(name: $name) { completed { @@ -292,7 +245,7 @@ export class ChildWorkflowHandle { async stop(): Promise { this.#checkRunId(); - const { data } = await this.ctx.gql/**/ ` + const { data } = await this.ctx.gql /**/` mutation ($run_id: String!) { _sub_internal_stop(run_id: $run_id) } @@ -306,7 +259,7 @@ export class ChildWorkflowHandle { async hasStopped(): Promise { this.#checkRunId(); - const { data } = await this.ctx.gql/**/ ` + const { data } = await this.ctx.gql /**/` query { _sub_internal_results(name: $name) { completed { @@ -329,7 +282,7 @@ export class ChildWorkflowHandle { #checkRunId() { if (!this.handleDef.runId) { throw new Error( - "Invalid state: run_id is not properly set, this could mean that the workflow was not started yet" + "Invalid state: run_id is not properly set, this could mean that the workflow was not started yet", ); } } @@ -371,7 +324,7 @@ class RetryStrategy { constructor( maxRetries: number, minBackoffMs?: number, - maxBackoffMs?: number + maxBackoffMs?: number, ) { this.maxRetries = maxRetries; this.minBackoffMs = minBackoffMs; diff --git a/src/typegate/src/runtimes/substantial/types.ts b/src/typegate/src/runtimes/substantial/types.ts index a331300017..7f9359d210 100644 --- a/src/typegate/src/runtimes/substantial/types.ts +++ b/src/typegate/src/runtimes/substantial/types.ts @@ -102,10 +102,3 @@ export class Interrupt extends Error { return new Interrupt(kind, cause); } } - -export function appendIfOngoing(run: Run, operation: Operation) { - const hasStopped = run.operations.some(({ event }) => event.type == "Stop"); - if (!hasStopped) { - run.operations.push(operation); - } -} diff --git a/src/typegate/src/runtimes/substantial/worker.ts b/src/typegate/src/runtimes/substantial/worker.ts index 01e509edf6..8a3e6433e0 100644 --- a/src/typegate/src/runtimes/substantial/worker.ts +++ b/src/typegate/src/runtimes/substantial/worker.ts @@ -17,7 +17,6 @@ self.onmessage = async function (event) { schedule, kwargs, internal, - kind, } = data as TaskData; // FIXME: handle case when script is missing and notify WorkerManager so it cleans up // its registry. @@ -26,86 +25,47 @@ self.onmessage = async function (event) { runCtx = new Context(run, kwargs, internal); const workflowFn = module[functionName]; - if (kind == "DENO") { - if (typeof workflowFn !== "function") { - self.postMessage(Err(`Function "${functionName}" not found`)); - self.close(); - return; - } - workflowFn(runCtx) - .then((wfResult: unknown) => { - self.postMessage( - Ok( - Msg( - type, - { - kind: "SUCCESS", - result: wfResult, - run: runCtx!.getRun(), - schedule, - } satisfies WorkflowResult, - ), - ), - ); - }) - .catch((wfException: unknown) => { - self.postMessage( - Ok( - Msg( - type, - { - kind: "FAIL", - result: wfException instanceof Error - ? wfException.message - : JSON.stringify(wfException), - exception: wfException instanceof Error - ? wfException - : undefined, - run: runCtx!.getRun(), - schedule, - } satisfies WorkflowResult, - ), - ), - ); - }); - } else if (kind == "PYTHON") { - try { - const result = Meta.python.executePython({ - python_module_path: modulePath, - python_function_name: functionName, - executing_context: runCtx, - }); - + if (typeof workflowFn !== "function") { + self.postMessage(Err(`Function "${functionName}" not found`)); + self.close(); + return; + } + workflowFn(runCtx) + .then((wfResult: unknown) => { self.postMessage( - Ok(Msg( - type, - { - kind: "SUCCESS", - result: result.execution_result, - run: runCtx!.getRun(), - schedule, - } satisfies WorkflowResult, - )), + Ok( + Msg( + type, + { + kind: "SUCCESS", + result: wfResult, + run: runCtx!.getRun(), + schedule, + } satisfies WorkflowResult, + ), + ), ); - } catch (pyException) { + }) + .catch((wfException: unknown) => { self.postMessage( - Ok(Msg( - type, - { - kind: "FAIL", - result: pyException instanceof Error - ? pyException.message - : JSON.stringify(pyException), - exception: pyException instanceof Error - ? pyException - : undefined, - run: runCtx!.getRun(), - schedule, - } satisfies WorkflowResult, - )), + Ok( + Msg( + type, + { + kind: "FAIL", + result: wfException instanceof Error + ? wfException.message + : JSON.stringify(wfException), + exception: wfException instanceof Error + ? wfException + : undefined, + run: runCtx!.getRun(), + schedule, + } satisfies WorkflowResult, + ), + ), ); - } - } + }); break; } default: From 4a8350edcfc789c60844e2e2647f6a92610216a0 Mon Sep 17 00:00:00 2001 From: j03-dev <24nomeniavo@gmail.com> Date: Thu, 7 Nov 2024 07:39:55 +0300 Subject: [PATCH 7/7] commit befor test worflow only save method --- Cargo.lock | 1 + src/substantial/src/converters.rs | 6 +- src/typegate/engine/00_runtime.js | 1 + src/typegate/engine/Cargo.toml | 2 + src/typegate/engine/runtime.d.ts | 20 +- src/typegate/engine/src/ext.rs | 1 + src/typegate/engine/src/lib.rs | 1 - src/typegate/engine/src/python.rs | 99 ----- .../{substantial.rs => substantial/mod.rs} | 4 + .../runtimes/substantial/python_context.rs | 375 ++++++++++++++++++ .../src/runtimes/deno/shared_types.ts | 2 +- .../src/runtimes/substantial/agent.ts | 13 +- .../src/runtimes/substantial/worker.ts | 102 +++-- tests/runtimes/substantial/common.ts | 60 +-- 14 files changed, 509 insertions(+), 178 deletions(-) delete mode 100644 src/typegate/engine/src/python.rs rename src/typegate/engine/src/runtimes/{substantial.rs => substantial/mod.rs} (99%) create mode 100644 src/typegate/engine/src/runtimes/substantial/python_context.rs diff --git a/Cargo.lock b/Cargo.lock index f8e49a5133..8589adc7cb 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -12856,6 +12856,7 @@ dependencies = [ "query-engine-metrics", "regex", "request-handlers", + "reqwest", "schema-connector", "schema-core", "serde", diff --git a/src/substantial/src/converters.rs b/src/substantial/src/converters.rs index 5c40979617..217bc681af 100644 --- a/src/substantial/src/converters.rs +++ b/src/substantial/src/converters.rs @@ -87,7 +87,7 @@ pub struct Run { #[derive(Debug)] pub enum Interupt { Sleep, - Saveretry, + SaveRetry, WaitReceiveEvent, WaitHandleEvent, WaitEnsureValue, @@ -101,7 +101,7 @@ impl fmt::Display for Interupt { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { let variant = match self { Self::Sleep => "SLEEP", - Self::Saveretry => "SAVE_RETRY", + Self::SaveRetry => "SAVE_RETRY", Self::WaitReceiveEvent => "WAIT_RECEIVE_EVENT", Self::WaitHandleEvent => "WAIT_HANDLE_EVENT", Self::WaitEnsureValue => "WAIT_ENSURE_VALUE", @@ -277,7 +277,7 @@ impl Run { { let now = Utc::now(); if wait_until > &now { - bail!(Interupt::Saveretry); + bail!(Interupt::SaveRetry); } else { current_retry_count = *counter; } diff --git a/src/typegate/engine/00_runtime.js b/src/typegate/engine/00_runtime.js index 5cbf36de0c..62a5c4403f 100644 --- a/src/typegate/engine/00_runtime.js +++ b/src/typegate/engine/00_runtime.js @@ -83,6 +83,7 @@ const Meta = { contextSleep: getOp("op_context_sleep"), contextAppendEvent: getOp("op_context_append_event"), contextAppendOp: getOp("op_context_append_op"), + executePythonWithContext: getOp("op_execute_python_with_context"), }, grpc: { register: getOp("op_grpc_register"), diff --git a/src/typegate/engine/Cargo.toml b/src/typegate/engine/Cargo.toml index d6693f9aac..b92e669d4a 100644 --- a/src/typegate/engine/Cargo.toml +++ b/src/typegate/engine/Cargo.toml @@ -72,6 +72,8 @@ protobuf-json-mapping.workspace = true # python pyo3 = { workspace = true, features = ["extension-module"] } +reqwest = { workspace = true } + [dev-dependencies] env_logger.workspace = true diff --git a/src/typegate/engine/runtime.d.ts b/src/typegate/engine/runtime.d.ts index 869aa529bc..ae41549b51 100644 --- a/src/typegate/engine/runtime.d.ts +++ b/src/typegate/engine/runtime.d.ts @@ -1,6 +1,8 @@ // Copyright Metatype OÜ, licensed under the Elastic License 2.0. // SPDX-License-Identifier: Elastic-2.0 +import { TaskContext } from "../src/runtimes/deno/shared_types.ts"; + declare global { const Meta: MetaNS; } @@ -89,10 +91,9 @@ export type MetaNS = { contextSleep: (inp: SleepInput) => void; contextAppendEvent: (inp: AppendEventInput) => void; contextAppendOp: (inp: AppendOpInput) => void; - }; - - python: { - executePython: (inp: PythonExecutionInput) => PythonExecutionOutput; + executePythonWithContext: ( + inp: PythonExecutionInput, + ) => PythonExecutionOutput; }; }; @@ -431,14 +432,15 @@ export interface EnumerateAllChildrenInput { } export interface PythonExecutionInput { - python_module_path: string; - python_function_name: string; - executing_context: any; + run: Run; + internal: TaskContext; + kwargs: Record; + module_path: string; + function_name: string; } export interface PythonExecutionOutput { - execution_result: any; - execution_status: string; + wfResult: any; } export interface SaveInput { diff --git a/src/typegate/engine/src/ext.rs b/src/typegate/engine/src/ext.rs index 0b47234f16..9d1ea4830e 100644 --- a/src/typegate/engine/src/ext.rs +++ b/src/typegate/engine/src/ext.rs @@ -62,6 +62,7 @@ deno_core::extension!( substantial::op_context_save, substantial::op_context_sleep, substantial::op_context_append_event, + substantial::op_execute_python_with_context, // FIXME(yohe): this test broke and has proven difficult to fix // #[cfg(test)] diff --git a/src/typegate/engine/src/lib.rs b/src/typegate/engine/src/lib.rs index 9dc8915895..5d6ba03fcc 100644 --- a/src/typegate/engine/src/lib.rs +++ b/src/typegate/engine/src/lib.rs @@ -2,7 +2,6 @@ // SPDX-License-Identifier: Elastic-2.0 mod ext; -mod python; mod runtimes; // mod snapshot; mod typegraph; diff --git a/src/typegate/engine/src/python.rs b/src/typegate/engine/src/python.rs deleted file mode 100644 index fc91e1084e..0000000000 --- a/src/typegate/engine/src/python.rs +++ /dev/null @@ -1,99 +0,0 @@ -// Copyright Metatype OÜ, licensed under the Elastic License 2.0. -// SPDX-License-Identifier: Elastic-2.0 - -use anyhow::Result; -use pyo3::prelude::*; -use serde::{Deserialize, Serialize}; -use serde_json::Value; -#[rustfmt::skip] -use deno_core as deno_core; - -#[derive(Deserialize)] -#[serde(crate = "serde")] -pub struct PythonExecutionInput { - python_module_path: String, - python_function_name: String, - execution_context: Value, -} - -#[derive(Serialize)] -#[serde(crate = "serde")] -pub struct PythonExecutionOutput { - execution_result: Value, - execution_status: String, -} - -fn convert_json_to_python_object(py: Python, json_value: &Value) -> PyResult { - match json_value { - Value::Null => Ok(py.None()), - Value::Bool(boolean_value) => Ok(boolean_value.into_py(py)), - Value::Number(numeric_value) => { - if let Some(integer_value) = numeric_value.as_i64() { - Ok(integer_value.into_py(py)) - } else if let Some(float_value) = numeric_value.as_f64() { - Ok(float_value.into_py(py)) - } else { - Err(PyErr::new::( - "Unsupported numeric type in JSON", - )) - } - } - Value::String(string_value) => Ok(string_value.into_py(py)), - Value::Array(array_value) => { - let python_list = pyo3::types::PyList::empty_bound(py); - for item in array_value { - python_list.append(convert_json_to_python_object(py, item)?)?; - } - Ok(python_list.into()) - } - Value::Object(map_value) => { - let python_dict = pyo3::types::PyDict::new_bound(py); - for (key, value) in map_value { - python_dict.set_item(key, convert_json_to_python_object(py, value)?)?; - } - Ok(python_dict.into()) - } - } -} - -pub fn execute_python_function( - module_path: String, - function_name: String, - context: Value, -) -> Result { - Python::with_gil(|py| { - let python_code = std::fs::read_to_string(module_path.clone()) - .map_err(|e| PyErr::new::(e.to_string()))?; - - let python_module = - PyModule::from_code_bound(py, &python_code, &module_path, "dynamic_module")?; - let python_function = python_module.getattr(function_name.as_str())?; - - let python_context = convert_json_to_python_object(py, &context)?; - let execution_result = python_function.call1((python_context,))?; - - Ok(execution_result.into()) - }) -} - -#[deno_core::op2] -#[serde] -pub fn op_execute_python_with_context( - #[serde] input: PythonExecutionInput, -) -> Result { - let execution_result = execute_python_function( - input.python_module_path, - input.python_function_name, - input.execution_context, - )?; - - let result_value = Python::with_gil(|py| { - let result_str = execution_result.clone_ref(py).to_string(); - serde_json::from_str(&result_str).unwrap_or(Value::Null) - }); - - Ok(PythonExecutionOutput { - execution_result: result_value, - execution_status: "SUCCESS".to_string(), - }) -} diff --git a/src/typegate/engine/src/runtimes/substantial.rs b/src/typegate/engine/src/runtimes/substantial/mod.rs similarity index 99% rename from src/typegate/engine/src/runtimes/substantial.rs rename to src/typegate/engine/src/runtimes/substantial/mod.rs index 4b7ff1d6e3..e3206827b2 100644 --- a/src/typegate/engine/src/runtimes/substantial.rs +++ b/src/typegate/engine/src/runtimes/substantial/mod.rs @@ -3,6 +3,10 @@ // Copyright Metatype OÜ, licensed under the Elastic License 2.0. // SPDX-License-Identifier: Elastic-2.0 +mod python_context; + +pub use python_context::op_execute_python_with_context; + use crate::interlude::*; use chrono::{DateTime, Utc}; diff --git a/src/typegate/engine/src/runtimes/substantial/python_context.rs b/src/typegate/engine/src/runtimes/substantial/python_context.rs new file mode 100644 index 0000000000..39c3d357d3 --- /dev/null +++ b/src/typegate/engine/src/runtimes/substantial/python_context.rs @@ -0,0 +1,375 @@ +// Copyright Metatype OÜ, licensed under the Elastic License 2.0. +// SPDX-License-Identifier: Elastic-2.0 + +use anyhow::{bail, Result}; +// use common::graphql::Query; +use pyo3::{exceptions::PyException, prelude::*}; +use reqwest::{Client, Method, Request, RequestBuilder, Url}; +use serde::{Deserialize, Serialize}; +use serde_json::Value; +use std::{collections::HashMap, str::FromStr}; +#[rustfmt::skip] +use deno_core as deno_core; + +use chrono::{TimeZone, Utc}; + +use substantial::converters::{Interupt, Operation, OperationEvent, Run, SaveOutput, SavedValue}; + +trait ToPyObject { + fn to_object(&self, python: Python) -> PyObject; +} + +impl ToPyObject for Value { + fn to_object(&self, py: Python) -> PyObject { + match self { + Value::Null => py.None(), + Value::Bool(b) => b.into_py(py), + Value::Number(n) => { + if let Some(i) = n.as_i64() { + i.into_py(py) + } else if let Some(f) = n.as_f64() { + f.into_py(py) + } else { + n.to_string().into_py(py) + } + } + Value::String(s) => s.into_py(py), + Value::Array(arr) => arr + .iter() + .map(|v| v.to_object(py)) + .collect::>() + .into_py(py), + Value::Object(map) => { + let dict = pyo3::types::PyDict::new_bound(py); + for (k, v) in map { + dict.set_item(k, v.to_object(py)).unwrap(); + } + dict.into_py(py) + } + } + } +} + +#[derive(Deserialize)] +struct Context; + +#[derive(Deserialize)] +struct Meta { + url: String, + token: String, +} + +#[derive(Deserialize)] +struct TaskContext { + #[allow(dead_code)] + parent: Option>, + #[allow(dead_code)] + context: Option, + #[allow(dead_code)] + secrets: HashMap, + meta: Meta, + #[allow(dead_code)] + headers: HashMap, +} + +fn create_gql_client(internal: TaskContext) -> Result { + let url = Url::from_str(&internal.meta.url)?; + let request = Request::new(Method::POST, url); + let gql_client = + RequestBuilder::from_parts(Client::new(), request).bearer_auth(internal.meta.token); + + Ok(gql_client) +} + +#[derive(Clone, Debug)] +enum Strategy { + #[allow(dead_code)] + Linear, +} + +#[pyclass(frozen)] +#[derive(Clone)] +struct Retry { + strategy: Option, + min_backoff_ms: i32, + max_backoff_ms: i32, + max_retries: i32, +} + +struct RetryStrategy { + min_backoff_ms: Option, + max_backoff_ms: Option, + max_retries: i32, +} + +impl RetryStrategy { + fn new( + max_retries: i32, + min_backoff_ms: Option, + max_backoff_ms: Option, + ) -> Result { + if max_retries < 1 { + bail!("max_retries < 1".to_string()); + } + + let mut strategy = RetryStrategy { + min_backoff_ms, + max_backoff_ms, + max_retries, + }; + + let low = strategy.min_backoff_ms; + let high = strategy.max_backoff_ms; + if let (Some(low), Some(high)) = (low, high) { + if low >= high { + bail!("min_backoff_ms >= max_backoff_ms".to_string()); + } + if low < 0 { + bail!("min_backoff_ms < 0".to_string()); + } + } else if low.is_some() && high.is_none() { + strategy.max_backoff_ms = Some(low.unwrap() + 10); + } else if low.is_none() && high.is_some() { + strategy.min_backoff_ms = Some(high.unwrap().saturating_sub(10)); + } + + Ok(strategy) + } + + fn eval(&self, strategy: Strategy, retries_left: i32) -> Result { + match strategy { + Strategy::Linear => Ok(self.linear(retries_left)?), + } + } + + fn linear(&self, retries_left: i32) -> Result { + if retries_left <= 0_i32 { + bail!("retries left <= 0"); + } + + let dt = self.max_backoff_ms.unwrap_or(0) - self.min_backoff_ms.unwrap_or(0); + Ok(((self.max_retries - retries_left) * dt) / self.max_retries) + } +} + +#[pyclass(frozen)] +#[derive(Clone)] +struct Save { + #[allow(dead_code)] + timeout_ms: Option, + retry: Option, +} + +#[pymethods] +impl Save { + #[getter] + fn get_retry(&self) -> Option { + self.retry.clone() + } +} + +#[pyclass] +struct PythonContext { + #[allow(dead_code)] + query: RequestBuilder, + run: Run, + #[allow(dead_code)] + kwargs: HashMap, +} + +#[pymethods] +impl PythonContext { + #[pyo3(signature = (func, option = None))] + fn save(&mut self, py: Python, func: PyObject, option: Option>) -> PyResult { + let SaveOutput { + payload, + current_retry_count, + } = self + .run + .save() + .map_err(|e| PyException::new_err(e.to_string()))?; + + if let Some(payload) = payload { + return Ok(payload.to_string()); + } + + let current_retry_count = current_retry_count.unwrap_or(1); + + match func.call0(py) { + Ok(result) => { + let op = OperationEvent::Save { + id: self.run.id, + value: SavedValue::Resolved { + payload: serde_json::json!(result.to_string()), + }, + }; + self.run.append_op(op); + + Ok(result.to_string()) + } + Err(err) => { + if let Some(option) = option { + if let Some(retry) = option.get().retry.clone() { + let Retry { + max_retries, + max_backoff_ms, + min_backoff_ms, + .. + } = retry; + if current_retry_count < max_retries { + let strategy = RetryStrategy::new( + max_retries, + Some(min_backoff_ms), + Some(max_backoff_ms), + ) + .map_err(|err| PyException::new_err(err.to_string()))?; + + let retries_left = + std::cmp::max(retry.max_retries - current_retry_count, 0); + let delay_ms = strategy + .eval(retry.strategy.unwrap(), retries_left) + .map_err(|err| PyException::new_err(err.to_string()))?; + let wait_until_as_ms = std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap() + .as_millis() + as i32 + + delay_ms; + + let op = OperationEvent::Save { + id: self.run.id, + value: SavedValue::Retry { + counter: current_retry_count, + wait_until: Utc + .timestamp_millis_opt(wait_until_as_ms as i64) + .unwrap(), + }, + }; + self.run.append_op(op); + } + return Err(PyException::new_err(format!("{}", Interupt::SaveRetry))); + } else { + let op = OperationEvent::Save { + id: self.run.id, + value: SavedValue::Failed { + err: serde_json::json!({ + "retries": current_retry_count, + "message": err.to_string() + }), + }, + }; + self.run.append_op(op); + } + } + + Err(err) + } + } + } + + fn sleep(&mut self, durration_ms: i32) -> PyResult<()> { + self.run + .sleep(durration_ms) + .map_err(|err| PyException::new_err(err.to_string())) + } + + fn append_event(&mut self, event_name: String, payload: PyObject) -> PyResult<()> { + self.run + .append_event(event_name, serde_json::json!(payload.to_string())); + Ok(()) + } + + fn receive(&self, event_name: String) -> PyResult { + for Operation { event, .. } in self.run.operations.iter() { + if let OperationEvent::Send { + event_name: ref sent_name, + value, + } = event + { + if event_name == *sent_name { + return Ok(value.to_string()); + } + } + } + + Err(PyException::new_err(format!( + "{}", + Interupt::WaitReceiveEvent + ))) + } + + fn handle(&mut self, py: Python, event_name: String, func: PyObject) -> PyResult { + for Operation { event, .. } in self.run.operations.iter() { + if let OperationEvent::Send { + event_name: ref sent_name, + value, + } = event + { + if event_name == *sent_name { + let func = func.call1(py, (value.to_object(py),))?; + return self.save(py, func, None); + } + } + } + + Err(PyException::new_err(format!( + "{}", + Interupt::WaitHandleEvent + ))) + } +} + +fn execute_python_function( + module_path: String, + function_name: String, + python_context: PythonContext, +) -> Result { + Python::with_gil(|py| { + let python_code = std::fs::read_to_string(module_path.clone()) + .map_err(|e| PyErr::new::(e.to_string()))?; + + let python_module = + PyModule::from_code_bound(py, &python_code, &module_path, "dynamic_module")?; + + let python_function = python_module.getattr(function_name.as_str())?; + + let execution_result = python_function.call1((python_context,))?; + + Ok(execution_result.into()) + }) +} + +#[derive(Deserialize)] +#[serde(crate = "serde")] +pub struct PythonExecutionInput { + run: Run, + internal: TaskContext, + kwargs: HashMap, + module_path: String, + funciton_name: String, +} + +#[derive(Serialize)] +#[serde(crate = "serde")] +pub struct PythonExecutionOutput { + result: Value, +} + +#[deno_core::op2] +#[serde] +pub fn op_execute_python_with_context( + #[serde] input: PythonExecutionInput, +) -> Result { + // this is a python Class in pyO3 + let python_context = PythonContext { + query: create_gql_client(input.internal)?, + run: input.run, + kwargs: input.kwargs, + }; + + let output = execute_python_function(input.module_path, input.funciton_name, python_context)?; + + let result = serde_json::json!(output.to_string()); + + Ok(PythonExecutionOutput { result }) +} diff --git a/src/typegate/src/runtimes/deno/shared_types.ts b/src/typegate/src/runtimes/deno/shared_types.ts index 7bbb446fe0..d8e9b924b0 100644 --- a/src/typegate/src/runtimes/deno/shared_types.ts +++ b/src/typegate/src/runtimes/deno/shared_types.ts @@ -70,6 +70,6 @@ export interface TaskExec { ( args: Record, context: TaskContext, - helpers: Record + helpers: Record, ): unknown; } diff --git a/src/typegate/src/runtimes/substantial/agent.ts b/src/typegate/src/runtimes/substantial/agent.ts index a37f47e4bc..c97c76bc38 100644 --- a/src/typegate/src/runtimes/substantial/agent.ts +++ b/src/typegate/src/runtimes/substantial/agent.ts @@ -8,7 +8,6 @@ import { import { getLogger } from "../../log.ts"; import { TaskContext } from "../deno/shared_types.ts"; import { - appendIfOngoing, Interrupt, Kind, Result, @@ -224,7 +223,7 @@ export class Agent { } if (newEventOp) { - appendIfOngoing(run, newEventOp); + Meta.substantial.contextAppendOp({ run, op: newEventOp?.event }); } if (run.operations.length == 0) { @@ -325,7 +324,9 @@ export class Agent { default: logger.error( `Fatal: invalid type ${answer.type} sent by "${runId}": ${ - JSON.stringify(answer.data) + JSON.stringify( + answer.data, + ) }`, ); } @@ -401,9 +402,9 @@ export class Agent { // Note: run is a one-time value, thus can be mutated - appendIfOngoing(run, { - at: new Date().toJSON(), - event: { + Meta.substantial.contextAppendOp({ + run, + op: { type: "Stop", result: { [rustResult]: result ?? null, diff --git a/src/typegate/src/runtimes/substantial/worker.ts b/src/typegate/src/runtimes/substantial/worker.ts index 8a3e6433e0..857d571f70 100644 --- a/src/typegate/src/runtimes/substantial/worker.ts +++ b/src/typegate/src/runtimes/substantial/worker.ts @@ -17,21 +17,66 @@ self.onmessage = async function (event) { schedule, kwargs, internal, + kind, } = data as TaskData; // FIXME: handle case when script is missing and notify WorkerManager so it cleans up // its registry. const module = await import(modulePath); - runCtx = new Context(run, kwargs, internal); - const workflowFn = module[functionName]; + if (kind == "DENO") { + runCtx = new Context(run, kwargs, internal); + const workflowFn = module[functionName]; - if (typeof workflowFn !== "function") { - self.postMessage(Err(`Function "${functionName}" not found`)); - self.close(); - return; - } - workflowFn(runCtx) - .then((wfResult: unknown) => { + if (typeof workflowFn !== "function") { + self.postMessage(Err(`Function "${functionName}" not found`)); + self.close(); + return; + } + workflowFn(runCtx) + .then((wfResult: unknown) => { + self.postMessage( + Ok( + Msg( + type, + { + kind: "SUCCESS", + result: wfResult, + run: runCtx!.getRun(), + schedule, + } satisfies WorkflowResult, + ), + ), + ); + }) + .catch((wfException: unknown) => { + self.postMessage( + Ok( + Msg( + type, + { + kind: "FAIL", + result: wfException instanceof Error + ? wfException.message + : JSON.stringify(wfException), + exception: wfException instanceof Error + ? wfException + : undefined, + run: runCtx!.getRun(), + schedule, + } satisfies WorkflowResult, + ), + ), + ); + }); + } else if (kind == "PYTHON") { + try { + const { wfResult } = Meta.substantial.executePythonWithContext({ + run, + internal, + kwargs, + module_path: modulePath, + function_name: functionName, + }); self.postMessage( Ok( Msg( @@ -39,33 +84,32 @@ self.onmessage = async function (event) { { kind: "SUCCESS", result: wfResult, - run: runCtx!.getRun(), + run, schedule, } satisfies WorkflowResult, ), ), ); - }) - .catch((wfException: unknown) => { + } catch (wfException) { self.postMessage( - Ok( - Msg( - type, - { - kind: "FAIL", - result: wfException instanceof Error - ? wfException.message - : JSON.stringify(wfException), - exception: wfException instanceof Error - ? wfException - : undefined, - run: runCtx!.getRun(), - schedule, - } satisfies WorkflowResult, - ), - ), + Ok(Msg( + type, + { + kind: "FAIL", + result: wfException instanceof Error + ? wfException.message + : JSON.stringify(wfException), + exception: wfException instanceof Error + ? wfException + : undefined, + run, + schedule, + } satisfies WorkflowResult, + )), ); - }); + } + } + break; } default: diff --git a/tests/runtimes/substantial/common.ts b/tests/runtimes/substantial/common.ts index 3cc42b1f5d..92f78a0e55 100644 --- a/tests/runtimes/substantial/common.ts +++ b/tests/runtimes/substantial/common.ts @@ -1,4 +1,4 @@ -import { assertExists, assertEquals } from "@std/assert"; +import { assertEquals, assertExists } from "@std/assert"; import { connect, parseURL } from "redis"; import { gql, Meta, sleep } from "../../utils/mod.ts"; import { MetaTestCleanupFn } from "test-utils/test.ts"; @@ -33,7 +33,7 @@ export function basicTestTemplate( }; secrets?: Record; }, - cleanup?: MetaTestCleanupFn + cleanup?: MetaTestCleanupFn, ) { Meta.test( { @@ -63,11 +63,11 @@ export function basicTestTemplate( currentRunId = body.data?.start_sleep! as string; assertExists( currentRunId, - "Run id was not returned when workflow was started" + "Run id was not returned when workflow was started", ); }) .on(e); - } + }, ); // Let interrupts to do their jobs for a bit @@ -101,7 +101,7 @@ export function basicTestTemplate( }, }) .on(e); - } + }, ); await sleep(delays.awaitSleepCompleteSec * 1000); @@ -145,9 +145,9 @@ export function basicTestTemplate( }, }) .on(e); - } + }, ); - } + }, ); } @@ -162,7 +162,7 @@ export function concurrentWorkflowTestTemplate( }; secrets?: Record; }, - cleanup?: MetaTestCleanupFn + cleanup?: MetaTestCleanupFn, ) { Meta.test( { @@ -207,7 +207,7 @@ export function concurrentWorkflowTestTemplate( runIds.push(...[one, two, three]); }) .on(e); - } + }, ); // let's wait for a bit to make sure interrupts are doing their jobs @@ -243,7 +243,7 @@ export function concurrentWorkflowTestTemplate( three: [runIds[2]], }) .on(e); - } + }, ); // This is arbitrary, if ops are leaking that means it should be increased @@ -277,20 +277,20 @@ export function concurrentWorkflowTestTemplate( assertEquals( body?.data?.results?.ongoing?.count, 0, - `0 workflow currently running (${backendName})` + `0 workflow currently running (${backendName})`, ); assertEquals( body?.data?.results?.completed?.count, 3, - `3 workflows completed (${backendName})` + `3 workflows completed (${backendName})`, ); const localSorter = (a: any, b: any) => a.run_id.localeCompare(b.run_id); - const received = - body?.data?.results?.completed?.runs ?? ([] as Array); + const received = body?.data?.results?.completed?.runs ?? + ([] as Array); const expected = [ { result: { @@ -318,12 +318,12 @@ export function concurrentWorkflowTestTemplate( assertEquals( received.sort(localSorter), expected.sort(localSorter), - `All three workflows have completed, including the aborted one (${backendName})` + `All three workflows have completed, including the aborted one (${backendName})`, ); }) .on(e); }); - } + }, ); } @@ -338,7 +338,7 @@ export function retrySaveTestTemplate( }; secrets?: Record; }, - cleanup?: MetaTestCleanupFn + cleanup?: MetaTestCleanupFn, ) { Meta.test( { @@ -381,7 +381,7 @@ export function retrySaveTestTemplate( assertExists(retryAbortMeId, "retry_abort_me runId"); }) .on(e); - } + }, ); await sleep(1000); @@ -401,7 +401,7 @@ export function retrySaveTestTemplate( abort_retry: [retryAbortMeId], }) .on(e); - } + }, ); // Waiting for the retry to finish @@ -436,20 +436,20 @@ export function retrySaveTestTemplate( assertEquals( body?.data?.results?.ongoing?.count, 0, - `0 workflow currently running (${backendName})` + `0 workflow currently running (${backendName})`, ); assertEquals( body?.data?.results?.completed?.count, 4, - `4 workflows completed (${backendName})` + `4 workflows completed (${backendName})`, ); const localSorter = (a: any, b: any) => a.run_id.localeCompare(b.run_id); - const received = - body?.data?.results?.completed?.runs ?? ([] as Array); + const received = body?.data?.results?.completed?.runs ?? + ([] as Array); const expected = [ { result: { @@ -484,13 +484,13 @@ export function retrySaveTestTemplate( assertEquals( received.sort(localSorter), expected.sort(localSorter), - `All workflows have completed (${backendName})` + `All workflows have completed (${backendName})`, ); }) .on(e); - } + }, ); - } + }, ); } @@ -505,7 +505,7 @@ export function childWorkflowTestTemplate( }; secrets?: Record; }, - cleanup?: MetaTestCleanupFn + cleanup?: MetaTestCleanupFn, ) { Meta.test( { @@ -522,7 +522,7 @@ export function childWorkflowTestTemplate( "runtimes/substantial/substantial_child_workflow.py", { secrets, - } + }, ); const packages = [ @@ -543,7 +543,7 @@ export function childWorkflowTestTemplate( parentRunId = body.data?.start! as string; assertExists( parentRunId, - "Run id was not returned when workflow was started" + "Run id was not returned when workflow was started", ); }) .on(e); @@ -610,7 +610,7 @@ export function childWorkflowTestTemplate( }) .on(e); }); - } + }, ); }