Skip to content

Commit

Permalink
api/async exec: debugging how data moves around, documentation, etc
Browse files Browse the repository at this point in the history
  • Loading branch information
haraldschilly committed Jul 15, 2024
1 parent d6b8787 commit 984ea3f
Show file tree
Hide file tree
Showing 7 changed files with 170 additions and 90 deletions.
15 changes: 9 additions & 6 deletions src/packages/backend/execute-code.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -91,22 +91,25 @@ describe("test timeout", () => {
});

describe("async", () => {
it("use ID for async execution", async () => {
const retID = await executeCode({
it("use ID to get async execution result", async () => {
const c = await executeCode({
command: "sh",
args: ["-c", "sleep .1; echo foo;"],
args: ["-c", "sleep .5; echo foo;"],
bash: false,
timeout: 10,
async_exec: true,
async_mode: true,
});
const id = retID["async_id"];
expect(c.async_status).toEqual("running");
expect(c.async_start_ts).toBeGreaterThan(1);
const id = c.async_id;
expect(typeof id).toEqual("string");
if (typeof id === "string") {
await new Promise((done) => setTimeout(done, 1000));
const status = await executeCode({ async_get: id });
console.log("status", status);
expect(status.async_status).toEqual("completed");
expect(status.stdout).toEqual("foo\n");
expect(status.elapsed_s).toBeGreaterThan(0.1);
expect(status.async_start_ts).toBeGreaterThan(1);
}
});
});
42 changes: 33 additions & 9 deletions src/packages/backend/execute-code.ts
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ async function executeCodeNoAggregate(
if (cached != null) {
return cached;
} else {
throw new Error(`Status or result of '${opts.async_get}' not found.`);
throw new Error(`Async operation '${opts.async_get}' not found.`);
}
}

Expand Down Expand Up @@ -141,23 +141,29 @@ async function executeCodeNoAggregate(
await chmod(tempPath, 0o700);
}

if (opts.async_exec) {
if (opts.async_mode) {
// we return an ID, the caller can then use it to query the status
const async_limit = 1024 * 1024; // we limit how much we keep in memory, to avoid problems
opts.max_output = Math.min(async_limit, opts.max_output ?? async_limit);
opts.max_output ??= 1024 * 1024; // we limit how much we keep in memory, to avoid problems;
opts.timeout ??= 10 * 60;
const id = uuid();
const start = new Date();
const started = {
const started: ExecuteCodeOutput = {
stdout: `Process started running at ${start.toISOString()}`,
stderr: "",
exit_code: start.getTime(),
exit_code: 0,
async_start_ts: start.getTime(),
async_id: id,
async_status: "running",
};
asyncCache.set(id, started);

doSpawn({ ...opts, origCommand }, (err, result) => {
doSpawn({ ...opts, origCommand, async_id: id }, (err, result) => {
const started = asyncCache.get(id)?.exit_code ?? 0;
const info = { elapsed_s: (Date.now() - started) / 1000 };
const info: Partial<ExecuteCodeOutput> = {
elapsed_s: (Date.now() - started) / 1000,
async_start_ts: start.getTime(),
async_status: "error",
};
if (err) {
asyncCache.set(id, {
stdout: "",
Expand All @@ -166,7 +172,11 @@ async function executeCodeNoAggregate(
...info,
});
} else if (result != null) {
asyncCache.set(id, { ...result, ...info });
asyncCache.set(id, {
...result,
...info,
...{ async_status: "completed" },
});
} else {
asyncCache.set(id, {
stdout: "",
Expand All @@ -190,6 +200,18 @@ async function executeCodeNoAggregate(
}
}

function update_async(
async_id: string | undefined,
stream: "stdout" | "stderr",
data: string,
) {
if (!async_id) return;
const obj = asyncCache.get(async_id);
if (obj != null) {
obj[stream] = data;
}
}

function doSpawn(
opts,
cb: (err: string | undefined, result?: ExecuteCodeOutput) => void,
Expand Down Expand Up @@ -256,6 +278,7 @@ function doSpawn(
} else {
stdout += data;
}
update_async(opts.async_id, "stdout", stdout);
});

r.stderr.on("data", (data) => {
Expand All @@ -267,6 +290,7 @@ function doSpawn(
} else {
stderr += data;
}
update_async(opts.async_id, "stderr", stderr);
});

let stderr_is_done = false;
Expand Down
184 changes: 112 additions & 72 deletions src/packages/next/lib/api/schema/exec.ts
Original file line number Diff line number Diff line change
@@ -1,94 +1,134 @@
import { z } from "../framework";

import { FailedAPIOperationSchema } from "./common";
import { ProjectIdSchema } from "./projects/common";
import { ComputeServerIdSchema } from "./compute/common";
import { ProjectIdSchema } from "./projects/common";

// OpenAPI spec
//
export const ExecInputSchema = z
.object({
project_id: ProjectIdSchema,
compute_server_id: ComputeServerIdSchema.describe(
`If provided, the desired shell command will be run on the compute server whose id
.union([
z.object({
project_id: ProjectIdSchema,
compute_server_id: ComputeServerIdSchema.describe(
`If provided, the desired shell command will be run on the compute server whose id
is specified in this field (if available).`,
).optional(),
filesystem: z
.boolean()
.optional()
.describe(
`If \`true\`, this shell command runs in the fileserver container on the compute
).optional(),
filesystem: z
.boolean()
.optional()
.describe(
`If \`true\`, this shell command runs in the fileserver container on the compute
server; otherwise, it runs on the main compute container.`,
),
path: z
.string()
.optional()
.describe(
"Path to working directory in which the shell command should be executed.",
),
command: z.string().describe("The shell command to execute."),
args: z
.array(z.string())
.optional()
.describe("An array of arguments to pass to the shell command."),
timeout: z
.number()
.min(0)
.default(60)
.optional()
.describe("Number of seconds before this shell command times out."),
max_output: z
.number()
.min(0)
.optional()
.describe("Maximum number of bytes to return from shell command output."),
bash: z
.boolean()
.optional()
.describe(
`If \`true\`, this command runs in a \`bash\` shell. To do so, the provided shell
),
path: z
.string()
.optional()
.describe(
"Path to working directory in which the shell command should be executed.",
),
command: z.string().describe("The shell command to execute."),
args: z
.array(z.string())
.optional()
.describe("An array of arguments to pass to the shell command."),
timeout: z
.number()
.min(0)
.default(60)
.optional()
.describe("Number of seconds before this shell command times out."),
max_output: z
.number()
.min(0)
.optional()
.describe(
"Maximum number of bytes to return from shell command output.",
),
bash: z
.boolean()
.optional()
.describe(
`If \`true\`, this command runs in a \`bash\` shell. To do so, the provided shell
command is written to a file and then executed via the \`bash\` command.`,
),
aggregate: z
.union([
z.number(),
z.string(),
z.object({ value: z.union([z.string(), z.number()]) }),
])
.optional()
.describe(
`If provided, this shell command is aggregated as in
),
aggregate: z
.union([
z.number(),
z.string(),
z.object({ value: z.union([z.string(), z.number()]) }),
])
.optional()
.describe(
`If provided, this shell command is aggregated as in
\`src/packages/backend/aggregate.js\`. This parameter allows one to specify
multiple callbacks to be executed against the output of the same command
(given identical arguments) within a 60-second window.`,
),
err_on_exit: z
.boolean()
.optional()
.describe(
`When \`true\`, this call will throw an error whenever the provided shell command
),
err_on_exit: z
.boolean()
.optional()
.describe(
`When \`true\`, this call will throw an error whenever the provided shell command
exits with a non-zero exit code.`,
),
env: z
.record(z.string(), z.string())
.optional()
.describe(
"Environment variables to be passed to the shell command upon execution.",
),
async_exec: z.boolean().optional()
.describe(`If \`true\`, the execution happens asynchroneously.
This means it the API call does not block and returns an ID (\`async_id\`).
Later, use that ID in a call to \`async_get\` to eventually get the result`),
async_get: z.string().optional()
.describe(`For a given \`async_id\` returned by \`async\`,
retun the status, or the result as if it is called synchroneously.
Results are only cached temporarily!`),
})
),
env: z
.record(z.string(), z.string())
.optional()
.describe(
"Environment variables to be passed to the shell command upon execution.",
),
async_mode: z.boolean().optional()
.describe(`If \`true\`, the execution happens asynchroneously.
This means this API call does not block and returns an ID (\`async_id\`).
Later, use that ID in a call to \`async_get\` to eventually get the result.
Additionally and if not specified: \`max_output\` is set to 1MB and and \`timeout\` to 10 minutes.`),
}),

z.object({
project_id: ProjectIdSchema,
async_get: z.string().optional()
.describe(`For a given \`async_id\` returned by \`async\`,
retun the status, or the result as if it is called synchroneously.
Results are only cached temporarily!`),
}),
])
.describe("Perform arbitrary shell commands in a compute server or project.");

export const ExecOutputSchema = z.union([
z
.object({
stdout: z.string().describe("Output to stdout"),
stderr: z.string().describe("Output to stderr"),
exit_code: z
.number()
.describe(
"The numeric exit code. 0 usually means it ran without any issues.",
),
async_id: z
.string()
.optional()
.describe("The ID identifying the async operation (async only)"),
async_start_ts: z
.number()
.optional()
.describe("UNIX timestamp when execution started (async only)"),
elapsed_s: z
.string()
.optional()
.describe("How long the execution took (async only)"),
async_status: z // AsyncStatus
.union([
z.literal("running"),
z.literal("completed"),
z.literal("error"),
])
.optional()
.describe("Status of async operation."),
})
.describe("Output of executed command."),
FailedAPIOperationSchema,
z.any().describe("Output of executed command."),
]);

export type ExecInput = z.infer<typeof ExecInputSchema>;
Expand Down
4 changes: 2 additions & 2 deletions src/packages/next/pages/api/v2/exec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ async function get(req) {
aggregate,
err_on_exit,
env,
async_exec,
async_mode,
async_get,
} = getParams(req);

Expand All @@ -63,7 +63,7 @@ async function get(req) {
aggregate,
err_on_exit,
env,
async_exec,
async_mode,
async_get,
},
});
Expand Down
5 changes: 5 additions & 0 deletions src/packages/project/exec_shell_code.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,18 @@ export async function exec_shell_code(socket: CoCalcSocket, mesg) {
: abspath(mesg.path != null ? mesg.path : ""),
...mesg,
});
D(`execCode returned:`, out);
socket.write_mesg(
"json",
message.project_exec_output({
id: mesg.id,
stdout: out?.stdout,
stderr: out?.stderr,
exit_code: out?.exit_code,
async_id: out?.async_id,
async_start_ts: out?.async_start_ts,
async_status: out?.async_status,
elapsed_s: out?.elapsed_s,
}),
);
} catch (err) {
Expand Down
3 changes: 3 additions & 0 deletions src/packages/util/message.js
Original file line number Diff line number Diff line change
Expand Up @@ -824,6 +824,9 @@ message({
stdout: required,
stderr: required,
exit_code: required,
async_id: undefined,
async_start: undefined,
elapsed_s: undefined,
});

//#####################################################################
Expand Down
Loading

0 comments on commit 984ea3f

Please sign in to comment.