diff --git a/integration-tests/worker/CHANGELOG.md b/integration-tests/worker/CHANGELOG.md index ef7e3d44e..654af37c9 100644 --- a/integration-tests/worker/CHANGELOG.md +++ b/integration-tests/worker/CHANGELOG.md @@ -1,5 +1,12 @@ # @openfn/integration-tests-worker +## 1.0.53 + +### Patch Changes + +- Updated dependencies [f363254] + - @openfn/ws-worker@1.5.0 + ## 1.0.52 ### Patch Changes diff --git a/integration-tests/worker/package.json b/integration-tests/worker/package.json index e0ab1c019..d895d6be5 100644 --- a/integration-tests/worker/package.json +++ b/integration-tests/worker/package.json @@ -1,7 +1,7 @@ { "name": "@openfn/integration-tests-worker", "private": true, - "version": "1.0.52", + "version": "1.0.53", "description": "Lightning WOrker integration tests", "author": "Open Function Group ", "license": "ISC", diff --git a/integration-tests/worker/test/integration.test.ts b/integration-tests/worker/test/integration.test.ts index 18b487845..4a5e0b2b5 100644 --- a/integration-tests/worker/test/integration.test.ts +++ b/integration-tests/worker/test/integration.test.ts @@ -787,5 +787,127 @@ test.serial('set a timeout on a run', (t) => { }); }); +test.serial('set a default payload limit on the worker', (t) => { + return new Promise(async (done) => { + if (!worker.destroyed) { + await worker.destroy(); + } + + ({ worker } = await initWorker( + lightningPort, + { + maxWorkers: 1, + // use the dummy repo to remove autoinstall + repoDir: path.resolve('./dummy-repo'), + }, + { + payloadLimitMb: 0, + } + )); + + const run = { + id: crypto.randomUUID(), + jobs: [ + { + adaptor: '@openfn/test-adaptor@1.0.0', + body: `fn((s) => ({ data: 'aaaa' }))`, + }, + ], + }; + + lightning.once('step:complete', (evt) => { + const { reason, output_dataclip_id, output_dataclip } = evt.payload; + t.is(reason, 'success'); + t.falsy(output_dataclip_id); + t.falsy(output_dataclip); + + done(); + }); + + lightning.enqueueRun(run); + }); +}); + +test.serial('override the worker payload through run options', (t) => { + return new Promise(async (done) => { + if (!worker.destroyed) { + await worker.destroy(); + } + + ({ worker } = await initWorker( + lightningPort, + { + maxWorkers: 1, + // use the dummy repo to remove autoinstall + repoDir: path.resolve('./dummy-repo'), + }, + { payloadLimitMb: 0 } + )); + + const run = { + id: crypto.randomUUID(), + jobs: [ + { + adaptor: '@openfn/test-adaptor@1.0.0', + body: `fn((s) => ({ data: 'aaaa' }))`, + }, + ], + options: { + payload_limit_mb: 100, + }, + }; + + lightning.once('step:complete', (evt) => { + const { reason, output_dataclip_id, output_dataclip } = evt.payload; + t.is(reason, 'success'); + t.truthy(output_dataclip_id); + t.deepEqual(output_dataclip, JSON.stringify({ data: 'aaaa' })); + + done(); + }); + + lightning.enqueueRun(run); + }); +}); + +test.serial('Redact logs which exceed the payload limit', (t) => { + return new Promise(async (done) => { + if (!worker.destroyed) { + await worker.destroy(); + } + + ({ worker } = await initWorker(lightningPort, { + maxWorkers: 1, + // use the dummy repo to remove autoinstall + repoDir: path.resolve('./dummy-repo'), + })); + + const run = { + id: crypto.randomUUID(), + jobs: [ + { + adaptor: '@openfn/test-adaptor@1.0.0', + body: `fn((s) => { console.log('a'); return s;})`, + }, + ], + options: { + payload_limit_mb: 0, + }, + }; + + lightning.on('run:log', (evt) => { + if (evt.payload.source === 'JOB') { + t.regex(evt.payload.message[0], /redacted/i); + } + }); + + lightning.enqueueRun(run); + + lightning.once('run:complete', () => { + done(); + }); + }); +}); + // REMEMBER the default worker was destroyed at this point! // If you want to use a worker, you'll have to create your own diff --git a/packages/lexicon/lightning.d.ts b/packages/lexicon/lightning.d.ts index c6e3aa13a..744ec732a 100644 --- a/packages/lexicon/lightning.d.ts +++ b/packages/lexicon/lightning.d.ts @@ -46,8 +46,8 @@ export type LightningPlanOptions = { start?: StepId; output_dataclips?: boolean; - // future options - run_memory_limit?: number + run_memory_limit_mb?: number; + payload_limit_mb?: number; }; /** @@ -178,6 +178,7 @@ export type StepCompletePayload = ExitReason & { step_id: string; output_dataclip?: string; output_dataclip_id?: string; + output_dataclip_error?: 'DATACLIP_TOO_LARGE'; thread_id?: string; mem: { job: number; diff --git a/packages/ws-worker/CHANGELOG.md b/packages/ws-worker/CHANGELOG.md index 117e0a1ad..9366c3ff0 100644 --- a/packages/ws-worker/CHANGELOG.md +++ b/packages/ws-worker/CHANGELOG.md @@ -1,5 +1,11 @@ # ws-worker +## 1.5.0 + +### Minor Changes + +- f363254: Allow a payload limit to be set for large dataclips and logs (payload_limit_mb) + ## 1.4.1 ### Patch Changes diff --git a/packages/ws-worker/package.json b/packages/ws-worker/package.json index 45bcd17f3..06f78dcd5 100644 --- a/packages/ws-worker/package.json +++ b/packages/ws-worker/package.json @@ -1,6 +1,6 @@ { "name": "@openfn/ws-worker", - "version": "1.4.1", + "version": "1.5.0", "description": "A Websocket Worker to connect Lightning to a Runtime Engine", "main": "dist/index.js", "type": "module", diff --git a/packages/ws-worker/src/api/execute.ts b/packages/ws-worker/src/api/execute.ts index 6cac55c12..1dd3a6538 100644 --- a/packages/ws-worker/src/api/execute.ts +++ b/packages/ws-worker/src/api/execute.ts @@ -7,6 +7,7 @@ import { getWithReply, createRunState, throttle as createThrottle, + stringify, } from '../util'; import { RUN_COMPLETE, @@ -25,6 +26,7 @@ import handleRunError from '../events/run-error'; import type { Channel, RunState, JSONLog } from '../types'; import { WorkerRunOptions } from '../util/convert-lightning-plan'; +import ensurePayloadSize from '../util/ensure-payload-size'; const enc = new TextDecoder('utf-8'); @@ -210,20 +212,32 @@ export function onJobError(context: Context, event: any) { } } -export function onJobLog({ channel, state }: Context, event: JSONLog) { +export function onJobLog({ channel, state, options }: Context, event: JSONLog) { const timeInMicroseconds = BigInt(event.time) / BigInt(1e3); - // lightning-friendly log object - const log: RunLogPayload = { - run_id: state.plan.id!, + let message = event.message; + try { // The message body, the actual thing that is logged, - // may be always encoded into a string + // may be encoded into a string // Parse it here before sending on to lightning // TODO this needs optimising! - message: - typeof event.message === 'string' - ? JSON.parse(event.message) - : event.message, + if (typeof event.message === 'string') { + ensurePayloadSize(event.message, options?.payloadLimitMb); + message = JSON.parse(message); + } else if (event.message) { + const payload = stringify(event.message); + ensurePayloadSize(payload, options?.payloadLimitMb); + } + } catch (e) { + message = [ + `(Log message redacted: exceeds ${options.payloadLimitMb}mb memory limit)`, + ]; + } + + // lightning-friendly log object + const log: RunLogPayload = { + run_id: state.plan.id!, + message: message, source: event.name, level: event.level, timestamp: timeInMicroseconds.toString(), diff --git a/packages/ws-worker/src/events/run-start.ts b/packages/ws-worker/src/events/run-start.ts index 8845f5446..e57a1969a 100644 --- a/packages/ws-worker/src/events/run-start.ts +++ b/packages/ws-worker/src/events/run-start.ts @@ -12,7 +12,7 @@ export default async function onRunStart( context: Context, event: WorkflowStartPayload ) { - const { channel, state } = context; + const { channel, state, options = {} } = context; // Cheat on the timestamp time to make sure this is the first thing in the log const time = (timestamp() - BigInt(10e6)).toString(); @@ -36,6 +36,15 @@ export default async function onRunStart( await sendEvent(channel, RUN_START, { versions }); + if ('payloadLimitMb' in options) { + await onJobLog(versionLogContext, { + time, + message: [`Payload limit: ${options.payloadLimitMb}mb`], + level: 'info', + name: 'RTE', + }); + } + const versionMessage = calculateVersionString(versions); await onJobLog(versionLogContext, { diff --git a/packages/ws-worker/src/events/step-complete.ts b/packages/ws-worker/src/events/step-complete.ts index a542a5944..988e3f3d4 100644 --- a/packages/ws-worker/src/events/step-complete.ts +++ b/packages/ws-worker/src/events/step-complete.ts @@ -1,18 +1,21 @@ import crypto from 'node:crypto'; import type { StepCompletePayload } from '@openfn/lexicon/lightning'; import type { JobCompletePayload } from '@openfn/engine-multi'; +import { timestamp } from '@openfn/logger'; import { STEP_COMPLETE } from '../events'; import { stringify } from '../util'; import { calculateJobExitReason } from '../api/reasons'; -import { sendEvent, Context } from '../api/execute'; +import { sendEvent, onJobLog, Context } from '../api/execute'; +import ensurePayloadSize from '../util/ensure-payload-size'; -export default function onStepComplete( - { channel, state, options }: Context, +export default async function onStepComplete( + context: Context, event: JobCompletePayload, // TODO this isn't terribly graceful, but accept an error for crashes error?: any ) { + const { channel, state, options } = context; const dataclipId = crypto.randomUUID(); const step_id = state.activeStep as string; @@ -41,30 +44,44 @@ export default function onStepComplete( state.inputDataclips[nextJobId] = dataclipId; }); - const { reason, error_message, error_type } = calculateJobExitReason( - job_id, - event.state, - error - ); - state.reasons[job_id] = { reason, error_message, error_type }; - const evt = { step_id, job_id, - output_dataclip_id: dataclipId, - - reason, - error_message, - error_type, mem: event.mem, duration: event.duration, thread_id: event.threadId, } as StepCompletePayload; - if (!options || options.outputDataclips !== false) { - evt.output_dataclip = stringify(outputState); + try { + if (!options || options.outputDataclips !== false) { + const payload = stringify(outputState); + ensurePayloadSize(payload, options?.payloadLimitMb); + + // Write the dataclip if it's not too big + evt.output_dataclip = payload; + } + evt.output_dataclip_id = dataclipId; + } catch (e) { + evt.output_dataclip_error = 'DATACLIP_TOO_LARGE'; + + const time = (timestamp() - BigInt(10e6)).toString(); + // If the dataclip is too big, return the step without it + // (the workflow will carry on internally) + await onJobLog(context, { + time, + message: [ + 'Dataclip too large. This dataclip will not be sent back to lighting.', + ], + level: 'info', + name: 'R/T', + }); } + const reason = calculateJobExitReason(job_id, event.state, error); + state.reasons[job_id] = reason; + + Object.assign(evt, reason); + return sendEvent(channel, STEP_COMPLETE, evt); } diff --git a/packages/ws-worker/src/server.ts b/packages/ws-worker/src/server.ts index c4c0381e8..da879a779 100644 --- a/packages/ws-worker/src/server.ts +++ b/packages/ws-worker/src/server.ts @@ -33,6 +33,8 @@ export type ServerOptions = { min?: number; max?: number; }; + + payloadLimitMb?: number; // max memory limit for socket payload (ie, step:complete, log) }; // this is the server/koa API @@ -164,7 +166,7 @@ function createServer(engine: RuntimeEngine, options: ServerOptions = {}) { router.get('/', healthcheck); - app.options = options || {}; + app.options = options; // TODO this probably needs to move into ./api/ somewhere app.execute = async ({ id, token }: ClaimRun) => { @@ -174,10 +176,15 @@ function createServer(engine: RuntimeEngine, options: ServerOptions = {}) { const { channel: runChannel, plan, - options, + options = {}, input, } = await joinRunChannel(app.socket, token, id, logger); + // Default the payload limit if it's not otherwise set on the run options + if (!('payloadLimitMb' in options)) { + options.payloadLimitMb = app.options.payloadLimitMb; + } + // Callback to be triggered when the work is done (including errors) const onFinish = () => { logger.debug(`workflow ${id} complete: releasing worker`); diff --git a/packages/ws-worker/src/start.ts b/packages/ws-worker/src/start.ts index 723fd0828..c1c3f987b 100644 --- a/packages/ws-worker/src/start.ts +++ b/packages/ws-worker/src/start.ts @@ -39,6 +39,7 @@ function engineReady(engine: any) { max: maxBackoff, }, maxWorkflows: args.capacity, + payloadLimitMb: args.payloadMemory, }; if (args.lightningPublicKey) { diff --git a/packages/ws-worker/src/util/cli.ts b/packages/ws-worker/src/util/cli.ts index 2143334ff..1a634d961 100644 --- a/packages/ws-worker/src/util/cli.ts +++ b/packages/ws-worker/src/util/cli.ts @@ -15,6 +15,7 @@ type Args = { backoff: string; capacity?: number; runMemory?: number; + payloadMemory?: number; statePropsToRemove?: string[]; maxRunDurationSeconds: number; }; @@ -48,6 +49,7 @@ export default function parseArgs(argv: string[]): Args { WORKER_LIGHTNING_PUBLIC_KEY, WORKER_LIGHTNING_SERVICE_URL, WORKER_LOG_LEVEL, + WORKER_MAX_PAYLOAD_MB, WORKER_MAX_RUN_DURATION_SECONDS, WORKER_MAX_RUN_MEMORY_MB, WORKER_PORT, @@ -114,6 +116,11 @@ export default function parseArgs(argv: string[]): Args { 'Maximum memory allocated to a single run, in mb. Env: WORKER_MAX_RUN_MEMORY_MB', type: 'number', }) + .option('payload-memory', { + description: + 'Maximum memory allocated to a single run, in mb. Env: WORKER_MAX_PAYLOAD_MB', + type: 'number', + }) .option('max-run-duration-seconds', { alias: 't', description: @@ -146,6 +153,7 @@ export default function parseArgs(argv: string[]): Args { ['configuration', 'response'] ), runMemory: setArg(args.runMemory, WORKER_MAX_RUN_MEMORY_MB, 500), + payloadMemory: setArg(args.payloadMemory, WORKER_MAX_PAYLOAD_MB, 10), maxRunDurationSeconds: setArg( args.maxRunDurationSeconds, WORKER_MAX_RUN_DURATION_SECONDS, diff --git a/packages/ws-worker/src/util/convert-lightning-plan.ts b/packages/ws-worker/src/util/convert-lightning-plan.ts index 4f194eee6..df8d4b96c 100644 --- a/packages/ws-worker/src/util/convert-lightning-plan.ts +++ b/packages/ws-worker/src/util/convert-lightning-plan.ts @@ -43,6 +43,7 @@ const mapTriggerEdgeCondition = (edge: Edge) => { export type WorkerRunOptions = ExecuteOptions & { // Defaults to true - must be explicity false to stop dataclips being sent outputDataclips?: boolean; + payloadLimitMb?: number; }; export default ( @@ -53,15 +54,20 @@ export default ( // But some need to get passed down into the engine's options const engineOpts: WorkerRunOptions = {}; - if (run.options) { - if (run.options.run_timeout_ms) { + if ('run_timeout_ms' in run.options) { engineOpts.runTimeoutMs = run.options.run_timeout_ms; } - if (run.options.sanitize) { + if ('payload_limit_mb' in run.options) { + engineOpts.payloadLimitMb = run.options.payload_limit_mb; + } + if ('run_memory_limit_mb' in run.options) { + engineOpts.memoryLimitMb = run.options.run_memory_limit_mb; + } + if ('sanitize' in run.options) { engineOpts.sanitize = run.options.sanitize; } - if (run.options.hasOwnProperty('output_dataclips')) { + if ('output_dataclips' in run.options) { engineOpts.outputDataclips = run.options.output_dataclips; } } diff --git a/packages/ws-worker/src/util/ensure-payload-size.ts b/packages/ws-worker/src/util/ensure-payload-size.ts new file mode 100644 index 000000000..b5cf6d194 --- /dev/null +++ b/packages/ws-worker/src/util/ensure-payload-size.ts @@ -0,0 +1,16 @@ +export default (payload: string, limit_mb?: number) => { + // @ts-ignore + if (!isNaN(limit_mb)) { + const limit = limit_mb as number; + const size_bytes = Buffer.byteLength(payload, 'utf8'); + const size_mb = size_bytes / 1024 / 1024; + if (size_mb > limit) { + const e = new Error(); + // @ts-ignore + e.severity = 'kill'; + e.name = 'PAYLOAD_TOO_LARGE'; + e.message = `The payload exceeded the size limit of ${limit}mb`; + throw e; + } + } +}; diff --git a/packages/ws-worker/test/api/execute.test.ts b/packages/ws-worker/test/api/execute.test.ts index 818b2c08f..5dc77c72f 100644 --- a/packages/ws-worker/test/api/execute.test.ts +++ b/packages/ws-worker/test/api/execute.test.ts @@ -63,7 +63,7 @@ test('send event should throw if an event errors', async (t) => { }); }); -test('jobLog should should send a log event outside a run', async (t) => { +test('jobLog should send a log event outside a run', async (t) => { const plan = { id: 'run-1' }; const log: JSONLog = { @@ -100,7 +100,37 @@ test('jobLog should should send a log event outside a run', async (t) => { await onJobLog({ channel, state } as any, log); }); -test('jobLog should should send a log event inside a run', async (t) => { +test('jobLog should redact log messages which are too large', async (t) => { + const plan = { id: 'run-1' }; + const jobId = 'job-1'; + + const log: JSONLog = { + name: 'R/T', + level: 'info', + time: getBigIntTimestamp(), + message: JSON.stringify(new Array(1024 * 1024 + 1).fill('z').join('')), + }; + + const state = { + plan, + activeJob: jobId, + activeStep: 'b', + } as RunState; + + const channel = mockChannel({ + [RUN_LOG]: (evt) => { + t.regex(evt.message[0], /redacted/i); + }, + }); + + const options = { + payloadLimitMb: 1, + }; + + await onJobLog({ channel, state, options } as any, log); +}); + +test('jobLog should send a log event inside a run', async (t) => { const plan = { id: 'run-1' }; const jobId = 'job-1'; diff --git a/packages/ws-worker/test/events/step-complete.test.ts b/packages/ws-worker/test/events/step-complete.test.ts index 4972eb0f7..979eab449 100644 --- a/packages/ws-worker/test/events/step-complete.test.ts +++ b/packages/ws-worker/test/events/step-complete.test.ts @@ -4,7 +4,7 @@ import type { StepCompletePayload } from '@openfn/lexicon/lightning'; import handleStepComplete from '../../src/events/step-complete'; import { mockChannel } from '../../src/mock/sockets'; import { createRunState } from '../../src/util'; -import { STEP_COMPLETE } from '../../src/events'; +import { RUN_LOG, STEP_COMPLETE } from '../../src/events'; import { createPlan } from '../util'; import { JobCompletePayload } from '@openfn/engine-multi'; @@ -186,3 +186,71 @@ test('do not include dataclips in step:complete if output_dataclip is false', as } as JobCompletePayload; await handleStepComplete({ channel, state, options } as any, event); }); + +test('do not include dataclips in step:complete if output_dataclip is too big', async (t) => { + const plan = createPlan(); + const jobId = 'job-1'; + const result = { data: new Array(1024 * 1024 + 1).fill('z').join('') }; + + const state = createRunState(plan); + state.activeJob = jobId; + state.activeStep = 'b'; + + const options = { + payloadLimitMb: 1, + }; + + const channel = mockChannel({ + [RUN_LOG]: () => true, + [STEP_COMPLETE]: (evt: StepCompletePayload) => { + t.falsy(evt.output_dataclip_id); + t.falsy(evt.output_dataclip); + t.is(evt.output_dataclip_error, 'DATACLIP_TOO_LARGE'); + }, + }); + + const event = { + jobId, + workflowId: plan.id, + state: result, + next: ['a'], + mem: { job: 1, system: 10 }, + duration: 61, + thread_id: 'abc', + } as JobCompletePayload; + + await handleStepComplete({ channel, state, options } as any, event); +}); + +test('log when the output_dataclip is too big', async (t) => { + const plan = createPlan(); + const jobId = 'job-1'; + const result = { data: new Array(1024 * 1024 + 1).fill('z').join('') }; + + const state = createRunState(plan); + state.activeJob = jobId; + state.activeStep = 'b'; + + const options = { + payloadLimitMb: 1, + }; + + const channel = mockChannel({ + [RUN_LOG]: (e) => { + t.regex(e.message[0], /dataclip too large/i); + }, + [STEP_COMPLETE]: () => true, + }); + + const event = { + jobId, + workflowId: plan.id, + state: result, + next: ['a'], + mem: { job: 1, system: 10 }, + duration: 61, + thread_id: 'abc', + } as JobCompletePayload; + + await handleStepComplete({ channel, state, options } as any, event); +}); diff --git a/packages/ws-worker/test/util/convert-lightning-plan.test.ts b/packages/ws-worker/test/util/convert-lightning-plan.test.ts index 6624e1bf1..0d3ef653a 100644 --- a/packages/ws-worker/test/util/convert-lightning-plan.test.ts +++ b/packages/ws-worker/test/util/convert-lightning-plan.test.ts @@ -89,6 +89,8 @@ test('convert a single job with options', (t) => { options: { sanitize: 'obfuscate', run_timeout_ms: 10, + run_memory_limit_mb: 500, + payload_limit_mb: 20, }, }; const { plan, options } = convertPlan(run as LightningPlan); @@ -102,6 +104,8 @@ test('convert a single job with options', (t) => { }); t.deepEqual(options, { runTimeoutMs: 10, + memoryLimitMb: 500, + payloadLimitMb: 20, sanitize: 'obfuscate', }); }); diff --git a/packages/ws-worker/test/util/ensure-payload-size.test.ts b/packages/ws-worker/test/util/ensure-payload-size.test.ts new file mode 100644 index 000000000..20ac97418 --- /dev/null +++ b/packages/ws-worker/test/util/ensure-payload-size.test.ts @@ -0,0 +1,52 @@ +import test from 'ava'; +import ensurePayloadSize from '../../src/util/ensure-payload-size'; + +const mb = (bytes: number) => bytes / 1024 / 1024; + +test('throw limit 0, payload 1 byte', (t) => { + t.throws(() => ensurePayloadSize('x', 0), { + name: 'PAYLOAD_TOO_LARGE', + }); +}); + +test('ok for limit 1byte, payload 1 byte', (t) => { + t.notThrows(() => ensurePayloadSize('x', mb(1))); +}); + +test('throw for limit 1byte, payload 2 bytes', (t) => { + t.throws(() => ensurePayloadSize('xy', mb(1)), { + name: 'PAYLOAD_TOO_LARGE', + }); +}); + +test('ok for short string, limit 1mb', (t) => { + t.notThrows(() => ensurePayloadSize('hello world', 1)); +}); + +test('ok for 1mb string, limit 1mb', (t) => { + const str = new Array(1024 * 1024).fill('z').join(''); + t.notThrows(() => ensurePayloadSize(str, 1)); +}); + +test('throw for 1mb string + 1 byte, limit 1mb', (t) => { + const str = new Array(1024 * 1024 + 1).fill('z').join(''); + t.throws(() => ensurePayloadSize(str, 1), { + name: 'PAYLOAD_TOO_LARGE', + }); +}); + +test('ok if no limit', (t) => { + const str = new Array(1024 * 1024 + 1).fill('z').join(''); + t.notThrows(() => ensurePayloadSize(str)); +}); + +test('error shape', (t) => { + try { + const str = new Array(1024 * 1024 + 1).fill('z').join(''); + ensurePayloadSize(str, 1); + } catch (e: any) { + t.is(e.severity, 'kill'); + t.is(e.name, 'PAYLOAD_TOO_LARGE'); + t.is(e.message, 'The payload exceeded the size limit of 1mb'); + } +});