Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Worker: limit payload size #740

Merged
merged 15 commits into from
Jul 31, 2024
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/sweet-bats-know.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@openfn/ws-worker': minor
---

Allow a payload limit to be set for large dataclips and logs
122 changes: 122 additions & 0 deletions integration-tests/worker/test/integration.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -787,5 +787,127 @@ test.serial('set a timeout on a run', (t) => {
});
});

test.serial('set a default payload limit on the worker', (t) => {
return new Promise(async (done) => {
if (!worker.destroyed) {
await worker.destroy();
}

({ worker } = await initWorker(
lightningPort,
{
maxWorkers: 1,
// use the dummy repo to remove autoinstall
repoDir: path.resolve('./dummy-repo'),
},
{
payloadLimitMb: 0,
}
));

const run = {
id: crypto.randomUUID(),
jobs: [
{
adaptor: '@openfn/[email protected]',
body: `fn((s) => ({ data: 'aaaa' }))`,
},
],
};

lightning.once('step:complete', (evt) => {
const { reason, output_dataclip_id, output_dataclip } = evt.payload;
t.is(reason, 'success');
t.falsy(output_dataclip_id);
t.falsy(output_dataclip);

done();
});

lightning.enqueueRun(run);
});
});

test.serial('override the worker payload through run options', (t) => {
return new Promise(async (done) => {
if (!worker.destroyed) {
await worker.destroy();
}

({ worker } = await initWorker(
lightningPort,
{
maxWorkers: 1,
// use the dummy repo to remove autoinstall
repoDir: path.resolve('./dummy-repo'),
},
{ payloadLimitMb: 0 }
));

const run = {
id: crypto.randomUUID(),
jobs: [
{
adaptor: '@openfn/[email protected]',
body: `fn((s) => ({ data: 'aaaa' }))`,
},
],
options: {
payload_memory_limit_mb: 100,
},
};

lightning.once('step:complete', (evt) => {
const { reason, output_dataclip_id, output_dataclip } = evt.payload;
t.is(reason, 'success');
t.truthy(output_dataclip_id);
t.deepEqual(output_dataclip, JSON.stringify({ data: 'aaaa' }));

done();
});

lightning.enqueueRun(run);
});
});

test.serial('Redact logs which exceed the payload limit', (t) => {
return new Promise(async (done) => {
if (!worker.destroyed) {
await worker.destroy();
}

({ worker } = await initWorker(lightningPort, {
maxWorkers: 1,
// use the dummy repo to remove autoinstall
repoDir: path.resolve('./dummy-repo'),
}));

const run = {
id: crypto.randomUUID(),
jobs: [
{
adaptor: '@openfn/[email protected]',
body: `fn((s) => { console.log('a'); return s;})`,
},
],
options: {
payload_memory_limit_mb: 0,
},
};

lightning.on('run:log', (evt) => {
if (evt.payload.source === 'JOB') {
t.regex(evt.payload.message[0], /redacted/i);
}
});

lightning.enqueueRun(run);

lightning.once('run:complete', () => {
done();
});
});
});

// REMEMBER the default worker was destroyed at this point!
// If you want to use a worker, you'll have to create your own
5 changes: 3 additions & 2 deletions packages/lexicon/lightning.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,8 @@ export type LightningPlanOptions = {
start?: StepId;
output_dataclips?: boolean;

// future options
run_memory_limit?: number
run_memory_limit_mb?: number;
payload_memory_limit_mb?: number;
};

/**
Expand Down Expand Up @@ -178,6 +178,7 @@ export type StepCompletePayload = ExitReason & {
step_id: string;
output_dataclip?: string;
output_dataclip_id?: string;
output_dataclip_error?: 'DATACLIP_TOO_LARGE';
thread_id?: string;
mem: {
job: number;
Expand Down
32 changes: 23 additions & 9 deletions packages/ws-worker/src/api/execute.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import {
getWithReply,
createRunState,
throttle as createThrottle,
stringify,
} from '../util';
import {
RUN_COMPLETE,
Expand All @@ -25,6 +26,7 @@ import handleRunError from '../events/run-error';

import type { Channel, RunState, JSONLog } from '../types';
import { WorkerRunOptions } from '../util/convert-lightning-plan';
import ensurePayloadSize from '../util/ensure-payload-size';

const enc = new TextDecoder('utf-8');

Expand Down Expand Up @@ -210,20 +212,32 @@ export function onJobError(context: Context, event: any) {
}
}

export function onJobLog({ channel, state }: Context, event: JSONLog) {
export function onJobLog({ channel, state, options }: Context, event: JSONLog) {
const timeInMicroseconds = BigInt(event.time) / BigInt(1e3);

// lightning-friendly log object
const log: RunLogPayload = {
run_id: state.plan.id!,
let message = event.message;
try {
// The message body, the actual thing that is logged,
// may be always encoded into a string
// may be encoded into a string
// Parse it here before sending on to lightning
// TODO this needs optimising!
message:
typeof event.message === 'string'
? JSON.parse(event.message)
: event.message,
if (typeof event.message === 'string') {
ensurePayloadSize(event.message, options?.payloadLimitMb);
message = JSON.parse(message);
} else if (event.message) {
const payload = stringify(event.message);
ensurePayloadSize(payload, options?.payloadLimitMb);
}
} catch (e) {
message = [
`(Log message redacted: exceeds ${options.payloadLimitMb}mb memory limit)`,
];
}

// lightning-friendly log object
const log: RunLogPayload = {
run_id: state.plan.id!,
message: message,
source: event.name,
level: event.level,
timestamp: timeInMicroseconds.toString(),
Expand Down
11 changes: 10 additions & 1 deletion packages/ws-worker/src/events/run-start.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ export default async function onRunStart(
context: Context,
event: WorkflowStartPayload
) {
const { channel, state } = context;
const { channel, state, options = {} } = context;
// Cheat on the timestamp time to make sure this is the first thing in the log
const time = (timestamp() - BigInt(10e6)).toString();

Expand All @@ -36,6 +36,15 @@ export default async function onRunStart(

await sendEvent<RunStartPayload>(channel, RUN_START, { versions });

if ('payloadLimitMb' in options) {
await onJobLog(versionLogContext, {
time,
message: [`Payload limit: ${options.payloadLimitMb}mb`],
level: 'info',
name: 'RTE',
});
}

const versionMessage = calculateVersionString(versions);

await onJobLog(versionLogContext, {
Expand Down
51 changes: 34 additions & 17 deletions packages/ws-worker/src/events/step-complete.ts
Original file line number Diff line number Diff line change
@@ -1,18 +1,21 @@
import crypto from 'node:crypto';
import type { StepCompletePayload } from '@openfn/lexicon/lightning';
import type { JobCompletePayload } from '@openfn/engine-multi';
import { timestamp } from '@openfn/logger';

import { STEP_COMPLETE } from '../events';
import { stringify } from '../util';
import { calculateJobExitReason } from '../api/reasons';
import { sendEvent, Context } from '../api/execute';
import { sendEvent, onJobLog, Context } from '../api/execute';
import ensurePayloadSize from '../util/ensure-payload-size';

export default function onStepComplete(
{ channel, state, options }: Context,
export default async function onStepComplete(
context: Context,
event: JobCompletePayload,
// TODO this isn't terribly graceful, but accept an error for crashes
error?: any
) {
const { channel, state, options } = context;
const dataclipId = crypto.randomUUID();

const step_id = state.activeStep as string;
Expand Down Expand Up @@ -41,30 +44,44 @@ export default function onStepComplete(
state.inputDataclips[nextJobId] = dataclipId;
});

const { reason, error_message, error_type } = calculateJobExitReason(
job_id,
event.state,
error
);
state.reasons[job_id] = { reason, error_message, error_type };

const evt = {
step_id,
job_id,
output_dataclip_id: dataclipId,

reason,
error_message,
error_type,

mem: event.mem,
duration: event.duration,
thread_id: event.threadId,
} as StepCompletePayload;

if (!options || options.outputDataclips !== false) {
evt.output_dataclip = stringify(outputState);
try {
if (!options || options.outputDataclips !== false) {
const payload = stringify(outputState);
ensurePayloadSize(payload, options?.payloadLimitMb);

// Write the dataclip if it's not too big
evt.output_dataclip = payload;
}
evt.output_dataclip_id = dataclipId;
} catch (e) {
evt.output_dataclip_error = 'DATACLIP_TOO_LARGE';

const time = (timestamp() - BigInt(10e6)).toString();
// If the dataclip is too big, return the step without it
// (the workflow will carry on internally)
await onJobLog(context, {
time,
message: [
'Dataclip too large. This dataclip will not be sent back to lighting.',
],
level: 'info',
name: 'R/T',
});
}

const reason = calculateJobExitReason(job_id, event.state, error);
state.reasons[job_id] = reason;

Object.assign(evt, reason);

return sendEvent<StepCompletePayload>(channel, STEP_COMPLETE, evt);
}
11 changes: 9 additions & 2 deletions packages/ws-worker/src/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ export type ServerOptions = {
min?: number;
max?: number;
};

payloadLimitMb?: number; // max memory limit for socket payload (ie, step:complete, log)
};

// this is the server/koa API
Expand Down Expand Up @@ -164,7 +166,7 @@ function createServer(engine: RuntimeEngine, options: ServerOptions = {}) {

router.get('/', healthcheck);

app.options = options || {};
app.options = options;

// TODO this probably needs to move into ./api/ somewhere
app.execute = async ({ id, token }: ClaimRun) => {
Expand All @@ -174,10 +176,15 @@ function createServer(engine: RuntimeEngine, options: ServerOptions = {}) {
const {
channel: runChannel,
plan,
options,
options = {},
input,
} = await joinRunChannel(app.socket, token, id, logger);

// Default the payload limit if it's not otherwise set on the run options
if (!('payloadLimitMb' in options)) {
options.payloadLimitMb = app.options.payloadLimitMb;
}

// Callback to be triggered when the work is done (including errors)
const onFinish = () => {
logger.debug(`workflow ${id} complete: releasing worker`);
Expand Down
1 change: 1 addition & 0 deletions packages/ws-worker/src/start.ts
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ function engineReady(engine: any) {
max: maxBackoff,
},
maxWorkflows: args.capacity,
payloadLimitMb: args.payloadMemory,
};

if (args.lightningPublicKey) {
Expand Down
8 changes: 8 additions & 0 deletions packages/ws-worker/src/util/cli.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ type Args = {
backoff: string;
capacity?: number;
runMemory?: number;
payloadMemory?: number;
statePropsToRemove?: string[];
maxRunDurationSeconds: number;
};
Expand Down Expand Up @@ -48,6 +49,7 @@ export default function parseArgs(argv: string[]): Args {
WORKER_LIGHTNING_PUBLIC_KEY,
WORKER_LIGHTNING_SERVICE_URL,
WORKER_LOG_LEVEL,
WORKER_MAX_PAYLOAD_MEMORY_MB,
WORKER_MAX_RUN_DURATION_SECONDS,
WORKER_MAX_RUN_MEMORY_MB,
WORKER_PORT,
Expand Down Expand Up @@ -114,6 +116,11 @@ export default function parseArgs(argv: string[]): Args {
'Maximum memory allocated to a single run, in mb. Env: WORKER_MAX_RUN_MEMORY_MB',
type: 'number',
})
.option('payload-memory', {
description:
'Maximum memory allocated to a single run, in mb. Env: WORKER_MAX_PAYLOAD_MEMORY_MB',
type: 'number',
})
.option('max-run-duration-seconds', {
alias: 't',
description:
Expand Down Expand Up @@ -146,6 +153,7 @@ export default function parseArgs(argv: string[]): Args {
['configuration', 'response']
),
runMemory: setArg(args.runMemory, WORKER_MAX_RUN_MEMORY_MB, 500),
payloadMemory: setArg(args.payloadMemory, WORKER_MAX_PAYLOAD_MEMORY_MB, 10),
maxRunDurationSeconds: setArg(
args.maxRunDurationSeconds,
WORKER_MAX_RUN_DURATION_SECONDS,
Expand Down
Loading