Skip to content

Commit

Permalink
fix(gate): pushing a typegraph must not timeout (#511)
Browse files Browse the repository at this point in the history
<!--
Pull requests are squash merged using:
- their title as the commit message
- their description as the commit body

Having a good title and description is important for the users to get
readable changelog and understand when they need to update his code and
how.
-->

### Describe your change

Retry policy eval one more time if too long

### Motivation and context


[MET-296](https://metatype.atlassian.net/jira/software/c/projects/MET/boards/2?selectedIssue=MET-296)
  • Loading branch information
michael-0acf4 authored Dec 7, 2023
1 parent 6d8aac5 commit 67f04c4
Show file tree
Hide file tree
Showing 6 changed files with 70 additions and 34 deletions.
2 changes: 2 additions & 0 deletions typegate/src/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ const schema = {
}),
timer_max_timeout_ms: z.coerce.number().positive().max(60000),
timer_destroy_ressources: z.boolean(),
timer_policy_eval_retries: z.number().nonnegative().max(5),
tg_admin_password: z.string(),
tmp_dir: z.string(),
jwt_max_duration_sec: z.coerce.number().positive(),
Expand Down Expand Up @@ -84,6 +85,7 @@ const config = await configOrExit([
tg_port: "7890",
timer_max_timeout_ms: 3000,
timer_destroy_ressources: true,
timer_policy_eval_retries: 1,
},
mapKeys(Deno.env.toObject(), (k: string) => k.toLowerCase()),
parse(Deno.args) as Record<string, unknown>,
Expand Down
6 changes: 5 additions & 1 deletion typegate/src/engine/planner/policies.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import { getLogger } from "../../log.ts";
import { Type } from "../../typegraph/type_node.ts";
import { ArgPolicies } from "./args.ts";
import { BadContext } from "../../errors.ts";
import config from "../../config.ts";

export interface FunctionSubtreeData {
typeIdx: TypeIdx;
Expand Down Expand Up @@ -91,7 +92,10 @@ export class OperationPolicies {
"Policies must run on a Deno Runtime",
);
if (!this.resolvers.has(polIdx)) {
this.resolvers.set(polIdx, runtime.delegate(mat, false));
this.resolvers.set(
polIdx,
runtime.delegate(mat, false, config.timer_policy_eval_retries),
);
}
}
}
Expand Down
74 changes: 44 additions & 30 deletions typegate/src/runtimes/deno/deno.ts
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,11 @@ export class DenoRuntime extends Runtime {
})];
}

delegate(mat: TypeMaterializer, verbose: boolean): Resolver {
delegate(
mat: TypeMaterializer,
verbose: boolean,
pulseCount?: number,
): Resolver {
if (mat.name === "predefined_function") {
const func = predefinedFuncs[mat.data.name as string];
if (!func) {
Expand All @@ -218,23 +222,28 @@ export class DenoRuntime extends Runtime {
) => {
const token = await InternalAuth.emit();

return this.w.execute(op, {
type: "import_func",
args,
internals: {
parent,
context,
secrets,
effect: mat.effect.effect ?? null,
meta: {
url: `${url.protocol}//${url.host}/${this.typegraphName}`,
token,
return this.w.execute(
op,
{
type: "import_func",
args,
internals: {
parent,
context,
secrets,
effect: mat.effect.effect ?? null,
meta: {
url: `${url.protocol}//${url.host}/${this.typegraphName}`,
token,
},
headers,
},
headers,
name: mat.data.name as string,
verbose,
},
name: mat.data.name as string,
verbose,
});
[],
pulseCount,
);
};
}

Expand All @@ -245,22 +254,27 @@ export class DenoRuntime extends Runtime {
) => {
const token = await InternalAuth.emit();

return this.w.execute(op, {
type: "func",
args,
internals: {
parent,
context,
secrets,
effect: mat.effect.effect ?? null,
meta: {
url: `${url.protocol}//${url.host}/${this.typegraphName}`,
token,
return this.w.execute(
op,
{
type: "func",
args,
internals: {
parent,
context,
secrets,
effect: mat.effect.effect ?? null,
meta: {
url: `${url.protocol}//${url.host}/${this.typegraphName}`,
token,
},
headers,
},
headers,
verbose,
},
verbose,
});
[],
pulseCount,
);
};
}

Expand Down
12 changes: 11 additions & 1 deletion typegate/src/runtimes/patterns/messenger/async_messenger.ts
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,15 @@ export class AsyncMessenger<Broker, M, A> {
let shouldStop = false;
for (const item of currentQueue) {
if (this.#tasks.has(item.id)) {
if (
item.remainingPulseCount !== undefined &&
item.remainingPulseCount > 0
) {
// check again next time if unterminated
item.remainingPulseCount -= 1;
continue;
}
// default behavior or 0 pulse left
const data = JSON.stringify(item, null, 2);
this.receive({
id: item.id,
Expand All @@ -89,12 +98,13 @@ export class AsyncMessenger<Broker, M, A> {
op: string | number | null,
data: M,
hooks: Array<() => Promise<void>> = [],
pulseCount = 0,
): Promise<unknown> {
const id = this.nextId();
const promise = deferred<unknown>();
this.#tasks.set(id, { promise, hooks });

const message = { id, op, data };
const message = { id, op, data, remainingPulseCount: pulseCount };
this.#operationQueues[this.#queueIndex].push(message);
void this.#send(this.broker, message);
return promise;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,13 +39,18 @@ export class LazyAsyncMessenger<Broker, M, A>
this.receive.bind(this),
);
}
const { op } = message;
const { op, remainingPulseCount } = message;
if (op !== null && !this.#loadedOps.has(op)) {
const initOp = this.#ops.get(op);
if (!initOp) {
throw new Error(`unknown op ${op}`);
}
await this.execute(null, initOp);
await this.execute(
null,
initOp,
[],
remainingPulseCount,
);
this.#loadedOps.add(op);
}
await send(this.broker, message);
Expand Down
1 change: 1 addition & 0 deletions typegate/src/runtimes/patterns/messenger/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ export interface Message<T> {
id: number;
op: string | number | null;
data: T;
remainingPulseCount?: number;
}

export type Answer<T> =
Expand Down

0 comments on commit 67f04c4

Please sign in to comment.