diff --git a/.github/pull_request_template.md b/.github/pull_request_template.md index 0573b5fbc..236cc1dec 100644 --- a/.github/pull_request_template.md +++ b/.github/pull_request_template.md @@ -18,6 +18,7 @@ List any considerations/cases/advice for testing/QA here. - [ ] I have performed a self-review of my code - [ ] I have added unit tests +- [ ] If this is a change to the Worker, does the API_VERSION need bumping? - [ ] Changesets have been added (if there are production code changes) ## Release branch checklist diff --git a/integration-tests/worker/CHANGELOG.md b/integration-tests/worker/CHANGELOG.md index 5e5e48f3f..354459683 100644 --- a/integration-tests/worker/CHANGELOG.md +++ b/integration-tests/worker/CHANGELOG.md @@ -1,5 +1,15 @@ # @openfn/integration-tests-worker +## 1.0.55 + +### Patch Changes + +- Updated dependencies [870a836] +- Updated dependencies [eaa3859] + - @openfn/engine-multi@1.2.2 + - @openfn/ws-worker@1.6.0 + - @openfn/lightning-mock@2.0.16 + ## 1.0.54 ### Patch Changes diff --git a/integration-tests/worker/package.json b/integration-tests/worker/package.json index 646d2b4c8..b7155b990 100644 --- a/integration-tests/worker/package.json +++ b/integration-tests/worker/package.json @@ -1,7 +1,7 @@ { "name": "@openfn/integration-tests-worker", "private": true, - "version": "1.0.54", + "version": "1.0.55", "description": "Lightning WOrker integration tests", "author": "Open Function Group ", "license": "ISC", diff --git a/integration-tests/worker/test/integration.test.ts b/integration-tests/worker/test/integration.test.ts index 4a5e0b2b5..57274819a 100644 --- a/integration-tests/worker/test/integration.test.ts +++ b/integration-tests/worker/test/integration.test.ts @@ -558,6 +558,47 @@ test.serial("Don't send job logs to stdout", (t) => { }); }); +test.serial('Include timestamps on basically everything', (t) => { + return new Promise(async (done) => { + const attempt = { + id: crypto.randomUUID(), + jobs: [ + { + adaptor: '@openfn/language-common@latest', + body: 'fn((s) => s)', + }, + ], + }; + + const timestamps = {}; + + const assertAllTimestamps = () => { + t.is(timestamps['run-start'].length, 16); + t.is(timestamps['run-complete'].length, 16); + t.is(timestamps['step-start'].length, 16); + t.is(timestamps['step-complete'].length, 16); + }; + + lightning.once('run:start', ({ payload }) => { + timestamps['run-start'] = payload.timestamp; + }); + lightning.once('step:start', ({ payload }) => { + timestamps['step-start'] = payload.timestamp; + }); + lightning.once('step:complete', ({ payload }) => { + timestamps['step-complete'] = payload.timestamp; + }); + lightning.once('run:complete', ({ payload }) => { + timestamps['run-complete'] = payload.timestamp; + assertAllTimestamps(); + + done(); + }); + + lightning.enqueueRun(attempt); + }); +}); + test.serial("Don't send adaptor logs to stdout", (t) => { return new Promise(async (done) => { // We have to create a new worker with a different repo for this one diff --git a/packages/engine-multi/CHANGELOG.md b/packages/engine-multi/CHANGELOG.md index 394e3f69f..9e256b0c7 100644 --- a/packages/engine-multi/CHANGELOG.md +++ b/packages/engine-multi/CHANGELOG.md @@ -1,5 +1,14 @@ # engine-multi +## 1.2.2 + +### Patch Changes + +- 870a836: Add high resolution timestamps to key events +- Updated dependencies [44f7f57] + - @openfn/lexicon@1.1.0 + - @openfn/runtime@1.4.1 + ## 1.2.1 ### Patch Changes diff --git a/packages/engine-multi/package.json b/packages/engine-multi/package.json index 6755ad6f4..fb6a2e5dd 100644 --- a/packages/engine-multi/package.json +++ b/packages/engine-multi/package.json @@ -1,6 +1,6 @@ { "name": "@openfn/engine-multi", - "version": "1.2.1", + "version": "1.2.2", "description": "Multi-process runtime engine", "main": "dist/index.js", "type": "module", diff --git a/packages/engine-multi/src/api/lifecycle.ts b/packages/engine-multi/src/api/lifecycle.ts index d6b52fd6f..d18fd3ece 100644 --- a/packages/engine-multi/src/api/lifecycle.ts +++ b/packages/engine-multi/src/api/lifecycle.ts @@ -2,6 +2,7 @@ import * as externalEvents from '../events'; import * as internalEvents from '../worker/events'; import type ExecutionContext from '../classes/ExecutionContext'; +import { timestamp } from '@openfn/logger'; // Log events from the inner thread will be logged to stdout // EXCEPT the keys listed here @@ -39,6 +40,7 @@ export const workflowStart = ( context.emit(externalEvents.WORKFLOW_START, { threadId, versions: context.versions, + time: timestamp(), }); }; @@ -70,6 +72,7 @@ export const workflowComplete = ( threadId, duration: state.duration, state: result, + time: timestamp(), }); }; @@ -82,6 +85,7 @@ export const jobStart = ( context.emit(externalEvents.JOB_START, { jobId, threadId, + time: timestamp(), }); }; @@ -98,6 +102,7 @@ export const jobComplete = ( jobId, next, mem, + time: timestamp(), }); }; diff --git a/packages/engine-multi/src/events.ts b/packages/engine-multi/src/events.ts index f32c5eaca..0e3e97fc2 100644 --- a/packages/engine-multi/src/events.ts +++ b/packages/engine-multi/src/events.ts @@ -51,11 +51,13 @@ interface ExternalEvent { export interface WorkflowStartPayload extends ExternalEvent { versions: Versions; + time: bigint; } export interface WorkflowCompletePayload extends ExternalEvent { state: any; duration: number; + time: bigint; } export interface WorkflowErrorPayload extends ExternalEvent { @@ -66,6 +68,7 @@ export interface WorkflowErrorPayload extends ExternalEvent { export interface JobStartPayload extends ExternalEvent { jobId: string; + time: bigint; } export interface JobCompletePayload extends ExternalEvent { @@ -73,6 +76,7 @@ export interface JobCompletePayload extends ExternalEvent { duration: number; state: any; // the result state next: string[]; // downstream jobs + time: bigint; mem: { job: number; system: number; diff --git a/packages/engine-multi/test/api/lifecycle.test.ts b/packages/engine-multi/test/api/lifecycle.test.ts index 05896c20a..3f7e90293 100644 --- a/packages/engine-multi/test/api/lifecycle.test.ts +++ b/packages/engine-multi/test/api/lifecycle.test.ts @@ -24,7 +24,7 @@ const createContext = (workflowId: string, state?: any) => options: {}, }); -test(`workflowStart: emits ${e.WORKFLOW_START}`, (t) => { +test(`workflowStart: emits ${e.WORKFLOW_START} with key fields`, (t) => { return new Promise((done) => { const workflowId = 'a'; @@ -39,6 +39,8 @@ test(`workflowStart: emits ${e.WORKFLOW_START}`, (t) => { t.truthy(evt.versions); t.is(evt.workflowId, workflowId); t.is(evt.threadId, '123'); + t.assert(evt.time > 0); + t.assert(typeof evt.time === 'bigint'); done(); }); @@ -68,7 +70,7 @@ test('onWorkflowStart: updates state', (t) => { test.todo('onWorkflowStart: logs'); test.todo('onWorkflowStart: throws if the workflow is already started'); -test(`workflowComplete: emits ${e.WORKFLOW_COMPLETE}`, (t) => { +test(`workflowComplete: emits ${e.WORKFLOW_COMPLETE} with key fields`, (t) => { return new Promise((done) => { const workflowId = 'a'; const result = { a: 777 }; @@ -89,6 +91,8 @@ test(`workflowComplete: emits ${e.WORKFLOW_COMPLETE}`, (t) => { context.on(e.WORKFLOW_COMPLETE, (evt) => { t.is(evt.workflowId, workflowId); t.deepEqual(evt.state, result); + t.assert(evt.time > 0); + t.assert(typeof evt.time === 'bigint'); t.assert(evt.duration > 0); done(); }); @@ -120,7 +124,7 @@ test('workflowComplete: updates state', (t) => { t.deepEqual(state.result, result); }); -test(`job-start: emits ${e.JOB_START}`, (t) => { +test(`job-start: emits ${e.JOB_START} with key fields`, (t) => { return new Promise((done) => { const workflowId = 'a'; @@ -142,6 +146,8 @@ test(`job-start: emits ${e.JOB_START}`, (t) => { t.is(evt.workflowId, workflowId); t.is(evt.threadId, '1'); t.is(evt.jobId, 'j'); + t.assert(evt.time > 0); + t.assert(typeof evt.time === 'bigint'); done(); }); @@ -149,7 +155,7 @@ test(`job-start: emits ${e.JOB_START}`, (t) => { }); }); -test(`job-complete: emits ${e.JOB_COMPLETE}`, (t) => { +test(`job-complete: emits ${e.JOB_COMPLETE} with key fields`, (t) => { return new Promise((done) => { const workflowId = 'a'; @@ -179,6 +185,8 @@ test(`job-complete: emits ${e.JOB_COMPLETE}`, (t) => { t.is(evt.duration, 200); t.deepEqual(evt.next, []); t.deepEqual(evt.mem, event.mem); + t.assert(evt.time > 0); + t.assert(typeof evt.time === 'bigint'); done(); }); diff --git a/packages/lexicon/CHANGELOG.md b/packages/lexicon/CHANGELOG.md index f3d87246a..3c18bb953 100644 --- a/packages/lexicon/CHANGELOG.md +++ b/packages/lexicon/CHANGELOG.md @@ -1,5 +1,11 @@ # lexicon +## 1.1.0 + +### Minor Changes + +- 44f7f57: Bump API_VERSION to 1.2 (timestamps on events) + ## 1.0.2 ### Patch Changes diff --git a/packages/lexicon/lightning.d.ts b/packages/lexicon/lightning.d.ts index 744ec732a..a30ba111c 100644 --- a/packages/lexicon/lightning.d.ts +++ b/packages/lexicon/lightning.d.ts @@ -5,6 +5,8 @@ export const API_VERSION: number; type StepId = string; +type TimeInMicroSeconds = string; + /** * Type definitions for Lightning and Worker interfaces * @@ -145,17 +147,20 @@ export type GetCredentialReply = {}; export type GetDataclipPayload = { id: string }; export type GetDataClipReply = Uint8Array; // represents a json string Run -export type RunStartPayload = void; // no payload +export type RunStartPayload = { + timestamp: TimeInMicroSeconds; +}; // no payload export type RunStartReply = {}; // no payload export type RunCompletePayload = ExitReason & { + timestamp: TimeInMicroSeconds; final_dataclip_id?: string; // TODO this will be removed soon }; export type RunCompleteReply = undefined; export type RunLogPayload = { message: Array; - timestamp: string; + timestamp: TimeInMicroSeconds; run_id: string; level?: string; source?: string; // namespace @@ -169,6 +174,7 @@ export type StepStartPayload = { step_id: string; run_id?: string; input_dataclip_id?: string; + timestamp: TimeInMicroSeconds; }; export type StepStartReply = void; @@ -185,5 +191,6 @@ export type StepCompletePayload = ExitReason & { system: number; }; duration: number; + timestamp: TimeInMicroSeconds; }; export type StepCompleteReply = void; diff --git a/packages/lexicon/lightning.js b/packages/lexicon/lightning.js index de59e06c6..3743f7653 100644 --- a/packages/lexicon/lightning.js +++ b/packages/lexicon/lightning.js @@ -3,4 +3,4 @@ * Note that the major version represents the API spec version, while the minor version * represents the lexicon implementation of it */ -export const API_VERSION = 1.1; +export const API_VERSION = 1.2; diff --git a/packages/lexicon/package.json b/packages/lexicon/package.json index 4c5902801..ec0c0660a 100644 --- a/packages/lexicon/package.json +++ b/packages/lexicon/package.json @@ -1,6 +1,6 @@ { "name": "@openfn/lexicon", - "version": "1.0.2", + "version": "1.1.0", "description": "Central repo of names and type definitions", "author": "Open Function Group ", "license": "ISC", diff --git a/packages/lightning-mock/CHANGELOG.md b/packages/lightning-mock/CHANGELOG.md index 71d217307..5717c5694 100644 --- a/packages/lightning-mock/CHANGELOG.md +++ b/packages/lightning-mock/CHANGELOG.md @@ -1,5 +1,15 @@ # @openfn/lightning-mock +## 2.0.16 + +### Patch Changes + +- Updated dependencies [870a836] +- Updated dependencies [44f7f57] + - @openfn/engine-multi@1.2.2 + - @openfn/lexicon@1.1.0 + - @openfn/runtime@1.4.1 + ## 2.0.15 ### Patch Changes diff --git a/packages/lightning-mock/package.json b/packages/lightning-mock/package.json index ec5564fac..369de25ee 100644 --- a/packages/lightning-mock/package.json +++ b/packages/lightning-mock/package.json @@ -1,6 +1,6 @@ { "name": "@openfn/lightning-mock", - "version": "2.0.15", + "version": "2.0.16", "private": true, "description": "A mock Lightning server", "main": "dist/index.js", diff --git a/packages/ws-worker/CHANGELOG.md b/packages/ws-worker/CHANGELOG.md index 1efb96c29..945d25ee2 100644 --- a/packages/ws-worker/CHANGELOG.md +++ b/packages/ws-worker/CHANGELOG.md @@ -1,5 +1,19 @@ # ws-worker +## 1.6.0 + +### Minor Changes + +- eaa3859: Include timestamps in key events + +### Patch Changes + +- Updated dependencies [870a836] +- Updated dependencies [44f7f57] + - @openfn/engine-multi@1.2.2 + - @openfn/lexicon@1.1.0 + - @openfn/runtime@1.4.1 + ## 1.5.1 ### Patch Changes diff --git a/packages/ws-worker/package.json b/packages/ws-worker/package.json index 8893a398d..c18fa89bb 100644 --- a/packages/ws-worker/package.json +++ b/packages/ws-worker/package.json @@ -1,6 +1,6 @@ { "name": "@openfn/ws-worker", - "version": "1.5.1", + "version": "1.6.0", "description": "A Websocket Worker to connect Lightning to a Runtime Engine", "main": "dist/index.js", "type": "module", diff --git a/packages/ws-worker/src/api/execute.ts b/packages/ws-worker/src/api/execute.ts index 1dd3a6538..ac3dc0ec1 100644 --- a/packages/ws-worker/src/api/execute.ts +++ b/packages/ws-worker/src/api/execute.ts @@ -8,6 +8,7 @@ import { createRunState, throttle as createThrottle, stringify, + timeInMicroseconds, } from '../util'; import { RUN_COMPLETE, @@ -213,8 +214,6 @@ export function onJobError(context: Context, event: any) { } export function onJobLog({ channel, state, options }: Context, event: JSONLog) { - const timeInMicroseconds = BigInt(event.time) / BigInt(1e3); - let message = event.message; try { // The message body, the actual thing that is logged, @@ -240,7 +239,7 @@ export function onJobLog({ channel, state, options }: Context, event: JSONLog) { message: message, source: event.name, level: event.level, - timestamp: timeInMicroseconds.toString(), + timestamp: timeInMicroseconds(event.time) as string, }; if (state.activeStep) { diff --git a/packages/ws-worker/src/events/run-complete.ts b/packages/ws-worker/src/events/run-complete.ts index 7b56fdc3a..f58c48dcb 100644 --- a/packages/ws-worker/src/events/run-complete.ts +++ b/packages/ws-worker/src/events/run-complete.ts @@ -5,10 +5,11 @@ import { RUN_COMPLETE } from '../events'; import { calculateRunExitReason } from '../api/reasons'; import { sendEvent, Context } from '../api/execute'; import logFinalReason from '../util/log-final-reason'; +import { timeInMicroseconds } from '../util'; export default async function onWorkflowComplete( context: Context, - _event: WorkflowCompletePayload + event: WorkflowCompletePayload ) { const { state, channel, onFinish, logger } = context; @@ -22,6 +23,7 @@ export default async function onWorkflowComplete( try { await sendEvent(channel, RUN_COMPLETE, { final_dataclip_id: state.lastDataclipId!, + timestamp: timeInMicroseconds(event.time), ...reason, }); } catch (e) { diff --git a/packages/ws-worker/src/events/run-start.ts b/packages/ws-worker/src/events/run-start.ts index e57a1969a..e67c6beb5 100644 --- a/packages/ws-worker/src/events/run-start.ts +++ b/packages/ws-worker/src/events/run-start.ts @@ -7,6 +7,7 @@ import { sendEvent, Context, onJobLog } from '../api/execute'; import calculateVersionString from '../util/versions'; import pkg from '../../package.json' assert { type: 'json' }; +import { timeInMicroseconds } from '../util'; export default async function onRunStart( context: Context, @@ -34,10 +35,15 @@ export default async function onRunStart( ...event.versions, }; - await sendEvent(channel, RUN_START, { versions }); + await sendEvent(channel, RUN_START, { + versions, + /// use the engine time in run start + timestamp: timeInMicroseconds(event.time), + }); if ('payloadLimitMb' in options) { await onJobLog(versionLogContext, { + // use the fake time in the log time, message: [`Payload limit: ${options.payloadLimitMb}mb`], level: 'info', diff --git a/packages/ws-worker/src/events/step-complete.ts b/packages/ws-worker/src/events/step-complete.ts index 988e3f3d4..54918628d 100644 --- a/packages/ws-worker/src/events/step-complete.ts +++ b/packages/ws-worker/src/events/step-complete.ts @@ -4,7 +4,7 @@ import type { JobCompletePayload } from '@openfn/engine-multi'; import { timestamp } from '@openfn/logger'; import { STEP_COMPLETE } from '../events'; -import { stringify } from '../util'; +import { stringify, timeInMicroseconds } from '../util'; import { calculateJobExitReason } from '../api/reasons'; import { sendEvent, onJobLog, Context } from '../api/execute'; import ensurePayloadSize from '../util/ensure-payload-size'; @@ -51,6 +51,7 @@ export default async function onStepComplete( mem: event.mem, duration: event.duration, thread_id: event.threadId, + timestamp: timeInMicroseconds(event.time), } as StepCompletePayload; try { diff --git a/packages/ws-worker/src/events/step-start.ts b/packages/ws-worker/src/events/step-start.ts index 3eea012e1..ad5523483 100644 --- a/packages/ws-worker/src/events/step-start.ts +++ b/packages/ws-worker/src/events/step-start.ts @@ -4,6 +4,7 @@ import type { StepStartPayload } from '@openfn/lexicon/lightning'; import { STEP_START } from '../events'; import { sendEvent, Context } from '../api/execute'; +import { timeInMicroseconds } from '../util'; export default async function onStepStart( context: Context, @@ -21,5 +22,6 @@ export default async function onStepStart( step_id: state.activeStep!, job_id: state.activeJob!, input_dataclip_id, + timestamp: timeInMicroseconds(event.time), }); } diff --git a/packages/ws-worker/src/util/index.ts b/packages/ws-worker/src/util/index.ts index 776d274e5..5e36b051c 100644 --- a/packages/ws-worker/src/util/index.ts +++ b/packages/ws-worker/src/util/index.ts @@ -4,6 +4,7 @@ import getWithReply from './get-with-reply'; import stringify from './stringify'; import createRunState from './create-run-state'; import throttle from './throttle'; +export * from './timestamp'; export { throttle, diff --git a/packages/ws-worker/src/util/timestamp.ts b/packages/ws-worker/src/util/timestamp.ts new file mode 100644 index 000000000..bc78e3fb7 --- /dev/null +++ b/packages/ws-worker/src/util/timestamp.ts @@ -0,0 +1,2 @@ +export const timeInMicroseconds = (time?: bigint) => + time && (BigInt(time) / BigInt(1e3)).toString(); diff --git a/packages/ws-worker/test/api/execute.test.ts b/packages/ws-worker/test/api/execute.test.ts index 5dc77c72f..16ccdf089 100644 --- a/packages/ws-worker/test/api/execute.test.ts +++ b/packages/ws-worker/test/api/execute.test.ts @@ -396,7 +396,6 @@ test('execute should lazy-load initial state', async (t) => { const channel = mockChannel({ ...mockEventHandlers, [GET_DATACLIP]: (id) => { - console.log('> GET DATACLIP'); t.truthy(id); didLoadState = true; return toArrayBuffer({}); diff --git a/packages/ws-worker/test/events/run-complete.test.ts b/packages/ws-worker/test/events/run-complete.test.ts index bd9aaab69..12acb39ce 100644 --- a/packages/ws-worker/test/events/run-complete.test.ts +++ b/packages/ws-worker/test/events/run-complete.test.ts @@ -1,5 +1,5 @@ import test from 'ava'; -import { createMockLogger } from '@openfn/logger'; +import { createMockLogger, timestamp } from '@openfn/logger'; import handleRunComplete from '../../src/events/run-complete'; @@ -8,7 +8,7 @@ import { RUN_COMPLETE, RUN_LOG } from '../../src/events'; import { createRunState } from '../../src/util'; import { createPlan } from '../util'; -test('should send an run:complete event', async (t) => { +test('should send a run:complete event', async (t) => { const result = { answer: 42 }; const plan = createPlan(); @@ -22,6 +22,7 @@ test('should send an run:complete event', async (t) => { [RUN_LOG]: () => true, [RUN_COMPLETE]: (evt) => { t.deepEqual(evt.final_dataclip_id, 'x'); + t.falsy(evt.time); // if no timestamp in the engine event, no timestamp in the worker one }, }); @@ -31,6 +32,29 @@ test('should send an run:complete event', async (t) => { await handleRunComplete(context, event); }); +test('should include a timestamp', async (t) => { + const plan = createPlan(); + + const state = createRunState(plan); + + const channel = mockChannel({ + [RUN_LOG]: () => true, + [RUN_COMPLETE]: (evt) => { + t.assert(typeof evt.timestamp === 'string'); + t.is(evt.timestamp.length, 16); + }, + }); + + const event: any = { + time: timestamp(), + }; + + t.is(event.time.toString().length, 19); + + const context: any = { channel, state, onFinish: () => {} }; + await handleRunComplete(context, event); +}); + test('should call onFinish with final dataclip', async (t) => { const result = { answer: 42 }; const plan = createPlan(); diff --git a/packages/ws-worker/test/events/run-start.test.ts b/packages/ws-worker/test/events/run-start.test.ts index 4eab3a903..993606398 100644 --- a/packages/ws-worker/test/events/run-start.test.ts +++ b/packages/ws-worker/test/events/run-start.test.ts @@ -7,6 +7,36 @@ import { createRunState } from '../../src/util'; import { RUN_LOG, RUN_START } from '../../src/events'; import pkg from '../../package.json' assert { type: 'json' }; +import { timestamp } from '@openfn/logger'; + +test('should include a timestamp', async (t) => { + const plan = { + id: 'run-1', + workflow: { + steps: [{ id: 'job-1', expression: '.' }], + }, + options: {}, + }; + + const state = createRunState(plan); + + const channel = mockChannel({ + [RUN_LOG]: () => true, + [RUN_START]: (evt) => { + t.assert(typeof evt.timestamp === 'string'); + t.is(evt.timestamp.length, 16); + }, + }); + + const event: any = { + time: timestamp(), + }; + + t.is(event.time.toString().length, 19); + + const context: any = { channel, state, onFinish: () => {} }; + await handleRunStart(context, event); +}); test('run:start event should include versions', async (t) => { const plan = { @@ -73,6 +103,7 @@ test('run:start should log the version number', async (t) => { const event: WorkflowStartPayload = { workflowId: plan.id, versions, + time: BigInt(123), }; const state = createRunState(plan, input); diff --git a/packages/ws-worker/test/events/step-complete.test.ts b/packages/ws-worker/test/events/step-complete.test.ts index 979eab449..ef4d6f42c 100644 --- a/packages/ws-worker/test/events/step-complete.test.ts +++ b/packages/ws-worker/test/events/step-complete.test.ts @@ -7,6 +7,7 @@ import { createRunState } from '../../src/util'; import { RUN_LOG, STEP_COMPLETE } from '../../src/events'; import { createPlan } from '../util'; import { JobCompletePayload } from '@openfn/engine-multi'; +import { timestamp } from '@openfn/logger'; test('clear the step id and active job on state', async (t) => { const plan = createPlan(); @@ -151,6 +152,7 @@ test('send a step:complete event', async (t) => { mem: { job: 1, system: 10 }, duration: 61, thread_id: 'abc', + time: BigInt(123), } as JobCompletePayload; await handleStepComplete({ channel, state } as any, event); }); @@ -183,6 +185,7 @@ test('do not include dataclips in step:complete if output_dataclip is false', as mem: { job: 1, system: 10 }, duration: 61, thread_id: 'abc', + time: BigInt(123), } as JobCompletePayload; await handleStepComplete({ channel, state, options } as any, event); }); @@ -217,6 +220,7 @@ test('do not include dataclips in step:complete if output_dataclip is too big', mem: { job: 1, system: 10 }, duration: 61, thread_id: 'abc', + time: BigInt(123), } as JobCompletePayload; await handleStepComplete({ channel, state, options } as any, event); @@ -250,7 +254,31 @@ test('log when the output_dataclip is too big', async (t) => { mem: { job: 1, system: 10 }, duration: 61, thread_id: 'abc', + time: BigInt(123), } as JobCompletePayload; await handleStepComplete({ channel, state, options } as any, event); }); + +test('should include a timestamp', async (t) => { + const plan = createPlan(); + const state = createRunState(plan); + + const channel = mockChannel({ + [RUN_LOG]: () => true, + [STEP_COMPLETE]: (evt) => { + t.assert(typeof evt.timestamp === 'string'); + t.is(evt.timestamp.length, 16); + }, + }); + + const event: any = { + time: timestamp(), + jobId: 'job-1', + }; + + t.is(event.time.toString().length, 19); + + const context: any = { channel, state, onFinish: () => {} }; + await handleStepComplete(context, event); +}); diff --git a/packages/ws-worker/test/events/step-start.test.ts b/packages/ws-worker/test/events/step-start.test.ts index 343ef5bd0..c75a1034f 100644 --- a/packages/ws-worker/test/events/step-start.test.ts +++ b/packages/ws-worker/test/events/step-start.test.ts @@ -4,6 +4,7 @@ import handleStepStart from '../../src/events/step-start'; import { mockChannel } from '../../src/mock/sockets'; import { createRunState } from '../../src/util'; import { RUN_LOG, STEP_START } from '../../src/events'; +import { timestamp } from '@openfn/logger'; test('set a step id and active job on state', async (t) => { const plan = { @@ -20,7 +21,8 @@ test('set a step id and active job on state', async (t) => { [RUN_LOG]: (x) => x, }); - await handleStepStart({ channel, state } as any, { jobId } as any); + const event = { jobId } as any; + await handleStepStart({ channel, state } as any, event); t.is(state.activeJob, jobId); t.truthy(state.activeStep); @@ -54,5 +56,36 @@ test('send a step:start event', async (t) => { [RUN_LOG]: () => true, }); - await handleStepStart({ channel, state } as any, { jobId } as any); + const event = { jobId } as any; + await handleStepStart({ channel, state } as any, event); +}); + +test('should include a timestamp', async (t) => { + const plan = { + id: 'run-1', + workflow: { + steps: [{ id: 'job-1', expression: '.' }], + }, + options: {}, + }; + + const state = createRunState(plan); + + const channel = mockChannel({ + [RUN_LOG]: () => true, + [STEP_START]: (evt) => { + t.assert(typeof evt.timestamp === 'string'); + t.is(evt.timestamp.length, 16); + }, + }); + + const event: any = { + time: timestamp(), + jobId: 'job-1', + }; + + t.is(event.time.toString().length, 19); + + const context: any = { channel, state, onFinish: () => {} }; + await handleStepStart(context, event); }); diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index b6f2b5bc3..af598fe53 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -468,6 +468,8 @@ importers: specifier: ^5.1.6 version: 5.1.6 + packages/engine-multi/tmp/a/b/c: {} + packages/engine-multi/tmp/repo: {} packages/lexicon: