Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(subs,deno): advanced filters + context util + replay aware logger #930

Merged
merged 10 commits into from
Dec 9, 2024
49 changes: 49 additions & 0 deletions src/typegate/src/runtimes/substantial/deno_context.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,12 @@ export class Context {
private id = 0;
public kwargs = {};
gql: ReturnType<typeof createGQLClient>;
logger: SubLogger;

constructor(private run: Run, private internal: TaskContext) {
this.gql = createGQLClient(internal);
this.kwargs = getKwargsCopy(run);
this.logger = new SubLogger(this);
}

#nextId() {
Expand Down Expand Up @@ -414,6 +416,53 @@ class RetryStrategy {
}
}


class SubLogger {
constructor(private ctx: Context) {}

async #log(kind: "warn" | "error" | "info", ...args: unknown[]) {
await this.ctx.save(() => {
const prefix = `[${kind.toUpperCase()}: ${this.ctx.getRun().run_id}]`;
switch(kind) {
case "warn": {
console.warn(prefix, ...args);
break;
}
case "error": {
console.error(prefix,...args);
break;
}
default: {
console.info(prefix, ...args);
break;
}
}

const message = args.map((arg) => {
try {
return JSON.stringify(arg);
} catch(_) {
return String(arg);
michael-0acf4 marked this conversation as resolved.
Show resolved Hide resolved
}
}).join(" ");

return `${prefix}: ${message}`;
});
}

async warn(...payload: unknown[]) {
await this.#log("warn", ...payload);
}

async info(...payload: unknown[]) {
await this.#log("info", ...payload);
}

async error(...payload: unknown[]) {
await this.#log("error", ...payload);
}
}

function createGQLClient(internal: TaskContext) {
const tgLocal = new URL(internal.meta.url);
if (testBaseUrl) {
Expand Down
10 changes: 10 additions & 0 deletions src/typegraph/deno/src/runtimes/deno.ts
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,16 @@
return t.func(t.struct({}), out, mat);
}

/** Utility for fetching the current request context */
fetchContext<C extends t.Typedef>(outputShape?: C): t.Func {
const returnValue = outputShape ? `context` : "JSON.stringify(context)";
return this.func(
t.struct({}),
outputShape ?? t.json(),
{ code: `(_, { context }) => ${returnValue}` }

Check warning on line 139 in src/typegraph/deno/src/runtimes/deno.ts

View check run for this annotation

Codecov / codecov/patch

src/typegraph/deno/src/runtimes/deno.ts#L133-L139

Added lines #L133 - L139 were not covered by tests
);
}

Check warning on line 141 in src/typegraph/deno/src/runtimes/deno.ts

View check run for this annotation

Codecov / codecov/patch

src/typegraph/deno/src/runtimes/deno.ts#L141

Added line #L141 was not covered by tests

policy(name: string, _code: string): Policy;
policy(name: string, data: Omit<DenoFunc, "effect">): Policy;
policy(name: string, data: string | Omit<DenoFunc, "effect">): Policy {
Expand Down
17 changes: 16 additions & 1 deletion src/typegraph/python/typegraph/runtimes/deno.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import json
import re
from dataclasses import dataclass
from typing import TYPE_CHECKING, Any, List, Optional
from typing import TYPE_CHECKING, Any, List, Optional, Union

from typegraph.gen.exports.runtimes import (
Effect,
Expand Down Expand Up @@ -129,6 +129,21 @@ def identity(self, inp: "t.struct") -> "t.func":
PredefinedFunMat(id=res.value, name="identity", effect=EffectRead()),
)

def fetch_context(self, output_shape: Union["t.struct", None] = None):
"""
Utility for fetching the current request context
"""
return_value = (
"context" if output_shape is not None else "JSON.stringify(context)"
)
from typegraph import t

return self.func(
inp=t.struct(),
out=output_shape or t.json(),
code=f"(_, {{ context }}) => {return_value}",
)

def policy(
self, name: str, code: str, secrets: Optional[List[str]] = None
) -> Policy:
Expand Down
17 changes: 17 additions & 0 deletions tests/auth/auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,4 +60,21 @@ def auth(g: Graph):
),
auth_token_field="token",
).with_policy(public),
# context_raw=deno.fetch_context().with_policy(public) # no args if shape is unknown
context=deno.fetch_context(
t.struct(
{
"provider": t.string(),
"accessToken": t.string(),
"refreshAt": t.integer(),
"profile": t.struct(
{
"id": t.integer(),
}
),
"exp": t.integer(),
"iat": t.integer(),
}
)
).with_policy(public),
)
12 changes: 12 additions & 0 deletions tests/auth/auth_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -331,6 +331,12 @@ Meta.test("Auth", async (t) => {
token(x: 1) {
x
}
context {
provider
profile {
id
}
}
}
`
.withHeaders({ authorization: `bearer ${jwt}` })
Expand All @@ -347,6 +353,12 @@ Meta.test("Auth", async (t) => {
token: {
x: 1,
},
context: {
provider: "github",
profile: {
id: 123
}
}
})
.on(e);
});
Expand Down
8 changes: 8 additions & 0 deletions tests/runtimes/substantial/imports/common_types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,14 @@ export interface Context {
createWorkflowHandle(
handleDef: SerializableWorkflowHandle,
): ChildWorkflowHandle;

logger: SubLogger
}

interface SubLogger {
warn: (...args: unknown[]) => Promise<void>;
info: (...args: unknown[]) => Promise<void>;
error: (...args: unknown[]) => Promise<void>;
}

export type TaskCtx = {
Expand Down
3 changes: 3 additions & 0 deletions tests/runtimes/substantial/workflow.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,14 @@ export const eventsAndExceptionExample: Workflow<string> = async (
) => {
const { to } = ctx.kwargs;
const messageDialog = await ctx.save(() => sendSubscriptionEmail(to));
await ctx.logger.info("Will send to", to);
await ctx.logger.warn("Will now wait on an event");

// This will wait until a `confirmation` event is sent to THIS workflow
const confirmation = ctx.receive<boolean>("confirmation");

if (!confirmation) {
await ctx.logger.error("Denial", to);
throw new Error(`${to} has denied the subscription`);
}

Expand Down
Loading