Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Worker: timestamp events #752

Merged
merged 8 commits into from
Sep 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/pull_request_template.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ List any considerations/cases/advice for testing/QA here.

- [ ] I have performed a self-review of my code
- [ ] I have added unit tests
- [ ] If this is a change to the Worker, does the API_VERSION need bumping?
- [ ] Changesets have been added (if there are production code changes)

## Release branch checklist
Expand Down
10 changes: 10 additions & 0 deletions integration-tests/worker/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,15 @@
# @openfn/integration-tests-worker

## 1.0.55

### Patch Changes

- Updated dependencies [870a836]
- Updated dependencies [eaa3859]
- @openfn/[email protected]
- @openfn/[email protected]
- @openfn/[email protected]

## 1.0.54

### Patch Changes
Expand Down
2 changes: 1 addition & 1 deletion integration-tests/worker/package.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"name": "@openfn/integration-tests-worker",
"private": true,
"version": "1.0.54",
"version": "1.0.55",
"description": "Lightning WOrker integration tests",
"author": "Open Function Group <[email protected]>",
"license": "ISC",
Expand Down
41 changes: 41 additions & 0 deletions integration-tests/worker/test/integration.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -558,6 +558,47 @@ test.serial("Don't send job logs to stdout", (t) => {
});
});

test.serial('Include timestamps on basically everything', (t) => {
return new Promise(async (done) => {
const attempt = {
id: crypto.randomUUID(),
jobs: [
{
adaptor: '@openfn/language-common@latest',
body: 'fn((s) => s)',
},
],
};

const timestamps = {};

const assertAllTimestamps = () => {
t.is(timestamps['run-start'].length, 16);
t.is(timestamps['run-complete'].length, 16);
t.is(timestamps['step-start'].length, 16);
t.is(timestamps['step-complete'].length, 16);
};

lightning.once('run:start', ({ payload }) => {
timestamps['run-start'] = payload.timestamp;
});
lightning.once('step:start', ({ payload }) => {
timestamps['step-start'] = payload.timestamp;
});
lightning.once('step:complete', ({ payload }) => {
timestamps['step-complete'] = payload.timestamp;
});
lightning.once('run:complete', ({ payload }) => {
timestamps['run-complete'] = payload.timestamp;
assertAllTimestamps();

done();
});

lightning.enqueueRun(attempt);
});
});

test.serial("Don't send adaptor logs to stdout", (t) => {
return new Promise(async (done) => {
// We have to create a new worker with a different repo for this one
Expand Down
9 changes: 9 additions & 0 deletions packages/engine-multi/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,14 @@
# engine-multi

## 1.2.2

### Patch Changes

- 870a836: Add high resolution timestamps to key events
- Updated dependencies [44f7f57]
- @openfn/[email protected]
- @openfn/[email protected]

## 1.2.1

### Patch Changes
Expand Down
2 changes: 1 addition & 1 deletion packages/engine-multi/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@openfn/engine-multi",
"version": "1.2.1",
"version": "1.2.2",
"description": "Multi-process runtime engine",
"main": "dist/index.js",
"type": "module",
Expand Down
5 changes: 5 additions & 0 deletions packages/engine-multi/src/api/lifecycle.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import * as externalEvents from '../events';
import * as internalEvents from '../worker/events';
import type ExecutionContext from '../classes/ExecutionContext';
import { timestamp } from '@openfn/logger';

// Log events from the inner thread will be logged to stdout
// EXCEPT the keys listed here
Expand Down Expand Up @@ -39,6 +40,7 @@ export const workflowStart = (
context.emit(externalEvents.WORKFLOW_START, {
threadId,
versions: context.versions,
time: timestamp(),
});
};

Expand Down Expand Up @@ -70,6 +72,7 @@ export const workflowComplete = (
threadId,
duration: state.duration,
state: result,
time: timestamp(),
});
};

Expand All @@ -82,6 +85,7 @@ export const jobStart = (
context.emit(externalEvents.JOB_START, {
jobId,
threadId,
time: timestamp(),
});
};

Expand All @@ -98,6 +102,7 @@ export const jobComplete = (
jobId,
next,
mem,
time: timestamp(),
});
};

Expand Down
4 changes: 4 additions & 0 deletions packages/engine-multi/src/events.ts
Original file line number Diff line number Diff line change
Expand Up @@ -51,11 +51,13 @@ interface ExternalEvent {

export interface WorkflowStartPayload extends ExternalEvent {
versions: Versions;
time: bigint;
}

export interface WorkflowCompletePayload extends ExternalEvent {
state: any;
duration: number;
time: bigint;
}

export interface WorkflowErrorPayload extends ExternalEvent {
Expand All @@ -66,13 +68,15 @@ export interface WorkflowErrorPayload extends ExternalEvent {

export interface JobStartPayload extends ExternalEvent {
jobId: string;
time: bigint;
}

export interface JobCompletePayload extends ExternalEvent {
jobId: string;
duration: number;
state: any; // the result state
next: string[]; // downstream jobs
time: bigint;
mem: {
job: number;
system: number;
Expand Down
16 changes: 12 additions & 4 deletions packages/engine-multi/test/api/lifecycle.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ const createContext = (workflowId: string, state?: any) =>
options: {},
});

test(`workflowStart: emits ${e.WORKFLOW_START}`, (t) => {
test(`workflowStart: emits ${e.WORKFLOW_START} with key fields`, (t) => {
return new Promise((done) => {
const workflowId = 'a';

Expand All @@ -39,6 +39,8 @@ test(`workflowStart: emits ${e.WORKFLOW_START}`, (t) => {
t.truthy(evt.versions);
t.is(evt.workflowId, workflowId);
t.is(evt.threadId, '123');
t.assert(evt.time > 0);
t.assert(typeof evt.time === 'bigint');
done();
});

Expand Down Expand Up @@ -68,7 +70,7 @@ test('onWorkflowStart: updates state', (t) => {
test.todo('onWorkflowStart: logs');
test.todo('onWorkflowStart: throws if the workflow is already started');

test(`workflowComplete: emits ${e.WORKFLOW_COMPLETE}`, (t) => {
test(`workflowComplete: emits ${e.WORKFLOW_COMPLETE} with key fields`, (t) => {
return new Promise((done) => {
const workflowId = 'a';
const result = { a: 777 };
Expand All @@ -89,6 +91,8 @@ test(`workflowComplete: emits ${e.WORKFLOW_COMPLETE}`, (t) => {
context.on(e.WORKFLOW_COMPLETE, (evt) => {
t.is(evt.workflowId, workflowId);
t.deepEqual(evt.state, result);
t.assert(evt.time > 0);
t.assert(typeof evt.time === 'bigint');
t.assert(evt.duration > 0);
done();
});
Expand Down Expand Up @@ -120,7 +124,7 @@ test('workflowComplete: updates state', (t) => {
t.deepEqual(state.result, result);
});

test(`job-start: emits ${e.JOB_START}`, (t) => {
test(`job-start: emits ${e.JOB_START} with key fields`, (t) => {
return new Promise((done) => {
const workflowId = 'a';

Expand All @@ -142,14 +146,16 @@ test(`job-start: emits ${e.JOB_START}`, (t) => {
t.is(evt.workflowId, workflowId);
t.is(evt.threadId, '1');
t.is(evt.jobId, 'j');
t.assert(evt.time > 0);
t.assert(typeof evt.time === 'bigint');
done();
});

jobStart(context, event);
});
});

test(`job-complete: emits ${e.JOB_COMPLETE}`, (t) => {
test(`job-complete: emits ${e.JOB_COMPLETE} with key fields`, (t) => {
return new Promise((done) => {
const workflowId = 'a';

Expand Down Expand Up @@ -179,6 +185,8 @@ test(`job-complete: emits ${e.JOB_COMPLETE}`, (t) => {
t.is(evt.duration, 200);
t.deepEqual(evt.next, []);
t.deepEqual(evt.mem, event.mem);
t.assert(evt.time > 0);
t.assert(typeof evt.time === 'bigint');
done();
});

Expand Down
6 changes: 6 additions & 0 deletions packages/lexicon/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
# lexicon

## 1.1.0

### Minor Changes

- 44f7f57: Bump API_VERSION to 1.2 (timestamps on events)

## 1.0.2

### Patch Changes
Expand Down
11 changes: 9 additions & 2 deletions packages/lexicon/lightning.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ export const API_VERSION: number;

type StepId = string;

type TimeInMicroSeconds = string;

/**
* Type definitions for Lightning and Worker interfaces
*
Expand Down Expand Up @@ -145,17 +147,20 @@ export type GetCredentialReply = {};
export type GetDataclipPayload = { id: string };
export type GetDataClipReply = Uint8Array; // represents a json string Run

export type RunStartPayload = void; // no payload
export type RunStartPayload = {
timestamp: TimeInMicroSeconds;
}; // no payload
export type RunStartReply = {}; // no payload

export type RunCompletePayload = ExitReason & {
timestamp: TimeInMicroSeconds;
final_dataclip_id?: string; // TODO this will be removed soon
};
export type RunCompleteReply = undefined;

export type RunLogPayload = {
message: Array<string | object>;
timestamp: string;
timestamp: TimeInMicroSeconds;
run_id: string;
level?: string;
source?: string; // namespace
Expand All @@ -169,6 +174,7 @@ export type StepStartPayload = {
step_id: string;
run_id?: string;
input_dataclip_id?: string;
timestamp: TimeInMicroSeconds;
};
export type StepStartReply = void;

Expand All @@ -185,5 +191,6 @@ export type StepCompletePayload = ExitReason & {
system: number;
};
duration: number;
timestamp: TimeInMicroSeconds;
};
export type StepCompleteReply = void;
2 changes: 1 addition & 1 deletion packages/lexicon/lightning.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,4 @@
* Note that the major version represents the API spec version, while the minor version
* represents the lexicon implementation of it
*/
export const API_VERSION = 1.1;
export const API_VERSION = 1.2;
2 changes: 1 addition & 1 deletion packages/lexicon/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@openfn/lexicon",
"version": "1.0.2",
"version": "1.1.0",
"description": "Central repo of names and type definitions",
"author": "Open Function Group <[email protected]>",
"license": "ISC",
Expand Down
10 changes: 10 additions & 0 deletions packages/lightning-mock/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,15 @@
# @openfn/lightning-mock

## 2.0.16

### Patch Changes

- Updated dependencies [870a836]
- Updated dependencies [44f7f57]
- @openfn/[email protected]
- @openfn/[email protected]
- @openfn/[email protected]

## 2.0.15

### Patch Changes
Expand Down
2 changes: 1 addition & 1 deletion packages/lightning-mock/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@openfn/lightning-mock",
"version": "2.0.15",
"version": "2.0.16",
"private": true,
"description": "A mock Lightning server",
"main": "dist/index.js",
Expand Down
14 changes: 14 additions & 0 deletions packages/ws-worker/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,19 @@
# ws-worker

## 1.6.0

### Minor Changes

- eaa3859: Include timestamps in key events

### Patch Changes

- Updated dependencies [870a836]
- Updated dependencies [44f7f57]
- @openfn/[email protected]
- @openfn/[email protected]
- @openfn/[email protected]

## 1.5.1

### Patch Changes
Expand Down
2 changes: 1 addition & 1 deletion packages/ws-worker/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@openfn/ws-worker",
"version": "1.5.1",
"version": "1.6.0",
"description": "A Websocket Worker to connect Lightning to a Runtime Engine",
"main": "dist/index.js",
"type": "module",
Expand Down
5 changes: 2 additions & 3 deletions packages/ws-worker/src/api/execute.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import {
createRunState,
throttle as createThrottle,
stringify,
timeInMicroseconds,
} from '../util';
import {
RUN_COMPLETE,
Expand Down Expand Up @@ -213,8 +214,6 @@ export function onJobError(context: Context, event: any) {
}

export function onJobLog({ channel, state, options }: Context, event: JSONLog) {
const timeInMicroseconds = BigInt(event.time) / BigInt(1e3);

let message = event.message;
try {
// The message body, the actual thing that is logged,
Expand All @@ -240,7 +239,7 @@ export function onJobLog({ channel, state, options }: Context, event: JSONLog) {
message: message,
source: event.name,
level: event.level,
timestamp: timeInMicroseconds.toString(),
timestamp: timeInMicroseconds(event.time) as string,
};

if (state.activeStep) {
Expand Down
Loading