Skip to content

Commit

Permalink
worker 1.6.6
Browse files Browse the repository at this point in the history
* worker: added simple logging

* worker: trap errors coming out of the websocket (#783)

* worker: trap errors coming out of the websocket

* formatting

* compiler: don't log compiled source (by deafault) (#781)

* compiler: don't log compiled source (by deafault)

* changeset

* Better worker logs (#784)

* worker: added simple logging

* log run times

* changeset

* format

* log capacity

* updates

* format

* versions: [email protected] [email protected]

* worker: log claim duration

* format

* version: [email protected]
  • Loading branch information
josephjclark authored Sep 25, 2024
1 parent f33a865 commit 5417df4
Show file tree
Hide file tree
Showing 19 changed files with 130 additions and 49 deletions.
7 changes: 7 additions & 0 deletions integration-tests/execute/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,12 @@
# @openfn/integration-tests-execute

## 1.0.5

### Patch Changes

- Updated dependencies [5db5862]
- @openfn/compiler@0.3.3

## 1.0.4

### Patch Changes
Expand Down
2 changes: 1 addition & 1 deletion integration-tests/execute/package.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"name": "@openfn/integration-tests-execute",
"private": true,
"version": "1.0.4",
"version": "1.0.5",
"description": "Job execution tests",
"author": "Open Function Group <[email protected]>",
"license": "ISC",
Expand Down
11 changes: 11 additions & 0 deletions integration-tests/worker/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,16 @@
# @openfn/integration-tests-worker

## 1.0.60

### Patch Changes

- Updated dependencies [5db5862]
- Updated dependencies [f581c6b]
- Updated dependencies [3e6eba2]
- @openfn/ws-worker@1.6.5
- @openfn/engine-multi@1.2.5
- @openfn/lightning-mock@2.0.19

## 1.0.59

### Patch Changes
Expand Down
2 changes: 1 addition & 1 deletion integration-tests/worker/package.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"name": "@openfn/integration-tests-worker",
"private": true,
"version": "1.0.59",
"version": "1.0.60",
"description": "Lightning WOrker integration tests",
"author": "Open Function Group <[email protected]>",
"license": "ISC",
Expand Down
8 changes: 8 additions & 0 deletions packages/cli/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,13 @@
# @openfn/cli

## 1.8.4

### Patch Changes

- 5db5862: Dont log compiled job code
- Updated dependencies [5db5862]
- @openfn/compiler@0.3.3

## 1.8.3

### Patch Changes
Expand Down
2 changes: 1 addition & 1 deletion packages/cli/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@openfn/cli",
"version": "1.8.3",
"version": "1.8.4",
"description": "CLI devtools for the openfn toolchain.",
"engines": {
"node": ">=18",
Expand Down
6 changes: 6 additions & 0 deletions packages/compiler/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
# @openfn/compiler

## 0.3.3

### Patch Changes

- 5db5862: Dont log compiled job code

## 0.3.2

### Patch Changes
Expand Down
2 changes: 1 addition & 1 deletion packages/compiler/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@openfn/compiler",
"version": "0.3.2",
"version": "0.3.3",
"description": "Compiler and language tooling for openfn jobs.",
"author": "Open Function Group <[email protected]>",
"license": "ISC",
Expand Down
7 changes: 5 additions & 2 deletions packages/compiler/src/compile.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ const defaultLogger = createLogger();

export type Options = TransformOptions & {
logger?: Logger;
logCompiledSource?: boolean;
};

export default function compile(pathOrSource: string, options: Options = {}) {
Expand All @@ -31,8 +32,10 @@ export default function compile(pathOrSource: string, options: Options = {}) {
const transformedAst = transform(ast, undefined, options);

const compiledSource = print(transformedAst).code;
logger.debug('Compiled source:');
logger.debug(compiledSource); // TODO indent or something
if (options.logCompiledSource) {
logger.debug('Compiled source:');
logger.debug(compiledSource); // TODO indent or something
}

return compiledSource;
}
7 changes: 7 additions & 0 deletions packages/engine-multi/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,12 @@
# engine-multi

## 1.2.5

### Patch Changes

- Updated dependencies [5db5862]
- @openfn/compiler@0.3.3

## 1.2.4

### Patch Changes
Expand Down
2 changes: 1 addition & 1 deletion packages/engine-multi/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@openfn/engine-multi",
"version": "1.2.4",
"version": "1.2.5",
"description": "Multi-process runtime engine",
"main": "dist/index.js",
"type": "module",
Expand Down
6 changes: 6 additions & 0 deletions packages/lightning-mock/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
# @openfn/lightning-mock

## 2.0.19

### Patch Changes

- @openfn/engine-multi@1.2.5

## 2.0.18

### Patch Changes
Expand Down
2 changes: 1 addition & 1 deletion packages/lightning-mock/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@openfn/lightning-mock",
"version": "2.0.18",
"version": "2.0.19",
"private": true,
"description": "A mock Lightning server",
"main": "dist/index.js",
Expand Down
15 changes: 15 additions & 0 deletions packages/ws-worker/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,20 @@
# ws-worker

## 1.6.6

### Patch Changes

- Log claim event duration

## 1.6.5

### Patch Changes

- 5db5862: Dont log compiled job code
- f581c6b: log duration of runs and server capacity
- 3e6eba2: Trap errors coming out of the websocket
- @openfn/engine-multi@1.2.5

## 1.6.4

### Patch Changes
Expand Down
2 changes: 1 addition & 1 deletion packages/ws-worker/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@openfn/ws-worker",
"version": "1.6.4",
"version": "1.6.6",
"description": "A Websocket Worker to connect Lightning to a Runtime Engine",
"main": "dist/index.js",
"type": "module",
Expand Down
13 changes: 9 additions & 4 deletions packages/ws-worker/src/api/claim.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,22 +35,27 @@ const claim = (

const activeWorkers = Object.keys(app.workflows).length;
if (activeWorkers >= maxWorkers) {
logger.debug('skipping claim attempt: server at capacity');
logger.debug(
`skipping claim attempt: server at capacity (${activeWorkers}/${maxWorkers})`
);
return reject(new Error('Server at capacity'));
}

if (!app.queueChannel) {
logger.debug('skipping claim attempt: websocket unavailable');
return reject(new Error('No websocket available'));
}
logger.debug(`requesting run (capacity ${activeWorkers}/${maxWorkers})`);

logger.debug('requesting run...');
const start = Date.now();
app.queueChannel
.push<ClaimPayload>(CLAIM, { demand: 1 })
.receive('ok', ({ runs }: ClaimReply) => {
const duration = Date.now() - start;
logger.debug(
`claimed ${runs.length} runs: `,
runs.map((r) => r.id).join(',')
`claimed ${runs.length} runs in ${duration}ms (${
runs.length ? runs.map((r) => r.id).join(',') : '-'
})`
);
// TODO what if we get here after we've been cancelled?
// the events have already been claimed...
Expand Down
78 changes: 43 additions & 35 deletions packages/ws-worker/src/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -180,42 +180,50 @@ function createServer(engine: RuntimeEngine, options: ServerOptions = {}) {
// TODO this probably needs to move into ./api/ somewhere
app.execute = async ({ id, token }: ClaimRun) => {
if (app.socket) {
app.workflows[id] = true;

// TODO if we fail to join the run channel, the whole server
// will crash. Really we should just abort the run somehow
// Maybe even soft shutdown the worker
const {
channel: runChannel,
plan,
options = {},
input,
} = await joinRunChannel(app.socket, token, id, logger);

// Default the payload limit if it's not otherwise set on the run options
if (!('payloadLimitMb' in options)) {
options.payloadLimitMb = app.options.payloadLimitMb;
}

// Callback to be triggered when the work is done (including errors)
const onFinish = () => {
logger.debug(`workflow ${id} complete: releasing worker`);
delete app.workflows[id];
runChannel.leave();

app.events.emit(INTERNAL_RUN_COMPLETE);
};
const context = execute(
runChannel,
engine,
logger,
plan,
input,
options,
onFinish
);
try {
const start = Date.now();
app.workflows[id] = true;

const {
channel: runChannel,
plan,
options = {},
input,
} = await joinRunChannel(app.socket, token, id, logger);

// Default the payload limit if it's not otherwise set on the run options
if (!('payloadLimitMb' in options)) {
options.payloadLimitMb = app.options.payloadLimitMb;
}

// Callback to be triggered when the work is done (including errors)
const onFinish = () => {
const duration = (Date.now() - start) / 1000;
logger.debug(
`workflow ${id} complete in ${duration}s: releasing worker`
);
delete app.workflows[id];
runChannel.leave();

app.events.emit(INTERNAL_RUN_COMPLETE);
};
const context = execute(
runChannel,
engine,
logger,
plan,
input,
options,
onFinish
);

app.workflows[id] = context;
app.workflows[id] = context;
} catch (e) {
// Trap errors coming out of the socket
// These are likely to be comms errors with Lightning
logger.error(`Unexpected error executing ${id}`);
logger.error(e);
}
} else {
logger.error('No lightning socket established');
// TODO something else. Throw? Emit?
Expand Down
5 changes: 4 additions & 1 deletion packages/ws-worker/src/start.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ const args = cli(process.argv);

const logger = createLogger('SRV', { level: args.log });

logger.info('Starting worker server...');

if (args.lightning === 'mock') {
args.lightning = 'ws://localhost:8888/worker';
if (!args.secret) {
Expand All @@ -25,7 +27,7 @@ const [minBackoff, maxBackoff] = args.backoff
.map((n: string) => parseInt(n, 10) * 1000);

function engineReady(engine: any) {
logger.debug('Creating worker server...');
logger.debug('Creating worker instance');

const workerOptions: ServerOptions = {
port: args.port,
Expand Down Expand Up @@ -61,6 +63,7 @@ function engineReady(engine: any) {
logger.debug('Worker options:', humanOptions);

createWorker(engine, workerOptions);
logger.success('Worker started OK');
}

if (args.mock) {
Expand Down
2 changes: 2 additions & 0 deletions pnpm-lock.yaml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit 5417df4

Please sign in to comment.