Skip to content

Commit

Permalink
feat(subs,deno): advanced filters + context util + replay aware logger (
Browse files Browse the repository at this point in the history
#930)

Solves
[MET-720](https://linear.app/metatypedev/issue/MET-720/subs-advanced-filters-for-workflows),
[MET-749](https://linear.app/metatypedev/issue/MET-749/subs-durable-logger)
and
[MET-760](https://linear.app/metatypedev/issue/MET-760/gate-allow-user-to-fetchdebug-the-context-easily).

#### Basic overview
Given an expression tree, a field can represent either an operator
(e.g., and, or, lte, etc.) or a 'special' field (started_at, ended_at,
status).

We can now answer queries such as: 'List all failed runs that started
between x and y but did not end at z, where the value is not null, or
alternatively completed but returned null'

Example:
```js
{
  or: [
    {
      and: [
        { status: { contains: '"ERROR"' } },
        { started_at: { gte: <x> } },
        { started_at: { lt: <y> } },
        { ended_at: { not: { eq: <z> } } }
      ]
    },
    {
      and: [
        { status: { eq: '"COMPLETED"' } },
        {  eq: 'null' }
      ]
    }
  ]
}
```

#### Migration notes

None

- [x] The change comes with new or modified tests
- [x] Hard-to-understand functions have explanatory comments
- [ ] End-user documentation is updated to reflect the change
  • Loading branch information
michael-0acf4 authored Dec 9, 2024
1 parent 4037040 commit 2ea7a48
Show file tree
Hide file tree
Showing 20 changed files with 909 additions and 81 deletions.
19 changes: 15 additions & 4 deletions src/typegate/src/runtimes/substantial.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import {
} from "./substantial/agent.ts";
import { closestWord } from "../utils.ts";
import { InternalAuth } from "../services/auth/protocols/internal.ts";
import { applyFilter, type Expr, type ExecutionStatus } from "./substantial/filter_utils.ts";

const logger = getLogger(import.meta);

Expand All @@ -28,7 +29,7 @@ interface QueryCompletedWorkflowResult {
started_at: string;
ended_at: string;
result: {
status: "COMPLETED" | "COMPLETED_WITH_ERROR" | "UNKNOWN";
status: ExecutionStatus;
value: unknown; // hinted by the user
};
}
Expand Down Expand Up @@ -218,9 +219,11 @@ export class SubstantialRuntime extends Runtime {
return JSON.parse(JSON.stringify(res));
};
case "results":
return this.#resultsResover(false);
return this.#resultsResolver(false);
case "results_raw":
return this.#resultsResover(true);
return this.#resultsResolver(true);
case "advanced_filters":
return this.#advancedFiltersResolver();
case "internal_link_parent_child":
return this.#linkerResolver();
default:
Expand Down Expand Up @@ -285,7 +288,7 @@ export class SubstantialRuntime extends Runtime {
};
}

#resultsResover(enableGenerics: boolean): Resolver {
#resultsResolver(enableGenerics: boolean): Resolver {
return async ({ name: workflowName }) => {
this.#checkWorkflowExistOrThrow(workflowName);

Expand Down Expand Up @@ -407,6 +410,14 @@ export class SubstantialRuntime extends Runtime {
};
}

#advancedFiltersResolver(): Resolver {
return async ({ name: workflowName, filter }) => {
this.#checkWorkflowExistOrThrow(workflowName);
// console.log("workflow", workflowName, "Filter", filter);
return await applyFilter(workflowName, this.agent, filter as Expr);
};
}

#linkerResolver(): Resolver {
return async ({ parent_run_id, child_run_id }) => {
await Meta.substantial.metadataWriteParentChildLink({
Expand Down
52 changes: 52 additions & 0 deletions src/typegate/src/runtimes/substantial/deno_context.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,12 @@ export class Context {
private id = 0;
public kwargs = {};
gql: ReturnType<typeof createGQLClient>;
logger: SubLogger;

constructor(private run: Run, private internal: TaskContext) {
this.gql = createGQLClient(internal);
this.kwargs = getKwargsCopy(run);
this.logger = new SubLogger(this);
}

#nextId() {
Expand Down Expand Up @@ -414,6 +416,56 @@ class RetryStrategy {
}
}


class SubLogger {
constructor(private ctx: Context) {}

async #log(kind: "warn" | "error" | "info", ...args: unknown[]) {
await this.ctx.save(() => {
const prefix = `[${kind.toUpperCase()}: ${this.ctx.getRun().run_id}]`;
switch(kind) {
case "warn": {
console.warn(prefix, ...args);
break;
}
case "error": {
console.error(prefix,...args);
break;
}
default: {
console.info(prefix, ...args);
break;
}
}

const message = args.map((arg) => {
try {
const json = JSON.stringify(arg);
// Functions are omitted,
// For example, JSON.stringify(() => 1234) => undefined (no throw)
return json === undefined ? String(arg) : json;
} catch(_) {
return String(arg);
}
}).join(" ");

return `${prefix}: ${message}`;
});
}

async warn(...payload: unknown[]) {
await this.#log("warn", ...payload);
}

async info(...payload: unknown[]) {
await this.#log("info", ...payload);
}

async error(...payload: unknown[]) {
await this.#log("error", ...payload);
}
}

function createGQLClient(internal: TaskContext) {
const tgLocal = new URL(internal.meta.url);
if (testBaseUrl) {
Expand Down
Loading

0 comments on commit 2ea7a48

Please sign in to comment.