Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(subs,deno): advanced filters + context util + replay aware logger #930

Merged
merged 10 commits into from
Dec 9, 2024
19 changes: 15 additions & 4 deletions src/typegate/src/runtimes/substantial.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import {
} from "./substantial/agent.ts";
import { closestWord } from "../utils.ts";
import { InternalAuth } from "../services/auth/protocols/internal.ts";
import { applyFilter, type Expr, type ExecutionStatus } from "./substantial/filter_utils.ts";

const logger = getLogger(import.meta);

Expand All @@ -28,7 +29,7 @@ interface QueryCompletedWorkflowResult {
started_at: string;
ended_at: string;
result: {
status: "COMPLETED" | "COMPLETED_WITH_ERROR" | "UNKNOWN";
status: ExecutionStatus;
value: unknown; // hinted by the user
};
}
Expand Down Expand Up @@ -218,9 +219,11 @@ export class SubstantialRuntime extends Runtime {
return JSON.parse(JSON.stringify(res));
};
case "results":
return this.#resultsResover(false);
return this.#resultsResolver(false);
case "results_raw":
return this.#resultsResover(true);
return this.#resultsResolver(true);
case "advanced_filters":
return this.#advancedFiltersResolver();
case "internal_link_parent_child":
return this.#linkerResolver();
default:
Expand Down Expand Up @@ -285,7 +288,7 @@ export class SubstantialRuntime extends Runtime {
};
}

#resultsResover(enableGenerics: boolean): Resolver {
#resultsResolver(enableGenerics: boolean): Resolver {
return async ({ name: workflowName }) => {
this.#checkWorkflowExistOrThrow(workflowName);

Expand Down Expand Up @@ -407,6 +410,14 @@ export class SubstantialRuntime extends Runtime {
};
}

#advancedFiltersResolver(): Resolver {
return async ({ name: workflowName, filter }) => {
this.#checkWorkflowExistOrThrow(workflowName);
// console.log("workflow", workflowName, "Filter", filter);
return await applyFilter(workflowName, this.agent, filter as Expr);
};
}

#linkerResolver(): Resolver {
return async ({ parent_run_id, child_run_id }) => {
await Meta.substantial.metadataWriteParentChildLink({
Expand Down
52 changes: 52 additions & 0 deletions src/typegate/src/runtimes/substantial/deno_context.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,12 @@ export class Context {
private id = 0;
public kwargs = {};
gql: ReturnType<typeof createGQLClient>;
logger: SubLogger;

constructor(private run: Run, private internal: TaskContext) {
this.gql = createGQLClient(internal);
this.kwargs = getKwargsCopy(run);
this.logger = new SubLogger(this);
}

#nextId() {
Expand Down Expand Up @@ -414,6 +416,56 @@ class RetryStrategy {
}
}


class SubLogger {
constructor(private ctx: Context) {}

async #log(kind: "warn" | "error" | "info", ...args: unknown[]) {
await this.ctx.save(() => {
const prefix = `[${kind.toUpperCase()}: ${this.ctx.getRun().run_id}]`;
switch(kind) {
case "warn": {
console.warn(prefix, ...args);
break;
}
case "error": {
console.error(prefix,...args);
break;
}
default: {
console.info(prefix, ...args);
break;
}
}

const message = args.map((arg) => {
try {
const json = JSON.stringify(arg);
// Functions are omitted,
// For example, JSON.stringify(() => 1234) => undefined (no throw)
return json === undefined ? String(arg) : json;
} catch(_) {
return String(arg);
michael-0acf4 marked this conversation as resolved.
Show resolved Hide resolved
}
}).join(" ");

return `${prefix}: ${message}`;
});
}

async warn(...payload: unknown[]) {
await this.#log("warn", ...payload);
}

async info(...payload: unknown[]) {
await this.#log("info", ...payload);
}

async error(...payload: unknown[]) {
await this.#log("error", ...payload);
}
}

function createGQLClient(internal: TaskContext) {
const tgLocal = new URL(internal.meta.url);
if (testBaseUrl) {
Expand Down
Loading
Loading