From 2ea7a48c2e416c135c5c01eb3c6236daf1e0e565 Mon Sep 17 00:00:00 2001 From: michael-0acf4 Date: Mon, 9 Dec 2024 10:54:33 +0300 Subject: [PATCH] feat(subs,deno): advanced filters + context util + replay aware logger (#930) Solves [MET-720](https://linear.app/metatypedev/issue/MET-720/subs-advanced-filters-for-workflows), [MET-749](https://linear.app/metatypedev/issue/MET-749/subs-durable-logger) and [MET-760](https://linear.app/metatypedev/issue/MET-760/gate-allow-user-to-fetchdebug-the-context-easily). #### Basic overview Given an expression tree, a field can represent either an operator (e.g., and, or, lte, etc.) or a 'special' field (started_at, ended_at, status). We can now answer queries such as: 'List all failed runs that started between x and y but did not end at z, where the value is not null, or alternatively completed but returned null' Example: ```js { or: [ { and: [ { status: { contains: '"ERROR"' } }, { started_at: { gte: } }, { started_at: { lt: } }, { ended_at: { not: { eq: } } } ] }, { and: [ { status: { eq: '"COMPLETED"' } }, { eq: 'null' } ] } ] } ``` #### Migration notes None - [x] The change comes with new or modified tests - [x] Hard-to-understand functions have explanatory comments - [ ] End-user documentation is updated to reflect the change --- src/typegate/src/runtimes/substantial.ts | 19 +- .../src/runtimes/substantial/deno_context.ts | 52 +++ .../src/runtimes/substantial/filter_utils.ts | 296 ++++++++++++++++++ .../{substantial.rs => substantial/mod.rs} | 103 +++--- .../src/runtimes/substantial/type_utils.rs | 162 ++++++++++ src/typegraph/core/src/t.rs | 2 +- src/typegraph/core/wit/typegraph.wit | 4 +- src/typegraph/deno/src/runtimes/deno.ts | 10 + .../deno/src/runtimes/substantial.ts | 6 + .../python/typegraph/runtimes/deno.py | 17 +- .../python/typegraph/runtimes/substantial.py | 4 + tests/auth/auth.py | 17 + tests/auth/auth_test.ts | 12 + tests/runtimes/substantial/common.ts | 44 ++- .../runtimes/substantial/filter_utils_test.ts | 212 +++++++++++++ .../substantial/imports/common_types.ts | 8 + tests/runtimes/substantial/substantial.py | 8 +- .../substantial/substantial_child_workflow.py | 5 +- .../{ => workflows}/child_workflow.ts | 2 +- .../substantial/{ => workflows}/workflow.ts | 7 +- 20 files changed, 909 insertions(+), 81 deletions(-) create mode 100644 src/typegate/src/runtimes/substantial/filter_utils.ts rename src/typegraph/core/src/runtimes/{substantial.rs => substantial/mod.rs} (77%) create mode 100644 src/typegraph/core/src/runtimes/substantial/type_utils.rs create mode 100644 tests/runtimes/substantial/filter_utils_test.ts rename tests/runtimes/substantial/{ => workflows}/child_workflow.ts (97%) rename tests/runtimes/substantial/{ => workflows}/workflow.ts (92%) diff --git a/src/typegate/src/runtimes/substantial.ts b/src/typegate/src/runtimes/substantial.ts index 35c366b0d..8fada6ea2 100644 --- a/src/typegate/src/runtimes/substantial.ts +++ b/src/typegate/src/runtimes/substantial.ts @@ -20,6 +20,7 @@ import { } from "./substantial/agent.ts"; import { closestWord } from "../utils.ts"; import { InternalAuth } from "../services/auth/protocols/internal.ts"; +import { applyFilter, type Expr, type ExecutionStatus } from "./substantial/filter_utils.ts"; const logger = getLogger(import.meta); @@ -28,7 +29,7 @@ interface QueryCompletedWorkflowResult { started_at: string; ended_at: string; result: { - status: "COMPLETED" | "COMPLETED_WITH_ERROR" | "UNKNOWN"; + status: ExecutionStatus; value: unknown; // hinted by the user }; } @@ -218,9 +219,11 @@ export class SubstantialRuntime extends Runtime { return JSON.parse(JSON.stringify(res)); }; case "results": - return this.#resultsResover(false); + return this.#resultsResolver(false); case "results_raw": - return this.#resultsResover(true); + return this.#resultsResolver(true); + case "advanced_filters": + return this.#advancedFiltersResolver(); case "internal_link_parent_child": return this.#linkerResolver(); default: @@ -285,7 +288,7 @@ export class SubstantialRuntime extends Runtime { }; } - #resultsResover(enableGenerics: boolean): Resolver { + #resultsResolver(enableGenerics: boolean): Resolver { return async ({ name: workflowName }) => { this.#checkWorkflowExistOrThrow(workflowName); @@ -407,6 +410,14 @@ export class SubstantialRuntime extends Runtime { }; } + #advancedFiltersResolver(): Resolver { + return async ({ name: workflowName, filter }) => { + this.#checkWorkflowExistOrThrow(workflowName); + // console.log("workflow", workflowName, "Filter", filter); + return await applyFilter(workflowName, this.agent, filter as Expr); + }; + } + #linkerResolver(): Resolver { return async ({ parent_run_id, child_run_id }) => { await Meta.substantial.metadataWriteParentChildLink({ diff --git a/src/typegate/src/runtimes/substantial/deno_context.ts b/src/typegate/src/runtimes/substantial/deno_context.ts index a9a871db8..73090e156 100644 --- a/src/typegate/src/runtimes/substantial/deno_context.ts +++ b/src/typegate/src/runtimes/substantial/deno_context.ts @@ -14,10 +14,12 @@ export class Context { private id = 0; public kwargs = {}; gql: ReturnType; + logger: SubLogger; constructor(private run: Run, private internal: TaskContext) { this.gql = createGQLClient(internal); this.kwargs = getKwargsCopy(run); + this.logger = new SubLogger(this); } #nextId() { @@ -414,6 +416,56 @@ class RetryStrategy { } } + +class SubLogger { + constructor(private ctx: Context) {} + + async #log(kind: "warn" | "error" | "info", ...args: unknown[]) { + await this.ctx.save(() => { + const prefix = `[${kind.toUpperCase()}: ${this.ctx.getRun().run_id}]`; + switch(kind) { + case "warn": { + console.warn(prefix, ...args); + break; + } + case "error": { + console.error(prefix,...args); + break; + } + default: { + console.info(prefix, ...args); + break; + } + } + + const message = args.map((arg) => { + try { + const json = JSON.stringify(arg); + // Functions are omitted, + // For example, JSON.stringify(() => 1234) => undefined (no throw) + return json === undefined ? String(arg) : json; + } catch(_) { + return String(arg); + } + }).join(" "); + + return `${prefix}: ${message}`; + }); + } + + async warn(...payload: unknown[]) { + await this.#log("warn", ...payload); + } + + async info(...payload: unknown[]) { + await this.#log("info", ...payload); + } + + async error(...payload: unknown[]) { + await this.#log("error", ...payload); + } +} + function createGQLClient(internal: TaskContext) { const tgLocal = new URL(internal.meta.url); if (testBaseUrl) { diff --git a/src/typegate/src/runtimes/substantial/filter_utils.ts b/src/typegate/src/runtimes/substantial/filter_utils.ts new file mode 100644 index 000000000..149b4cf15 --- /dev/null +++ b/src/typegate/src/runtimes/substantial/filter_utils.ts @@ -0,0 +1,296 @@ +// Copyright Metatype OÜ, licensed under the Mozilla Public License Version 2.0. +// SPDX-License-Identifier: MPL-2.0 + +import { Agent } from "./agent.ts"; + +export type ExecutionStatus = + | "COMPLETED" + | "COMPLETED_WITH_ERROR" + | "ONGOING" + | "UNKNOWN"; + +type KVHelper = { + // should be a union type but it is not very helpful with autocomplete + [K in T[number]]?: V; +}; +type AND = { and?: Array }; +type OR = { or?: Array }; +type NOT = { not?: Expr }; + +// JSON string is currently the only way to do generics on a typegraph +type ORD = KVHelper<["eq", "lt", "lte", "gt", "gte"], string>; +type INCL = KVHelper<["in", "contains"], string>; +type Terms = ORD & INCL; + +type SpecialTerms = KVHelper<["started_at", "ended_at", "status"], Terms>; + +export type Expr = Terms & SpecialTerms & AND & OR & NOT; + +export class SearchItem { + constructor( + public readonly run_id: string, + public readonly started_at: string | null, + public readonly ended_at: string | null, + public readonly status: ExecutionStatus, + public readonly value?: unknown, + ) {} + + toSearchResult() { + return { + run_id: this.run_id, + started_at: this.started_at, + ended_at: this.ended_at, + status: this.status, + value: this.value === undefined ? undefined : JSON.stringify(this.value), + }; + } +} + +export async function buildSearchableItems( + workflowName: string, + agent: Agent, +): Promise> { + const relatedRuns = await agent.retrieveLinks(workflowName); + const searchList = [] as Array; + for (const runId of relatedRuns) { + const run = await agent.retrieveEvents(runId); + const startedAt = run.operations[0]?.at; + let endedAt, result: any; + + let hasStopped = false; + for (const op of run.operations) { + if (op.event.type == "Stop") { + endedAt = op.at; + result = op.event.result; + hasStopped = true; + break; + } + } + + const isOk = "Ok" in result; + const kind = isOk ? "Ok" : "Err"; + const stoppedStatus = isOk ? "COMPLETED" : "COMPLETED_WITH_ERROR"; + + searchList.push( + new SearchItem( + runId, + startedAt ?? null, + endedAt ?? null, + hasStopped ? stoppedStatus : "ONGOING", + hasStopped ? result[kind] : undefined, + ), + ); + } + + return searchList; +} + +export async function applyFilter( + workflowName: string, + agent: Agent, + filter: Expr, +) { + const searchableItems = await buildSearchableItems(workflowName, agent); + const searchResults = []; + for (const item of searchableItems) { + if (evalExpr(item, filter, [""])) { + searchResults.push(item.toSearchResult()); + } + } + + return searchResults; +} + + +export function evalExpr( + sResult: SearchItem, + filter: Expr, + path: Array, +) { + const keys = Object.keys(filter) as Array; + if (keys.length != 1) { + throw new Error(`Invalid expression at ${path.join(".")}`); + } + const op = keys[0]; + const newPath = [...path, op]; + + switch (op) { + // Expr + case "and": + case "or": { + const exprList = filter[op]; + if (!Array.isArray(exprList)) { + // should be unreachable since filter is validated at push + throw new Error( + `Fatal: array expected at ${path.join(".")}`, + ); + } + const fn = op == "or" ? "some" : "every"; + if ( + !exprList[fn]((subFilter, index) => + evalExpr(sResult, subFilter, [...newPath, `#${index}`]) + ) + ) { + return false; + } + break; + } + case "not": { + if (evalExpr(sResult, filter["not"]!, newPath)) { + return false; + } + break; + } + // Special + case "status": + case "started_at": + case "ended_at": { + const discriminator = sResult[op]; + const repr = new SearchItem( + sResult.run_id, + null, + null, + sResult.status, + discriminator, + ); + return evalTerm(repr, filter[op]!, newPath); + } + // Term + default: { + if (!evalTerm(sResult, filter, path)) { + return false; + } + } + } + + return true; +} + +function evalTerm(sResult: SearchItem, terms: Terms, path: Array) { + const value = sResult.value; + const keys = Object.keys(terms) as Array; + if (keys.length != 1) { + throw new Error(`Invalid expression at ${path.join(".")}`); + } + + const op = keys[0]; + const newPath = [...path, op]; + switch (op) { + case "eq": { + // term can never compare (null at worst) + if (value === undefined) { + return false; + } + + if (!testCompare(value, toJS(terms[op]))) { + return false; + } + + break; + } + case "lt": + case "lte": + case "gt": + case "gte": { + if (!ord(value, toJS(terms[op]), op, newPath)) { + return false; + } + break; + } + case "contains": + case "in": { + if ( + !inclusion(value, toJS(terms[op]), op, newPath) + ) { + return false; + } + break; + } + default: { + throw new Error( + `Unknown operator "${op}" at ${path.join(".")}`, + ); + } + } + + return true; +} + +function toJS(val: string | undefined) { + // TODO: impl generic JSON on typegate + // ideally this should be an identity fn + return JSON.parse(val ?? "null"); +} + +function testCompare(value: unknown, testValue: unknown) { + const easy = ["number", "boolean", "string"]; + if (easy.includes(typeof value)) { + return value === testValue; + } + + return JSON.stringify(value) == JSON.stringify(testValue); +} + +function comparable(a: unknown, b: unknown) { + return typeof a == typeof b; +} + +function ord(l: unknown, r: unknown, cp: keyof ORD, path: Array) { + if (!comparable(l, r)) { + return false; + } + + if ( + typeof l == "string" && typeof r == "string" || + typeof l == "number" && typeof r == "number" + ) { + switch (cp) { + case "lt": + return l < r; + case "lte": + return l <= r; + case "gt": + return l > r; + case "gte": + return l >= r; + default: { + throw new Error( + `Unknown comparison operator "${cp}" at ${path.join(",")}`, + ); + } + } + } + + return false; +} + +function inclusion( + l: unknown, + r: unknown, + cp: keyof INCL, + _newPath: Array, +) { + const [left, right] = cp == "in" ? [l, r] : [r, l]; + if (Array.isArray(right)) { + // Note: Array.prototype.includes compare item references, not the values + const leftComp = JSON.stringify(left); + return right.some((inner) => JSON.stringify(inner) === leftComp); + } else if (typeof left == "string" && typeof right == "string") { + return right.includes(left); + } else if ( + typeof left == typeof right && typeof left == "object" && left != null + ) { + // Example: { a: { b: 1 } } in { a: { b: 1 }, c: 2 } => true + const rightV = (right ?? {}) as Record; + for (const [k, leftVal] of Object.entries(left)) { + const rightVal = rightV[k]; + if (!testCompare(leftVal, rightVal)) { + return false; + } + } + + return true; + } + + return false; +} diff --git a/src/typegraph/core/src/runtimes/substantial.rs b/src/typegraph/core/src/runtimes/substantial/mod.rs similarity index 77% rename from src/typegraph/core/src/runtimes/substantial.rs rename to src/typegraph/core/src/runtimes/substantial/mod.rs index 23a824553..72b650feb 100644 --- a/src/typegraph/core/src/runtimes/substantial.rs +++ b/src/typegraph/core/src/runtimes/substantial/mod.rs @@ -6,7 +6,6 @@ use crate::errors::Result; use crate::global_store::Store; use crate::t::{self, TypeBuilder}; use crate::typegraph::TypegraphContext; -use crate::types::WithRuntimeConfig; use crate::wit::core::FuncParams; use crate::wit::{ core::RuntimeId, runtimes::Effect as WitEffect, runtimes::SubstantialOperationData, @@ -14,6 +13,8 @@ use crate::wit::{ use common::typegraph::Materializer; use serde_json::json; +mod type_utils; + #[derive(Debug)] pub enum SubstantialMaterializer { Start { secrets: Vec }, @@ -25,6 +26,7 @@ pub enum SubstantialMaterializer { Results, ResultsRaw, InternalLinkParentChild, + AdvancedFilters, } impl MaterializerConverter for SubstantialMaterializer { @@ -55,6 +57,7 @@ impl MaterializerConverter for SubstantialMaterializer { SubstantialMaterializer::InternalLinkParentChild => { ("internal_link_parent_child".to_string(), json!({})) } + SubstantialMaterializer::AdvancedFilters => ("advanced_filters".to_string(), json!({})), }; Ok(Materializer { @@ -70,9 +73,9 @@ pub fn substantial_operation( runtime: RuntimeId, data: SubstantialOperationData, ) -> Result { - let mut inp = t::struct_(); - let (effect, mat_data, out_ty) = match data { + let (effect, mat_data, inp_ty, out_ty) = match data { SubstantialOperationData::Start(data) => { + let mut inp = t::struct_(); inp.prop("name", t::string().build()?); inp.prop( "kwargs", @@ -86,27 +89,32 @@ pub fn substantial_operation( SubstantialMaterializer::Start { secrets: data.secrets, }, + inp.build()?, t::string().build()?, ) } SubstantialOperationData::StartRaw(data) => { + let mut inp = t::struct_(); inp.prop("name", t::string().build()?); - inp.prop("kwargs", t_json_string()?.into()); + inp.prop("kwargs", t::string().format("json").build()?); ( WitEffect::Create(true), SubstantialMaterializer::StartRaw { secrets: data.secrets, }, + inp.build()?, t::string().build()?, ) } SubstantialOperationData::Stop => { + let mut inp = t::struct_(); inp.prop("run_id", t::string().build()?); ( WitEffect::Create(false), SubstantialMaterializer::Stop, + inp.build()?, t::list(t::string().build()?).build()?, ) } @@ -116,31 +124,36 @@ pub fn substantial_operation( .prop("payload", data.into()) .build()?; + let mut inp = t::struct_(); inp.prop("run_id", t::string().build()?); inp.prop("event", event); ( WitEffect::Create(false), SubstantialMaterializer::Send, + inp.build()?, t::string().build()?, ) } SubstantialOperationData::SendRaw => { let event = t::struct_() .prop("name", t::string().build()?) - .prop("payload", t_json_string()?.into()) + .prop("payload", t::string().format("json").build()?) .build()?; + let mut inp = t::struct_(); inp.prop("run_id", t::string().build()?); inp.prop("event", event); ( WitEffect::Create(false), SubstantialMaterializer::SendRaw, + inp.build()?, t::string().build()?, ) } SubstantialOperationData::Resources => { + let mut inp = t::struct_(); inp.prop("name", t::string().build()?); let row = t::struct_() @@ -157,96 +170,58 @@ pub fn substantial_operation( .prop("running", t::list(row).build()?) .build()?; - (WitEffect::Read, SubstantialMaterializer::Resources, out) + ( + WitEffect::Read, + SubstantialMaterializer::Resources, + inp.build()?, + out, + ) } SubstantialOperationData::Results(data) => { + let mut inp = t::struct_(); inp.prop("name", t::string().build()?); ( WitEffect::Read, SubstantialMaterializer::Results, - results_op_results_ty(data)?, + inp.build()?, + type_utils::results_op_results_ty(data)?, ) } SubstantialOperationData::ResultsRaw => { + let mut inp = t::struct_(); inp.prop("name", t::string().build()?); ( WitEffect::Read, SubstantialMaterializer::ResultsRaw, - results_op_results_ty(t_json_string()?)?, + inp.build()?, + type_utils::results_op_results_ty(t::string().format("json").build()?.into())?, ) } SubstantialOperationData::InternalLinkParentChild => { + let mut inp = t::struct_(); inp.prop("parent_run_id", t::string().build()?); inp.prop("child_run_id", t::string().build()?); ( WitEffect::Create(true), SubstantialMaterializer::InternalLinkParentChild, + inp.build()?, t::boolean().build()?, ) } + SubstantialOperationData::AdvancedFilters => ( + WitEffect::Read, + SubstantialMaterializer::AdvancedFilters, + type_utils::filter_expr_ty()?, + type_utils::search_results_ty()?, + ), }; let mat = super::Materializer::substantial(runtime, mat_data, effect); let mat_id = Store::register_materializer(mat); Ok(FuncParams { - inp: inp.build()?.into(), + inp: inp_ty.into(), out: out_ty.into(), mat: mat_id, }) } - -fn t_json_string() -> Result { - t::string() - .build() - .and_then(|r| { - r.with_config(json!({ - "format": "json" - })) - }) - .map(|r| r.id().into()) -} - -fn results_op_results_ty(out: u32) -> Result { - let count = t::integer().build()?; - - let result = t::struct_() - .prop("status", t::string().build()?) - .prop("value", t::optional(out.into()).build()?) - .build()?; - - let ongoing_runs = t::list( - t::struct_() - .prop("run_id", t::string().build()?) - .prop("started_at", t::string().build()?) - .build()?, - ) - .build()?; - - let completed_runs = t::list( - t::struct_() - .prop("run_id", t::string().build()?) - .prop("started_at", t::string().build()?) - .prop("ended_at", t::string().build()?) - .prop("result", result) - .build()?, - ) - .build()?; - - t::struct_() - .prop( - "ongoing", - t::struct_() - .prop("count", count) - .prop("runs", ongoing_runs) - .build()?, - ) - .prop( - "completed", - t::struct_() - .prop("count", count) - .prop("runs", completed_runs) - .build()?, - ) - .build() -} diff --git a/src/typegraph/core/src/runtimes/substantial/type_utils.rs b/src/typegraph/core/src/runtimes/substantial/type_utils.rs new file mode 100644 index 000000000..fe3f74d92 --- /dev/null +++ b/src/typegraph/core/src/runtimes/substantial/type_utils.rs @@ -0,0 +1,162 @@ +// Copyright Metatype OÜ, licensed under the Mozilla Public License Version 2.0. +// SPDX-License-Identifier: MPL-2.0 + +use std::cell::RefCell; + +use indexmap::IndexSet; + +use crate::errors::Result; +use crate::t::{self, TypeBuilder}; + +pub fn results_op_results_ty(out: u32) -> Result { + let count = t::integer().build()?; + + let result = t::struct_() + .prop("status", t::string().build()?) + .prop("value", t::optional(out.into()).build()?) + .build()?; + + let ongoing_runs = t::list( + t::struct_() + .prop("run_id", t::string().build()?) + .prop("started_at", t::string().build()?) + .build()?, + ) + .build()?; + + let completed_runs = t::list( + t::struct_() + .prop("run_id", t::string().build()?) + .prop("started_at", t::string().build()?) + .prop("ended_at", t::string().build()?) + .prop("result", result) + .build()?, + ) + .build()?; + + t::struct_() + .prop( + "ongoing", + t::struct_() + .prop("count", count) + .prop("runs", ongoing_runs) + .build()?, + ) + .prop( + "completed", + t::struct_() + .prop("count", count) + .prop("runs", completed_runs) + .build()?, + ) + .build() +} + +thread_local! { + static RECORDER: RefCell> = Default::default(); +} + +fn loc_ref(name: &str) -> Result { + let name = format!("s_{name}").to_lowercase(); + t::ref_(name, Default::default()).build() +} + +fn save( + name: &str, + builder: impl FnOnce(&str) -> Result, +) -> Result { + let name = format!("s_{name}").to_lowercase(); + + let has_name = RECORDER.with_borrow(|cache| cache.contains(&name)); + if has_name { + return t::ref_(name, Default::default()).build(); + } + + RECORDER.with_borrow_mut(|cache| { + let id = builder(&name)?; + cache.insert(name.to_string()); + Ok(id) + }) +} + +/// Term: `{ op: value } | { special: { op: value } }` +/// * op: "eq", "lt", "contains", .. +/// * special: "started_at", "status", .. +fn filter_term_variants() -> Result> { + // FIXME: a generic json would have been helpful here vs json string + // let any_scalar = save("any_scalar", |n| { + // t::eitherx!(t::integer(), t::string(), t::boolean(), t::struct_()).build_named(n) + // })?; + // let value_to_comp_against = t::eitherx!(any_scalar, t::listx(any_scalar)).build()?; + + let value_to_comp_against = t::string().format("json").build()?; + let ops = ["eq", "lt", "lte", "gt", "gte", "in", "contains"] + .into_iter() + .map(|op| { + save(op, |n| { + t::struct_().prop(op, value_to_comp_against).build_named(n) + }) + }) + .collect::>>()?; + + let op_value = save("op", |n| t::either(ops.clone().into_iter()).build_named(n))?; + + let special = ["started_at", "ended_at", "status"] + .into_iter() + .map(|sp| save(sp, |n| t::struct_().prop(sp, op_value).build_named(n))) + .collect::>>()?; + + let mut variants = vec![]; + variants.extend(ops.iter()); + variants.extend(special.iter()); + + Ok(variants) +} + +/// Expr: `{ op: [...] } | Term` +/// * op: "and" or "or" +/// * ...: may contain itself, or a term +pub fn filter_expr_ty() -> Result { + let mut op_expr_variants = filter_term_variants()?; + op_expr_variants.extend([ + loc_ref("and_expr")?, + loc_ref("or_expr")?, + loc_ref("not_expr")?, + ]); + + let expr = save("expr", |n| t::either(op_expr_variants).build_named(n))?; + let expr_list = save("list_expr", |n| t::listx(expr).build_named(n))?; + + let _and = save("and_expr", |n| { + t::struct_().prop("and", expr_list).build_named(n) + })?; + + let _or = save("or_expr", |n| { + t::struct_().prop("or", expr_list).build_named(n) + })?; + + let _not = save("not_expr", |n| { + t::struct_().prop("not", expr).build_named(n) + })?; + + t::struct_() + .prop("name", t::string().build()?) + .prop( + "filter", expr, + // save("Filter", |n| t::unionx!(and, or, expr).build_named(n))?, + ) + .build() +} + +pub fn search_results_ty() -> Result { + t::list( + t::struct_() + .prop("run_id", t::string().build()?) + .prop("status", t::string().build()?) + .prop("started_at", t::string().optional().build()?) + .prop("ended_at", t::string().optional().build()?) + .prop("value", t::string().format("json").optional().build()?) + .build()?, + ) + .build() +} diff --git a/src/typegraph/core/src/t.rs b/src/typegraph/core/src/t.rs index 40e06b1fa..f39b6c713 100644 --- a/src/typegraph/core/src/t.rs +++ b/src/typegraph/core/src/t.rs @@ -338,7 +338,7 @@ pub(crate) use unionx; #[derive(Default)] pub struct EitherBuilder { - data: TypeEither, + pub data: TypeEither, } #[allow(clippy::derivable_impls)] diff --git a/src/typegraph/core/wit/typegraph.wit b/src/typegraph/core/wit/typegraph.wit index 763ce9b2f..cda10f336 100644 --- a/src/typegraph/core/wit/typegraph.wit +++ b/src/typegraph/core/wit/typegraph.wit @@ -529,7 +529,9 @@ interface runtimes { // type of result results(type-id), results-raw, - internal-link-parent-child + internal-link-parent-child, + // filters + advanced-filters } register-substantial-runtime: func(data: substantial-runtime-data) -> result; diff --git a/src/typegraph/deno/src/runtimes/deno.ts b/src/typegraph/deno/src/runtimes/deno.ts index f72cc9b6e..9eb821093 100644 --- a/src/typegraph/deno/src/runtimes/deno.ts +++ b/src/typegraph/deno/src/runtimes/deno.ts @@ -130,6 +130,16 @@ export class DenoRuntime extends Runtime { return t.func(t.struct({}), out, mat); } + /** Utility for fetching the current request context */ + fetchContext(outputShape?: C): t.Func { + const returnValue = outputShape ? `context` : "JSON.stringify(context)"; + return this.func( + t.struct({}), + outputShape ?? t.json(), + { code: `(_, { context }) => ${returnValue}` } + ); + } + policy(name: string, _code: string): Policy; policy(name: string, data: Omit): Policy; policy(name: string, data: string | Omit): Policy { diff --git a/src/typegraph/deno/src/runtimes/substantial.ts b/src/typegraph/deno/src/runtimes/substantial.ts index 7e9c81f85..9d2dc1259 100644 --- a/src/typegraph/deno/src/runtimes/substantial.ts +++ b/src/typegraph/deno/src/runtimes/substantial.ts @@ -100,6 +100,12 @@ export class SubstantialRuntime extends Runtime { }); } + advancedFilters(): Func { + return this._genericSubstantialFunc({ + tag: "advanced-filters" + }); + } + #internalLinkParentChild(): Func { return this._genericSubstantialFunc({ tag: "internal-link-parent-child", diff --git a/src/typegraph/python/typegraph/runtimes/deno.py b/src/typegraph/python/typegraph/runtimes/deno.py index 58657df4a..08dc00512 100644 --- a/src/typegraph/python/typegraph/runtimes/deno.py +++ b/src/typegraph/python/typegraph/runtimes/deno.py @@ -4,7 +4,7 @@ import json import re from dataclasses import dataclass -from typing import TYPE_CHECKING, Any, List, Optional +from typing import TYPE_CHECKING, Any, List, Optional, Union from typegraph.gen.exports.runtimes import ( Effect, @@ -129,6 +129,21 @@ def identity(self, inp: "t.struct") -> "t.func": PredefinedFunMat(id=res.value, name="identity", effect=EffectRead()), ) + def fetch_context(self, output_shape: Union["t.struct", None] = None): + """ + Utility for fetching the current request context + """ + return_value = ( + "context" if output_shape is not None else "JSON.stringify(context)" + ) + from typegraph import t + + return self.func( + inp=t.struct(), + out=output_shape or t.json(), + code=f"(_, {{ context }}) => {return_value}", + ) + def policy( self, name: str, code: str, secrets: Optional[List[str]] = None ) -> Policy: diff --git a/src/typegraph/python/typegraph/runtimes/substantial.py b/src/typegraph/python/typegraph/runtimes/substantial.py index f0cb7a80c..ecd9ab164 100644 --- a/src/typegraph/python/typegraph/runtimes/substantial.py +++ b/src/typegraph/python/typegraph/runtimes/substantial.py @@ -19,6 +19,7 @@ SubstantialOperationDataStart, SubstantialOperationDataStartRaw, SubstantialOperationDataStop, + SubstantialOperationDataAdvancedFilters, SubstantialRuntimeData, SubstantialStartData, WorkflowFileDescription, @@ -85,6 +86,9 @@ def query_results(self, output: "t.typedef"): def query_results_raw(self): return self._generic_substantial_func(SubstantialOperationDataResultsRaw()) + def advanced_filters(self): + return self._generic_substantial_func(SubstantialOperationDataAdvancedFilters()) + def _internal_link_parent_child(self): return self._generic_substantial_func( SubstantialOperationDataInternalLinkParentChild() diff --git a/tests/auth/auth.py b/tests/auth/auth.py index 78ea96dd5..e3a1703de 100644 --- a/tests/auth/auth.py +++ b/tests/auth/auth.py @@ -60,4 +60,21 @@ def auth(g: Graph): ), auth_token_field="token", ).with_policy(public), + # context_raw=deno.fetch_context().with_policy(public) # no args if shape is unknown + context=deno.fetch_context( + t.struct( + { + "provider": t.string(), + "accessToken": t.string(), + "refreshAt": t.integer(), + "profile": t.struct( + { + "id": t.integer(), + } + ), + "exp": t.integer(), + "iat": t.integer(), + } + ) + ).with_policy(public), ) diff --git a/tests/auth/auth_test.ts b/tests/auth/auth_test.ts index 1ee2f7502..15743659f 100644 --- a/tests/auth/auth_test.ts +++ b/tests/auth/auth_test.ts @@ -331,6 +331,12 @@ Meta.test("Auth", async (t) => { token(x: 1) { x } + context { + provider + profile { + id + } + } } ` .withHeaders({ authorization: `bearer ${jwt}` }) @@ -347,6 +353,12 @@ Meta.test("Auth", async (t) => { token: { x: 1, }, + context: { + provider: "github", + profile: { + id: 123 + } + } }) .on(e); }); diff --git a/tests/runtimes/substantial/common.ts b/tests/runtimes/substantial/common.ts index 9dca65020..ffd012a41 100644 --- a/tests/runtimes/substantial/common.ts +++ b/tests/runtimes/substantial/common.ts @@ -5,6 +5,7 @@ import { assertEquals, assertExists } from "@std/assert"; import { connect, parseURL } from "redis"; import { gql, Meta, sleep } from "../../utils/mod.ts"; import { MetaTestCleanupFn } from "test-utils/test.ts"; +import { Expr } from "@metatype/typegate/runtimes/substantial/filter_utils.ts"; export type BackendName = "fs" | "memory" | "redis"; @@ -563,7 +564,7 @@ export function childWorkflowTestTemplate( await sleep(delays.awaitCompleteSec * 1000); - t.should(`complete parent and all child workflows`, async () => { + await t.should(`complete parent and all child workflows`, async () => { await gql` query { children: results_raw(name: "bumpPackage") { @@ -622,6 +623,43 @@ export function childWorkflowTestTemplate( }) .on(e); }); + + + await t.should(`filter the runs given a nested expr (${backendName})`, async () => { + await gql` + query { + search(name: "bumpPackage", filter: $filter) { + # started_at + # ended_at + status + value + } + } + ` + .withVars({ + filter: { + or: [ + { + and: [ + { status: { contains: JSON.stringify("COMPL") }}, + { contains: JSON.stringify("substantial") } + ] + }, + { not: { not: { eq: JSON.stringify("Bump typegraph v3 => v4") } } } + ] + } satisfies Expr + }) + .expectBody((body) => { + const sorted = body.data.search.sort((a: any, b: any) => a.value.localeCompare(b.value)); + assertEquals(sorted, + [ + { status: "COMPLETED", value: '"Bump substantial v2 => v3"' }, + { status: "COMPLETED", value: '"Bump typegraph v3 => v4"' } + ] + ); + }) + .on(e); + }); }, ); } @@ -660,7 +698,7 @@ export function inputMutationTemplate( let currentRunId: string | null = null; await t.should( - `start accidentialInputMutation workflow and return its run id (${backendName})`, + `start accidentalInputMutation workflow and return its run id (${backendName})`, async () => { await gql` mutation { @@ -688,7 +726,7 @@ export function inputMutationTemplate( async () => { await gql` query { - results_raw(name: "accidentialInputMutation") { + results_raw(name: "accidentalInputMutation") { ongoing { count runs { diff --git a/tests/runtimes/substantial/filter_utils_test.ts b/tests/runtimes/substantial/filter_utils_test.ts new file mode 100644 index 000000000..150efa218 --- /dev/null +++ b/tests/runtimes/substantial/filter_utils_test.ts @@ -0,0 +1,212 @@ +// Copyright Metatype OÜ, licensed under the Mozilla Public License Version 2.0. +// SPDX-License-Identifier: MPL-2.0 + +import { assertEquals } from "@std/assert"; +import { Meta } from "../../utils/mod.ts"; +import { + evalExpr, + ExecutionStatus, + Expr, + SearchItem, +} from "@metatype/typegate/runtimes/substantial/filter_utils.ts"; + +function addDays(date: Date, days: number) { + const ret = new Date(date); + ret.setDate(ret.getDate() + days); + return ret; +} + +function val(x: unknown) { + return JSON.stringify(x); +} + +export function testData() { + const samples = [ + { "COMPLETED_WITH_ERROR": "Fatal: error" }, + { "COMPLETED": true }, + { "ONGOING": undefined }, + { "COMPLETED": [1, 2, ["three"]] }, + { "ONGOING": undefined }, + { "COMPLETED_WITH_ERROR": { nested: { object: 1234 }, b: 4 } }, + { "COMPLETED": null }, + { "COMPLETED": 1 }, + { "COMPLETED_WITH_ERROR": 2 }, + { "COMPLETED": 3 }, + ] satisfies Array<{ [K in ExecutionStatus]?: unknown }>; + + const dataset = []; + + let start = new Date("2024-01-01"), end = null; + for (let i = 0; i < samples.length; i += 1) { + end = addDays(start, 1); + const [status, value] = Object.entries(samples[i])[0] as [ + ExecutionStatus, + unknown, + ]; + + dataset.push( + new SearchItem( + `fakeUUID#${i}`, + start.toJSON(), + status == "ONGOING" ? null : end.toJSON(), + status, + value, + ), + ); + + if (i % 2 == 0) { + start = end; + } + } + + return dataset; +} + +Meta.test("base filter logic", async (t) => { + const testShould = async ( + fact: string, + data: { filter: Expr; expected: Array }, + ) => { + await t.should(fact, () => { + const items = testData(); + const searchResults = []; + for (const item of items) { + if (evalExpr(item, data.filter, [""])) { + searchResults.push(item.toSearchResult()); + } + } + assertEquals(searchResults, data.expected); + }); + }; + + // ------------------ + await testShould("be able to discriminate truthy values and 1", { + filter: { eq: val(1) }, + expected: [ + { + ended_at: "2024-01-06T00:00:00.000Z", + run_id: "fakeUUID#7", + started_at: "2024-01-05T00:00:00.000Z", + status: "COMPLETED", + value: "1", + }, + ], + }); + + await testShould('work with null and special values (e.g. "status")', { + filter: { + or: [ + { status: { eq: val("ONGOING") } }, + { eq: val(null) }, + ], + }, + expected: [ + { + run_id: "fakeUUID#2", + started_at: "2024-01-02T00:00:00.000Z", + ended_at: null, + status: "ONGOING", + value: undefined, + }, + { + run_id: "fakeUUID#4", + started_at: "2024-01-03T00:00:00.000Z", + ended_at: null, + status: "ONGOING", + value: undefined, + }, + { + ended_at: "2024-01-05T00:00:00.000Z", + run_id: "fakeUUID#6", + started_at: "2024-01-04T00:00:00.000Z", + status: "COMPLETED", + value: "null", + }, + ], + }); + + await testShould('work with "in" and "contains" operators', { + filter: { + or: [ + { + and: [ + { contains: val(1) }, + { contains: val(["three"]) }, + ], + }, + { contains: val({ nested: { object: 1234 } }) }, + { in: val("Fatal: error+ some other string") }, + ], + }, + expected: [ + { + run_id: "fakeUUID#0", + started_at: "2024-01-01T00:00:00.000Z", + ended_at: "2024-01-02T00:00:00.000Z", + status: "COMPLETED_WITH_ERROR", + value: '"Fatal: error"', + }, + { + run_id: "fakeUUID#3", + started_at: "2024-01-03T00:00:00.000Z", + ended_at: "2024-01-04T00:00:00.000Z", + status: "COMPLETED", + value: '[1,2,["three"]]', + }, + { + ended_at: "2024-01-05T00:00:00.000Z", + run_id: "fakeUUID#5", + started_at: "2024-01-04T00:00:00.000Z", + status: "COMPLETED_WITH_ERROR", + value: '{"nested":{"object":1234},"b":4}', + }, + ], + }); + + await testShould( + "be able to compare numbers and strings on all kinds of terms (special + simple) ", + { + filter: { + or: [ + { + and: [ + { started_at: { gte: val("2024-01-02") } }, + { not: { not: { ended_at: { eq: val(null) } } } }, + ], + }, + { lte: val(1) }, + ], + }, + expected: [ + { + run_id: "fakeUUID#2", + started_at: "2024-01-02T00:00:00.000Z", + ended_at: null, + status: "ONGOING", + value: undefined, + }, + { + run_id: "fakeUUID#4", + started_at: "2024-01-03T00:00:00.000Z", + ended_at: null, + status: "ONGOING", + value: undefined, + }, + { + run_id: "fakeUUID#7", + started_at: "2024-01-05T00:00:00.000Z", + ended_at: "2024-01-06T00:00:00.000Z", + status: "COMPLETED", + value: "1", + }, + ], + }, + ); +}); + +// TODO: bench? +// Each case should be relatively close as we traverse from start to end without any back and forth +// 1. benchmark(listTrasersal, bigNoopExpr, items=100000) +// 2. benchmark(arrayWithNegFilter, bigNoopExpr, items=100000) + +// then avg unit overhead := [time(bigNoopExpr) - time(listTrasersal)] / 100000 diff --git a/tests/runtimes/substantial/imports/common_types.ts b/tests/runtimes/substantial/imports/common_types.ts index 3d7fe58dc..8eedc9bef 100644 --- a/tests/runtimes/substantial/imports/common_types.ts +++ b/tests/runtimes/substantial/imports/common_types.ts @@ -46,6 +46,14 @@ export interface Context { createWorkflowHandle( handleDef: SerializableWorkflowHandle, ): ChildWorkflowHandle; + + logger: SubLogger +} + +interface SubLogger { + warn: (...args: unknown[]) => Promise; + info: (...args: unknown[]) => Promise; + error: (...args: unknown[]) => Promise; } export type TaskCtx = { diff --git a/tests/runtimes/substantial/substantial.py b/tests/runtimes/substantial/substantial.py index 65f75b9f4..3eb55ae68 100644 --- a/tests/runtimes/substantial/substantial.py +++ b/tests/runtimes/substantial/substantial.py @@ -21,14 +21,16 @@ def substantial(g: Graph): backend = Backend.redis("SUB_REDIS") file = ( - WorkflowFile.deno(file="workflow.ts", deps=["imports/common_types.ts"]) + WorkflowFile.deno( + file="workflows/workflow.ts", deps=["imports/common_types.ts"] + ) .import_( [ "saveAndSleepExample", "eventsAndExceptionExample", "retryExample", "secretsExample", - "accidentialInputMutation", + "accidentalInputMutation", ] ) .build() @@ -80,6 +82,6 @@ def substantial(g: Graph): ) } ) - ).reduce({"name": "accidentialInputMutation"}), + ).reduce({"name": "accidentalInputMutation"}), **sub.internals(), ) diff --git a/tests/runtimes/substantial/substantial_child_workflow.py b/tests/runtimes/substantial/substantial_child_workflow.py index e9a545aad..bff454029 100644 --- a/tests/runtimes/substantial/substantial_child_workflow.py +++ b/tests/runtimes/substantial/substantial_child_workflow.py @@ -20,7 +20,9 @@ def substantial_child_workflow(g: Graph): backend = Backend.redis("SUB_REDIS") file = ( - WorkflowFile.deno(file="child_workflow.ts", deps=["imports/common_types.ts"]) + WorkflowFile.deno( + file="workflows/child_workflow.ts", deps=["imports/common_types.ts"] + ) .import_(["bumpPackage", "bumpAll"]) .build() ) @@ -38,5 +40,6 @@ def substantial_child_workflow(g: Graph): start=sub.start(t.struct({"packages": t.list(package)})).reduce( {"name": "bumpAll"} ), + search=sub.advanced_filters(), **sub.internals(), ) diff --git a/tests/runtimes/substantial/child_workflow.ts b/tests/runtimes/substantial/workflows/child_workflow.ts similarity index 97% rename from tests/runtimes/substantial/child_workflow.ts rename to tests/runtimes/substantial/workflows/child_workflow.ts index 161dc40e0..4a16d3685 100644 --- a/tests/runtimes/substantial/child_workflow.ts +++ b/tests/runtimes/substantial/workflows/child_workflow.ts @@ -1,7 +1,7 @@ // Copyright Metatype OÜ, licensed under the Mozilla Public License Version 2.0. // SPDX-License-Identifier: MPL-2.0 -import { Context } from "./imports/common_types.ts"; +import { Context } from "../imports/common_types.ts"; function apply(pkg: string, oldVersion: string, newVersion: string) { console.info( diff --git a/tests/runtimes/substantial/workflow.ts b/tests/runtimes/substantial/workflows/workflow.ts similarity index 92% rename from tests/runtimes/substantial/workflow.ts rename to tests/runtimes/substantial/workflows/workflow.ts index 51a227938..b625ec99d 100644 --- a/tests/runtimes/substantial/workflow.ts +++ b/tests/runtimes/substantial/workflows/workflow.ts @@ -7,18 +7,21 @@ import { sendSubscriptionEmail, sleep, Workflow, -} from "./imports/common_types.ts"; +} from "../imports/common_types.ts"; export const eventsAndExceptionExample: Workflow = async ( ctx: Context ) => { const { to } = ctx.kwargs; const messageDialog = await ctx.save(() => sendSubscriptionEmail(to)); + await ctx.logger.info("Will send to", to); + await ctx.logger.warn("Will now wait on an event"); // This will wait until a `confirmation` event is sent to THIS workflow const confirmation = ctx.receive("confirmation"); if (!confirmation) { + await ctx.logger.error("Denial", to); throw new Error(`${to} has denied the subscription`); } @@ -109,7 +112,7 @@ export const secretsExample: Workflow = (_, { secrets }) => { return Promise.resolve(); }; -export async function accidentialInputMutation(ctx: Context) { +export async function accidentalInputMutation(ctx: Context) { const { items } = ctx.kwargs; const copy = [];