Skip to content

Commit

Permalink
worker: allow a limit on payload size
Browse files Browse the repository at this point in the history
  • Loading branch information
josephjclark committed Jul 30, 2024
1 parent 5c72cb6 commit 116a554
Show file tree
Hide file tree
Showing 10 changed files with 193 additions and 21 deletions.
4 changes: 2 additions & 2 deletions packages/lexicon/lightning.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,8 @@ export type LightningPlanOptions = {
start?: StepId;
output_dataclips?: boolean;

// future options
run_memory_limit?: number
run_memory_limit_mb?: number;
payload_memory_limit_mb?: number;
};

/**
Expand Down
49 changes: 32 additions & 17 deletions packages/ws-worker/src/events/step-complete.ts
Original file line number Diff line number Diff line change
@@ -1,18 +1,21 @@
import crypto from 'node:crypto';
import type { StepCompletePayload } from '@openfn/lexicon/lightning';
import type { JobCompletePayload } from '@openfn/engine-multi';
import { timestamp } from '@openfn/logger';

import { STEP_COMPLETE } from '../events';
import { stringify } from '../util';
import { calculateJobExitReason } from '../api/reasons';
import { sendEvent, Context } from '../api/execute';
import { sendEvent, onJobLog, Context } from '../api/execute';
import ensurePayloadSize from '../util/ensure-payload-size';

export default function onStepComplete(
{ channel, state, options }: Context,
export default async function onStepComplete(
context: Context,
event: JobCompletePayload,
// TODO this isn't terribly graceful, but accept an error for crashes
error?: any
) {
const { channel, state, options } = context;
const dataclipId = crypto.randomUUID();

const step_id = state.activeStep as string;
Expand Down Expand Up @@ -41,30 +44,42 @@ export default function onStepComplete(
state.inputDataclips[nextJobId] = dataclipId;
});

const { reason, error_message, error_type } = calculateJobExitReason(
job_id,
event.state,
error
);
state.reasons[job_id] = { reason, error_message, error_type };

const evt = {
step_id,
job_id,
output_dataclip_id: dataclipId,

reason,
error_message,
error_type,

mem: event.mem,
duration: event.duration,
thread_id: event.threadId,
} as StepCompletePayload;

if (!options || options.outputDataclips !== false) {
evt.output_dataclip = stringify(outputState);
try {
if (!options || options.outputDataclips !== false) {
const payload = stringify(outputState);
ensurePayloadSize(payload, options?.payloadLimitMb);

// Write the dataclip if it's not too big
evt.output_dataclip = payload;
evt.output_dataclip_id = dataclipId;
}
} catch (e) {
const time = (timestamp() - BigInt(10e6)).toString();
// If the dataclip is too big, return the step without it
// (the workflow will carry on internally)
await onJobLog(context, {
time,
message: [
'Dataclip too large. This dataclip will not be sent back to lighting.',
],
level: 'info',
name: 'R/T',
});
}

const reason = calculateJobExitReason(job_id, event.state, error);
state.reasons[job_id] = reason;

Object.assign(evt, reason);

return sendEvent<StepCompletePayload>(channel, STEP_COMPLETE, evt);
}
4 changes: 3 additions & 1 deletion packages/ws-worker/src/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ export type ServerOptions = {
min?: number;
max?: number;
};

payloadLimitMb?: number; // max memory limit for socket payload (ie, step:complete, log)
};

// this is the server/koa API
Expand Down Expand Up @@ -164,7 +166,7 @@ function createServer(engine: RuntimeEngine, options: ServerOptions = {}) {

router.get('/', healthcheck);

app.options = options || {};
app.options = options;

// TODO this probably needs to move into ./api/ somewhere
app.execute = async ({ id, token }: ClaimRun) => {
Expand Down
1 change: 1 addition & 0 deletions packages/ws-worker/src/start.ts
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ function engineReady(engine: any) {
max: maxBackoff,
},
maxWorkflows: args.capacity,
payloadLimitMb: args.payloadMemory,
};

if (args.lightningPublicKey) {
Expand Down
8 changes: 8 additions & 0 deletions packages/ws-worker/src/util/cli.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ type Args = {
backoff: string;
capacity?: number;
runMemory?: number;
payloadMemory?: number;
statePropsToRemove?: string[];
maxRunDurationSeconds: number;
};
Expand Down Expand Up @@ -48,6 +49,7 @@ export default function parseArgs(argv: string[]): Args {
WORKER_LIGHTNING_PUBLIC_KEY,
WORKER_LIGHTNING_SERVICE_URL,
WORKER_LOG_LEVEL,
WORKER_MAX_PAYLOAD_MEMORY_MB,
WORKER_MAX_RUN_DURATION_SECONDS,
WORKER_MAX_RUN_MEMORY_MB,
WORKER_PORT,
Expand Down Expand Up @@ -114,6 +116,11 @@ export default function parseArgs(argv: string[]): Args {
'Maximum memory allocated to a single run, in mb. Env: WORKER_MAX_RUN_MEMORY_MB',
type: 'number',
})
.option('payload-memory', {
description:
'Maximum memory allocated to a single run, in mb. Env: WORKER_MAX_PAYLOAD_MEMORY_MB',
type: 'number',
})
.option('max-run-duration-seconds', {
alias: 't',
description:
Expand Down Expand Up @@ -146,6 +153,7 @@ export default function parseArgs(argv: string[]): Args {
['configuration', 'response']
),
runMemory: setArg(args.runMemory, WORKER_MAX_RUN_MEMORY_MB, 500),
payloadMemory: setArg(args.payloadMemory, WORKER_MAX_PAYLOAD_MEMORY_MB, 10),
maxRunDurationSeconds: setArg(
args.maxRunDurationSeconds,
WORKER_MAX_RUN_DURATION_SECONDS,
Expand Down
7 changes: 7 additions & 0 deletions packages/ws-worker/src/util/convert-lightning-plan.ts
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ const mapTriggerEdgeCondition = (edge: Edge) => {
export type WorkerRunOptions = ExecuteOptions & {
// Defaults to true - must be explicity false to stop dataclips being sent
outputDataclips?: boolean;
payloadLimitMb?: number;
};

export default (
Expand All @@ -58,6 +59,12 @@ export default (
if (run.options.run_timeout_ms) {
engineOpts.runTimeoutMs = run.options.run_timeout_ms;
}
if (run.options.payload_memory_limit_mb) {
engineOpts.payloadLimitMb = run.options.payload_memory_limit_mb;
}
if (run.options.run_memory_limit_mb) {
engineOpts.memoryLimitMb = run.options.run_memory_limit_mb;
}
if (run.options.sanitize) {
engineOpts.sanitize = run.options.sanitize;
}
Expand Down
16 changes: 16 additions & 0 deletions packages/ws-worker/src/util/ensure-payload-size.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
export default (payload: string, limit_mb?: number) => {
// @ts-ignore
if (!isNaN(limit_mb)) {
const limit = limit_mb as number;
const size_bytes = Buffer.byteLength(payload, 'utf8');
const size_mb = size_bytes / 1024 / 1024;
if (size_mb > limit) {
const e = new Error();
// @ts-ignore
e.severity = 'kill';
e.name = 'PAYLOAD_TOO_LARGE';
e.message = `The payload exceeded the size limit of ${limit}mb`;
throw e;
}
}
};
69 changes: 68 additions & 1 deletion packages/ws-worker/test/events/step-complete.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import type { StepCompletePayload } from '@openfn/lexicon/lightning';
import handleStepComplete from '../../src/events/step-complete';
import { mockChannel } from '../../src/mock/sockets';
import { createRunState } from '../../src/util';
import { STEP_COMPLETE } from '../../src/events';
import { RUN_LOG, STEP_COMPLETE } from '../../src/events';
import { createPlan } from '../util';
import { JobCompletePayload } from '@openfn/engine-multi';

Expand Down Expand Up @@ -186,3 +186,70 @@ test('do not include dataclips in step:complete if output_dataclip is false', as
} as JobCompletePayload;
await handleStepComplete({ channel, state, options } as any, event);
});

test('do not include dataclips in step:complete if output_dataclip is too big', async (t) => {
const plan = createPlan();
const jobId = 'job-1';
const result = { data: new Array(1024 * 1024 + 1).fill('z').join('') };

const state = createRunState(plan);
state.activeJob = jobId;
state.activeStep = 'b';

const options = {
payloadLimitMb: 1,
};

const channel = mockChannel({
[RUN_LOG]: () => true,
[STEP_COMPLETE]: (evt: StepCompletePayload) => {
t.falsy(evt.output_dataclip_id);
t.falsy(evt.output_dataclip);
},
});

const event = {
jobId,
workflowId: plan.id,
state: result,
next: ['a'],
mem: { job: 1, system: 10 },
duration: 61,
thread_id: 'abc',
} as JobCompletePayload;

await handleStepComplete({ channel, state, options } as any, event);
});

test('log when the output_dataclip is too big', async (t) => {
const plan = createPlan();
const jobId = 'job-1';
const result = { data: new Array(1024 * 1024 + 1).fill('z').join('') };

const state = createRunState(plan);
state.activeJob = jobId;
state.activeStep = 'b';

const options = {
payloadLimitMb: 1,
};

const channel = mockChannel({
[RUN_LOG]: (e) => {
t.regex(e.message[0], /dataclip too large/i);
},
[STEP_COMPLETE]: () => true,
});

const event = {
jobId,
workflowId: plan.id,
state: result,
next: ['a'],
mem: { job: 1, system: 10 },
duration: 61,
thread_id: 'abc',
} as JobCompletePayload;

await handleStepComplete({ channel, state, options } as any, event);
});
4 changes: 4 additions & 0 deletions packages/ws-worker/test/util/convert-lightning-plan.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,8 @@ test('convert a single job with options', (t) => {
options: {
sanitize: 'obfuscate',
run_timeout_ms: 10,
run_memory_limit_mb: 500,
payload_memory_limit_mb: 20,
},
};
const { plan, options } = convertPlan(run as LightningPlan);
Expand All @@ -102,6 +104,8 @@ test('convert a single job with options', (t) => {
});
t.deepEqual(options, {
runTimeoutMs: 10,
memoryLimitMb: 500,
payloadLimitMb: 20,
sanitize: 'obfuscate',
});
});
Expand Down
52 changes: 52 additions & 0 deletions packages/ws-worker/test/util/ensure-payload-size.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
import test from 'ava';
import ensurePayloadSize from '../../src/util/ensure-payload-size';

const mb = (bytes: number) => bytes / 1024 / 1024;

test('throw limit 0, payload 1 byte', (t) => {
t.throws(() => ensurePayloadSize('x', 0), {
name: 'PAYLOAD_TOO_LARGE',
});
});

test('ok for limit 1byte, payload 1 byte', (t) => {
t.notThrows(() => ensurePayloadSize('x', mb(1)));
});

test('throw for limit 1byte, payload 2 bytes', (t) => {
t.throws(() => ensurePayloadSize('xy', mb(1)), {
name: 'PAYLOAD_TOO_LARGE',
});
});

test('ok for short string, limit 1mb', (t) => {
t.notThrows(() => ensurePayloadSize('hello world', 1));
});

test('ok for 1mb string, limit 1mb', (t) => {
const str = new Array(1024 * 1024).fill('z').join('');
t.notThrows(() => ensurePayloadSize(str, 1));
});

test('throw for 1mb string + 1 byte, limit 1mb', (t) => {
const str = new Array(1024 * 1024 + 1).fill('z').join('');
t.throws(() => ensurePayloadSize(str, 1), {
name: 'PAYLOAD_TOO_LARGE',
});
});

test('ok if no limit', (t) => {
const str = new Array(1024 * 1024 + 1).fill('z').join('');
t.notThrows(() => ensurePayloadSize(str));
});

test('error shape', (t) => {
try {
const str = new Array(1024 * 1024 + 1).fill('z').join('');
ensurePayloadSize(str, 1);
} catch (e: any) {
t.is(e.severity, 'kill');
t.is(e.name, 'PAYLOAD_TOO_LARGE');
t.is(e.message, 'The payload exceeded the size limit of 1mb');
}
});

0 comments on commit 116a554

Please sign in to comment.