From 984ea3f67a0e8da7beb8f50a2320ef3ef4f0dd69 Mon Sep 17 00:00:00 2001 From: Harald Schilly Date: Mon, 15 Jul 2024 15:52:31 +0200 Subject: [PATCH] api/async exec: debugging how data moves around, documentation, etc --- src/packages/backend/execute-code.test.ts | 15 +- src/packages/backend/execute-code.ts | 42 +++-- src/packages/next/lib/api/schema/exec.ts | 184 +++++++++++++--------- src/packages/next/pages/api/v2/exec.ts | 4 +- src/packages/project/exec_shell_code.ts | 5 + src/packages/util/message.js | 3 + src/packages/util/types/execute-code.ts | 7 +- 7 files changed, 170 insertions(+), 90 deletions(-) diff --git a/src/packages/backend/execute-code.test.ts b/src/packages/backend/execute-code.test.ts index ebb2fed309..91adf577c6 100644 --- a/src/packages/backend/execute-code.test.ts +++ b/src/packages/backend/execute-code.test.ts @@ -91,22 +91,25 @@ describe("test timeout", () => { }); describe("async", () => { - it("use ID for async execution", async () => { - const retID = await executeCode({ + it("use ID to get async execution result", async () => { + const c = await executeCode({ command: "sh", - args: ["-c", "sleep .1; echo foo;"], + args: ["-c", "sleep .5; echo foo;"], bash: false, timeout: 10, - async_exec: true, + async_mode: true, }); - const id = retID["async_id"]; + expect(c.async_status).toEqual("running"); + expect(c.async_start_ts).toBeGreaterThan(1); + const id = c.async_id; expect(typeof id).toEqual("string"); if (typeof id === "string") { await new Promise((done) => setTimeout(done, 1000)); const status = await executeCode({ async_get: id }); - console.log("status", status); + expect(status.async_status).toEqual("completed"); expect(status.stdout).toEqual("foo\n"); expect(status.elapsed_s).toBeGreaterThan(0.1); + expect(status.async_start_ts).toBeGreaterThan(1); } }); }); diff --git a/src/packages/backend/execute-code.ts b/src/packages/backend/execute-code.ts index 428165d1e6..c7bdea1a3e 100644 --- a/src/packages/backend/execute-code.ts +++ b/src/packages/backend/execute-code.ts @@ -73,7 +73,7 @@ async function executeCodeNoAggregate( if (cached != null) { return cached; } else { - throw new Error(`Status or result of '${opts.async_get}' not found.`); + throw new Error(`Async operation '${opts.async_get}' not found.`); } } @@ -141,23 +141,29 @@ async function executeCodeNoAggregate( await chmod(tempPath, 0o700); } - if (opts.async_exec) { + if (opts.async_mode) { // we return an ID, the caller can then use it to query the status - const async_limit = 1024 * 1024; // we limit how much we keep in memory, to avoid problems - opts.max_output = Math.min(async_limit, opts.max_output ?? async_limit); + opts.max_output ??= 1024 * 1024; // we limit how much we keep in memory, to avoid problems; + opts.timeout ??= 10 * 60; const id = uuid(); const start = new Date(); - const started = { + const started: ExecuteCodeOutput = { stdout: `Process started running at ${start.toISOString()}`, stderr: "", - exit_code: start.getTime(), + exit_code: 0, + async_start_ts: start.getTime(), async_id: id, + async_status: "running", }; asyncCache.set(id, started); - doSpawn({ ...opts, origCommand }, (err, result) => { + doSpawn({ ...opts, origCommand, async_id: id }, (err, result) => { const started = asyncCache.get(id)?.exit_code ?? 0; - const info = { elapsed_s: (Date.now() - started) / 1000 }; + const info: Partial = { + elapsed_s: (Date.now() - started) / 1000, + async_start_ts: start.getTime(), + async_status: "error", + }; if (err) { asyncCache.set(id, { stdout: "", @@ -166,7 +172,11 @@ async function executeCodeNoAggregate( ...info, }); } else if (result != null) { - asyncCache.set(id, { ...result, ...info }); + asyncCache.set(id, { + ...result, + ...info, + ...{ async_status: "completed" }, + }); } else { asyncCache.set(id, { stdout: "", @@ -190,6 +200,18 @@ async function executeCodeNoAggregate( } } +function update_async( + async_id: string | undefined, + stream: "stdout" | "stderr", + data: string, +) { + if (!async_id) return; + const obj = asyncCache.get(async_id); + if (obj != null) { + obj[stream] = data; + } +} + function doSpawn( opts, cb: (err: string | undefined, result?: ExecuteCodeOutput) => void, @@ -256,6 +278,7 @@ function doSpawn( } else { stdout += data; } + update_async(opts.async_id, "stdout", stdout); }); r.stderr.on("data", (data) => { @@ -267,6 +290,7 @@ function doSpawn( } else { stderr += data; } + update_async(opts.async_id, "stderr", stderr); }); let stderr_is_done = false; diff --git a/src/packages/next/lib/api/schema/exec.ts b/src/packages/next/lib/api/schema/exec.ts index 7eb4319f77..fd5d67d089 100644 --- a/src/packages/next/lib/api/schema/exec.ts +++ b/src/packages/next/lib/api/schema/exec.ts @@ -1,94 +1,134 @@ import { z } from "../framework"; import { FailedAPIOperationSchema } from "./common"; -import { ProjectIdSchema } from "./projects/common"; import { ComputeServerIdSchema } from "./compute/common"; +import { ProjectIdSchema } from "./projects/common"; // OpenAPI spec // export const ExecInputSchema = z - .object({ - project_id: ProjectIdSchema, - compute_server_id: ComputeServerIdSchema.describe( - `If provided, the desired shell command will be run on the compute server whose id + .union([ + z.object({ + project_id: ProjectIdSchema, + compute_server_id: ComputeServerIdSchema.describe( + `If provided, the desired shell command will be run on the compute server whose id is specified in this field (if available).`, - ).optional(), - filesystem: z - .boolean() - .optional() - .describe( - `If \`true\`, this shell command runs in the fileserver container on the compute + ).optional(), + filesystem: z + .boolean() + .optional() + .describe( + `If \`true\`, this shell command runs in the fileserver container on the compute server; otherwise, it runs on the main compute container.`, - ), - path: z - .string() - .optional() - .describe( - "Path to working directory in which the shell command should be executed.", - ), - command: z.string().describe("The shell command to execute."), - args: z - .array(z.string()) - .optional() - .describe("An array of arguments to pass to the shell command."), - timeout: z - .number() - .min(0) - .default(60) - .optional() - .describe("Number of seconds before this shell command times out."), - max_output: z - .number() - .min(0) - .optional() - .describe("Maximum number of bytes to return from shell command output."), - bash: z - .boolean() - .optional() - .describe( - `If \`true\`, this command runs in a \`bash\` shell. To do so, the provided shell + ), + path: z + .string() + .optional() + .describe( + "Path to working directory in which the shell command should be executed.", + ), + command: z.string().describe("The shell command to execute."), + args: z + .array(z.string()) + .optional() + .describe("An array of arguments to pass to the shell command."), + timeout: z + .number() + .min(0) + .default(60) + .optional() + .describe("Number of seconds before this shell command times out."), + max_output: z + .number() + .min(0) + .optional() + .describe( + "Maximum number of bytes to return from shell command output.", + ), + bash: z + .boolean() + .optional() + .describe( + `If \`true\`, this command runs in a \`bash\` shell. To do so, the provided shell command is written to a file and then executed via the \`bash\` command.`, - ), - aggregate: z - .union([ - z.number(), - z.string(), - z.object({ value: z.union([z.string(), z.number()]) }), - ]) - .optional() - .describe( - `If provided, this shell command is aggregated as in + ), + aggregate: z + .union([ + z.number(), + z.string(), + z.object({ value: z.union([z.string(), z.number()]) }), + ]) + .optional() + .describe( + `If provided, this shell command is aggregated as in \`src/packages/backend/aggregate.js\`. This parameter allows one to specify multiple callbacks to be executed against the output of the same command (given identical arguments) within a 60-second window.`, - ), - err_on_exit: z - .boolean() - .optional() - .describe( - `When \`true\`, this call will throw an error whenever the provided shell command + ), + err_on_exit: z + .boolean() + .optional() + .describe( + `When \`true\`, this call will throw an error whenever the provided shell command exits with a non-zero exit code.`, - ), - env: z - .record(z.string(), z.string()) - .optional() - .describe( - "Environment variables to be passed to the shell command upon execution.", - ), - async_exec: z.boolean().optional() - .describe(`If \`true\`, the execution happens asynchroneously. - This means it the API call does not block and returns an ID (\`async_id\`). - Later, use that ID in a call to \`async_get\` to eventually get the result`), - async_get: z.string().optional() - .describe(`For a given \`async_id\` returned by \`async\`, - retun the status, or the result as if it is called synchroneously. - Results are only cached temporarily!`), - }) + ), + env: z + .record(z.string(), z.string()) + .optional() + .describe( + "Environment variables to be passed to the shell command upon execution.", + ), + async_mode: z.boolean().optional() + .describe(`If \`true\`, the execution happens asynchroneously. +This means this API call does not block and returns an ID (\`async_id\`). +Later, use that ID in a call to \`async_get\` to eventually get the result. + +Additionally and if not specified: \`max_output\` is set to 1MB and and \`timeout\` to 10 minutes.`), + }), + + z.object({ + project_id: ProjectIdSchema, + async_get: z.string().optional() + .describe(`For a given \`async_id\` returned by \`async\`, + retun the status, or the result as if it is called synchroneously. + Results are only cached temporarily!`), + }), + ]) .describe("Perform arbitrary shell commands in a compute server or project."); export const ExecOutputSchema = z.union([ + z + .object({ + stdout: z.string().describe("Output to stdout"), + stderr: z.string().describe("Output to stderr"), + exit_code: z + .number() + .describe( + "The numeric exit code. 0 usually means it ran without any issues.", + ), + async_id: z + .string() + .optional() + .describe("The ID identifying the async operation (async only)"), + async_start_ts: z + .number() + .optional() + .describe("UNIX timestamp when execution started (async only)"), + elapsed_s: z + .string() + .optional() + .describe("How long the execution took (async only)"), + async_status: z // AsyncStatus + .union([ + z.literal("running"), + z.literal("completed"), + z.literal("error"), + ]) + .optional() + .describe("Status of async operation."), + }) + .describe("Output of executed command."), FailedAPIOperationSchema, - z.any().describe("Output of executed command."), ]); export type ExecInput = z.infer; diff --git a/src/packages/next/pages/api/v2/exec.ts b/src/packages/next/pages/api/v2/exec.ts index 8f56209b36..793a1e0915 100644 --- a/src/packages/next/pages/api/v2/exec.ts +++ b/src/packages/next/pages/api/v2/exec.ts @@ -38,7 +38,7 @@ async function get(req) { aggregate, err_on_exit, env, - async_exec, + async_mode, async_get, } = getParams(req); @@ -63,7 +63,7 @@ async function get(req) { aggregate, err_on_exit, env, - async_exec, + async_mode, async_get, }, }); diff --git a/src/packages/project/exec_shell_code.ts b/src/packages/project/exec_shell_code.ts index 6b1151ab8b..52e98b0574 100644 --- a/src/packages/project/exec_shell_code.ts +++ b/src/packages/project/exec_shell_code.ts @@ -33,6 +33,7 @@ export async function exec_shell_code(socket: CoCalcSocket, mesg) { : abspath(mesg.path != null ? mesg.path : ""), ...mesg, }); + D(`execCode returned:`, out); socket.write_mesg( "json", message.project_exec_output({ @@ -40,6 +41,10 @@ export async function exec_shell_code(socket: CoCalcSocket, mesg) { stdout: out?.stdout, stderr: out?.stderr, exit_code: out?.exit_code, + async_id: out?.async_id, + async_start_ts: out?.async_start_ts, + async_status: out?.async_status, + elapsed_s: out?.elapsed_s, }), ); } catch (err) { diff --git a/src/packages/util/message.js b/src/packages/util/message.js index a85b207b59..6da901a950 100644 --- a/src/packages/util/message.js +++ b/src/packages/util/message.js @@ -824,6 +824,9 @@ message({ stdout: required, stderr: required, exit_code: required, + async_id: undefined, + async_start: undefined, + elapsed_s: undefined, }); //##################################################################### diff --git a/src/packages/util/types/execute-code.ts b/src/packages/util/types/execute-code.ts index 70a75afcb5..e0a2440a95 100644 --- a/src/packages/util/types/execute-code.ts +++ b/src/packages/util/types/execute-code.ts @@ -1,8 +1,12 @@ +export type AsyncStatus = "running" | "completed" | "error"; + export interface ExecuteCodeOutput { stdout: string; stderr: string; exit_code: number; + async_start_ts?: number; async_id?: string; + async_status?: AsyncStatus; elapsed_s?: number; // how long it took, async execution } @@ -22,7 +26,8 @@ export interface ExecuteCodeOptions { env?: object; // if given, added to exec environment aggregate?: string | number; // if given, aggregates multiple calls with same sequence number into one -- see @cocalc/util/aggregate; typically make this a timestamp for compiling code (e.g., latex). verbose?: boolean; // default true -- impacts amount of logging - async_exec?: boolean; // default false -- if true, return an ID and execute it asynchroneously + async_mode?: boolean; // default false -- if true, return an ID and execute it asynchroneously + async_get?: string; // if given, retrieve status or result of that async operation } export interface ExecuteCodeOptionsAsyncGet {