From 91e9e18ec3baeb6e6b4c06d587ff844442f39ef1 Mon Sep 17 00:00:00 2001 From: michael-0acf4 Date: Thu, 28 Nov 2024 18:16:47 +0300 Subject: [PATCH 1/8] feat: fetch_context utils --- src/typegraph/deno/src/runtimes/deno.ts | 10 ++++++++++ src/typegraph/python/typegraph/runtimes/deno.py | 17 ++++++++++++++++- tests/auth/auth.py | 17 +++++++++++++++++ tests/auth/auth_test.ts | 12 ++++++++++++ 4 files changed, 55 insertions(+), 1 deletion(-) diff --git a/src/typegraph/deno/src/runtimes/deno.ts b/src/typegraph/deno/src/runtimes/deno.ts index f72cc9b6e..9eb821093 100644 --- a/src/typegraph/deno/src/runtimes/deno.ts +++ b/src/typegraph/deno/src/runtimes/deno.ts @@ -130,6 +130,16 @@ export class DenoRuntime extends Runtime { return t.func(t.struct({}), out, mat); } + /** Utility for fetching the current request context */ + fetchContext(outputShape?: C): t.Func { + const returnValue = outputShape ? `context` : "JSON.stringify(context)"; + return this.func( + t.struct({}), + outputShape ?? t.json(), + { code: `(_, { context }) => ${returnValue}` } + ); + } + policy(name: string, _code: string): Policy; policy(name: string, data: Omit): Policy; policy(name: string, data: string | Omit): Policy { diff --git a/src/typegraph/python/typegraph/runtimes/deno.py b/src/typegraph/python/typegraph/runtimes/deno.py index 58657df4a..08dc00512 100644 --- a/src/typegraph/python/typegraph/runtimes/deno.py +++ b/src/typegraph/python/typegraph/runtimes/deno.py @@ -4,7 +4,7 @@ import json import re from dataclasses import dataclass -from typing import TYPE_CHECKING, Any, List, Optional +from typing import TYPE_CHECKING, Any, List, Optional, Union from typegraph.gen.exports.runtimes import ( Effect, @@ -129,6 +129,21 @@ def identity(self, inp: "t.struct") -> "t.func": PredefinedFunMat(id=res.value, name="identity", effect=EffectRead()), ) + def fetch_context(self, output_shape: Union["t.struct", None] = None): + """ + Utility for fetching the current request context + """ + return_value = ( + "context" if output_shape is not None else "JSON.stringify(context)" + ) + from typegraph import t + + return self.func( + inp=t.struct(), + out=output_shape or t.json(), + code=f"(_, {{ context }}) => {return_value}", + ) + def policy( self, name: str, code: str, secrets: Optional[List[str]] = None ) -> Policy: diff --git a/tests/auth/auth.py b/tests/auth/auth.py index 78ea96dd5..e3a1703de 100644 --- a/tests/auth/auth.py +++ b/tests/auth/auth.py @@ -60,4 +60,21 @@ def auth(g: Graph): ), auth_token_field="token", ).with_policy(public), + # context_raw=deno.fetch_context().with_policy(public) # no args if shape is unknown + context=deno.fetch_context( + t.struct( + { + "provider": t.string(), + "accessToken": t.string(), + "refreshAt": t.integer(), + "profile": t.struct( + { + "id": t.integer(), + } + ), + "exp": t.integer(), + "iat": t.integer(), + } + ) + ).with_policy(public), ) diff --git a/tests/auth/auth_test.ts b/tests/auth/auth_test.ts index 1ee2f7502..15743659f 100644 --- a/tests/auth/auth_test.ts +++ b/tests/auth/auth_test.ts @@ -331,6 +331,12 @@ Meta.test("Auth", async (t) => { token(x: 1) { x } + context { + provider + profile { + id + } + } } ` .withHeaders({ authorization: `bearer ${jwt}` }) @@ -347,6 +353,12 @@ Meta.test("Auth", async (t) => { token: { x: 1, }, + context: { + provider: "github", + profile: { + id: 123 + } + } }) .on(e); }); From 890c8fba66fc0774f782726dbceafb8ae7372191 Mon Sep 17 00:00:00 2001 From: michael-0acf4 Date: Thu, 28 Nov 2024 19:25:46 +0300 Subject: [PATCH 2/8] feat: durable logger --- .../src/runtimes/substantial/deno_context.ts | 49 +++++++++++++++++++ .../substantial/imports/common_types.ts | 8 +++ tests/runtimes/substantial/workflow.ts | 3 ++ 3 files changed, 60 insertions(+) diff --git a/src/typegate/src/runtimes/substantial/deno_context.ts b/src/typegate/src/runtimes/substantial/deno_context.ts index a9a871db8..302ad8ce8 100644 --- a/src/typegate/src/runtimes/substantial/deno_context.ts +++ b/src/typegate/src/runtimes/substantial/deno_context.ts @@ -14,10 +14,12 @@ export class Context { private id = 0; public kwargs = {}; gql: ReturnType; + logger: SubLogger; constructor(private run: Run, private internal: TaskContext) { this.gql = createGQLClient(internal); this.kwargs = getKwargsCopy(run); + this.logger = new SubLogger(this); } #nextId() { @@ -414,6 +416,53 @@ class RetryStrategy { } } + +class SubLogger { + constructor(private ctx: Context) {} + + async #log(kind: "warn" | "error" | "info", ...args: unknown[]) { + await this.ctx.save(() => { + const prefix = `[${kind.toUpperCase()}: ${this.ctx.getRun().run_id}]`; + switch(kind) { + case "warn": { + console.warn(prefix, ...args); + break; + } + case "error": { + console.error(prefix,...args); + break; + } + default: { + console.info(prefix, ...args); + break; + } + } + + const message = args.map((arg) => { + try { + return JSON.stringify(arg); + } catch(_) { + return String(arg); + } + }).join(" "); + + return `${prefix}: ${message}`; + }); + } + + async warn(...payload: unknown[]) { + await this.#log("warn", ...payload); + } + + async info(...payload: unknown[]) { + await this.#log("info", ...payload); + } + + async error(...payload: unknown[]) { + await this.#log("error", ...payload); + } +} + function createGQLClient(internal: TaskContext) { const tgLocal = new URL(internal.meta.url); if (testBaseUrl) { diff --git a/tests/runtimes/substantial/imports/common_types.ts b/tests/runtimes/substantial/imports/common_types.ts index 3d7fe58dc..8eedc9bef 100644 --- a/tests/runtimes/substantial/imports/common_types.ts +++ b/tests/runtimes/substantial/imports/common_types.ts @@ -46,6 +46,14 @@ export interface Context { createWorkflowHandle( handleDef: SerializableWorkflowHandle, ): ChildWorkflowHandle; + + logger: SubLogger +} + +interface SubLogger { + warn: (...args: unknown[]) => Promise; + info: (...args: unknown[]) => Promise; + error: (...args: unknown[]) => Promise; } export type TaskCtx = { diff --git a/tests/runtimes/substantial/workflow.ts b/tests/runtimes/substantial/workflow.ts index 51a227938..a39ee93a9 100644 --- a/tests/runtimes/substantial/workflow.ts +++ b/tests/runtimes/substantial/workflow.ts @@ -14,11 +14,14 @@ export const eventsAndExceptionExample: Workflow = async ( ) => { const { to } = ctx.kwargs; const messageDialog = await ctx.save(() => sendSubscriptionEmail(to)); + await ctx.logger.info("Will send to", to); + await ctx.logger.warn("Will now wait on an event"); // This will wait until a `confirmation` event is sent to THIS workflow const confirmation = ctx.receive("confirmation"); if (!confirmation) { + await ctx.logger.error("Denial", to); throw new Error(`${to} has denied the subscription`); } From 0ac826174319c40ac1030597ae8c17714e2422d5 Mon Sep 17 00:00:00 2001 From: michael-0acf4 Date: Fri, 29 Nov 2024 22:09:00 +0300 Subject: [PATCH 3/8] wip: filter types --- src/typegate/src/runtimes/substantial.ts | 26 +++- .../src/runtimes/substantial/deno_context.ts | 5 +- .../{substantial.rs => substantial/mod.rs} | 103 ++++++-------- .../src/runtimes/substantial/type_utils.rs | 132 ++++++++++++++++++ src/typegraph/core/src/t.rs | 15 +- src/typegraph/core/wit/typegraph.wit | 4 +- .../deno/src/runtimes/substantial.ts | 6 + .../python/typegraph/runtimes/substantial.py | 4 + 8 files changed, 225 insertions(+), 70 deletions(-) rename src/typegraph/core/src/runtimes/{substantial.rs => substantial/mod.rs} (77%) create mode 100644 src/typegraph/core/src/runtimes/substantial/type_utils.rs diff --git a/src/typegate/src/runtimes/substantial.ts b/src/typegate/src/runtimes/substantial.ts index 35c366b0d..a3bc5596b 100644 --- a/src/typegate/src/runtimes/substantial.ts +++ b/src/typegate/src/runtimes/substantial.ts @@ -218,9 +218,11 @@ export class SubstantialRuntime extends Runtime { return JSON.parse(JSON.stringify(res)); }; case "results": - return this.#resultsResover(false); + return this.#resultsResolver(false); case "results_raw": - return this.#resultsResover(true); + return this.#resultsResolver(true); + case "advanced_filters": + return this.#advancedFiltersResolver(); case "internal_link_parent_child": return this.#linkerResolver(); default: @@ -285,7 +287,7 @@ export class SubstantialRuntime extends Runtime { }; } - #resultsResover(enableGenerics: boolean): Resolver { + #resultsResolver(enableGenerics: boolean): Resolver { return async ({ name: workflowName }) => { this.#checkWorkflowExistOrThrow(workflowName); @@ -407,6 +409,24 @@ export class SubstantialRuntime extends Runtime { }; } + #advancedFiltersResolver(): Resolver { + return ({ filter }) => { + console.log("Filter", filter); + + const dummySearchResult = { + run_id: "fake", + started_at: new Date().toJSON(), + ended_at: new Date().toJSON(), + status: "COMPLETED", + value: JSON.stringify(filter) + }; + + return [ + dummySearchResult + ]; + }; + } + #linkerResolver(): Resolver { return async ({ parent_run_id, child_run_id }) => { await Meta.substantial.metadataWriteParentChildLink({ diff --git a/src/typegate/src/runtimes/substantial/deno_context.ts b/src/typegate/src/runtimes/substantial/deno_context.ts index 302ad8ce8..73090e156 100644 --- a/src/typegate/src/runtimes/substantial/deno_context.ts +++ b/src/typegate/src/runtimes/substantial/deno_context.ts @@ -440,7 +440,10 @@ class SubLogger { const message = args.map((arg) => { try { - return JSON.stringify(arg); + const json = JSON.stringify(arg); + // Functions are omitted, + // For example, JSON.stringify(() => 1234) => undefined (no throw) + return json === undefined ? String(arg) : json; } catch(_) { return String(arg); } diff --git a/src/typegraph/core/src/runtimes/substantial.rs b/src/typegraph/core/src/runtimes/substantial/mod.rs similarity index 77% rename from src/typegraph/core/src/runtimes/substantial.rs rename to src/typegraph/core/src/runtimes/substantial/mod.rs index 23a824553..e71fb58ea 100644 --- a/src/typegraph/core/src/runtimes/substantial.rs +++ b/src/typegraph/core/src/runtimes/substantial/mod.rs @@ -6,7 +6,6 @@ use crate::errors::Result; use crate::global_store::Store; use crate::t::{self, TypeBuilder}; use crate::typegraph::TypegraphContext; -use crate::types::WithRuntimeConfig; use crate::wit::core::FuncParams; use crate::wit::{ core::RuntimeId, runtimes::Effect as WitEffect, runtimes::SubstantialOperationData, @@ -14,6 +13,8 @@ use crate::wit::{ use common::typegraph::Materializer; use serde_json::json; +mod type_utils; + #[derive(Debug)] pub enum SubstantialMaterializer { Start { secrets: Vec }, @@ -25,6 +26,7 @@ pub enum SubstantialMaterializer { Results, ResultsRaw, InternalLinkParentChild, + AdvancedFilters, } impl MaterializerConverter for SubstantialMaterializer { @@ -55,6 +57,7 @@ impl MaterializerConverter for SubstantialMaterializer { SubstantialMaterializer::InternalLinkParentChild => { ("internal_link_parent_child".to_string(), json!({})) } + SubstantialMaterializer::AdvancedFilters => ("advanced_filters".to_string(), json!({})), }; Ok(Materializer { @@ -70,9 +73,9 @@ pub fn substantial_operation( runtime: RuntimeId, data: SubstantialOperationData, ) -> Result { - let mut inp = t::struct_(); - let (effect, mat_data, out_ty) = match data { + let (effect, mat_data, inp_ty, out_ty) = match data { SubstantialOperationData::Start(data) => { + let mut inp = t::struct_(); inp.prop("name", t::string().build()?); inp.prop( "kwargs", @@ -86,27 +89,32 @@ pub fn substantial_operation( SubstantialMaterializer::Start { secrets: data.secrets, }, + inp.build()?, t::string().build()?, ) } SubstantialOperationData::StartRaw(data) => { + let mut inp = t::struct_(); inp.prop("name", t::string().build()?); - inp.prop("kwargs", t_json_string()?.into()); + inp.prop("kwargs", t::json_str()?); ( WitEffect::Create(true), SubstantialMaterializer::StartRaw { secrets: data.secrets, }, + inp.build()?, t::string().build()?, ) } SubstantialOperationData::Stop => { + let mut inp = t::struct_(); inp.prop("run_id", t::string().build()?); ( WitEffect::Create(false), SubstantialMaterializer::Stop, + inp.build()?, t::list(t::string().build()?).build()?, ) } @@ -116,31 +124,36 @@ pub fn substantial_operation( .prop("payload", data.into()) .build()?; + let mut inp = t::struct_(); inp.prop("run_id", t::string().build()?); inp.prop("event", event); ( WitEffect::Create(false), SubstantialMaterializer::Send, + inp.build()?, t::string().build()?, ) } SubstantialOperationData::SendRaw => { let event = t::struct_() .prop("name", t::string().build()?) - .prop("payload", t_json_string()?.into()) + .prop("payload", t::json_str()?) .build()?; + let mut inp = t::struct_(); inp.prop("run_id", t::string().build()?); inp.prop("event", event); ( WitEffect::Create(false), SubstantialMaterializer::SendRaw, + inp.build()?, t::string().build()?, ) } SubstantialOperationData::Resources => { + let mut inp = t::struct_(); inp.prop("name", t::string().build()?); let row = t::struct_() @@ -157,96 +170,58 @@ pub fn substantial_operation( .prop("running", t::list(row).build()?) .build()?; - (WitEffect::Read, SubstantialMaterializer::Resources, out) + ( + WitEffect::Read, + SubstantialMaterializer::Resources, + inp.build()?, + out, + ) } SubstantialOperationData::Results(data) => { + let mut inp = t::struct_(); inp.prop("name", t::string().build()?); ( WitEffect::Read, SubstantialMaterializer::Results, - results_op_results_ty(data)?, + inp.build()?, + type_utils::results_op_results_ty(data)?, ) } SubstantialOperationData::ResultsRaw => { + let mut inp = t::struct_(); inp.prop("name", t::string().build()?); ( WitEffect::Read, SubstantialMaterializer::ResultsRaw, - results_op_results_ty(t_json_string()?)?, + inp.build()?, + type_utils::results_op_results_ty(t::json_str()?.into())?, ) } SubstantialOperationData::InternalLinkParentChild => { + let mut inp = t::struct_(); inp.prop("parent_run_id", t::string().build()?); inp.prop("child_run_id", t::string().build()?); ( WitEffect::Create(true), SubstantialMaterializer::InternalLinkParentChild, + inp.build()?, t::boolean().build()?, ) } + SubstantialOperationData::AdvancedFilters => ( + WitEffect::Read, + SubstantialMaterializer::AdvancedFilters, + type_utils::filter_expr_ty()?, + type_utils::search_results_ty()?, + ), }; let mat = super::Materializer::substantial(runtime, mat_data, effect); let mat_id = Store::register_materializer(mat); Ok(FuncParams { - inp: inp.build()?.into(), + inp: inp_ty.into(), out: out_ty.into(), mat: mat_id, }) } - -fn t_json_string() -> Result { - t::string() - .build() - .and_then(|r| { - r.with_config(json!({ - "format": "json" - })) - }) - .map(|r| r.id().into()) -} - -fn results_op_results_ty(out: u32) -> Result { - let count = t::integer().build()?; - - let result = t::struct_() - .prop("status", t::string().build()?) - .prop("value", t::optional(out.into()).build()?) - .build()?; - - let ongoing_runs = t::list( - t::struct_() - .prop("run_id", t::string().build()?) - .prop("started_at", t::string().build()?) - .build()?, - ) - .build()?; - - let completed_runs = t::list( - t::struct_() - .prop("run_id", t::string().build()?) - .prop("started_at", t::string().build()?) - .prop("ended_at", t::string().build()?) - .prop("result", result) - .build()?, - ) - .build()?; - - t::struct_() - .prop( - "ongoing", - t::struct_() - .prop("count", count) - .prop("runs", ongoing_runs) - .build()?, - ) - .prop( - "completed", - t::struct_() - .prop("count", count) - .prop("runs", completed_runs) - .build()?, - ) - .build() -} diff --git a/src/typegraph/core/src/runtimes/substantial/type_utils.rs b/src/typegraph/core/src/runtimes/substantial/type_utils.rs new file mode 100644 index 000000000..c089e86bc --- /dev/null +++ b/src/typegraph/core/src/runtimes/substantial/type_utils.rs @@ -0,0 +1,132 @@ +// Copyright Metatype OÜ, licensed under the Mozilla Public License Version 2.0. +// SPDX-License-Identifier: MPL-2.0 + +use crate::errors::Result; +use crate::t::{self, TypeBuilder}; + +pub fn results_op_results_ty(out: u32) -> Result { + let count = t::integer().build()?; + + let result = t::struct_() + .prop("status", t::string().build()?) + .prop("value", t::optional(out.into()).build()?) + .build()?; + + let ongoing_runs = t::list( + t::struct_() + .prop("run_id", t::string().build()?) + .prop("started_at", t::string().build()?) + .build()?, + ) + .build()?; + + let completed_runs = t::list( + t::struct_() + .prop("run_id", t::string().build()?) + .prop("started_at", t::string().build()?) + .prop("ended_at", t::string().build()?) + .prop("result", result) + .build()?, + ) + .build()?; + + t::struct_() + .prop( + "ongoing", + t::struct_() + .prop("count", count) + .prop("runs", ongoing_runs) + .build()?, + ) + .prop( + "completed", + t::struct_() + .prop("count", count) + .prop("runs", completed_runs) + .build()?, + ) + .build() +} + +/// Term `{ op: value } | { special: { op: value } }` +/// * op: "eq", "lt", "contains", .. +/// * special: "started_at", "status", .. +fn filter_term_variants() -> Result> { + // FIXME: a generic json would have been helpful here vs json string + // let any = t::eitherx!(t::integer(), t::string(), t::boolean(), ...).build_named("AnyValue")?; + let value_to_comp_against = t::json_str()?; + let ops = ["eq", "lt", "lte", "gt", "gte", "in", "contains"] + .into_iter() + .map(|op| t::struct_().prop(op, value_to_comp_against).build()) + .collect::>>()?; + + let op_value = t::either(ops.clone().into_iter()).build()?; + let special = ["started_at", "ended_at", "status"] + .into_iter() + .map(|sp| t::struct_().prop(sp, op_value).build()) + .collect::>>()?; + + let mut variants = vec![]; + variants.extend(ops.iter()); + variants.extend(special.iter()); + + Ok(variants) +} + +/// Expr `{ op: [...] }` +/// * op: "and" or "or" +/// * ...: may contain itself, or a term +pub fn filter_expr_ty() -> Result { + let mut and = t::struct_(); + let mut or = t::struct_(); + + let op_term_variants = filter_term_variants()?; + + let mut expr = t::eitherx!(and, or); + expr.data + .variants + .extend(op_term_variants.into_iter().map(|ty| { + let id: u32 = ty.into(); + id + })); + + let expr_id = expr.build()?; + + and.prop("and", t::listx(expr_id).build()?); + or.prop("or", t::listx(expr_id).build()?); + + /* + query { + search(filter:{ + and: [ + { started_at: { eq: "a" } }, + { status: { contains: "STO" } }, + { in: "abc" } + # { or: []} # FIXME: broken ref + ] + }) { + started_at + ended_at + status + value + } + } + + */ + t::struct_() + .prop("filter", t::eitherx!(expr_id, and, or).build()?) + .build() +} + +pub fn search_results_ty() -> Result { + t::list( + t::struct_() + .prop("run_id", t::string().build()?) + .prop("started_at", t::string().build()?) + .prop("ended_at", t::string().build()?) + .prop("status", t::string().build()?) + .prop("value", t::json_str()?) + .build()?, + ) + .build() +} diff --git a/src/typegraph/core/src/t.rs b/src/typegraph/core/src/t.rs index 40e06b1fa..4bdcafbc7 100644 --- a/src/typegraph/core/src/t.rs +++ b/src/typegraph/core/src/t.rs @@ -5,6 +5,7 @@ use crate::errors::Result; use crate::errors::TgError; use crate::types::RefAttr; use crate::types::TypeRefBuilder; +use crate::types::WithRuntimeConfig; use crate::types::{Named as _, TypeId, TypeRef}; use crate::wit::core::{ @@ -334,11 +335,12 @@ macro_rules! unionx { crate::t::unionx![$($ty),*] }; } +use serde_json::json; pub(crate) use unionx; #[derive(Default)] pub struct EitherBuilder { - data: TypeEither, + pub data: TypeEither, } #[allow(clippy::derivable_impls)] @@ -391,6 +393,17 @@ pub fn struct_() -> StructBuilder { Default::default() } +pub fn json_str() -> Result { + string() + .build() + .and_then(|r| { + r.with_config(json!({ + "format": "json" + })) + }) + .map(|r| r.id()) +} + pub fn struct_extends(ty: TypeId) -> Result { Ok(StructBuilder { data: TypeStruct { diff --git a/src/typegraph/core/wit/typegraph.wit b/src/typegraph/core/wit/typegraph.wit index 763ce9b2f..cda10f336 100644 --- a/src/typegraph/core/wit/typegraph.wit +++ b/src/typegraph/core/wit/typegraph.wit @@ -529,7 +529,9 @@ interface runtimes { // type of result results(type-id), results-raw, - internal-link-parent-child + internal-link-parent-child, + // filters + advanced-filters } register-substantial-runtime: func(data: substantial-runtime-data) -> result; diff --git a/src/typegraph/deno/src/runtimes/substantial.ts b/src/typegraph/deno/src/runtimes/substantial.ts index 7e9c81f85..9d2dc1259 100644 --- a/src/typegraph/deno/src/runtimes/substantial.ts +++ b/src/typegraph/deno/src/runtimes/substantial.ts @@ -100,6 +100,12 @@ export class SubstantialRuntime extends Runtime { }); } + advancedFilters(): Func { + return this._genericSubstantialFunc({ + tag: "advanced-filters" + }); + } + #internalLinkParentChild(): Func { return this._genericSubstantialFunc({ tag: "internal-link-parent-child", diff --git a/src/typegraph/python/typegraph/runtimes/substantial.py b/src/typegraph/python/typegraph/runtimes/substantial.py index f0cb7a80c..ecd9ab164 100644 --- a/src/typegraph/python/typegraph/runtimes/substantial.py +++ b/src/typegraph/python/typegraph/runtimes/substantial.py @@ -19,6 +19,7 @@ SubstantialOperationDataStart, SubstantialOperationDataStartRaw, SubstantialOperationDataStop, + SubstantialOperationDataAdvancedFilters, SubstantialRuntimeData, SubstantialStartData, WorkflowFileDescription, @@ -85,6 +86,9 @@ def query_results(self, output: "t.typedef"): def query_results_raw(self): return self._generic_substantial_func(SubstantialOperationDataResultsRaw()) + def advanced_filters(self): + return self._generic_substantial_func(SubstantialOperationDataAdvancedFilters()) + def _internal_link_parent_child(self): return self._generic_substantial_func( SubstantialOperationDataInternalLinkParentChild() From 14d7003498aca1775b588e7c17c2b9ef24191378 Mon Sep 17 00:00:00 2001 From: michael-0acf4 Date: Mon, 2 Dec 2024 17:47:51 +0300 Subject: [PATCH 4/8] feat: type typegraph side --- .../src/runtimes/substantial/type_utils.rs | 95 +++++++++++-------- 1 file changed, 56 insertions(+), 39 deletions(-) diff --git a/src/typegraph/core/src/runtimes/substantial/type_utils.rs b/src/typegraph/core/src/runtimes/substantial/type_utils.rs index c089e86bc..ea68e7741 100644 --- a/src/typegraph/core/src/runtimes/substantial/type_utils.rs +++ b/src/typegraph/core/src/runtimes/substantial/type_utils.rs @@ -1,6 +1,10 @@ // Copyright Metatype OÜ, licensed under the Mozilla Public License Version 2.0. // SPDX-License-Identifier: MPL-2.0 +use std::cell::RefCell; + +use indexmap::IndexSet; + use crate::errors::Result; use crate::t::{self, TypeBuilder}; @@ -48,6 +52,33 @@ pub fn results_op_results_ty(out: u32) -> Result { .build() } +thread_local! { + static RECORDER: RefCell> = Default::default(); +} + +fn loc_ref(name: &str) -> Result { + let name = format!("s_{name}").to_lowercase(); + t::ref_(name, Default::default()).build() +} + +fn save( + name: &str, + builder: impl FnOnce(&str) -> Result, +) -> Result { + let name = format!("s_{name}").to_lowercase(); + + let has_name = RECORDER.with_borrow(|cache| cache.contains(&name)); + if has_name { + return t::ref_(name, Default::default()).build(); + } + + RECORDER.with_borrow_mut(|cache| { + let id = builder(&name)?; + cache.insert(name.to_string()); + Ok(id) + }) +} + /// Term `{ op: value } | { special: { op: value } }` /// * op: "eq", "lt", "contains", .. /// * special: "started_at", "status", .. @@ -57,13 +88,18 @@ fn filter_term_variants() -> Result> { let value_to_comp_against = t::json_str()?; let ops = ["eq", "lt", "lte", "gt", "gte", "in", "contains"] .into_iter() - .map(|op| t::struct_().prop(op, value_to_comp_against).build()) + .map(|op| { + save(op, |n| { + t::struct_().prop(op, value_to_comp_against).build_named(n) + }) + }) .collect::>>()?; - let op_value = t::either(ops.clone().into_iter()).build()?; + let op_value = save("Op", |n| t::either(ops.clone().into_iter()).build_named(n))?; + let special = ["started_at", "ended_at", "status"] .into_iter() - .map(|sp| t::struct_().prop(sp, op_value).build()) + .map(|sp| save(sp, |n| t::struct_().prop(sp, op_value).build_named(n))) .collect::>>()?; let mut variants = vec![]; @@ -77,44 +113,25 @@ fn filter_term_variants() -> Result> { /// * op: "and" or "or" /// * ...: may contain itself, or a term pub fn filter_expr_ty() -> Result { - let mut and = t::struct_(); - let mut or = t::struct_(); - - let op_term_variants = filter_term_variants()?; - - let mut expr = t::eitherx!(and, or); - expr.data - .variants - .extend(op_term_variants.into_iter().map(|ty| { - let id: u32 = ty.into(); - id - })); - - let expr_id = expr.build()?; - - and.prop("and", t::listx(expr_id).build()?); - or.prop("or", t::listx(expr_id).build()?); - - /* - query { - search(filter:{ - and: [ - { started_at: { eq: "a" } }, - { status: { contains: "STO" } }, - { in: "abc" } - # { or: []} # FIXME: broken ref - ] - }) { - started_at - ended_at - status - value - } - } + let mut op_term_variants = filter_term_variants()?; + op_term_variants.extend([loc_ref("and_expr")?, loc_ref("or_expr")?]); + + let expr = save("expr", |n| t::either(op_term_variants).build_named(n))?; + let expr_list = save("list_expr", |n| t::listx(expr).build_named(n))?; + + let _and = save("and_expr", |n| { + t::struct_().prop("and", expr_list).build_named(n) + })?; + + let _or = save("or_expr", |n| { + t::struct_().prop("or", expr_list).build_named(n) + })?; - */ t::struct_() - .prop("filter", t::eitherx!(expr_id, and, or).build()?) + .prop( + "filter", expr, + // save("Filter", |n| t::unionx!(and, or, expr).build_named(n))?, + ) .build() } From 3d5eb04609330818914027ac5bbde5c48ff9fb7e Mon Sep 17 00:00:00 2001 From: michael-0acf4 Date: Mon, 2 Dec 2024 21:22:06 +0300 Subject: [PATCH 5/8] feat: filter --- src/typegate/src/runtimes/substantial.ts | 23 +- .../src/runtimes/substantial/filter_utils.ts | 256 ++++++++++++++++++ .../core/src/runtimes/substantial/mod.rs | 6 +- .../src/runtimes/substantial/type_utils.rs | 29 +- src/typegraph/core/src/t.rs | 13 - 5 files changed, 286 insertions(+), 41 deletions(-) create mode 100644 src/typegate/src/runtimes/substantial/filter_utils.ts diff --git a/src/typegate/src/runtimes/substantial.ts b/src/typegate/src/runtimes/substantial.ts index a3bc5596b..2c692baa0 100644 --- a/src/typegate/src/runtimes/substantial.ts +++ b/src/typegate/src/runtimes/substantial.ts @@ -20,15 +20,18 @@ import { } from "./substantial/agent.ts"; import { closestWord } from "../utils.ts"; import { InternalAuth } from "../services/auth/protocols/internal.ts"; +import { applyFilter, Expr } from "./substantial/filter_utils.ts"; const logger = getLogger(import.meta); +export type ExecutionStatus = "COMPLETED" | "COMPLETED_WITH_ERROR" | "ONGOING" | "UNKNOWN"; + interface QueryCompletedWorkflowResult { run_id: string; started_at: string; ended_at: string; result: { - status: "COMPLETED" | "COMPLETED_WITH_ERROR" | "UNKNOWN"; + status: ExecutionStatus; value: unknown; // hinted by the user }; } @@ -410,20 +413,10 @@ export class SubstantialRuntime extends Runtime { } #advancedFiltersResolver(): Resolver { - return ({ filter }) => { - console.log("Filter", filter); - - const dummySearchResult = { - run_id: "fake", - started_at: new Date().toJSON(), - ended_at: new Date().toJSON(), - status: "COMPLETED", - value: JSON.stringify(filter) - }; - - return [ - dummySearchResult - ]; + return async ({ name: workflowName, filter }) => { + this.#checkWorkflowExistOrThrow(workflowName); + // console.log("workflow", workflowName, "Filter", filter); + return await applyFilter(workflowName, this.agent, filter as Expr); }; } diff --git a/src/typegate/src/runtimes/substantial/filter_utils.ts b/src/typegate/src/runtimes/substantial/filter_utils.ts new file mode 100644 index 000000000..0adc3f274 --- /dev/null +++ b/src/typegate/src/runtimes/substantial/filter_utils.ts @@ -0,0 +1,256 @@ +// Copyright Metatype OÜ, licensed under the Mozilla Public License Version 2.0. +// SPDX-License-Identifier: MPL-2.0 + +import { ExecutionStatus } from "../substantial.ts"; +import { Agent } from "./agent.ts"; + +type KVHelper = { + // should be a union type but it is not very helpful with autocomplete + [K in T[number]]?: V; +}; +type AND = { and?: Array }; +type OR = { or?: Array }; +type NOT = { not?: Expr }; + +// JSON string is currently the only way to do generics on a typegraph +type ORD = KVHelper<["eq", "lt", "lte", "gt", "gte"], string>; +type INCL = KVHelper<["in", "contains"], string>; +type Terms = ORD & INCL; + +type SpecialTerms = KVHelper<["started_at", "ended_at", "status"], Terms>; + +export type Expr = Terms & SpecialTerms & AND & OR & NOT; + +export class SearchItem { + constructor( + public readonly run_id: string, + public readonly started_at: string | null, + public readonly ended_at: string | null, + public readonly status: ExecutionStatus, + public readonly value?: unknown, + ) {} + + toSearchResult() { + return { + run_id: this.run_id, + started_at: this.started_at, + ended_at: this.ended_at, + status: this.status, + value: this.value === undefined ? undefined : JSON.stringify(this.value), + }; + } +} + +export async function buildSearchableItems( + workflowName: string, + agent: Agent, +): Promise> { + const relatedRuns = await agent.retrieveLinks(workflowName); + const searchList = [] as Array; + for (const runId of relatedRuns) { + const run = await agent.retrieveEvents(runId); + const startedAt = run.operations[0]?.at; + let endedAt, result: any; + + let hasStopped = false; + for (const op of run.operations) { + if (op.event.type == "Stop") { + endedAt = op.at; + result = op.event.result; + hasStopped = true; + break; + } + } + + const isOk = "Ok" in result; + const kind = isOk ? "Ok" : "Err"; + const stoppedStatus = isOk ? "COMPLETED_WITH_ERROR" : "COMPLETED"; + + searchList.push( + new SearchItem( + runId, + startedAt ?? null, + endedAt ?? null, + hasStopped ? stoppedStatus : "ONGOING", + hasStopped ? result[kind] : undefined, + ), + ); + } + + return searchList; +} + +export async function applyFilter( + workflowName: string, + agent: Agent, + filter: Expr, +) { + const searchableItems = await buildSearchableItems(workflowName, agent); + const searchResults = []; + for (const item of searchableItems) { + if (evalExpr(item, filter, [""])) { + searchResults.push(item.toSearchResult()); + } + } + + return searchResults; +} + +function evalExpr(sResult: SearchItem, filter: Expr, path: Array) { + for (const k in filter) { + const op = k as unknown as keyof Expr; + const newPath = [...path, op]; + switch (op) { + // Expr + case "and": + case "or": { + const exprList = filter[op]; + if (!Array.isArray(exprList)) { + // should be unreachable since filter is validated at push + throw new Error(`Fatal: array expected at ${path.join(".")}`); + } + const fn = op == "or" ? "some" : "every"; + if ( + !exprList[fn]((subFilter) => evalExpr(sResult, subFilter, newPath)) + ) { + return false; + } + break; + } + case "not": { + if (evalExpr(sResult, filter["not"]!, newPath)) { + return false; + } + break; + } + // special + case "status": + case "started_at": + case "ended_at": { + const discriminator = sResult[op]; + const repr = new SearchItem( + sResult.run_id, + sResult.started_at, + sResult.ended_at, + sResult.status, + discriminator, + ); + return evalTerm(repr, filter[op]!, newPath); + } + // Term + default: { + if (!evalTerm(sResult, filter, newPath)) { + return false; + } + } + } + } + + return true; +} + +function evalTerm(sResult: SearchItem, terms: Terms, path: Array) { + const value = sResult.value; + + for (const k in terms) { + const op = k as unknown as keyof Terms; + const term = JSON.parse(terms[op] ?? "null"); // TODO: impl generic JSON on typegate + const newPath = [...path, op]; + switch (op) { + case "eq": { + if (value != term) { + return false; + } + break; + } + case "lt": + case "lte": + case "gt": + case "gte": { + if (!ord(value, term, op, newPath)) { + return false; + } + break; + } + case "contains": + case "in": { + if ( + !inclusion(value, term, op, newPath) + ) { + return false; + } + break; + } + default: { + throw new Error(`Unknown operator at ${newPath.join(".")}`); + } + } + } + + return true; +} + +function comparable(a: unknown, b: unknown) { + return typeof a == typeof b; +} + +function ord(l: unknown, r: unknown, cp: keyof ORD, path: Array) { + if (!comparable(l, r)) { + return false; + } + + if ( + typeof l == "string" && typeof r == "string" || + typeof l == "number" && typeof r == "number" + ) { + switch (cp) { + case "lt": + return l < r; + case "lte": + return l <= r; + case "gt": + return l > r; + case "gte": + return l >= r; + default: { + throw new Error( + `Unknown comparison operator "${cp}" at ${path.join(",")}`, + ); + } + } + } + + return false; +} + +function inclusion( + l: unknown, + r: unknown, + cp: keyof INCL, + _newPath: Array, +) { + if (!comparable(l, r)) { + return false; + } + + const [left, right] = cp == "in" ? [l, r] : [r, l]; + if (Array.isArray(right)) { + // FIXME: does not work with [ [[1]] ].includes([[1]]) + return right.includes(left); + } else if ( + typeof left == typeof right && typeof left == "object" && left != null + ) { + // { a: { b: 1 } } in { a: { b: 1 }, c: 2 } := true + const rightV = (right ?? {}) as Record; + for (const [k, leftVal] of Object.entries(left)) { + const rightVal = rightV[k]; + if (leftVal != rightVal) { + return false; + } + } + + return true; + } + + return false; +} diff --git a/src/typegraph/core/src/runtimes/substantial/mod.rs b/src/typegraph/core/src/runtimes/substantial/mod.rs index e71fb58ea..72b650feb 100644 --- a/src/typegraph/core/src/runtimes/substantial/mod.rs +++ b/src/typegraph/core/src/runtimes/substantial/mod.rs @@ -96,7 +96,7 @@ pub fn substantial_operation( SubstantialOperationData::StartRaw(data) => { let mut inp = t::struct_(); inp.prop("name", t::string().build()?); - inp.prop("kwargs", t::json_str()?); + inp.prop("kwargs", t::string().format("json").build()?); ( WitEffect::Create(true), @@ -138,7 +138,7 @@ pub fn substantial_operation( SubstantialOperationData::SendRaw => { let event = t::struct_() .prop("name", t::string().build()?) - .prop("payload", t::json_str()?) + .prop("payload", t::string().format("json").build()?) .build()?; let mut inp = t::struct_(); @@ -194,7 +194,7 @@ pub fn substantial_operation( WitEffect::Read, SubstantialMaterializer::ResultsRaw, inp.build()?, - type_utils::results_op_results_ty(t::json_str()?.into())?, + type_utils::results_op_results_ty(t::string().format("json").build()?.into())?, ) } SubstantialOperationData::InternalLinkParentChild => { diff --git a/src/typegraph/core/src/runtimes/substantial/type_utils.rs b/src/typegraph/core/src/runtimes/substantial/type_utils.rs index ea68e7741..339c6045c 100644 --- a/src/typegraph/core/src/runtimes/substantial/type_utils.rs +++ b/src/typegraph/core/src/runtimes/substantial/type_utils.rs @@ -79,13 +79,13 @@ fn save( }) } -/// Term `{ op: value } | { special: { op: value } }` +/// Term: `{ op: value } | { special: { op: value } }` /// * op: "eq", "lt", "contains", .. /// * special: "started_at", "status", .. fn filter_term_variants() -> Result> { // FIXME: a generic json would have been helpful here vs json string // let any = t::eitherx!(t::integer(), t::string(), t::boolean(), ...).build_named("AnyValue")?; - let value_to_comp_against = t::json_str()?; + let value_to_comp_against = t::string().format("json").build()?; let ops = ["eq", "lt", "lte", "gt", "gte", "in", "contains"] .into_iter() .map(|op| { @@ -109,14 +109,18 @@ fn filter_term_variants() -> Result> { Ok(variants) } -/// Expr `{ op: [...] }` +/// Expr: `{ op: [...] } | Term` /// * op: "and" or "or" /// * ...: may contain itself, or a term pub fn filter_expr_ty() -> Result { - let mut op_term_variants = filter_term_variants()?; - op_term_variants.extend([loc_ref("and_expr")?, loc_ref("or_expr")?]); - - let expr = save("expr", |n| t::either(op_term_variants).build_named(n))?; + let mut op_expr_variants = filter_term_variants()?; + op_expr_variants.extend([ + loc_ref("and_expr")?, + loc_ref("or_expr")?, + loc_ref("not_expr")?, + ]); + + let expr = save("expr", |n| t::either(op_expr_variants).build_named(n))?; let expr_list = save("list_expr", |n| t::listx(expr).build_named(n))?; let _and = save("and_expr", |n| { @@ -127,7 +131,12 @@ pub fn filter_expr_ty() -> Result { t::struct_().prop("or", expr_list).build_named(n) })?; + let _not = save("not_expr", |n| { + t::struct_().prop("not", expr).build_named(n) + })?; + t::struct_() + .prop("name", t::string().build()?) .prop( "filter", expr, // save("Filter", |n| t::unionx!(and, or, expr).build_named(n))?, @@ -139,10 +148,10 @@ pub fn search_results_ty() -> Result { t::list( t::struct_() .prop("run_id", t::string().build()?) - .prop("started_at", t::string().build()?) - .prop("ended_at", t::string().build()?) .prop("status", t::string().build()?) - .prop("value", t::json_str()?) + .prop("started_at", t::string().optional().build()?) + .prop("ended_at", t::string().optional().build()?) + .prop("value", t::string().format("json").optional().build()?) .build()?, ) .build() diff --git a/src/typegraph/core/src/t.rs b/src/typegraph/core/src/t.rs index 4bdcafbc7..f39b6c713 100644 --- a/src/typegraph/core/src/t.rs +++ b/src/typegraph/core/src/t.rs @@ -5,7 +5,6 @@ use crate::errors::Result; use crate::errors::TgError; use crate::types::RefAttr; use crate::types::TypeRefBuilder; -use crate::types::WithRuntimeConfig; use crate::types::{Named as _, TypeId, TypeRef}; use crate::wit::core::{ @@ -335,7 +334,6 @@ macro_rules! unionx { crate::t::unionx![$($ty),*] }; } -use serde_json::json; pub(crate) use unionx; #[derive(Default)] @@ -393,17 +391,6 @@ pub fn struct_() -> StructBuilder { Default::default() } -pub fn json_str() -> Result { - string() - .build() - .and_then(|r| { - r.with_config(json!({ - "format": "json" - })) - }) - .map(|r| r.id()) -} - pub fn struct_extends(ty: TypeId) -> Result { Ok(StructBuilder { data: TypeStruct { From f9da645e9d95b146468b95eefdd4bd93fed5ebc9 Mon Sep 17 00:00:00 2001 From: michael-0acf4 Date: Tue, 3 Dec 2024 18:50:26 +0300 Subject: [PATCH 6/8] handle more edgecases + test --- src/typegate/src/runtimes/substantial.ts | 4 +- .../src/runtimes/substantial/filter_utils.ts | 196 ++++++++++------- .../runtimes/substantial/filter_utils_test.ts | 198 ++++++++++++++++++ 3 files changed, 317 insertions(+), 81 deletions(-) create mode 100644 tests/runtimes/substantial/filter_utils_test.ts diff --git a/src/typegate/src/runtimes/substantial.ts b/src/typegate/src/runtimes/substantial.ts index 2c692baa0..8fada6ea2 100644 --- a/src/typegate/src/runtimes/substantial.ts +++ b/src/typegate/src/runtimes/substantial.ts @@ -20,12 +20,10 @@ import { } from "./substantial/agent.ts"; import { closestWord } from "../utils.ts"; import { InternalAuth } from "../services/auth/protocols/internal.ts"; -import { applyFilter, Expr } from "./substantial/filter_utils.ts"; +import { applyFilter, type Expr, type ExecutionStatus } from "./substantial/filter_utils.ts"; const logger = getLogger(import.meta); -export type ExecutionStatus = "COMPLETED" | "COMPLETED_WITH_ERROR" | "ONGOING" | "UNKNOWN"; - interface QueryCompletedWorkflowResult { run_id: string; started_at: string; diff --git a/src/typegate/src/runtimes/substantial/filter_utils.ts b/src/typegate/src/runtimes/substantial/filter_utils.ts index 0adc3f274..5a14efb82 100644 --- a/src/typegate/src/runtimes/substantial/filter_utils.ts +++ b/src/typegate/src/runtimes/substantial/filter_utils.ts @@ -1,9 +1,14 @@ // Copyright Metatype OÜ, licensed under the Mozilla Public License Version 2.0. // SPDX-License-Identifier: MPL-2.0 -import { ExecutionStatus } from "../substantial.ts"; import { Agent } from "./agent.ts"; +export type ExecutionStatus = + | "COMPLETED" + | "COMPLETED_WITH_ERROR" + | "ONGOING" + | "UNKNOWN"; + type KVHelper = { // should be a union type but it is not very helpful with autocomplete [K in T[number]]?: V; @@ -64,7 +69,7 @@ export async function buildSearchableItems( const isOk = "Ok" in result; const kind = isOk ? "Ok" : "Err"; - const stoppedStatus = isOk ? "COMPLETED_WITH_ERROR" : "COMPLETED"; + const stoppedStatus = isOk ? "COMPLETED" : "COMPLETED_WITH_ERROR"; searchList.push( new SearchItem( @@ -96,52 +101,64 @@ export async function applyFilter( return searchResults; } -function evalExpr(sResult: SearchItem, filter: Expr, path: Array) { - for (const k in filter) { - const op = k as unknown as keyof Expr; - const newPath = [...path, op]; - switch (op) { - // Expr - case "and": - case "or": { - const exprList = filter[op]; - if (!Array.isArray(exprList)) { - // should be unreachable since filter is validated at push - throw new Error(`Fatal: array expected at ${path.join(".")}`); - } - const fn = op == "or" ? "some" : "every"; - if ( - !exprList[fn]((subFilter) => evalExpr(sResult, subFilter, newPath)) - ) { - return false; - } - break; + +export function evalExpr( + sResult: SearchItem, + filter: Expr, + path: Array, +) { + const keys = Object.keys(filter) as Array; + if (keys.length != 1) { + throw new Error(`Invalid expression at ${path.join(".")}`); + } + const op = keys[0]; + const newPath = [...path, op]; + + switch (op) { + // Expr + case "and": + case "or": { + const exprList = filter[op]; + if (!Array.isArray(exprList)) { + // should be unreachable since filter is validated at push + throw new Error( + `Fatal: array expected at ${path.join(".")}`, + ); } - case "not": { - if (evalExpr(sResult, filter["not"]!, newPath)) { - return false; - } - break; + const fn = op == "or" ? "some" : "every"; + if ( + !exprList[fn]((subFilter, index) => + evalExpr(sResult, subFilter, [...newPath, `#${index}`]) + ) + ) { + return false; } - // special - case "status": - case "started_at": - case "ended_at": { - const discriminator = sResult[op]; - const repr = new SearchItem( - sResult.run_id, - sResult.started_at, - sResult.ended_at, - sResult.status, - discriminator, - ); - return evalTerm(repr, filter[op]!, newPath); + break; + } + case "not": { + if (evalExpr(sResult, filter["not"]!, newPath)) { + return false; } - // Term - default: { - if (!evalTerm(sResult, filter, newPath)) { - return false; - } + break; + } + // Special + case "status": + case "started_at": + case "ended_at": { + const discriminator = sResult[op]; + const repr = new SearchItem( + sResult.run_id, + null, + null, + sResult.status, + discriminator, + ); + return evalTerm(repr, filter[op]!, newPath); + } + // Term + default: { + if (!evalTerm(sResult, filter, path)) { + return false; } } } @@ -151,45 +168,69 @@ function evalExpr(sResult: SearchItem, filter: Expr, path: Array) { function evalTerm(sResult: SearchItem, terms: Terms, path: Array) { const value = sResult.value; + const keys = Object.keys(terms) as Array; + if (keys.length != 1) { + throw new Error(`Invalid expression at ${path.join(".")}`); + } - for (const k in terms) { - const op = k as unknown as keyof Terms; - const term = JSON.parse(terms[op] ?? "null"); // TODO: impl generic JSON on typegate - const newPath = [...path, op]; - switch (op) { - case "eq": { - if (value != term) { - return false; - } - break; + const op = keys[0]; + const newPath = [...path, op]; + switch (op) { + case "eq": { + // term can never compare (null at worst) + if (value === undefined) { + return false; } - case "lt": - case "lte": - case "gt": - case "gte": { - if (!ord(value, term, op, newPath)) { - return false; - } - break; + + if (!testCompare(value, toJS(terms[op]))) { + return false; } - case "contains": - case "in": { - if ( - !inclusion(value, term, op, newPath) - ) { - return false; - } - break; + + break; + } + case "lt": + case "lte": + case "gt": + case "gte": { + if (!ord(value, toJS(terms[op]), op, newPath)) { + return false; } - default: { - throw new Error(`Unknown operator at ${newPath.join(".")}`); + break; + } + case "contains": + case "in": { + if ( + !inclusion(value, toJS(terms[op]), op, newPath) + ) { + return false; } + break; + } + default: { + throw new Error( + `Unknown operator "${op}" at ${path.join(".")}`, + ); } } return true; } +function toJS(val: string | undefined) { + // TODO: impl generic JSON on typegate + // ideally this should be an identity fn + return JSON.parse(val ?? "null"); +} + +function testCompare(value: unknown, testValue: unknown) { + const easy = ["number", "boolean", "string"]; + if (easy.includes(typeof value)) { + return value === testValue; + } + + return JSON.stringify(value) == JSON.stringify(testValue); +} + function comparable(a: unknown, b: unknown) { return typeof a == typeof b; } @@ -229,13 +270,12 @@ function inclusion( cp: keyof INCL, _newPath: Array, ) { - if (!comparable(l, r)) { - return false; - } - const [left, right] = cp == "in" ? [l, r] : [r, l]; if (Array.isArray(right)) { - // FIXME: does not work with [ [[1]] ].includes([[1]]) + // Note: Array.prototype.includes compare inner references + const leftComp = JSON.stringify(left); + return right.some((inner) => JSON.stringify(inner) === leftComp); + } else if (typeof left == "string" && typeof right == "string") { return right.includes(left); } else if ( typeof left == typeof right && typeof left == "object" && left != null diff --git a/tests/runtimes/substantial/filter_utils_test.ts b/tests/runtimes/substantial/filter_utils_test.ts new file mode 100644 index 000000000..72612c12b --- /dev/null +++ b/tests/runtimes/substantial/filter_utils_test.ts @@ -0,0 +1,198 @@ +// Copyright Metatype OÜ, licensed under the Mozilla Public License Version 2.0. +// SPDX-License-Identifier: MPL-2.0 + +import { assertEquals } from "@std/assert"; +import { Meta } from "../../utils/mod.ts"; +import { + evalExpr, + ExecutionStatus, + Expr, + SearchItem, +} from "@metatype/typegate/runtimes/substantial/filter_utils.ts"; + +function addDays(date: Date, days: number) { + const ret = new Date(date); + ret.setDate(ret.getDate() + days); + return ret; +} + +function val(x: unknown) { + return JSON.stringify(x); +} + +export function testData() { + const samples = [ + { "COMPLETED_WITH_ERROR": "Fatal: error" }, + { "COMPLETED": true }, + { "ONGOING": undefined }, + { "COMPLETED": [1, 2, ["three"]] }, + { "ONGOING": undefined }, + { "COMPLETED_WITH_ERROR": { nested: { object: 1234 }, b: 4 } }, + { "COMPLETED": null }, + { "COMPLETED": 1 }, + { "COMPLETED_WITH_ERROR": 2 }, + { "COMPLETED": 3 }, + ] satisfies Array<{ [K in ExecutionStatus]?: unknown }>; + + const dataset = []; + + let start = new Date("2024-01-01"), end = null; + for (let i = 0; i < samples.length; i += 1) { + end = addDays(start, 1); + const [status, value] = Object.entries(samples[i])[0] as [ + ExecutionStatus, + unknown, + ]; + + dataset.push( + new SearchItem( + `fakeUUID#${i}`, + start.toJSON(), + status == "ONGOING" ? null : end.toJSON(), + status, + value, + ), + ); + + if (i % 2 == 0) { + start = end; + } + } + + return dataset; +} + +Meta.test("base filter logic", async (t) => { + const testShould = async ( + fact: string, + data: { filter: Expr; expected: Array }, + ) => { + await t.should(fact, () => { + const items = testData(); + const searchResults = []; + for (const item of items) { + if (evalExpr(item, data.filter, [""])) { + searchResults.push(item.toSearchResult()); + } + } + assertEquals(searchResults, data.expected); + }); + }; + + // ------------------ + await testShould("be able discriminate truthy values and 1)", { + filter: { eq: val(1) }, + expected: [ + { + ended_at: "2024-01-06T00:00:00.000Z", + run_id: "fakeUUID#7", + started_at: "2024-01-05T00:00:00.000Z", + status: "COMPLETED", + value: "1", + }, + ] + }); + + await testShould('work with null and special values (e.g. "status")', { + filter: { + or: [ + { status: { eq: val("ONGOING") } }, + { eq: val(null) }, + ], + }, + expected: [ + { + run_id: "fakeUUID#2", + started_at: "2024-01-02T00:00:00.000Z", + ended_at: null, + status: "ONGOING", + value: undefined, + }, + { + run_id: "fakeUUID#4", + started_at: "2024-01-03T00:00:00.000Z", + ended_at: null, + status: "ONGOING", + value: undefined, + }, + { + ended_at: "2024-01-05T00:00:00.000Z", + run_id: "fakeUUID#6", + started_at: "2024-01-04T00:00:00.000Z", + status: "COMPLETED", + value: "null", + } + ], + }); + + await testShould('work with "in" and "contains" operators', { + filter: { + or: [ + { + and: [ + { contains: val(1) }, + { contains: val(["three"]) }, + ], + }, + { contains: val({ nested: { object: 1234 } })}, + { in: val("Fatal: error+ some other string") }, + ], + }, + expected: [ + { + run_id: "fakeUUID#0", + started_at: "2024-01-01T00:00:00.000Z", + ended_at: "2024-01-02T00:00:00.000Z", + status: "COMPLETED_WITH_ERROR", + value: '"Fatal: error"', + }, + { + run_id: "fakeUUID#3", + started_at: "2024-01-03T00:00:00.000Z", + ended_at: "2024-01-04T00:00:00.000Z", + status: "COMPLETED", + value: '[1,2,["three"]]', + }, + ], + }); + + await testShould( + "be able to compare numbers and strings on all kinds of terms (special + simple) ", + { + filter: { + or: [ + { + and: [ + { started_at: { gte: val("2024-01-02") } }, + { not: { not: { ended_at: { eq: val(null) } } } }, + ], + }, + { lte: val(1) }, + ], + }, + expected: [ + { + run_id: "fakeUUID#2", + started_at: "2024-01-02T00:00:00.000Z", + ended_at: null, + status: "ONGOING", + value: undefined, + }, + { + run_id: "fakeUUID#4", + started_at: "2024-01-03T00:00:00.000Z", + ended_at: null, + status: "ONGOING", + value: undefined, + }, + { + run_id: "fakeUUID#7", + started_at: "2024-01-05T00:00:00.000Z", + ended_at: "2024-01-06T00:00:00.000Z", + status: "COMPLETED", + value: "1", + }, + ], + }, + ); +}); From 8fea55d0977c8c6c34679efe765d2752d6d4c49a Mon Sep 17 00:00:00 2001 From: michael-0acf4 Date: Tue, 3 Dec 2024 21:13:50 +0300 Subject: [PATCH 7/8] test: full integration --- .../src/runtimes/substantial/filter_utils.ts | 6 +-- .../src/runtimes/substantial/type_utils.rs | 8 +++- tests/runtimes/substantial/common.ts | 41 +++++++++++++++++-- .../runtimes/substantial/filter_utils_test.ts | 28 +++++++++---- tests/runtimes/substantial/substantial.py | 8 ++-- .../substantial/substantial_child_workflow.py | 5 ++- .../{ => workflows}/child_workflow.ts | 2 +- .../substantial/{ => workflows}/workflow.ts | 4 +- 8 files changed, 80 insertions(+), 22 deletions(-) rename tests/runtimes/substantial/{ => workflows}/child_workflow.ts (97%) rename tests/runtimes/substantial/{ => workflows}/workflow.ts (97%) diff --git a/src/typegate/src/runtimes/substantial/filter_utils.ts b/src/typegate/src/runtimes/substantial/filter_utils.ts index 5a14efb82..149b4cf15 100644 --- a/src/typegate/src/runtimes/substantial/filter_utils.ts +++ b/src/typegate/src/runtimes/substantial/filter_utils.ts @@ -272,7 +272,7 @@ function inclusion( ) { const [left, right] = cp == "in" ? [l, r] : [r, l]; if (Array.isArray(right)) { - // Note: Array.prototype.includes compare inner references + // Note: Array.prototype.includes compare item references, not the values const leftComp = JSON.stringify(left); return right.some((inner) => JSON.stringify(inner) === leftComp); } else if (typeof left == "string" && typeof right == "string") { @@ -280,11 +280,11 @@ function inclusion( } else if ( typeof left == typeof right && typeof left == "object" && left != null ) { - // { a: { b: 1 } } in { a: { b: 1 }, c: 2 } := true + // Example: { a: { b: 1 } } in { a: { b: 1 }, c: 2 } => true const rightV = (right ?? {}) as Record; for (const [k, leftVal] of Object.entries(left)) { const rightVal = rightV[k]; - if (leftVal != rightVal) { + if (!testCompare(leftVal, rightVal)) { return false; } } diff --git a/src/typegraph/core/src/runtimes/substantial/type_utils.rs b/src/typegraph/core/src/runtimes/substantial/type_utils.rs index 339c6045c..fe3f74d92 100644 --- a/src/typegraph/core/src/runtimes/substantial/type_utils.rs +++ b/src/typegraph/core/src/runtimes/substantial/type_utils.rs @@ -84,7 +84,11 @@ fn save( /// * special: "started_at", "status", .. fn filter_term_variants() -> Result> { // FIXME: a generic json would have been helpful here vs json string - // let any = t::eitherx!(t::integer(), t::string(), t::boolean(), ...).build_named("AnyValue")?; + // let any_scalar = save("any_scalar", |n| { + // t::eitherx!(t::integer(), t::string(), t::boolean(), t::struct_()).build_named(n) + // })?; + // let value_to_comp_against = t::eitherx!(any_scalar, t::listx(any_scalar)).build()?; + let value_to_comp_against = t::string().format("json").build()?; let ops = ["eq", "lt", "lte", "gt", "gte", "in", "contains"] .into_iter() @@ -95,7 +99,7 @@ fn filter_term_variants() -> Result> { }) .collect::>>()?; - let op_value = save("Op", |n| t::either(ops.clone().into_iter()).build_named(n))?; + let op_value = save("op", |n| t::either(ops.clone().into_iter()).build_named(n))?; let special = ["started_at", "ended_at", "status"] .into_iter() diff --git a/tests/runtimes/substantial/common.ts b/tests/runtimes/substantial/common.ts index 9dca65020..410880f98 100644 --- a/tests/runtimes/substantial/common.ts +++ b/tests/runtimes/substantial/common.ts @@ -5,6 +5,7 @@ import { assertEquals, assertExists } from "@std/assert"; import { connect, parseURL } from "redis"; import { gql, Meta, sleep } from "../../utils/mod.ts"; import { MetaTestCleanupFn } from "test-utils/test.ts"; +import { Expr } from "@metatype/typegate/runtimes/substantial/filter_utils.ts"; export type BackendName = "fs" | "memory" | "redis"; @@ -563,7 +564,7 @@ export function childWorkflowTestTemplate( await sleep(delays.awaitCompleteSec * 1000); - t.should(`complete parent and all child workflows`, async () => { + await t.should(`complete parent and all child workflows`, async () => { await gql` query { children: results_raw(name: "bumpPackage") { @@ -622,6 +623,40 @@ export function childWorkflowTestTemplate( }) .on(e); }); + + + await t.should(`filter the runs given a nested expr (${backendName})`, async () => { + await gql` + query { + search(name: "bumpPackage", filter: $filter) { + # started_at + # ended_at + status + value + } + } + ` + .withVars({ + filter: { + or: [ + { + and: [ + { status: { contains: JSON.stringify("COMPL") }}, + { contains: JSON.stringify("substantial") } + ] + }, + { not: { not: { eq: JSON.stringify("Bump typegraph v3 => v4") } } } + ] + } satisfies Expr + }) + .expectData({ + search: [ + { status: "COMPLETED", value: '"Bump typegraph v3 => v4"' }, + { status: "COMPLETED", value: '"Bump substantial v2 => v3"' } + ] + }) + .on(e); + }); }, ); } @@ -660,7 +695,7 @@ export function inputMutationTemplate( let currentRunId: string | null = null; await t.should( - `start accidentialInputMutation workflow and return its run id (${backendName})`, + `start accidentalInputMutation workflow and return its run id (${backendName})`, async () => { await gql` mutation { @@ -688,7 +723,7 @@ export function inputMutationTemplate( async () => { await gql` query { - results_raw(name: "accidentialInputMutation") { + results_raw(name: "accidentalInputMutation") { ongoing { count runs { diff --git a/tests/runtimes/substantial/filter_utils_test.ts b/tests/runtimes/substantial/filter_utils_test.ts index 72612c12b..150efa218 100644 --- a/tests/runtimes/substantial/filter_utils_test.ts +++ b/tests/runtimes/substantial/filter_utils_test.ts @@ -80,17 +80,17 @@ Meta.test("base filter logic", async (t) => { }; // ------------------ - await testShould("be able discriminate truthy values and 1)", { + await testShould("be able to discriminate truthy values and 1", { filter: { eq: val(1) }, expected: [ - { + { ended_at: "2024-01-06T00:00:00.000Z", run_id: "fakeUUID#7", started_at: "2024-01-05T00:00:00.000Z", status: "COMPLETED", value: "1", - }, - ] + }, + ], }); await testShould('work with null and special values (e.g. "status")', { @@ -115,13 +115,13 @@ Meta.test("base filter logic", async (t) => { status: "ONGOING", value: undefined, }, - { + { ended_at: "2024-01-05T00:00:00.000Z", run_id: "fakeUUID#6", started_at: "2024-01-04T00:00:00.000Z", status: "COMPLETED", value: "null", - } + }, ], }); @@ -134,7 +134,7 @@ Meta.test("base filter logic", async (t) => { { contains: val(["three"]) }, ], }, - { contains: val({ nested: { object: 1234 } })}, + { contains: val({ nested: { object: 1234 } }) }, { in: val("Fatal: error+ some other string") }, ], }, @@ -153,6 +153,13 @@ Meta.test("base filter logic", async (t) => { status: "COMPLETED", value: '[1,2,["three"]]', }, + { + ended_at: "2024-01-05T00:00:00.000Z", + run_id: "fakeUUID#5", + started_at: "2024-01-04T00:00:00.000Z", + status: "COMPLETED_WITH_ERROR", + value: '{"nested":{"object":1234},"b":4}', + }, ], }); @@ -196,3 +203,10 @@ Meta.test("base filter logic", async (t) => { }, ); }); + +// TODO: bench? +// Each case should be relatively close as we traverse from start to end without any back and forth +// 1. benchmark(listTrasersal, bigNoopExpr, items=100000) +// 2. benchmark(arrayWithNegFilter, bigNoopExpr, items=100000) + +// then avg unit overhead := [time(bigNoopExpr) - time(listTrasersal)] / 100000 diff --git a/tests/runtimes/substantial/substantial.py b/tests/runtimes/substantial/substantial.py index 65f75b9f4..3eb55ae68 100644 --- a/tests/runtimes/substantial/substantial.py +++ b/tests/runtimes/substantial/substantial.py @@ -21,14 +21,16 @@ def substantial(g: Graph): backend = Backend.redis("SUB_REDIS") file = ( - WorkflowFile.deno(file="workflow.ts", deps=["imports/common_types.ts"]) + WorkflowFile.deno( + file="workflows/workflow.ts", deps=["imports/common_types.ts"] + ) .import_( [ "saveAndSleepExample", "eventsAndExceptionExample", "retryExample", "secretsExample", - "accidentialInputMutation", + "accidentalInputMutation", ] ) .build() @@ -80,6 +82,6 @@ def substantial(g: Graph): ) } ) - ).reduce({"name": "accidentialInputMutation"}), + ).reduce({"name": "accidentalInputMutation"}), **sub.internals(), ) diff --git a/tests/runtimes/substantial/substantial_child_workflow.py b/tests/runtimes/substantial/substantial_child_workflow.py index e9a545aad..bff454029 100644 --- a/tests/runtimes/substantial/substantial_child_workflow.py +++ b/tests/runtimes/substantial/substantial_child_workflow.py @@ -20,7 +20,9 @@ def substantial_child_workflow(g: Graph): backend = Backend.redis("SUB_REDIS") file = ( - WorkflowFile.deno(file="child_workflow.ts", deps=["imports/common_types.ts"]) + WorkflowFile.deno( + file="workflows/child_workflow.ts", deps=["imports/common_types.ts"] + ) .import_(["bumpPackage", "bumpAll"]) .build() ) @@ -38,5 +40,6 @@ def substantial_child_workflow(g: Graph): start=sub.start(t.struct({"packages": t.list(package)})).reduce( {"name": "bumpAll"} ), + search=sub.advanced_filters(), **sub.internals(), ) diff --git a/tests/runtimes/substantial/child_workflow.ts b/tests/runtimes/substantial/workflows/child_workflow.ts similarity index 97% rename from tests/runtimes/substantial/child_workflow.ts rename to tests/runtimes/substantial/workflows/child_workflow.ts index 161dc40e0..4a16d3685 100644 --- a/tests/runtimes/substantial/child_workflow.ts +++ b/tests/runtimes/substantial/workflows/child_workflow.ts @@ -1,7 +1,7 @@ // Copyright Metatype OÜ, licensed under the Mozilla Public License Version 2.0. // SPDX-License-Identifier: MPL-2.0 -import { Context } from "./imports/common_types.ts"; +import { Context } from "../imports/common_types.ts"; function apply(pkg: string, oldVersion: string, newVersion: string) { console.info( diff --git a/tests/runtimes/substantial/workflow.ts b/tests/runtimes/substantial/workflows/workflow.ts similarity index 97% rename from tests/runtimes/substantial/workflow.ts rename to tests/runtimes/substantial/workflows/workflow.ts index a39ee93a9..b625ec99d 100644 --- a/tests/runtimes/substantial/workflow.ts +++ b/tests/runtimes/substantial/workflows/workflow.ts @@ -7,7 +7,7 @@ import { sendSubscriptionEmail, sleep, Workflow, -} from "./imports/common_types.ts"; +} from "../imports/common_types.ts"; export const eventsAndExceptionExample: Workflow = async ( ctx: Context @@ -112,7 +112,7 @@ export const secretsExample: Workflow = (_, { secrets }) => { return Promise.resolve(); }; -export async function accidentialInputMutation(ctx: Context) { +export async function accidentalInputMutation(ctx: Context) { const { items } = ctx.kwargs; const copy = []; From 06499d846872eec5007ce1d61786221835300f2c Mon Sep 17 00:00:00 2001 From: michael-0acf4 Date: Wed, 4 Dec 2024 14:09:47 +0300 Subject: [PATCH 8/8] fix: sort unpredictable output --- tests/runtimes/substantial/common.ts | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/tests/runtimes/substantial/common.ts b/tests/runtimes/substantial/common.ts index 410880f98..ffd012a41 100644 --- a/tests/runtimes/substantial/common.ts +++ b/tests/runtimes/substantial/common.ts @@ -649,12 +649,15 @@ export function childWorkflowTestTemplate( ] } satisfies Expr }) - .expectData({ - search: [ - { status: "COMPLETED", value: '"Bump typegraph v3 => v4"' }, - { status: "COMPLETED", value: '"Bump substantial v2 => v3"' } + .expectBody((body) => { + const sorted = body.data.search.sort((a: any, b: any) => a.value.localeCompare(b.value)); + assertEquals(sorted, + [ + { status: "COMPLETED", value: '"Bump substantial v2 => v3"' }, + { status: "COMPLETED", value: '"Bump typegraph v3 => v4"' } ] - }) + ); + }) .on(e); }); },