diff --git a/typegate/src/config.ts b/typegate/src/config.ts index 31c31a6802..4df62a1698 100644 --- a/typegate/src/config.ts +++ b/typegate/src/config.ts @@ -41,6 +41,7 @@ const schema = { }), timer_max_timeout_ms: z.coerce.number().positive().max(60000), timer_destroy_ressources: z.boolean(), + timer_policy_eval_retries: z.number().nonnegative().max(5), tg_admin_password: z.string(), tmp_dir: z.string(), jwt_max_duration_sec: z.coerce.number().positive(), @@ -84,6 +85,7 @@ const config = await configOrExit([ tg_port: "7890", timer_max_timeout_ms: 3000, timer_destroy_ressources: true, + timer_policy_eval_retries: 1, }, mapKeys(Deno.env.toObject(), (k: string) => k.toLowerCase()), parse(Deno.args) as Record, diff --git a/typegate/src/engine/planner/policies.ts b/typegate/src/engine/planner/policies.ts index 91c3af0ec9..ae186e8b43 100644 --- a/typegate/src/engine/planner/policies.ts +++ b/typegate/src/engine/planner/policies.ts @@ -18,6 +18,7 @@ import { getLogger } from "../../log.ts"; import { Type } from "../../typegraph/type_node.ts"; import { ArgPolicies } from "./args.ts"; import { BadContext } from "../../errors.ts"; +import config from "../../config.ts"; export interface FunctionSubtreeData { typeIdx: TypeIdx; @@ -91,7 +92,10 @@ export class OperationPolicies { "Policies must run on a Deno Runtime", ); if (!this.resolvers.has(polIdx)) { - this.resolvers.set(polIdx, runtime.delegate(mat, false)); + this.resolvers.set( + polIdx, + runtime.delegate(mat, false, config.timer_policy_eval_retries), + ); } } } diff --git a/typegate/src/runtimes/deno/deno.ts b/typegate/src/runtimes/deno/deno.ts index 3c750392ff..e886744faf 100644 --- a/typegate/src/runtimes/deno/deno.ts +++ b/typegate/src/runtimes/deno/deno.ts @@ -192,7 +192,11 @@ export class DenoRuntime extends Runtime { })]; } - delegate(mat: TypeMaterializer, verbose: boolean): Resolver { + delegate( + mat: TypeMaterializer, + verbose: boolean, + pulseCount?: number, + ): Resolver { if (mat.name === "predefined_function") { const func = predefinedFuncs[mat.data.name as string]; if (!func) { @@ -218,23 +222,28 @@ export class DenoRuntime extends Runtime { ) => { const token = await InternalAuth.emit(); - return this.w.execute(op, { - type: "import_func", - args, - internals: { - parent, - context, - secrets, - effect: mat.effect.effect ?? null, - meta: { - url: `${url.protocol}//${url.host}/${this.typegraphName}`, - token, + return this.w.execute( + op, + { + type: "import_func", + args, + internals: { + parent, + context, + secrets, + effect: mat.effect.effect ?? null, + meta: { + url: `${url.protocol}//${url.host}/${this.typegraphName}`, + token, + }, + headers, }, - headers, + name: mat.data.name as string, + verbose, }, - name: mat.data.name as string, - verbose, - }); + [], + pulseCount, + ); }; } @@ -245,22 +254,27 @@ export class DenoRuntime extends Runtime { ) => { const token = await InternalAuth.emit(); - return this.w.execute(op, { - type: "func", - args, - internals: { - parent, - context, - secrets, - effect: mat.effect.effect ?? null, - meta: { - url: `${url.protocol}//${url.host}/${this.typegraphName}`, - token, + return this.w.execute( + op, + { + type: "func", + args, + internals: { + parent, + context, + secrets, + effect: mat.effect.effect ?? null, + meta: { + url: `${url.protocol}//${url.host}/${this.typegraphName}`, + token, + }, + headers, }, - headers, + verbose, }, - verbose, - }); + [], + pulseCount, + ); }; } diff --git a/typegate/src/runtimes/patterns/messenger/async_messenger.ts b/typegate/src/runtimes/patterns/messenger/async_messenger.ts index 2c315d279c..5b204071a3 100644 --- a/typegate/src/runtimes/patterns/messenger/async_messenger.ts +++ b/typegate/src/runtimes/patterns/messenger/async_messenger.ts @@ -67,6 +67,15 @@ export class AsyncMessenger { let shouldStop = false; for (const item of currentQueue) { if (this.#tasks.has(item.id)) { + if ( + item.remainingPulseCount !== undefined && + item.remainingPulseCount > 0 + ) { + // check again next time if unterminated + item.remainingPulseCount -= 1; + continue; + } + // default behavior or 0 pulse left const data = JSON.stringify(item, null, 2); this.receive({ id: item.id, @@ -89,12 +98,13 @@ export class AsyncMessenger { op: string | number | null, data: M, hooks: Array<() => Promise> = [], + pulseCount = 0, ): Promise { const id = this.nextId(); const promise = deferred(); this.#tasks.set(id, { promise, hooks }); - const message = { id, op, data }; + const message = { id, op, data, remainingPulseCount: pulseCount }; this.#operationQueues[this.#queueIndex].push(message); void this.#send(this.broker, message); return promise; diff --git a/typegate/src/runtimes/patterns/messenger/lazy_async_messenger.ts b/typegate/src/runtimes/patterns/messenger/lazy_async_messenger.ts index 4434faf7ea..444ca638a2 100644 --- a/typegate/src/runtimes/patterns/messenger/lazy_async_messenger.ts +++ b/typegate/src/runtimes/patterns/messenger/lazy_async_messenger.ts @@ -39,13 +39,18 @@ export class LazyAsyncMessenger this.receive.bind(this), ); } - const { op } = message; + const { op, remainingPulseCount } = message; if (op !== null && !this.#loadedOps.has(op)) { const initOp = this.#ops.get(op); if (!initOp) { throw new Error(`unknown op ${op}`); } - await this.execute(null, initOp); + await this.execute( + null, + initOp, + [], + remainingPulseCount, + ); this.#loadedOps.add(op); } await send(this.broker, message); diff --git a/typegate/src/runtimes/patterns/messenger/types.ts b/typegate/src/runtimes/patterns/messenger/types.ts index b4cd970b15..2e12807021 100644 --- a/typegate/src/runtimes/patterns/messenger/types.ts +++ b/typegate/src/runtimes/patterns/messenger/types.ts @@ -7,6 +7,7 @@ export interface Message { id: number; op: string | number | null; data: T; + remainingPulseCount?: number; } export type Answer =