Skip to content

Commit

Permalink
api/async exec: cleanup api, more testing
Browse files Browse the repository at this point in the history
  • Loading branch information
haraldschilly committed Jul 16, 2024
1 parent 0475484 commit 5c482b9
Show file tree
Hide file tree
Showing 8 changed files with 281 additions and 139 deletions.
116 changes: 101 additions & 15 deletions src/packages/backend/execute-code.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -90,27 +90,113 @@ describe("test timeout", () => {
});
});

describe("test env", () => {
it("allows to specify environment variables", async () => {
const { stdout, stderr } = await executeCode({
command: "sh",
args: ["-c", "echo $FOO;"],
err_on_exit: false,
bash: false,
env: { FOO: "bar" },
});
expect(stdout).toBe("bar\n");
expect(stderr).toBe("");
});
});

describe("async", () => {
it("use ID to get async execution result", async () => {
it("use ID to get async result", async () => {
const c = await executeCode({
command: "sh",
args: ["-c", "sleep .5; echo foo;"],
args: ["-c", "echo foo; sleep .5; echo bar; sleep .5; echo baz;"],
bash: false,
timeout: 10,
async_mode: true,
async_call: true,
});
expect(c.async_status).toEqual("running");
expect(c.async_start).toBeGreaterThan(1);
const id = c.async_id;
expect(typeof id).toEqual("string");
if (typeof id === "string") {
await new Promise((done) => setTimeout(done, 1000));
const status = await executeCode({ async_get: id });
expect(status.async_status).toEqual("completed");
expect(status.stdout).toEqual("foo\n");
expect(status.elapsed_s).toBeGreaterThan(0.1);
expect(status.elapsed_s).toBeLessThan(3);
expect(status.async_start).toBeGreaterThan(1);
expect(c.type).toEqual("async");
if (c.type !== "async") return;
const { status, start, job_id } = c;
expect(status).toEqual("running");
expect(start).toBeGreaterThan(1);
expect(typeof job_id).toEqual("string");
if (typeof job_id !== "string") return;
await new Promise((done) => setTimeout(done, 250));
{
const s = await executeCode({ async_get: job_id });
expect(s.type).toEqual("async");
if (s.type !== "async") return;
expect(s.status).toEqual("running");
// partial stdout result
expect(s.stdout).toEqual("foo\n");
expect(s.elapsed_s).toBeUndefined();
expect(s.start).toBeGreaterThan(1);
expect(s.exit_code).toEqual(0);
}

await new Promise((done) => setTimeout(done, 900));
{
const s = await executeCode({ async_get: job_id });
expect(s.type).toEqual("async");
if (s.type !== "async") return;
expect(s.status).toEqual("completed");
expect(s.stdout).toEqual("foo\nbar\nbaz\n");
expect(s.elapsed_s).toBeGreaterThan(0.1);
expect(s.elapsed_s).toBeLessThan(3);
expect(s.start).toBeGreaterThan(1);
expect(s.stderr).toEqual("");
expect(s.exit_code).toEqual(0);
}
});

it("with an error", async () => {
const c = await executeCode({
command: ">&2 echo baz; exit 3",
bash: true,
async_call: true,
});
expect(c.type).toEqual("async");
if (c.type !== "async") return;
const { job_id } = c;
expect(typeof job_id).toEqual("string");
if (typeof job_id !== "string") return;
await new Promise((done) => setTimeout(done, 250));
const s = await executeCode({ async_get: job_id });
expect(s.type).toEqual("async");
if (s.type !== "async") return;
expect(s.status).toEqual("error");
expect(s.stdout).toEqual("");
expect(s.stderr).toEqual("baz\n");
// any error is code 1 it seems?
expect(s.exit_code).toEqual(1);
});

it("trigger a timeout", async () => {
const c = await executeCode({
command: "sh",
args: ["-c", "echo foo; sleep 1; echo bar;"],
bash: false,
timeout: 0.1,
async_call: true,
});
expect(c.type).toEqual("async");
if (c.type !== "async") return;
const { status, start, job_id } = c;
expect(status).toEqual("running");
expect(start).toBeGreaterThan(1);
expect(typeof job_id).toEqual("string");
if (typeof job_id !== "string") return;
await new Promise((done) => setTimeout(done, 250));
const s = await executeCode({ async_get: job_id });
expect(s.type).toEqual("async");
if (s.type !== "async") return;
expect(s.status).toEqual("error");
expect(s.stdout).toEqual("");
expect(s.elapsed_s).toBeGreaterThan(0.01);
expect(s.elapsed_s).toBeLessThan(3);
expect(s.start).toBeGreaterThan(1);
expect(s.stderr).toEqual(
"killed command 'sh -c echo foo; sleep 1; echo bar;'",
);
expect(s.exit_code).toEqual(1);
});
});
138 changes: 79 additions & 59 deletions src/packages/backend/execute-code.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ import { to_json, trunc, uuid, walltime } from "@cocalc/util/misc";
import { envForSpawn } from "./misc";

import {
ExecuteCodeOutputAsync,
ExecuteCodeOutputBlocking,
isExecuteCodeOptionsAsyncGet,
type ExecuteCodeFunctionWithCallback,
type ExecuteCodeOptions,
Expand All @@ -34,7 +36,7 @@ import {

const log = getLogger("execute-code");

const asyncCache = new LRU<string, ExecuteCodeOutput>({
const asyncCache = new LRU<string, ExecuteCodeOutputAsync>({
max: 100,
ttl: 1000 * 60 * 60,
ttlAutopurge: true,
Expand Down Expand Up @@ -64,6 +66,12 @@ export const execute_code: ExecuteCodeFunctionWithCallback = aggregate(
},
);

async function clean_up_tmp(tempDir: string | undefined) {
if (tempDir) {
await rm(tempDir, { force: true, recursive: true });
}
}

// actual implementation, without the aggregate wrapper
async function executeCodeNoAggregate(
opts: ExecuteCodeOptions | ExecuteCodeOptionsAsyncGet,
Expand All @@ -77,16 +85,15 @@ async function executeCodeNoAggregate(
}
}

if (opts.args == null) opts.args = [];
if (opts.timeout == null) opts.timeout = 10;
if (opts.ulimit_timeout == null) opts.ulimit_timeout = true;
if (opts.err_on_exit == null) opts.err_on_exit = true;
if (opts.verbose == null) opts.verbose = true;
opts.args ??= [];
opts.timeout ??= 10;
opts.ulimit_timeout ??= true;
opts.err_on_exit ??= true;
opts.verbose ??= true;

if (opts.verbose) {
log.debug(`input: ${opts.command} ${opts.args?.join(" ")}`);
}

const s = opts.command.split(/\s+/g); // split on whitespace
if (opts.args?.length === 0 && s.length > 1) {
opts.bash = true;
Expand Down Expand Up @@ -141,49 +148,59 @@ async function executeCodeNoAggregate(
await chmod(tempPath, 0o700);
}

if (opts.async_mode) {
if (opts.async_call) {
// we return an ID, the caller can then use it to query the status
opts.max_output ??= 1024 * 1024; // we limit how much we keep in memory, to avoid problems;
opts.timeout ??= 10 * 60;
const id = uuid();
const job_id = uuid();
const start = new Date();
const started: ExecuteCodeOutput = {
const started: ExecuteCodeOutputAsync = {
type: "async",
stdout: `Process started running at ${start.toISOString()}`,
stderr: "",
exit_code: 0,
async_start: start.getTime(),
async_id: id,
async_status: "running",
start: start.getTime(),
job_id,
status: "running",
};
asyncCache.set(id, started);

doSpawn({ ...opts, origCommand, async_id: id }, (err, result) => {
const started = asyncCache.get(id)?.async_start ?? 0;
const info: Partial<ExecuteCodeOutput> = {
elapsed_s: (Date.now() - started) / 1000,
async_start: start.getTime(),
async_status: "error",
};
if (err) {
asyncCache.set(id, {
stdout: "",
stderr: `${err}`,
exit_code: 1,
...info,
});
} else if (result != null) {
asyncCache.set(id, {
...result,
...info,
...{ async_status: "completed" },
});
} else {
asyncCache.set(id, {
stdout: "",
stderr: `No result`,
exit_code: 1,
...info,
});
asyncCache.set(job_id, started);

doSpawn({ ...opts, origCommand, job_id }, async (err, result) => {
try {
const started = asyncCache.get(job_id)?.start ?? 0;
const info: Omit<
ExecuteCodeOutputAsync,
"stdout" | "stderr" | "exit_code"
> = {
job_id,
type: "async",
elapsed_s: (Date.now() - started) / 1000,
start: start.getTime(),
status: "error",
};
if (err) {
asyncCache.set(job_id, {
stdout: "",
stderr: `${err}`,
exit_code: 1,
...info,
});
} else if (result != null) {
asyncCache.set(job_id, {
...result,
...info,
...{ status: "completed" },
});
} else {
asyncCache.set(job_id, {
stdout: "",
stderr: `No result`,
exit_code: 1,
...info,
});
}
} finally {
await clean_up_tmp(tempDir);
}
});

Expand All @@ -193,28 +210,26 @@ async function executeCodeNoAggregate(
return await callback(doSpawn, { ...opts, origCommand });
}
} finally {
// clean up
if (tempDir) {
await rm(tempDir, { force: true, recursive: true });
}
// do not delete the tempDir in async mode!
if (!opts.async_call) await clean_up_tmp(tempDir);
}
}

function update_async(
async_id: string | undefined,
job_id: string | undefined,
stream: "stdout" | "stderr",
data: string,
) {
if (!async_id) return;
const obj = asyncCache.get(async_id);
if (!job_id) return;
const obj = asyncCache.get(job_id);
if (obj != null) {
obj[stream] = data;
}
}

function doSpawn(
opts,
cb: (err: string | undefined, result?: ExecuteCodeOutput) => void,
cb: (err: string | undefined, result?: ExecuteCodeOutputBlocking) => void,
) {
const start_time = walltime();

Expand Down Expand Up @@ -278,7 +293,7 @@ function doSpawn(
} else {
stdout += data;
}
update_async(opts.async_id, "stdout", stdout);
update_async(opts.job_id, "stdout", stdout);
});

r.stderr.on("data", (data) => {
Expand All @@ -290,7 +305,7 @@ function doSpawn(
} else {
stderr += data;
}
update_async(opts.async_id, "stderr", stderr);
update_async(opts.job_id, "stderr", stderr);
});

let stderr_is_done = false;
Expand Down Expand Up @@ -363,12 +378,17 @@ function doSpawn(
const x = opts.origCommand
? opts.origCommand
: `'${opts.command}' (args=${opts.args?.join(" ")})`;
cb(
`command '${x}' exited with nonzero code ${exit_code} -- stderr='${trunc(
stderr,
1024,
)}'`,
);
if (opts.job_id) {
cb(stderr);
} else {
// sync behavor, like it was before
cb(
`command '${x}' exited with nonzero code ${exit_code} -- stderr='${trunc(
stderr,
1024,
)}'`,
);
}
} else if (!ran_code) {
// regardless of opts.err_on_exit !
const x = opts.origCommand
Expand All @@ -390,7 +410,7 @@ function doSpawn(
// if exit-code not set, may have been SIGKILL so we set it to 1
exit_code = 1;
}
cb(undefined, { stdout, stderr, exit_code });
cb(undefined, { type: "blocking", stdout, stderr, exit_code });
}
};

Expand Down
Loading

0 comments on commit 5c482b9

Please sign in to comment.