From 4a061669bc3b55bd17c230fae83bcc39d4153f65 Mon Sep 17 00:00:00 2001 From: j03-dev <24nomeniavo@gmail.com> Date: Thu, 19 Dec 2024 11:20:57 +0300 Subject: [PATCH] feat: add compensation feature to deno context and utils --- src/substantial/protocol/events.proto | 7 +++ src/typegate/engine/runtime.d.ts | 7 ++- .../src/runtimes/substantial/deno_context.ts | 61 +++++++++++++++++-- 3 files changed, 69 insertions(+), 6 deletions(-) diff --git a/src/substantial/protocol/events.proto b/src/substantial/protocol/events.proto index 397ab00f08..31dab09701 100644 --- a/src/substantial/protocol/events.proto +++ b/src/substantial/protocol/events.proto @@ -31,6 +31,12 @@ message Save { } } +message Compensation { + uint32 save_id = 1; + string error = 2; + bytes compensation_result = 3; +} + message Sleep { uint32 id = 1; google.protobuf.Timestamp start = 2; @@ -58,6 +64,7 @@ message Event { Send send = 13; Stop stop = 14; } + Compensation compensation = 15; }; message Records { diff --git a/src/typegate/engine/runtime.d.ts b/src/typegate/engine/runtime.d.ts index 9b482ccd1b..cecfdd77aa 100644 --- a/src/typegate/engine/runtime.d.ts +++ b/src/typegate/engine/runtime.d.ts @@ -318,7 +318,12 @@ export type OperationEvent = | { type: "Send"; event_name: string; value: unknown } | { type: "Stop"; result: unknown } | { type: "Start"; kwargs: Record } - | { type: "Compensate" }; + | { + type: "Compensate"; + save_id: number; + error: string; + compensation_result: any; + }; export type Operation = { at: string; event: OperationEvent }; diff --git a/src/typegate/src/runtimes/substantial/deno_context.ts b/src/typegate/src/runtimes/substantial/deno_context.ts index 73090e1568..eaa84d6390 100644 --- a/src/typegate/src/runtimes/substantial/deno_context.ts +++ b/src/typegate/src/runtimes/substantial/deno_context.ts @@ -4,6 +4,7 @@ import { make_internal } from "../../worker_utils.ts"; import { TaskContext } from "../deno/shared_types.ts"; import { appendIfOngoing, Interrupt, OperationEvent, Run } from "./types.ts"; +import { randomUUID } from "../../crypto.ts"; // const isTest = Deno.env.get("DENO_TESTING") === "true"; const testBaseUrl = Deno.env.get("TEST_OVERRIDE_GQL_ORIGIN"); @@ -15,11 +16,14 @@ export class Context { public kwargs = {}; gql: ReturnType; logger: SubLogger; + utils: Utils; + compensationStack: (() => any | Promise)[] = []; constructor(private run: Run, private internal: TaskContext) { this.gql = createGQLClient(internal); this.kwargs = getKwargsCopy(run); this.logger = new SubLogger(this); + this.utils = new Utils(this); } #nextId() { @@ -33,7 +37,14 @@ export class Context { appendIfOngoing(this.run, { at: new Date().toJSON(), event: op }); } - async save(fn: () => T | Promise, option?: SaveOption) { + async save( + fn: () => T | Promise, + option?: SaveOption, + compensateWith?: () => T | Promise, + ) { + if (compensateWith) { + this.compensationStack.push(compensateWith!); + } const id = this.#nextId(); let currRetryCount = 1; @@ -76,6 +87,10 @@ export class Context { return clonedResult; } catch (err: any) { + if (option?.retry?.compensationOnfristFail) { + await this.#triggerCompensation(id, err); + throw err; + } if ( option?.retry?.maxRetries && currRetryCount < option.retry.maxRetries @@ -103,6 +118,7 @@ export class Context { throw Interrupt.Variant("SAVE_RETRY"); } else { + await this.#triggerCompensation(id, err); this.#appendOp({ type: "Save", id, @@ -120,6 +136,23 @@ export class Context { } } + async #triggerCompensation(save_id: number, error: string) { + const compensationStack = this.compensationStack; + if (compensationStack && compensationStack.length) { + compensationStack.reverse(); + for (const compensationFn of compensationStack) { + const result = await Promise.resolve(compensationFn()); + const clonedResult = deepClone(result ?? null); + this.#appendOp({ + type: "Compensate", + save_id, + error, + compensation_result: clonedResult, + }); + } + } + } + sleep(durationMs: number) { const id = this.#nextId(); for (const { event } of this.run.operations) { @@ -350,6 +383,7 @@ interface SaveOption { minBackoffMs: number; maxBackoffMs: number; maxRetries: number; + compensationOnfristFail: boolean; }; } @@ -416,20 +450,19 @@ class RetryStrategy { } } - class SubLogger { constructor(private ctx: Context) {} async #log(kind: "warn" | "error" | "info", ...args: unknown[]) { await this.ctx.save(() => { const prefix = `[${kind.toUpperCase()}: ${this.ctx.getRun().run_id}]`; - switch(kind) { + switch (kind) { case "warn": { console.warn(prefix, ...args); break; } case "error": { - console.error(prefix,...args); + console.error(prefix, ...args); break; } default: { @@ -444,7 +477,7 @@ class SubLogger { // Functions are omitted, // For example, JSON.stringify(() => 1234) => undefined (no throw) return json === undefined ? String(arg) : json; - } catch(_) { + } catch (_) { return String(arg); } }).join(" "); @@ -466,6 +499,24 @@ class SubLogger { } } +class Utils { + constructor(private ctx: Context) {} + + async now() { + return await this.ctx.save(() => Date.now()); + } + + async random(a: number, b: number) { + return await this.ctx.save(() => + Math.floor(Math.random() * (b - a + 1)) + a + ); + } + + async uuid4() { + return await this.ctx.save(() => randomUUID()); + } +} + function createGQLClient(internal: TaskContext) { const tgLocal = new URL(internal.meta.url); if (testBaseUrl) {