diff --git a/Cargo.lock b/Cargo.lock index 193fe71cb9..9181304b80 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -9075,6 +9075,69 @@ dependencies = [ "unicase", ] +[[package]] +name = "pyo3" +version = "0.22.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3d922163ba1f79c04bc49073ba7b32fd5a8d3b76a87c955921234b8e77333c51" +dependencies = [ + "cfg-if", + "indoc", + "libc", + "memoffset 0.9.1", + "once_cell", + "portable-atomic", + "pyo3-build-config", + "pyo3-ffi", + "pyo3-macros", + "unindent", +] + +[[package]] +name = "pyo3-build-config" +version = "0.22.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bc38c5feeb496c8321091edf3d63e9a6829eab4b863b4a6a65f26f3e9cc6b179" +dependencies = [ + "once_cell", + "target-lexicon", +] + +[[package]] +name = "pyo3-ffi" +version = "0.22.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "94845622d88ae274d2729fcefc850e63d7a3ddff5e3ce11bd88486db9f1d357d" +dependencies = [ + "libc", + "pyo3-build-config", +] + +[[package]] +name = "pyo3-macros" +version = "0.22.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e655aad15e09b94ffdb3ce3d217acf652e26bbc37697ef012f5e5e348c716e5e" +dependencies = [ + "proc-macro2", + "pyo3-macros-backend", + "quote", + "syn 2.0.71", +] + +[[package]] +name = "pyo3-macros-backend" +version = "0.22.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ae1e3f09eecd94618f60a455a23def79f79eba4dc561a97324bf9ac8c6df30ce" +dependencies = [ + "heck 0.5.0", + "proc-macro2", + "pyo3-build-config", + "quote", + "syn 2.0.71", +] + [[package]] name = "quaint" version = "0.2.0-alpha.13" @@ -12788,14 +12851,17 @@ dependencies = [ "protobuf", "protobuf-json-mapping", "psl", + "pyo3", "query-connector", "query-core", "query-engine-metrics", "regex", "request-handlers", + "reqwest", "schema-connector", "schema-core", "serde", + "serde_json", "shadow-rs", "substantial", "tap", diff --git a/Cargo.toml b/Cargo.toml index 4deca359b8..9ca4016faf 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -11,7 +11,7 @@ members = [ "src/typegate/standalone", "src/typegraph/core", "src/xtask", - "src/substantial" + "src/substantial", ] exclude = [ @@ -69,19 +69,19 @@ sha2 = "0.10.8" seahash = "4.1.0" # patterns -anyhow = "1.0.89" # FIXME: replace anyhow with eyre +anyhow = "1.0.89" # FIXME: replace anyhow with eyre color-eyre = "0.6.3" -eyre = "0.6.12" # NOTE: keep in sync with verison used by color-eyre +eyre = "0.6.12" # NOTE: keep in sync with verison used by color-eyre thiserror = "1.0.64" indoc = "2.0.5" unindent = "0.2.3" itertools = "0.13.0" -lazy_static = "1.5.0" # FIXME: replace with Lazy Cell +lazy_static = "1.5.0" # FIXME: replace with Lazy Cell crossbeam-channel = "0.5.13" enum_dispatch = "0.3.13" tap = "1.0.1" derive_more = { version = "1", features = ["from"] } -cached = "0.53.1" # FIXME: replace usage with a Lazy Cell + dashmap +cached = "0.53.1" # FIXME: replace usage with a Lazy Cell + dashmap garde = "0.20" paste = "1.0.15" @@ -116,7 +116,7 @@ indexmap = { version = "2.6.0", features = ["serde"] } semver = "1.0.23" dashmap = "6.1.0" connection-string = "0.2.0" -chrono = { version = "0.4.38", features = ["serde"] } +chrono = { version = "0.4.38", features = ["serde"] } tera = { version = "1.20", default-features = false } ordered-float = "4.3.0" graphql-parser = "0.4.0" @@ -150,7 +150,7 @@ tracing-unwrap = { version = "1.0.1", features = ["log-location"] } tracing-appender = "0.2.3" # async -futures = "=0.3.30" # pinned due to bug with .31 with zeromq (deno) +futures = "=0.3.30" # pinned due to bug with .31 with zeromq (deno) futures-concurrency = "7.6" futures-lite = "2.3" tokio = { version = "1", features = ["parking_lot"] } @@ -164,7 +164,9 @@ temporal-sdk-core-protos = { git = "https://github.com/temporalio/sdk-core", rev # prisma query-core = { git = "https://github.com/metatypedev/prisma-engines", branch = "fix/version-compat" } query-connector = { git = "https://github.com/metatypedev/prisma-engines", branch = "fix/version-compat" } -request-handlers = { git = "https://github.com/metatypedev/prisma-engines", features = ["all"], branch = "fix/version-compat" } +request-handlers = { git = "https://github.com/metatypedev/prisma-engines", features = [ + "all", +], branch = "fix/version-compat" } datamodel-renderer = { git = "https://github.com/metatypedev/prisma-engines", branch = "fix/version-compat" } user-facing-errors = { git = "https://github.com/metatypedev/prisma-engines", branch = "fix/version-compat" } query-engine-metrics = { git = "https://github.com/metatypedev/prisma-engines", branch = "fix/version-compat" } @@ -190,6 +192,9 @@ protobuf = "3.6.0" protobuf-json-mapping = "3.6.0" proto-parser = { git = "https://github.com/metatypedev/proto-parser", branch = "main" } +# python +pyo3 = { version = "0.22.5" } + # test assert_cmd = "2.0.16" pretty_assertions = "1.4.1" diff --git a/src/substantial/src/converters.rs b/src/substantial/src/converters.rs index 3c146ca090..217bc681af 100644 --- a/src/substantial/src/converters.rs +++ b/src/substantial/src/converters.rs @@ -1,7 +1,8 @@ -use std::collections::HashMap; +use serde_json::Value; +use std::{collections::HashMap, fmt}; -use anyhow::{bail, Context, Ok, Result}; -use chrono::{DateTime, Utc}; +use anyhow::{bail, Context, Result}; +use chrono::{DateTime, Duration, Utc}; use protobuf::{ well_known_types::{ @@ -78,13 +79,126 @@ pub struct Operation { /// Each operation is produced from the workflow execution #[derive(Serialize, Deserialize, Clone, Debug)] pub struct Run { + pub id: u32, pub run_id: String, pub operations: Vec, } +#[derive(Debug)] +pub enum Interupt { + Sleep, + SaveRetry, + WaitReceiveEvent, + WaitHandleEvent, + WaitEnsureValue, +} + +impl Interupt { + const PREFIX: &'static str = "SUBSTANTIAL_INTERRUPT_"; +} + +impl fmt::Display for Interupt { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + let variant = match self { + Self::Sleep => "SLEEP", + Self::SaveRetry => "SAVE_RETRY", + Self::WaitReceiveEvent => "WAIT_RECEIVE_EVENT", + Self::WaitHandleEvent => "WAIT_HANDLE_EVENT", + Self::WaitEnsureValue => "WAIT_ENSURE_VALUE", + }; + write!(f, "{}{:?}", Self::PREFIX, variant) + } +} + +impl std::error::Error for Interupt {} + +pub enum Strategy { + Linear, +} + +pub struct Retry { + pub strategy: Option, + pub min_backoff_ms: i32, + pub max_backoff_ms: i32, + pub max_retries: i32, +} + +pub struct RetryStrategy { + min_backoff_ms: Option, + max_backoff_ms: Option, + max_retries: i32, +} + +impl RetryStrategy { + pub fn new( + max_retries: i32, + min_backoff_ms: Option, + max_backoff_ms: Option, + ) -> anyhow::Result { + if max_retries < 1 { + anyhow::bail!("maxRetries < 1"); + } + + let mut min_ms = min_backoff_ms; + let mut max_ms = max_backoff_ms; + + match (min_ms, max_ms) { + (Some(low), Some(high)) => { + if low >= high { + anyhow::bail!("minBackoffMs >= maxBackoffMs"); + } + if low < 0 { + anyhow::bail!("minBackoffMs < 0"); + } + } + (Some(low), None) => { + max_ms = Some(low + 10); + } + (None, Some(high)) => { + min_ms = Some(0.max(high - 10)); + } + (None, None) => {} + } + + Ok(Self { + min_backoff_ms: min_ms, + max_backoff_ms: max_ms, + max_retries, + }) + } + + pub fn eval(&self, strategy: Strategy, retries_left: i32) -> anyhow::Result { + match strategy { + Strategy::Linear => self.linear(retries_left), + // Add more strategy matches here + } + } + + fn linear(&self, retries_left: i32) -> anyhow::Result { + if retries_left <= 0 { + anyhow::bail!("retries left <= 0"); + } + + let dt = self.max_backoff_ms.unwrap_or(0) - self.min_backoff_ms.unwrap_or(0); + Ok(((self.max_retries - retries_left) * dt) / self.max_retries) + } +} + +pub struct Save { + pub timeout_ms: Option, + pub retry: Option, +} + +#[derive(Serialize)] +pub struct SaveOutput { + pub payload: Option, + pub current_retry_count: Option, +} + impl Run { pub fn new(run_id: String) -> Self { Self { + id: 0, run_id, operations: vec![], } @@ -125,6 +239,97 @@ impl Run { pub fn reset(&mut self) { self.operations = vec![]; } + + pub fn next_id(&mut self) -> u32 { + self.id += 1; + self.id + } + + pub fn append_op(&mut self, op: OperationEvent) { + let has_stopped = self + .operations + .iter() + .any(|op| matches!(op.event, OperationEvent::Stop { .. })); + if !has_stopped { + self.operations.push(Operation { + at: Utc::now(), + event: op, + }); + } + } + + pub fn save(&mut self) -> Result { + let next_id = self.next_id(); + let mut current_retry_count: i32 = 1; + + for Operation { event, .. } in self.operations.iter() { + if let OperationEvent::Save { id, value } = event { + if *id == next_id { + if let SavedValue::Resolved { payload } = value { + return Ok(SaveOutput { + payload: Some(payload.clone()), + current_retry_count: None, + }); + } else if let SavedValue::Retry { + counter, + wait_until, + } = value + { + let now = Utc::now(); + if wait_until > &now { + bail!(Interupt::SaveRetry); + } else { + current_retry_count = *counter; + } + } + } + } + } + + Ok(SaveOutput { + payload: None, + current_retry_count: Some(current_retry_count), + }) + } + + pub fn sleep(&mut self, duration_ms: i32) -> Result<()> { + let next_id = self.next_id(); + for Operation { event, .. } in self.operations.iter() { + if let OperationEvent::Sleep { id, end, .. } = event { + if next_id == *id { + if end <= &Utc::now() { + return Ok(()); + } else { + bail!(Interupt::Sleep); + } + } + } + } + + let start = Utc::now(); + + let end = start + Duration::milliseconds(start.timestamp() + duration_ms as i64); + + self.operations.push(Operation { + at: start, + event: OperationEvent::Sleep { + id: next_id, + start, + end, + }, + }); + bail!(Interupt::Sleep); + } + + pub fn append_event(&mut self, event_name: String, payload: Value) { + self.operations.push(Operation { + at: Utc::now(), + event: OperationEvent::Send { + event_name, + value: payload, + }, + }); + } } impl TryFrom for Operation { diff --git a/src/typegate/engine/00_runtime.js b/src/typegate/engine/00_runtime.js index 412d71c056..62a5c4403f 100644 --- a/src/typegate/engine/00_runtime.js +++ b/src/typegate/engine/00_runtime.js @@ -73,8 +73,17 @@ const Meta = { metadataAppend: getOp("op_sub_metadata_append"), metadataWriteWorkflowLink: getOp("op_sub_metadata_write_workflow_link"), metadataReadWorkflowLinks: getOp("op_sub_metadata_read_workflow_links"), - metadataWriteParentChildLink: getOp("op_sub_metadata_write_parent_child_link"), - metadataEnumerateAllChildren: getOp("op_sub_metadata_enumerate_all_children"), + metadataWriteParentChildLink: getOp( + "op_sub_metadata_write_parent_child_link", + ), + metadataEnumerateAllChildren: getOp( + "op_sub_metadata_enumerate_all_children", + ), + contextSave: getOp("op_context_save"), + contextSleep: getOp("op_context_sleep"), + contextAppendEvent: getOp("op_context_append_event"), + contextAppendOp: getOp("op_context_append_op"), + executePythonWithContext: getOp("op_execute_python_with_context"), }, grpc: { register: getOp("op_grpc_register"), @@ -83,6 +92,4 @@ const Meta = { }, }; - - globalThis.Meta = Meta; diff --git a/src/typegate/engine/Cargo.toml b/src/typegate/engine/Cargo.toml index 6567b7cfb1..b92e669d4a 100644 --- a/src/typegate/engine/Cargo.toml +++ b/src/typegate/engine/Cargo.toml @@ -18,6 +18,7 @@ tracing.workspace = true # encoding serde.workspace = true +serde_json.workspace = true regex.workspace = true zstd.workspace = true base64.workspace = true @@ -68,6 +69,11 @@ bytes.workspace = true protobuf.workspace = true protobuf-json-mapping.workspace = true +# python +pyo3 = { workspace = true, features = ["extension-module"] } + +reqwest = { workspace = true } + [dev-dependencies] env_logger.workspace = true diff --git a/src/typegate/engine/runtime.d.ts b/src/typegate/engine/runtime.d.ts index 109491903d..ae41549b51 100644 --- a/src/typegate/engine/runtime.d.ts +++ b/src/typegate/engine/runtime.d.ts @@ -1,6 +1,8 @@ // Copyright Metatype OÜ, licensed under the Elastic License 2.0. // SPDX-License-Identifier: Elastic-2.0 +import { TaskContext } from "../src/runtimes/deno/shared_types.ts"; + declare global { const Meta: MetaNS; } @@ -17,7 +19,7 @@ export type MetaNS = { unregisterEngine: (engine_name: string) => Promise; query: (inp: PrismaQueryInp) => Promise; diff: ( - inp: PrismaDiffInp + inp: PrismaDiffInp, ) => Promise<[string, ParsedDiff[]] | undefined | null>; apply: (inp: PrismaDevInp) => Promise; deploy: (inp: PrismaDeployInp) => Promise; @@ -34,7 +36,7 @@ export type MetaNS = { workflowSignal: (inp: TemporalWorkflowSignalInput) => Promise; workflowQuery: (inp: TemporalWorkflowQueryInput) => Promise>; workflowDescribe: ( - inp: TemporalWorkflowDescribeInput + inp: TemporalWorkflowDescribeInput, ) => Promise; }; @@ -43,12 +45,12 @@ export type MetaNS = { componentPath: string, instanceId: string, args: WitWireInitArgs, - cb: (op_name: string, json: string) => Promise + cb: (op_name: string, json: string) => Promise, ) => Promise; destroy: (instanceId: string) => Promise; handle: ( instanceId: string, - args: WitWireReq + args: WitWireReq, ) => Promise; }; @@ -63,7 +65,7 @@ export type MetaNS = { storePersistRun: (inp: PersistRunInput) => Promise; storeAddSchedule: (inp: AddScheduleInput) => Promise; storeReadSchedule: ( - inp: ReadOrCloseScheduleInput + inp: ReadOrCloseScheduleInput, ) => Promise; storeCloseSchedule: (inp: ReadOrCloseScheduleInput) => Promise; agentNextRun: (inp: NextRunInput) => Promise; @@ -72,19 +74,26 @@ export type MetaNS = { agentRenewLease: (inp: LeaseInput) => Promise; agentRemoveLease: (inp: LeaseInput) => Promise; metadataReadAll: ( - inp: ReadAllMetadataInput + inp: ReadAllMetadataInput, ) => Promise>; metadataAppend: (inp: AppendMetadataInput) => Promise; metadataWriteWorkflowLink: (inp: WriteLinkInput) => Promise; metadataReadWorkflowLinks: ( - inp: ReadWorkflowLinkInput + inp: ReadWorkflowLinkInput, ) => Promise>; metadataWriteParentChildLink: ( - inp: WriteParentChildLinkInput + inp: WriteParentChildLinkInput, ) => Promise; metadataEnumerateAllChildren: ( - inp: EnumerateAllChildrenInput + inp: EnumerateAllChildrenInput, ) => Promise>; + contextSave: (inp: SaveInput) => SaveOutput; + contextSleep: (inp: SleepInput) => void; + contextAppendEvent: (inp: AppendEventInput) => void; + contextAppendOp: (inp: AppendOpInput) => void; + executePythonWithContext: ( + inp: PythonExecutionInput, + ) => PythonExecutionOutput; }; }; @@ -116,16 +125,16 @@ interface PrismaDevInp { } type PrismaApplyOut = | { - ResetRequired: { - reset_reason: string; - }; - } + ResetRequired: { + reset_reason: string; + }; + } | { - Ok: { - applied_migrations: Array; - reset_reason: string | undefined | null; - }; + Ok: { + applied_migrations: Array; + reset_reason: string | undefined | null; }; + }; interface PrismaDeployOut { migration_count: number; applied_migrations: Array; @@ -237,14 +246,14 @@ export type WitWireReq = { export type WitWireHandleError = | { - InstanceNotFound: string; - } + InstanceNotFound: string; + } | { - ModuleErr: string; - } + ModuleErr: string; + } | { - MatErr: string; - }; + MatErr: string; + }; export type WitWireMatInfo = { op_name: string; @@ -261,29 +270,29 @@ export type WitWireInitArgs = { export type WitWireInitResponse = object; export type WitWireInitError = | { - VersionMismatch: string; - } + VersionMismatch: string; + } | { - UnexpectedMat: string; - } + UnexpectedMat: string; + } | { - ModuleErr: string; - } + ModuleErr: string; + } | { - Other: string; - }; + Other: string; + }; export type WitWireHandleResponse = | { - Ok: string; - } + Ok: string; + } | "NoHandler" | { - InJsonErr: string; - } + InJsonErr: string; + } | { - HandlerErr: string; - }; + HandlerErr: string; + }; export type GrpcRegisterInput = { proto_file_content: string; @@ -301,20 +310,20 @@ export type Backend = | { type: "fs" } | { type: "memory" } | { - type: "redis"; - connection_string: string; - }; + type: "redis"; + connection_string: string; + }; export type OperationEvent = | { type: "Sleep"; id: number; start: string; end: string } | { - type: "Save"; - id: number; - value: - | { type: "Retry"; wait_until: string; counter: number } - | { type: "Resolved"; payload: unknown } - | { type: "Failed"; err: unknown }; - } + type: "Save"; + id: number; + value: + | { type: "Retry"; wait_until: string; counter: number } + | { type: "Resolved"; payload: unknown } + | { type: "Failed"; err: unknown }; + } | { type: "Send"; event_name: string; value: unknown } | { type: "Stop"; result: unknown } | { type: "Start"; kwargs: Record } @@ -323,6 +332,7 @@ export type OperationEvent = export type Operation = { at: string; event: OperationEvent }; export interface Run { + id: number; run_id: string; operations: Array; } @@ -420,3 +430,40 @@ export interface EnumerateAllChildrenInput { backend: Backend; parent_run_id: string; } + +export interface PythonExecutionInput { + run: Run; + internal: TaskContext; + kwargs: Record; + module_path: string; + function_name: string; +} + +export interface PythonExecutionOutput { + wfResult: any; +} + +export interface SaveInput { + run: Run; +} + +export interface SaveOutput { + payload?: any; + current_retry_count?: number; +} + +export interface SleepInput { + run: Run; + duration_ms: number; // in millisecond +} + +export interface AppendEventInput { + run: Run; + event_name: string; + payload: any; +} + +export interface AppendOpInput { + run: Run; + op: OperationEvent; +} diff --git a/src/typegate/engine/src/ext.rs b/src/typegate/engine/src/ext.rs index 2f2ca8de81..9d1ea4830e 100644 --- a/src/typegate/engine/src/ext.rs +++ b/src/typegate/engine/src/ext.rs @@ -59,6 +59,10 @@ deno_core::extension!( substantial::op_sub_metadata_write_workflow_link, substantial::op_sub_metadata_write_parent_child_link, substantial::op_sub_metadata_enumerate_all_children, + substantial::op_context_save, + substantial::op_context_sleep, + substantial::op_context_append_event, + substantial::op_execute_python_with_context, // FIXME(yohe): this test broke and has proven difficult to fix // #[cfg(test)] diff --git a/src/typegate/engine/src/runtimes/substantial.rs b/src/typegate/engine/src/runtimes/substantial/mod.rs similarity index 90% rename from src/typegate/engine/src/runtimes/substantial.rs rename to src/typegate/engine/src/runtimes/substantial/mod.rs index 9420c61e34..e3206827b2 100644 --- a/src/typegate/engine/src/runtimes/substantial.rs +++ b/src/typegate/engine/src/runtimes/substantial/mod.rs @@ -3,15 +3,20 @@ // Copyright Metatype OÜ, licensed under the Elastic License 2.0. // SPDX-License-Identifier: Elastic-2.0 +mod python_context; + +pub use python_context::op_execute_python_with_context; + use crate::interlude::*; use chrono::{DateTime, Utc}; use common::typegraph::runtimes::substantial::SubstantialBackend; use dashmap::DashMap; use deno_core::OpState; +use serde_json::Value; use substantial::{ backends::{fs::FsBackend, memory::MemoryBackend, redis::RedisBackend, Backend, NextRun}, - converters::{MetadataEvent, Operation, Run}, + converters::{MetadataEvent, Operation, OperationEvent, Run, SaveOutput}, }; #[rustfmt::skip] @@ -452,3 +457,52 @@ pub async fn op_sub_metadata_enumerate_all_children( backend.enumerate_all_children(input.parent_run_id.clone()) } + +#[derive(Deserialize)] +pub struct SaveInput { + pub run: Run, +} + +#[deno_core::op2] +#[serde] +pub fn op_context_save(#[serde] input: SaveInput) -> Result { + let mut run = input.run; + run.save() +} + +#[derive(Deserialize)] +pub struct SleepInut { + pub run: Run, + pub duration_ms: i32, +} + +#[deno_core::op2] +pub fn op_context_sleep(#[serde] input: SleepInut) -> Result<()> { + let mut run = input.run; + run.sleep(input.duration_ms) +} + +#[derive(Deserialize)] +pub struct AppendEventInput { + pub run: Run, + pub event_name: String, + pub payload: Value, +} + +#[deno_core::op2] +pub fn op_context_append_event(#[serde] input: AppendEventInput) { + let mut run = input.run; + run.append_event(input.event_name, input.payload) +} + +#[derive(Deserialize)] +pub struct AppendOpInput { + pub run: Run, + pub op: OperationEvent, +} + +#[deno_core::op2] +pub fn op_context_append_op(#[serde] input: AppendOpInput) { + let mut run = input.run; + run.append_op(input.op); +} diff --git a/src/typegate/engine/src/runtimes/substantial/python_context.rs b/src/typegate/engine/src/runtimes/substantial/python_context.rs new file mode 100644 index 0000000000..39c3d357d3 --- /dev/null +++ b/src/typegate/engine/src/runtimes/substantial/python_context.rs @@ -0,0 +1,375 @@ +// Copyright Metatype OÜ, licensed under the Elastic License 2.0. +// SPDX-License-Identifier: Elastic-2.0 + +use anyhow::{bail, Result}; +// use common::graphql::Query; +use pyo3::{exceptions::PyException, prelude::*}; +use reqwest::{Client, Method, Request, RequestBuilder, Url}; +use serde::{Deserialize, Serialize}; +use serde_json::Value; +use std::{collections::HashMap, str::FromStr}; +#[rustfmt::skip] +use deno_core as deno_core; + +use chrono::{TimeZone, Utc}; + +use substantial::converters::{Interupt, Operation, OperationEvent, Run, SaveOutput, SavedValue}; + +trait ToPyObject { + fn to_object(&self, python: Python) -> PyObject; +} + +impl ToPyObject for Value { + fn to_object(&self, py: Python) -> PyObject { + match self { + Value::Null => py.None(), + Value::Bool(b) => b.into_py(py), + Value::Number(n) => { + if let Some(i) = n.as_i64() { + i.into_py(py) + } else if let Some(f) = n.as_f64() { + f.into_py(py) + } else { + n.to_string().into_py(py) + } + } + Value::String(s) => s.into_py(py), + Value::Array(arr) => arr + .iter() + .map(|v| v.to_object(py)) + .collect::>() + .into_py(py), + Value::Object(map) => { + let dict = pyo3::types::PyDict::new_bound(py); + for (k, v) in map { + dict.set_item(k, v.to_object(py)).unwrap(); + } + dict.into_py(py) + } + } + } +} + +#[derive(Deserialize)] +struct Context; + +#[derive(Deserialize)] +struct Meta { + url: String, + token: String, +} + +#[derive(Deserialize)] +struct TaskContext { + #[allow(dead_code)] + parent: Option>, + #[allow(dead_code)] + context: Option, + #[allow(dead_code)] + secrets: HashMap, + meta: Meta, + #[allow(dead_code)] + headers: HashMap, +} + +fn create_gql_client(internal: TaskContext) -> Result { + let url = Url::from_str(&internal.meta.url)?; + let request = Request::new(Method::POST, url); + let gql_client = + RequestBuilder::from_parts(Client::new(), request).bearer_auth(internal.meta.token); + + Ok(gql_client) +} + +#[derive(Clone, Debug)] +enum Strategy { + #[allow(dead_code)] + Linear, +} + +#[pyclass(frozen)] +#[derive(Clone)] +struct Retry { + strategy: Option, + min_backoff_ms: i32, + max_backoff_ms: i32, + max_retries: i32, +} + +struct RetryStrategy { + min_backoff_ms: Option, + max_backoff_ms: Option, + max_retries: i32, +} + +impl RetryStrategy { + fn new( + max_retries: i32, + min_backoff_ms: Option, + max_backoff_ms: Option, + ) -> Result { + if max_retries < 1 { + bail!("max_retries < 1".to_string()); + } + + let mut strategy = RetryStrategy { + min_backoff_ms, + max_backoff_ms, + max_retries, + }; + + let low = strategy.min_backoff_ms; + let high = strategy.max_backoff_ms; + if let (Some(low), Some(high)) = (low, high) { + if low >= high { + bail!("min_backoff_ms >= max_backoff_ms".to_string()); + } + if low < 0 { + bail!("min_backoff_ms < 0".to_string()); + } + } else if low.is_some() && high.is_none() { + strategy.max_backoff_ms = Some(low.unwrap() + 10); + } else if low.is_none() && high.is_some() { + strategy.min_backoff_ms = Some(high.unwrap().saturating_sub(10)); + } + + Ok(strategy) + } + + fn eval(&self, strategy: Strategy, retries_left: i32) -> Result { + match strategy { + Strategy::Linear => Ok(self.linear(retries_left)?), + } + } + + fn linear(&self, retries_left: i32) -> Result { + if retries_left <= 0_i32 { + bail!("retries left <= 0"); + } + + let dt = self.max_backoff_ms.unwrap_or(0) - self.min_backoff_ms.unwrap_or(0); + Ok(((self.max_retries - retries_left) * dt) / self.max_retries) + } +} + +#[pyclass(frozen)] +#[derive(Clone)] +struct Save { + #[allow(dead_code)] + timeout_ms: Option, + retry: Option, +} + +#[pymethods] +impl Save { + #[getter] + fn get_retry(&self) -> Option { + self.retry.clone() + } +} + +#[pyclass] +struct PythonContext { + #[allow(dead_code)] + query: RequestBuilder, + run: Run, + #[allow(dead_code)] + kwargs: HashMap, +} + +#[pymethods] +impl PythonContext { + #[pyo3(signature = (func, option = None))] + fn save(&mut self, py: Python, func: PyObject, option: Option>) -> PyResult { + let SaveOutput { + payload, + current_retry_count, + } = self + .run + .save() + .map_err(|e| PyException::new_err(e.to_string()))?; + + if let Some(payload) = payload { + return Ok(payload.to_string()); + } + + let current_retry_count = current_retry_count.unwrap_or(1); + + match func.call0(py) { + Ok(result) => { + let op = OperationEvent::Save { + id: self.run.id, + value: SavedValue::Resolved { + payload: serde_json::json!(result.to_string()), + }, + }; + self.run.append_op(op); + + Ok(result.to_string()) + } + Err(err) => { + if let Some(option) = option { + if let Some(retry) = option.get().retry.clone() { + let Retry { + max_retries, + max_backoff_ms, + min_backoff_ms, + .. + } = retry; + if current_retry_count < max_retries { + let strategy = RetryStrategy::new( + max_retries, + Some(min_backoff_ms), + Some(max_backoff_ms), + ) + .map_err(|err| PyException::new_err(err.to_string()))?; + + let retries_left = + std::cmp::max(retry.max_retries - current_retry_count, 0); + let delay_ms = strategy + .eval(retry.strategy.unwrap(), retries_left) + .map_err(|err| PyException::new_err(err.to_string()))?; + let wait_until_as_ms = std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap() + .as_millis() + as i32 + + delay_ms; + + let op = OperationEvent::Save { + id: self.run.id, + value: SavedValue::Retry { + counter: current_retry_count, + wait_until: Utc + .timestamp_millis_opt(wait_until_as_ms as i64) + .unwrap(), + }, + }; + self.run.append_op(op); + } + return Err(PyException::new_err(format!("{}", Interupt::SaveRetry))); + } else { + let op = OperationEvent::Save { + id: self.run.id, + value: SavedValue::Failed { + err: serde_json::json!({ + "retries": current_retry_count, + "message": err.to_string() + }), + }, + }; + self.run.append_op(op); + } + } + + Err(err) + } + } + } + + fn sleep(&mut self, durration_ms: i32) -> PyResult<()> { + self.run + .sleep(durration_ms) + .map_err(|err| PyException::new_err(err.to_string())) + } + + fn append_event(&mut self, event_name: String, payload: PyObject) -> PyResult<()> { + self.run + .append_event(event_name, serde_json::json!(payload.to_string())); + Ok(()) + } + + fn receive(&self, event_name: String) -> PyResult { + for Operation { event, .. } in self.run.operations.iter() { + if let OperationEvent::Send { + event_name: ref sent_name, + value, + } = event + { + if event_name == *sent_name { + return Ok(value.to_string()); + } + } + } + + Err(PyException::new_err(format!( + "{}", + Interupt::WaitReceiveEvent + ))) + } + + fn handle(&mut self, py: Python, event_name: String, func: PyObject) -> PyResult { + for Operation { event, .. } in self.run.operations.iter() { + if let OperationEvent::Send { + event_name: ref sent_name, + value, + } = event + { + if event_name == *sent_name { + let func = func.call1(py, (value.to_object(py),))?; + return self.save(py, func, None); + } + } + } + + Err(PyException::new_err(format!( + "{}", + Interupt::WaitHandleEvent + ))) + } +} + +fn execute_python_function( + module_path: String, + function_name: String, + python_context: PythonContext, +) -> Result { + Python::with_gil(|py| { + let python_code = std::fs::read_to_string(module_path.clone()) + .map_err(|e| PyErr::new::(e.to_string()))?; + + let python_module = + PyModule::from_code_bound(py, &python_code, &module_path, "dynamic_module")?; + + let python_function = python_module.getattr(function_name.as_str())?; + + let execution_result = python_function.call1((python_context,))?; + + Ok(execution_result.into()) + }) +} + +#[derive(Deserialize)] +#[serde(crate = "serde")] +pub struct PythonExecutionInput { + run: Run, + internal: TaskContext, + kwargs: HashMap, + module_path: String, + funciton_name: String, +} + +#[derive(Serialize)] +#[serde(crate = "serde")] +pub struct PythonExecutionOutput { + result: Value, +} + +#[deno_core::op2] +#[serde] +pub fn op_execute_python_with_context( + #[serde] input: PythonExecutionInput, +) -> Result { + // this is a python Class in pyO3 + let python_context = PythonContext { + query: create_gql_client(input.internal)?, + run: input.run, + kwargs: input.kwargs, + }; + + let output = execute_python_function(input.module_path, input.funciton_name, python_context)?; + + let result = serde_json::json!(output.to_string()); + + Ok(PythonExecutionOutput { result }) +} diff --git a/src/typegate/src/runtimes/deno/shared_types.ts b/src/typegate/src/runtimes/deno/shared_types.ts index 7bbb446fe0..d8e9b924b0 100644 --- a/src/typegate/src/runtimes/deno/shared_types.ts +++ b/src/typegate/src/runtimes/deno/shared_types.ts @@ -70,6 +70,6 @@ export interface TaskExec { ( args: Record, context: TaskContext, - helpers: Record + helpers: Record, ): unknown; } diff --git a/src/typegate/src/runtimes/substantial/agent.ts b/src/typegate/src/runtimes/substantial/agent.ts index 966ce460f3..c97c76bc38 100644 --- a/src/typegate/src/runtimes/substantial/agent.ts +++ b/src/typegate/src/runtimes/substantial/agent.ts @@ -2,17 +2,17 @@ import { AddScheduleInput, Backend, NextRun, - Run, ReadOrCloseScheduleInput, + Run, } from "../../../engine/runtime.js"; import { getLogger } from "../../log.ts"; import { TaskContext } from "../deno/shared_types.ts"; import { Interrupt, + Kind, Result, WorkerData, WorkflowResult, - appendIfOngoing, } from "./types.ts"; import { RunId, WorkerManager } from "./workflow_worker_manager.ts"; @@ -21,7 +21,7 @@ const logger = getLogger(); export interface WorkflowDescription { name: string; path: string; - kind: "DENO" | "PYTHON"; + kind: Kind; } export interface AgentConfig { @@ -39,7 +39,7 @@ export class Agent { private backend: Backend, private queue: string, private config: AgentConfig, - private internalTCtx: TaskContext + private internalTCtx: TaskContext, ) {} async schedule(input: AddScheduleInput) { @@ -56,7 +56,7 @@ export class Agent { }); } catch (err) { logger.warn( - `Failed writing log metadata for schedule "${schedule}" (${runId}), skipping it: ${err}` + `Failed writing log metadata for schedule "${schedule}" (${runId}), skipping it: ${err}`, ); } } @@ -89,9 +89,11 @@ export class Agent { this.workflows = workflows; logger.warn( - `Initializing agent to handle ${workflows - .map(({ name }) => name) - .join(", ")}` + `Initializing agent to handle ${ + workflows + .map(({ name }) => name) + .join(", ") + }`, ); this.pollIntervalHandle = setInterval(async () => { @@ -132,7 +134,7 @@ export class Agent { for (const workflow of this.workflows) { const requests = replayRequests.filter( - ({ run_id }) => Agent.parseWorkflowName(run_id) == workflow.name + ({ run_id }) => Agent.parseWorkflowName(run_id) == workflow.name, ); while (requests.length > 0) { @@ -143,7 +145,7 @@ export class Agent { await this.#replay(next, workflow); } catch (err) { logger.error( - `Replay failed for ${workflow.name} => ${JSON.stringify(next)}` + `Replay failed for ${workflow.name} => ${JSON.stringify(next)}`, ); logger.error(err); } finally { @@ -189,7 +191,7 @@ export class Agent { // necessarily represent the state of what is actually running on the current typegate node if (this.workerManager.isOngoing(next.run_id)) { logger.warn( - `skip triggering ${next.run_id} for the current tick as it is still ongoing` + `skip triggering ${next.run_id} for the current tick as it is still ongoing`, ); return; @@ -221,7 +223,7 @@ export class Agent { } if (newEventOp) { - appendIfOngoing(run, newEventOp); + Meta.substantial.contextAppendOp({ run, op: newEventOp?.event }); } if (run.operations.length == 0) { @@ -234,9 +236,11 @@ export class Agent { // A consequence of the above, a workflow is always triggered by gql { start(..) } // This can also occur if an event is sent from gql under a runId that is not valid (e.g. due to typo) logger.warn( - `First item in the operation list is not a Start, got "${JSON.stringify( - first - )}" instead. Closing the underlying schedule.` + `First item in the operation list is not a Start, got "${ + JSON.stringify( + first, + ) + }" instead. Closing the underlying schedule.`, ); await Meta.substantial.storeCloseSchedule(schedDef); @@ -251,12 +255,13 @@ export class Agent { run, next.schedule_date, first.event.kwargs, - this.internalTCtx + this.internalTCtx, + workflow.kind, ); this.workerManager.listen( next.run_id, - this.#eventResultHandlerFor(workflow.name, next.run_id) + this.#eventResultHandlerFor(workflow.name, next.run_id), ); } catch (err) { throw err; @@ -279,7 +284,7 @@ export class Agent { // All Worker/Runner non-user issue should fall here // Note: Should never throw (typegate will panic), this will run in a worker logger.error( - `result error for "${runId}": ${JSON.stringify(result.payload)}` + `result error for "${runId}": ${JSON.stringify(result.payload)}`, ); return; } @@ -307,7 +312,7 @@ export class Agent { startedAt, workflowName, runId, - ret + ret, ); break; } @@ -318,9 +323,11 @@ export class Agent { } default: logger.error( - `Fatal: invalid type ${ - answer.type - } sent by "${runId}": ${JSON.stringify(answer.data)}` + `Fatal: invalid type ${answer.type} sent by "${runId}": ${ + JSON.stringify( + answer.data, + ) + }`, ); } }; @@ -329,7 +336,7 @@ export class Agent { async #workflowHandleInterrupts( workflowName: string, runId: string, - { result, schedule, run }: WorkflowResult + { result, schedule, run }: WorkflowResult, ) { this.workerManager.destroyWorker(workflowName, runId); // ! @@ -378,14 +385,16 @@ export class Agent { startedAt: Date, workflowName: string, runId: string, - { result, kind, schedule, run }: WorkflowResult + { result, kind, schedule, run }: WorkflowResult, ) { this.workerManager.destroyWorker(workflowName, runId); logger.info( - `gracefull completion of "${runId}" (${kind}): ${JSON.stringify( - result - )} started at "${startedAt}"` + `gracefull completion of "${runId}" (${kind}): ${ + JSON.stringify( + result, + ) + } started at "${startedAt}"`, ); logger.info(`Append Stop ${runId}`); @@ -393,9 +402,9 @@ export class Agent { // Note: run is a one-time value, thus can be mutated - appendIfOngoing(run, { - at: new Date().toJSON(), - event: { + Meta.substantial.contextAppendOp({ + run, + op: { type: "Stop", result: { [rustResult]: result ?? null, @@ -404,7 +413,7 @@ export class Agent { }); logger.info( - `Persist finalized records for "${workflowName}": ${result}" and closing everything..` + `Persist finalized records for "${workflowName}": ${result}" and closing everything..`, ); const _run = await Meta.substantial.storePersistRun({ @@ -451,13 +460,15 @@ function checkIfRunHasStopped(run: Run) { if (op.event.type == "Start") { if (life >= 1) { logger.error( - `bad logs: ${JSON.stringify( - run.operations.map(({ event }) => event.type) - )}` + `bad logs: ${ + JSON.stringify( + run.operations.map(({ event }) => event.type), + ) + }`, ); throw new Error( - `"${run.run_id}" has potentially corrupted logs, another run occured yet previous has not stopped` + `"${run.run_id}" has potentially corrupted logs, another run occured yet previous has not stopped`, ); } @@ -466,13 +477,15 @@ function checkIfRunHasStopped(run: Run) { } else if (op.event.type == "Stop") { if (life <= 0) { logger.error( - `bad logs: ${JSON.stringify( - run.operations.map(({ event }) => event.type) - )}` + `bad logs: ${ + JSON.stringify( + run.operations.map(({ event }) => event.type), + ) + }`, ); throw new Error( - `"${run.run_id}" has potentitally corrupted logs, attempted stopping already closed run, or run with a missing Start` + `"${run.run_id}" has potentitally corrupted logs, attempted stopping already closed run, or run with a missing Start`, ); } diff --git a/src/typegate/src/runtimes/substantial/deno_context.ts b/src/typegate/src/runtimes/substantial/deno_context.ts index 44a48f81e0..72ad29413b 100644 --- a/src/typegate/src/runtimes/substantial/deno_context.ts +++ b/src/typegate/src/runtimes/substantial/deno_context.ts @@ -6,7 +6,7 @@ import { make_internal } from "../../worker_utils.ts"; import { TaskContext } from "../deno/shared_types.ts"; -import { Interrupt, OperationEvent, Run, appendIfOngoing } from "./types.ts"; +import { Interrupt, OperationEvent, Run } from "./types.ts"; // const isTest = Deno.env.get("DENO_TESTING") === "true"; const testBaseUrl = Deno.env.get("TEST_OVERRIDE_GQL_ORIGIN"); @@ -14,50 +14,27 @@ const testBaseUrl = Deno.env.get("TEST_OVERRIDE_GQL_ORIGIN"); const additionalHeaders = { connection: "keep-alive" }; export class Context { - private id: number = 0; gql: ReturnType; constructor( private run: Run, private kwargs: Record, - private internal: TaskContext + private internal: TaskContext, ) { this.gql = createGQLClient(internal); } - #nextId() { - // IDEA: this scheme does not account the step provided - // Different args => potentially different step (notably for Save) - this.id += 1; - return this.id; - } - #appendOp(op: OperationEvent) { - appendIfOngoing(this.run, { at: new Date().toJSON(), event: op }); + Meta.substantial.contextAppendOp({ run: this.run, op }); } async save(fn: () => T | Promise, option?: SaveOption) { - const id = this.#nextId(); - - let currRetryCount = 1; - for (const { event } of this.run.operations) { - if (event.type == "Save" && id == event.id) { - if (event.value.type == "Resolved") { - return event.value.payload; - } else if (event.value.type == "Retry") { - const delay = new Date(event.value.wait_until); - if (delay.getTime() > new Date().getTime()) { - // Too soon! - throw Interrupt.Variant("SAVE_RETRY"); - } else { - currRetryCount = event.value.counter; - } - } - } - } + const { payload, current_retry_count } = Meta.substantial.contextSave({ + run: this.run, + }); - // current call already counts - currRetryCount += 1; + if (payload) return payload; + const currRetryCount = current_retry_count ?? 1; try { let result: any; @@ -69,7 +46,7 @@ export class Context { this.#appendOp({ type: "Save", - id, + id: this.run.id, value: { type: "Resolved", payload: result ?? null, @@ -86,7 +63,7 @@ export class Context { const strategy = new RetryStrategy( retry.maxRetries, retry.minBackoffMs, - retry.maxBackoffMs + retry.maxBackoffMs, ); const retriesLeft = Math.max(retry.maxRetries - currRetryCount, 0); @@ -95,7 +72,7 @@ export class Context { this.#appendOp({ type: "Save", - id, + id: this.run.id, value: { type: "Retry", wait_until: new Date(waitUntilAsMs).toJSON(), @@ -107,7 +84,7 @@ export class Context { } else { this.#appendOp({ type: "Save", - id, + id: this.run.id, value: { type: "Failed", err: { @@ -123,27 +100,7 @@ export class Context { } sleep(durationMs: number) { - const id = this.#nextId(); - for (const { event } of this.run.operations) { - if (event.type == "Sleep" && id == event.id) { - const end = new Date(event.end); - if (end.getTime() <= new Date().getTime()) { - return; - } else { - throw Interrupt.Variant("SLEEP"); - } - } - } - - const start = new Date(); - const end = new Date(start.getTime() + durationMs); - this.#appendOp({ - type: "Sleep", - id, - start: start.toJSON(), - end: end.toJSON(), - }); - throw Interrupt.Variant("SLEEP"); + Meta.substantial.contextSleep({ run: this.run, duration_ms: durationMs }); } getRun() { @@ -151,11 +108,7 @@ export class Context { } appendEvent(event_name: string, payload: unknown) { - this.#appendOp({ - type: "Send", - event_name, - value: payload, - }); + Meta.substantial.contextAppendEvent({ run: this.run, event_name, payload }); } receive(eventName: string) { @@ -170,7 +123,7 @@ export class Context { async handle( eventName: string, - fn: (received: unknown) => unknown | Promise + fn: (received: unknown) => unknown | Promise, ) { for (const { event } of this.run.operations) { if (event.type == "Send" && event.event_name == eventName) { @@ -208,7 +161,7 @@ export class Context { createWorkflowHandle(handleDef: SerializableWorkflowHandle) { if (!handleDef.runId) { throw new Error( - "Cannot create handle from a definition that was not run" + "Cannot create handle from a definition that was not run", ); } return new ChildWorkflowHandle(this, handleDef); @@ -227,11 +180,11 @@ interface SerializableWorkflowHandle { export class ChildWorkflowHandle { constructor( private ctx: Context, - public handleDef: SerializableWorkflowHandle + public handleDef: SerializableWorkflowHandle, ) {} async start(): Promise { - const { data } = await this.ctx.gql/**/ ` + const { data } = await this.ctx.gql /**/` mutation ($name: String!, $kwargs: String!) { _sub_internal_start(name: $name, kwargs: $kwargs) } @@ -243,7 +196,7 @@ export class ChildWorkflowHandle { this.handleDef.runId = (data as any)._sub_internal_start as string; this.#checkRunId(); - const { data: _ } = await this.ctx.gql/**/ ` + const { data: _ } = await this.ctx.gql /**/` mutation ($parent_run_id: String!, $child_run_id: String!) { _sub_internal_link_parent_child(parent_run_id: $parent_run_id, child_run_id: $child_run_id) } @@ -258,7 +211,7 @@ export class ChildWorkflowHandle { async result(): Promise { this.#checkRunId(); - const { data } = await this.ctx.gql/**/ ` + const { data } = await this.ctx.gql /**/` query ($name: String!) { _sub_internal_results(name: $name) { completed { @@ -292,7 +245,7 @@ export class ChildWorkflowHandle { async stop(): Promise { this.#checkRunId(); - const { data } = await this.ctx.gql/**/ ` + const { data } = await this.ctx.gql /**/` mutation ($run_id: String!) { _sub_internal_stop(run_id: $run_id) } @@ -306,7 +259,7 @@ export class ChildWorkflowHandle { async hasStopped(): Promise { this.#checkRunId(); - const { data } = await this.ctx.gql/**/ ` + const { data } = await this.ctx.gql /**/` query { _sub_internal_results(name: $name) { completed { @@ -329,7 +282,7 @@ export class ChildWorkflowHandle { #checkRunId() { if (!this.handleDef.runId) { throw new Error( - "Invalid state: run_id is not properly set, this could mean that the workflow was not started yet" + "Invalid state: run_id is not properly set, this could mean that the workflow was not started yet", ); } } @@ -371,7 +324,7 @@ class RetryStrategy { constructor( maxRetries: number, minBackoffMs?: number, - maxBackoffMs?: number + maxBackoffMs?: number, ) { this.maxRetries = maxRetries; this.minBackoffMs = minBackoffMs; diff --git a/src/typegate/src/runtimes/substantial/types.ts b/src/typegate/src/runtimes/substantial/types.ts index e7defcf5bd..7f9359d210 100644 --- a/src/typegate/src/runtimes/substantial/types.ts +++ b/src/typegate/src/runtimes/substantial/types.ts @@ -2,20 +2,43 @@ // SPDX-License-Identifier: Elastic-2.0 import { Operation, Run } from "../../../engine/runtime.js"; +import { TaskContext } from "../deno/shared_types.ts"; export type { + Backend, Operation, OperationEvent, Run, - Backend, } from "../../../engine/runtime.js"; export type AnyString = string & Record; export type WorkerEvent = "START" | AnyString; +export type Kind = "DENO" | "PYTHON"; + +export type TaskData = { + modulePath: string; + functionName: string; + run: Run; + kwargs: Record; + schedule: string; + internal: TaskContext; + kind: Kind; +}; + +export type ResultData = { + kind: string; + result: unknown; + run: Run; + schedule: string; + exception?: Error; +}; + +export type Data = TaskData | ResultData | string; + export type WorkerData = { type: WorkerEvent; - data: any; + data: Data; }; export type WorkerEventHandler = (message: Result) => Promise; @@ -33,7 +56,7 @@ export function Err(payload: E): Result { return { error: true, payload }; } -export function Msg(type: WorkerEvent, data: unknown): WorkerData { +export function Msg(type: WorkerEvent, data: Data): WorkerData { return { type, data }; } @@ -79,10 +102,3 @@ export class Interrupt extends Error { return new Interrupt(kind, cause); } } - -export function appendIfOngoing(run: Run, operation: Operation) { - const hasStopped = run.operations.some(({ event }) => event.type == "Stop"); - if (!hasStopped) { - run.operations.push(operation); - } -} diff --git a/src/typegate/src/runtimes/substantial/worker.ts b/src/typegate/src/runtimes/substantial/worker.ts index fcbc626421..857d571f70 100644 --- a/src/typegate/src/runtimes/substantial/worker.ts +++ b/src/typegate/src/runtimes/substantial/worker.ts @@ -1,9 +1,8 @@ // Copyright Metatype OÜ, licensed under the Elastic License 2.0. // SPDX-License-Identifier: Elastic-2.0 -import { errorToString } from "../../worker_utils.ts"; import { Context } from "./deno_context.ts"; -import { Err, Msg, Ok, WorkerData, WorkflowResult } from "./types.ts"; +import { Err, Msg, Ok, TaskData, WorkerData, WorkflowResult } from "./types.ts"; let runCtx: Context | undefined; @@ -11,50 +10,106 @@ self.onmessage = async function (event) { const { type, data } = event.data as WorkerData; switch (type) { case "START": { - const { modulePath, functionName, run, schedule, kwargs, internal } = - data; + const { + modulePath, + functionName, + run, + schedule, + kwargs, + internal, + kind, + } = data as TaskData; // FIXME: handle case when script is missing and notify WorkerManager so it cleans up // its registry. const module = await import(modulePath); - // TODO: for python use the same strategy but instead call from native - const workflowFn = module[functionName]; + if (kind == "DENO") { + runCtx = new Context(run, kwargs, internal); + const workflowFn = module[functionName]; - if (typeof workflowFn !== "function") { - self.postMessage(Err(`Function "${functionName}" not found`)); - self.close(); - return; - } - - runCtx = new Context(run, kwargs, internal); - - workflowFn(runCtx) - .then((wfResult: unknown) => { + if (typeof workflowFn !== "function") { + self.postMessage(Err(`Function "${functionName}" not found`)); + self.close(); + return; + } + workflowFn(runCtx) + .then((wfResult: unknown) => { + self.postMessage( + Ok( + Msg( + type, + { + kind: "SUCCESS", + result: wfResult, + run: runCtx!.getRun(), + schedule, + } satisfies WorkflowResult, + ), + ), + ); + }) + .catch((wfException: unknown) => { + self.postMessage( + Ok( + Msg( + type, + { + kind: "FAIL", + result: wfException instanceof Error + ? wfException.message + : JSON.stringify(wfException), + exception: wfException instanceof Error + ? wfException + : undefined, + run: runCtx!.getRun(), + schedule, + } satisfies WorkflowResult, + ), + ), + ); + }); + } else if (kind == "PYTHON") { + try { + const { wfResult } = Meta.substantial.executePythonWithContext({ + run, + internal, + kwargs, + module_path: modulePath, + function_name: functionName, + }); self.postMessage( Ok( - Msg(type, { - kind: "SUCCESS", - result: wfResult, - run: runCtx!.getRun(), - schedule, - } satisfies WorkflowResult) - ) + Msg( + type, + { + kind: "SUCCESS", + result: wfResult, + run, + schedule, + } satisfies WorkflowResult, + ), + ), ); - }) - .catch((wfException: unknown) => { + } catch (wfException) { self.postMessage( - Ok( - Msg(type, { + Ok(Msg( + type, + { kind: "FAIL", - result: errorToString(wfException), - exception: - wfException instanceof Error ? wfException : undefined, - run: runCtx!.getRun(), + result: wfException instanceof Error + ? wfException.message + : JSON.stringify(wfException), + exception: wfException instanceof Error + ? wfException + : undefined, + run, schedule, - } satisfies WorkflowResult) - ) + } satisfies WorkflowResult, + )), ); - }); + } + } + break; } default: diff --git a/src/typegate/src/runtimes/substantial/workflow_worker_manager.ts b/src/typegate/src/runtimes/substantial/workflow_worker_manager.ts index 795d20975a..236be7ecc3 100644 --- a/src/typegate/src/runtimes/substantial/workflow_worker_manager.ts +++ b/src/typegate/src/runtimes/substantial/workflow_worker_manager.ts @@ -5,7 +5,9 @@ import { envSharedWithWorkers } from "../../config/shared.ts"; import { getLogger } from "../../log.ts"; import { TaskContext } from "../deno/shared_types.ts"; import { + Data, Err, + Kind, Msg, Result, Run, @@ -48,7 +50,7 @@ export class WorkflowRecorder { name: WorkflowName, runId: RunId, worker: WorkerRecord, - startedAt: Date + startedAt: Date, ) { if (!this.workflowRuns.has(name)) { this.workflowRuns.set(name, new Set()); @@ -83,7 +85,7 @@ export class WorkflowRecorder { if (this.workflowRuns.has(name)) { if (!record) { logger.warn( - `"${runId}" associated with "${name}" does not exist or has been already destroyed` + `"${runId}" associated with "${name}" does not exist or has been already destroyed`, ); return false; } @@ -140,7 +142,7 @@ export class WorkerManager { modulePath, worker, }, - new Date() + new Date(), ); } @@ -151,10 +153,12 @@ export class WorkerManager { destroyAllWorkers() { this.recorder.destroyAllWorkers(); logger.warn( - `Destroyed workers for ${this.recorder - .getRegisteredWorkflowNames() - .map((w) => `"${w}"`) - .join(", ")}` + `Destroyed workers for ${ + this.recorder + .getRegisteredWorkflowNames() + .map((w) => `"${w}"`) + .join(", ") + }`, ); } @@ -181,7 +185,7 @@ export class WorkerManager { const rec = this.recorder.startedAtRecords.get(runId); if (!rec) { throw new Error( - `Invalid state: cannot find initial time for run "${runId}"` + `Invalid state: cannot find initial time for run "${runId}"`, ); } return rec; @@ -209,7 +213,7 @@ export class WorkerManager { worker.onerror = /*async*/ (event) => handlerFn(Err(event)); } - trigger(type: WorkerEvent, runId: RunId, data: unknown) { + trigger(type: WorkerEvent, runId: RunId, data: Data) { const { worker } = this.recorder.getWorkerRecord(runId); worker.postMessage(Msg(type, data)); logger.info(`trigger ${type} for ${runId}`); @@ -222,7 +226,8 @@ export class WorkerManager { storedRun: Run, schedule: string, kwargs: Record, - internalTCtx: TaskContext + internalTCtx: TaskContext, + kind: Kind, ) { this.#createWorker(name, workflowModPath, runId); this.trigger("START", runId, { @@ -232,6 +237,7 @@ export class WorkerManager { kwargs, schedule, internal: internalTCtx, + kind, }); } } diff --git a/src/typegraph/python/typegraph/runtimes/substantial.py b/src/typegraph/python/typegraph/runtimes/substantial.py index 836d35ccdd..065707b68d 100644 --- a/src/typegraph/python/typegraph/runtimes/substantial.py +++ b/src/typegraph/python/typegraph/runtimes/substantial.py @@ -30,12 +30,15 @@ class Backend: + @staticmethod def dev_memory(): return SubstantialBackendMemory() + @staticmethod def dev_fs(): return SubstantialBackendFs() + @staticmethod def redis(connection_string_secret: str): return SubstantialBackendRedis(value=RedisBackend(connection_string_secret)) @@ -47,7 +50,13 @@ def __init__( file_descriptions: List[WorkflowFileDescription], ): data = SubstantialRuntimeData(backend, file_descriptions) - super().__init__(runtimes.register_substantial_runtime(store, data)) + + runtime_id = runtimes.register_substantial_runtime(store, data) + if isinstance(runtime_id, Err): + raise Exception(runtime_id.value) + + super().__init__(runtime_id.value) + self.backend = backend def _generic_substantial_func( @@ -61,7 +70,7 @@ def _generic_substantial_func( func_out=None if func_out is None else func_out._id, operation=operation, ) - func_data = runtimes.generate_substantial_operation(store, self.id.value, data) + func_data = runtimes.generate_substantial_operation(store, self.id, data) if isinstance(func_data, Err): raise Exception(func_data.value) diff --git a/tests/runtimes/substantial/common.ts b/tests/runtimes/substantial/common.ts index 3cc42b1f5d..92f78a0e55 100644 --- a/tests/runtimes/substantial/common.ts +++ b/tests/runtimes/substantial/common.ts @@ -1,4 +1,4 @@ -import { assertExists, assertEquals } from "@std/assert"; +import { assertEquals, assertExists } from "@std/assert"; import { connect, parseURL } from "redis"; import { gql, Meta, sleep } from "../../utils/mod.ts"; import { MetaTestCleanupFn } from "test-utils/test.ts"; @@ -33,7 +33,7 @@ export function basicTestTemplate( }; secrets?: Record; }, - cleanup?: MetaTestCleanupFn + cleanup?: MetaTestCleanupFn, ) { Meta.test( { @@ -63,11 +63,11 @@ export function basicTestTemplate( currentRunId = body.data?.start_sleep! as string; assertExists( currentRunId, - "Run id was not returned when workflow was started" + "Run id was not returned when workflow was started", ); }) .on(e); - } + }, ); // Let interrupts to do their jobs for a bit @@ -101,7 +101,7 @@ export function basicTestTemplate( }, }) .on(e); - } + }, ); await sleep(delays.awaitSleepCompleteSec * 1000); @@ -145,9 +145,9 @@ export function basicTestTemplate( }, }) .on(e); - } + }, ); - } + }, ); } @@ -162,7 +162,7 @@ export function concurrentWorkflowTestTemplate( }; secrets?: Record; }, - cleanup?: MetaTestCleanupFn + cleanup?: MetaTestCleanupFn, ) { Meta.test( { @@ -207,7 +207,7 @@ export function concurrentWorkflowTestTemplate( runIds.push(...[one, two, three]); }) .on(e); - } + }, ); // let's wait for a bit to make sure interrupts are doing their jobs @@ -243,7 +243,7 @@ export function concurrentWorkflowTestTemplate( three: [runIds[2]], }) .on(e); - } + }, ); // This is arbitrary, if ops are leaking that means it should be increased @@ -277,20 +277,20 @@ export function concurrentWorkflowTestTemplate( assertEquals( body?.data?.results?.ongoing?.count, 0, - `0 workflow currently running (${backendName})` + `0 workflow currently running (${backendName})`, ); assertEquals( body?.data?.results?.completed?.count, 3, - `3 workflows completed (${backendName})` + `3 workflows completed (${backendName})`, ); const localSorter = (a: any, b: any) => a.run_id.localeCompare(b.run_id); - const received = - body?.data?.results?.completed?.runs ?? ([] as Array); + const received = body?.data?.results?.completed?.runs ?? + ([] as Array); const expected = [ { result: { @@ -318,12 +318,12 @@ export function concurrentWorkflowTestTemplate( assertEquals( received.sort(localSorter), expected.sort(localSorter), - `All three workflows have completed, including the aborted one (${backendName})` + `All three workflows have completed, including the aborted one (${backendName})`, ); }) .on(e); }); - } + }, ); } @@ -338,7 +338,7 @@ export function retrySaveTestTemplate( }; secrets?: Record; }, - cleanup?: MetaTestCleanupFn + cleanup?: MetaTestCleanupFn, ) { Meta.test( { @@ -381,7 +381,7 @@ export function retrySaveTestTemplate( assertExists(retryAbortMeId, "retry_abort_me runId"); }) .on(e); - } + }, ); await sleep(1000); @@ -401,7 +401,7 @@ export function retrySaveTestTemplate( abort_retry: [retryAbortMeId], }) .on(e); - } + }, ); // Waiting for the retry to finish @@ -436,20 +436,20 @@ export function retrySaveTestTemplate( assertEquals( body?.data?.results?.ongoing?.count, 0, - `0 workflow currently running (${backendName})` + `0 workflow currently running (${backendName})`, ); assertEquals( body?.data?.results?.completed?.count, 4, - `4 workflows completed (${backendName})` + `4 workflows completed (${backendName})`, ); const localSorter = (a: any, b: any) => a.run_id.localeCompare(b.run_id); - const received = - body?.data?.results?.completed?.runs ?? ([] as Array); + const received = body?.data?.results?.completed?.runs ?? + ([] as Array); const expected = [ { result: { @@ -484,13 +484,13 @@ export function retrySaveTestTemplate( assertEquals( received.sort(localSorter), expected.sort(localSorter), - `All workflows have completed (${backendName})` + `All workflows have completed (${backendName})`, ); }) .on(e); - } + }, ); - } + }, ); } @@ -505,7 +505,7 @@ export function childWorkflowTestTemplate( }; secrets?: Record; }, - cleanup?: MetaTestCleanupFn + cleanup?: MetaTestCleanupFn, ) { Meta.test( { @@ -522,7 +522,7 @@ export function childWorkflowTestTemplate( "runtimes/substantial/substantial_child_workflow.py", { secrets, - } + }, ); const packages = [ @@ -543,7 +543,7 @@ export function childWorkflowTestTemplate( parentRunId = body.data?.start! as string; assertExists( parentRunId, - "Run id was not returned when workflow was started" + "Run id was not returned when workflow was started", ); }) .on(e); @@ -610,7 +610,7 @@ export function childWorkflowTestTemplate( }) .on(e); }); - } + }, ); }