diff --git a/src/packages/backend/execute-code.test.ts b/src/packages/backend/execute-code.test.ts index 601ca961b88..20088351e64 100644 --- a/src/packages/backend/execute-code.test.ts +++ b/src/packages/backend/execute-code.test.ts @@ -90,27 +90,113 @@ describe("test timeout", () => { }); }); +describe("test env", () => { + it("allows to specify environment variables", async () => { + const { stdout, stderr } = await executeCode({ + command: "sh", + args: ["-c", "echo $FOO;"], + err_on_exit: false, + bash: false, + env: { FOO: "bar" }, + }); + expect(stdout).toBe("bar\n"); + expect(stderr).toBe(""); + }); +}); + describe("async", () => { - it("use ID to get async execution result", async () => { + it("use ID to get async result", async () => { const c = await executeCode({ command: "sh", - args: ["-c", "sleep .5; echo foo;"], + args: ["-c", "echo foo; sleep .5; echo bar; sleep .5; echo baz;"], bash: false, timeout: 10, - async_mode: true, + async_call: true, }); - expect(c.async_status).toEqual("running"); - expect(c.async_start).toBeGreaterThan(1); - const id = c.async_id; - expect(typeof id).toEqual("string"); - if (typeof id === "string") { - await new Promise((done) => setTimeout(done, 1000)); - const status = await executeCode({ async_get: id }); - expect(status.async_status).toEqual("completed"); - expect(status.stdout).toEqual("foo\n"); - expect(status.elapsed_s).toBeGreaterThan(0.1); - expect(status.elapsed_s).toBeLessThan(3); - expect(status.async_start).toBeGreaterThan(1); + expect(c.type).toEqual("async"); + if (c.type !== "async") return; + const { status, start, job_id } = c; + expect(status).toEqual("running"); + expect(start).toBeGreaterThan(1); + expect(typeof job_id).toEqual("string"); + if (typeof job_id !== "string") return; + await new Promise((done) => setTimeout(done, 250)); + { + const s = await executeCode({ async_get: job_id }); + expect(s.type).toEqual("async"); + if (s.type !== "async") return; + expect(s.status).toEqual("running"); + // partial stdout result + expect(s.stdout).toEqual("foo\n"); + expect(s.elapsed_s).toBeUndefined(); + expect(s.start).toBeGreaterThan(1); + expect(s.exit_code).toEqual(0); + } + + await new Promise((done) => setTimeout(done, 900)); + { + const s = await executeCode({ async_get: job_id }); + expect(s.type).toEqual("async"); + if (s.type !== "async") return; + expect(s.status).toEqual("completed"); + expect(s.stdout).toEqual("foo\nbar\nbaz\n"); + expect(s.elapsed_s).toBeGreaterThan(0.1); + expect(s.elapsed_s).toBeLessThan(3); + expect(s.start).toBeGreaterThan(1); + expect(s.stderr).toEqual(""); + expect(s.exit_code).toEqual(0); } }); + + it("with an error", async () => { + const c = await executeCode({ + command: ">&2 echo baz; exit 3", + bash: true, + async_call: true, + }); + expect(c.type).toEqual("async"); + if (c.type !== "async") return; + const { job_id } = c; + expect(typeof job_id).toEqual("string"); + if (typeof job_id !== "string") return; + await new Promise((done) => setTimeout(done, 250)); + const s = await executeCode({ async_get: job_id }); + expect(s.type).toEqual("async"); + if (s.type !== "async") return; + expect(s.status).toEqual("error"); + expect(s.stdout).toEqual(""); + expect(s.stderr).toEqual("baz\n"); + // any error is code 1 it seems? + expect(s.exit_code).toEqual(1); + }); + + it("trigger a timeout", async () => { + const c = await executeCode({ + command: "sh", + args: ["-c", "echo foo; sleep 1; echo bar;"], + bash: false, + timeout: 0.1, + async_call: true, + }); + expect(c.type).toEqual("async"); + if (c.type !== "async") return; + const { status, start, job_id } = c; + expect(status).toEqual("running"); + expect(start).toBeGreaterThan(1); + expect(typeof job_id).toEqual("string"); + if (typeof job_id !== "string") return; + await new Promise((done) => setTimeout(done, 250)); + const s = await executeCode({ async_get: job_id }); + expect(s.type).toEqual("async"); + if (s.type !== "async") return; + expect(s.status).toEqual("error"); + expect(s.stdout).toEqual(""); + expect(s.elapsed_s).toBeGreaterThan(0.01); + expect(s.elapsed_s).toBeLessThan(3); + expect(s.start).toBeGreaterThan(1); + expect(s.stderr).toEqual( + "killed command 'sh -c echo foo; sleep 1; echo bar;'", + ); + expect(s.exit_code).toEqual(1); + }); }); diff --git a/src/packages/backend/execute-code.ts b/src/packages/backend/execute-code.ts index adcb839ecf1..7607a4c436b 100644 --- a/src/packages/backend/execute-code.ts +++ b/src/packages/backend/execute-code.ts @@ -24,6 +24,8 @@ import { to_json, trunc, uuid, walltime } from "@cocalc/util/misc"; import { envForSpawn } from "./misc"; import { + ExecuteCodeOutputAsync, + ExecuteCodeOutputBlocking, isExecuteCodeOptionsAsyncGet, type ExecuteCodeFunctionWithCallback, type ExecuteCodeOptions, @@ -34,7 +36,7 @@ import { const log = getLogger("execute-code"); -const asyncCache = new LRU({ +const asyncCache = new LRU({ max: 100, ttl: 1000 * 60 * 60, ttlAutopurge: true, @@ -64,6 +66,12 @@ export const execute_code: ExecuteCodeFunctionWithCallback = aggregate( }, ); +async function clean_up_tmp(tempDir: string | undefined) { + if (tempDir) { + await rm(tempDir, { force: true, recursive: true }); + } +} + // actual implementation, without the aggregate wrapper async function executeCodeNoAggregate( opts: ExecuteCodeOptions | ExecuteCodeOptionsAsyncGet, @@ -77,16 +85,15 @@ async function executeCodeNoAggregate( } } - if (opts.args == null) opts.args = []; - if (opts.timeout == null) opts.timeout = 10; - if (opts.ulimit_timeout == null) opts.ulimit_timeout = true; - if (opts.err_on_exit == null) opts.err_on_exit = true; - if (opts.verbose == null) opts.verbose = true; + opts.args ??= []; + opts.timeout ??= 10; + opts.ulimit_timeout ??= true; + opts.err_on_exit ??= true; + opts.verbose ??= true; if (opts.verbose) { log.debug(`input: ${opts.command} ${opts.args?.join(" ")}`); } - const s = opts.command.split(/\s+/g); // split on whitespace if (opts.args?.length === 0 && s.length > 1) { opts.bash = true; @@ -141,49 +148,59 @@ async function executeCodeNoAggregate( await chmod(tempPath, 0o700); } - if (opts.async_mode) { + if (opts.async_call) { // we return an ID, the caller can then use it to query the status opts.max_output ??= 1024 * 1024; // we limit how much we keep in memory, to avoid problems; opts.timeout ??= 10 * 60; - const id = uuid(); + const job_id = uuid(); const start = new Date(); - const started: ExecuteCodeOutput = { + const started: ExecuteCodeOutputAsync = { + type: "async", stdout: `Process started running at ${start.toISOString()}`, stderr: "", exit_code: 0, - async_start: start.getTime(), - async_id: id, - async_status: "running", + start: start.getTime(), + job_id, + status: "running", }; - asyncCache.set(id, started); - - doSpawn({ ...opts, origCommand, async_id: id }, (err, result) => { - const started = asyncCache.get(id)?.async_start ?? 0; - const info: Partial = { - elapsed_s: (Date.now() - started) / 1000, - async_start: start.getTime(), - async_status: "error", - }; - if (err) { - asyncCache.set(id, { - stdout: "", - stderr: `${err}`, - exit_code: 1, - ...info, - }); - } else if (result != null) { - asyncCache.set(id, { - ...result, - ...info, - ...{ async_status: "completed" }, - }); - } else { - asyncCache.set(id, { - stdout: "", - stderr: `No result`, - exit_code: 1, - ...info, - }); + asyncCache.set(job_id, started); + + doSpawn({ ...opts, origCommand, job_id }, async (err, result) => { + try { + const started = asyncCache.get(job_id)?.start ?? 0; + const info: Omit< + ExecuteCodeOutputAsync, + "stdout" | "stderr" | "exit_code" + > = { + job_id, + type: "async", + elapsed_s: (Date.now() - started) / 1000, + start: start.getTime(), + status: "error", + }; + if (err) { + asyncCache.set(job_id, { + stdout: "", + stderr: `${err}`, + exit_code: 1, + ...info, + }); + } else if (result != null) { + asyncCache.set(job_id, { + ...result, + ...info, + ...{ status: "completed" }, + }); + } else { + asyncCache.set(job_id, { + stdout: "", + stderr: `No result`, + exit_code: 1, + ...info, + }); + } + } finally { + await clean_up_tmp(tempDir); } }); @@ -193,20 +210,18 @@ async function executeCodeNoAggregate( return await callback(doSpawn, { ...opts, origCommand }); } } finally { - // clean up - if (tempDir) { - await rm(tempDir, { force: true, recursive: true }); - } + // do not delete the tempDir in async mode! + if (!opts.async_call) await clean_up_tmp(tempDir); } } function update_async( - async_id: string | undefined, + job_id: string | undefined, stream: "stdout" | "stderr", data: string, ) { - if (!async_id) return; - const obj = asyncCache.get(async_id); + if (!job_id) return; + const obj = asyncCache.get(job_id); if (obj != null) { obj[stream] = data; } @@ -214,7 +229,7 @@ function update_async( function doSpawn( opts, - cb: (err: string | undefined, result?: ExecuteCodeOutput) => void, + cb: (err: string | undefined, result?: ExecuteCodeOutputBlocking) => void, ) { const start_time = walltime(); @@ -278,7 +293,7 @@ function doSpawn( } else { stdout += data; } - update_async(opts.async_id, "stdout", stdout); + update_async(opts.job_id, "stdout", stdout); }); r.stderr.on("data", (data) => { @@ -290,7 +305,7 @@ function doSpawn( } else { stderr += data; } - update_async(opts.async_id, "stderr", stderr); + update_async(opts.job_id, "stderr", stderr); }); let stderr_is_done = false; @@ -363,12 +378,17 @@ function doSpawn( const x = opts.origCommand ? opts.origCommand : `'${opts.command}' (args=${opts.args?.join(" ")})`; - cb( - `command '${x}' exited with nonzero code ${exit_code} -- stderr='${trunc( - stderr, - 1024, - )}'`, - ); + if (opts.job_id) { + cb(stderr); + } else { + // sync behavor, like it was before + cb( + `command '${x}' exited with nonzero code ${exit_code} -- stderr='${trunc( + stderr, + 1024, + )}'`, + ); + } } else if (!ran_code) { // regardless of opts.err_on_exit ! const x = opts.origCommand @@ -390,7 +410,7 @@ function doSpawn( // if exit-code not set, may have been SIGKILL so we set it to 1 exit_code = 1; } - cb(undefined, { stdout, stderr, exit_code }); + cb(undefined, { type: "blocking", stdout, stderr, exit_code }); } }; diff --git a/src/packages/next/lib/api/schema/exec.ts b/src/packages/next/lib/api/schema/exec.ts index 36ac192d448..0b42cb8cfda 100644 --- a/src/packages/next/lib/api/schema/exec.ts +++ b/src/packages/next/lib/api/schema/exec.ts @@ -12,14 +12,14 @@ export const ExecInputSchema = z project_id: ProjectIdSchema, compute_server_id: ComputeServerIdSchema.describe( `If provided, the desired shell command will be run on the compute server whose id - is specified in this field (if available).`, + is specified in this field (if available).`, ).optional(), filesystem: z .boolean() .optional() .describe( `If \`true\`, this shell command runs in the fileserver container on the compute - server; otherwise, it runs on the main compute container.`, + server; otherwise, it runs on the main compute container.`, ), path: z .string() @@ -50,8 +50,22 @@ export const ExecInputSchema = z .optional() .describe( `If \`true\`, this command runs in a \`bash\` shell. To do so, the provided shell - command is written to a file and then executed via the \`bash\` command.`, + command is written to a file and then executed via the \`bash\` command.`, ), + home: z + .string() + .optional() + .describe( + `Specify \`$HOME\`. If not set, it is inferred from the environment's \`$HOME\``, + ), + uid: z + .number() + .optional() + .describe("Set the `UID` identity of the spawned process."), + gid: z + .number() + .optional() + .describe("Set the `GID` identity of the spawned process."), aggregate: z .union([ z.number(), @@ -69,8 +83,9 @@ export const ExecInputSchema = z .boolean() .optional() .describe( - `When \`true\`, this call will throw an error whenever the provided shell command - exits with a non-zero exit code.`, + `When \`true\` (the default), + this call will throw an error whenever the provided shell command + exits with a non-zero exit code.`, ), env: z .record(z.string(), z.string()) @@ -78,55 +93,53 @@ export const ExecInputSchema = z .describe( "Environment variables to be passed to the shell command upon execution.", ), - async_mode: z.boolean().optional() - .describe(`If \`true\`, the execution happens asynchroneously. -This means this API call does not block and returns an ID (\`async_id\`). -Later, use that ID in a call to \`async_get\` to eventually get the result. + async_call: z.boolean().optional() + .describe(`If \`true\`, the execution happens asynchronously. +The API call does not block and returns an ID (\`job_id\`). +Later, use that ID in a call to \`async_get\` to get status updates, partial output, and eventually the final result. + +This does not support executing code on compute servers – only inside the project itself. -Additionally and if not specified: \`max_output\` is set to 1MB and and \`timeout\` to 10 minutes.`), +Additionally and if not specified, \`max_output\` is set to 1MB and and \`timeout\` to 10 minutes.`), }), z.object({ project_id: ProjectIdSchema, async_get: z.string().optional() - .describe(`For a given \`async_id\` returned by \`async\`, - retun the status, or the result as if it is called synchroneously. - Results are only cached temporarily!`), + .describe(`For a given \`job_id\`, returned when setting \`async_call=true\`, +retrieve the corresponding status or the result. +Results are cached temporarily in the project.`), }), ]) .describe("Perform arbitrary shell commands in a compute server or project."); +const ExecOutputBlocking = z.object({ + type: z.literal("blocking"), + stdout: z.string().describe("Output to stdout"), + stderr: z.string().describe("Output to stderr"), + exit_code: z + .number() + .describe( + "The numeric exit code. 0 usually means it ran without any issues.", + ), +}); + +const ExecOutputAsync = ExecOutputBlocking.extend({ + type: z.literal("async"), + job_id: z.string().describe("The ID identifying the async operation"), + start: z + .number() + .optional() + .describe("UNIX timestamp, when the execution started"), + elapsed_s: z.string().optional().describe("How long the execution took"), + status: z // AsyncStatus + .union([z.literal("running"), z.literal("completed"), z.literal("error")]) + .describe("Status of the async operation"), +}); + export const ExecOutputSchema = z.union([ z - .object({ - stdout: z.string().describe("Output to stdout"), - stderr: z.string().describe("Output to stderr"), - exit_code: z - .number() - .describe( - "The numeric exit code. 0 usually means it ran without any issues.", - ), - async_id: z - .string() - .optional() - .describe("The ID identifying the async operation (async only)"), - async_start: z - .number() - .optional() - .describe("UNIX timestamp when execution started (async only)"), - elapsed_s: z - .string() - .optional() - .describe("How long the execution took (async only)"), - async_status: z // AsyncStatus - .union([ - z.literal("running"), - z.literal("completed"), - z.literal("error"), - ]) - .optional() - .describe("Status of async operation."), - }) + .discriminatedUnion("type", [ExecOutputBlocking, ExecOutputAsync]) .describe("Output of executed command."), FailedAPIOperationSchema, ]); diff --git a/src/packages/next/pages/api/v2/exec.ts b/src/packages/next/pages/api/v2/exec.ts index 793a1e0915a..841a9953c2d 100644 --- a/src/packages/next/pages/api/v2/exec.ts +++ b/src/packages/next/pages/api/v2/exec.ts @@ -38,7 +38,7 @@ async function get(req) { aggregate, err_on_exit, env, - async_mode, + async_call, async_get, } = getParams(req); @@ -63,7 +63,7 @@ async function get(req) { aggregate, err_on_exit, env, - async_mode, + async_call, async_get, }, }); diff --git a/src/packages/project/exec_shell_code.test.ts b/src/packages/project/exec_shell_code.test.ts index 4020edc80f4..3fc70fb1f3c 100644 --- a/src/packages/project/exec_shell_code.test.ts +++ b/src/packages/project/exec_shell_code.test.ts @@ -21,6 +21,7 @@ test("exec_shell_code", (done) => { stdout: expect.any(String), stderr: "", exit_code: 0, + type: "blocking", }); expect(mesg.stdout).toBe("42\n"); done(); diff --git a/src/packages/project/exec_shell_code.ts b/src/packages/project/exec_shell_code.ts index 79beb7e8abe..5e90f80a8f9 100644 --- a/src/packages/project/exec_shell_code.ts +++ b/src/packages/project/exec_shell_code.ts @@ -11,6 +11,7 @@ import { CoCalcSocket } from "@cocalc/backend/tcp/enable-messaging-protocol"; import * as message from "@cocalc/util/message"; import { getLogger } from "./logger"; import execCode from "@cocalc/project/browser-websocket/exec-code"; +import { ExecuteCodeOutput } from "@cocalc/util/types/execute-code"; const { debug: D } = getLogger("exec_shell_code"); @@ -33,19 +34,25 @@ export async function exec_shell_code(socket: CoCalcSocket, mesg) { : abspath(mesg.path != null ? mesg.path : ""), ...mesg, }); - socket.write_mesg( - "json", - message.project_exec_output({ - id: mesg.id, - stdout: out?.stdout, - stderr: out?.stderr, - exit_code: out?.exit_code, - async_id: out?.async_id, - async_start: out?.async_start, - async_status: out?.async_status, + let ret: ExecuteCodeOutput & { id: string } = { + id: mesg.id, + type: "blocking", + stdout: out?.stdout, + stderr: out?.stderr, + exit_code: out?.exit_code, + }; + if (out?.type === "async") { + // extra fields for ExecuteCodeOutputAsync + ret = { + ...ret, + type: "async", + job_id: out?.job_id, + start: out?.start, + status: out?.status, elapsed_s: out?.elapsed_s, - }), - ); + }; + } + socket.write_mesg("json", message.project_exec_output(ret)); } catch (err) { let error = `Error executing command '${mesg.command}' with args '${mesg.args}' -- ${err}`; if (error.indexOf("Connection refused") !== -1) { diff --git a/src/packages/util/message.js b/src/packages/util/message.js index 34adec79049..a25affb275d 100644 --- a/src/packages/util/message.js +++ b/src/packages/util/message.js @@ -824,9 +824,10 @@ message({ stdout: required, stderr: required, exit_code: required, - async_id: undefined, - async_start: undefined, - async_status: undefined, + type: undefined, + job_id: undefined, + start: undefined, + status: undefined, elapsed_s: undefined, }); diff --git a/src/packages/util/types/execute-code.ts b/src/packages/util/types/execute-code.ts index 1c6c7657c6d..bdf78943e74 100644 --- a/src/packages/util/types/execute-code.ts +++ b/src/packages/util/types/execute-code.ts @@ -1,15 +1,29 @@ -export type AsyncStatus = "running" | "completed" | "error"; +export const ASYNC_STATES = ["running", "completed", "error"] as const; -export interface ExecuteCodeOutput { +export type AsyncStatus = (typeof ASYNC_STATES)[number]; + +interface ExecuteCodeBase { stdout: string; stderr: string; exit_code: number; - async_start?: number; - async_id?: string; - async_status?: AsyncStatus; +} + +export interface ExecuteCodeOutputBlocking extends ExecuteCodeBase { + type: "blocking"; +} + +export interface ExecuteCodeOutputAsync extends ExecuteCodeBase { + type: "async"; + start?: number; + job_id: string; + status: AsyncStatus; elapsed_s?: number; // how long it took, async execution } +export type ExecuteCodeOutput = + | ExecuteCodeOutputBlocking + | ExecuteCodeOutputAsync; + export interface ExecuteCodeOptions { command: string; args?: string[]; @@ -26,7 +40,7 @@ export interface ExecuteCodeOptions { env?: object; // if given, added to exec environment aggregate?: string | number; // if given, aggregates multiple calls with same sequence number into one -- see @cocalc/util/aggregate; typically make this a timestamp for compiling code (e.g., latex). verbose?: boolean; // default true -- impacts amount of logging - async_mode?: boolean; // default false -- if true, return an ID and execute it asynchroneously + async_call?: boolean; // default false -- if true, return an ID and execute it asynchroneously async_get?: string; // if given, retrieve status or result of that async operation }