Skip to content

Commit

Permalink
Merge pull request #791 from OpenFn/fix-backoff
Browse files Browse the repository at this point in the history
Worker: better handling of backoff when at capacity
  • Loading branch information
taylordowns2000 authored Sep 27, 2024
2 parents dcba5c3 + 24612d5 commit 9e845b0
Show file tree
Hide file tree
Showing 13 changed files with 190 additions and 46 deletions.
7 changes: 7 additions & 0 deletions integration-tests/worker/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,12 @@
# @openfn/integration-tests-worker

## 1.0.61

### Patch Changes

- Updated dependencies [42883f8]
- @openfn/ws-worker@1.6.7

## 1.0.60

### Patch Changes
Expand Down
2 changes: 1 addition & 1 deletion integration-tests/worker/package.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"name": "@openfn/integration-tests-worker",
"private": true,
"version": "1.0.60",
"version": "1.0.61",
"description": "Lightning WOrker integration tests",
"author": "Open Function Group <[email protected]>",
"license": "ISC",
Expand Down
1 change: 0 additions & 1 deletion packages/lightning-mock/src/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,6 @@ const createLightningServer = (options: LightningOptions = {}) => {
const runPrivateKey = options.runPrivateKey
? fromBase64(options.runPrivateKey)
: undefined;

const state = {
credentials: {},
runs: {},
Expand Down
1 change: 1 addition & 0 deletions packages/lightning-mock/src/start.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ const server = createLightningServer({
port: args.port,
logger,
logLevel: args.log,
runPrivateKey: process.env.WORKER_RUNS_PRIVATE_KEY,
});

// add a default credential
Expand Down
1 change: 0 additions & 1 deletion packages/lightning-mock/src/tokens.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ export const generateRunToken = async (
if (privateKey) {
try {
const alg = 'RS256';

const key = crypto.createPrivateKey(privateKey);

const jwt = await new jose.SignJWT({ id: runId })
Expand Down
6 changes: 6 additions & 0 deletions packages/ws-worker/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
# ws-worker

## 1.6.7

### Patch Changes

- 42883f8: Better handliung of claim backoffs when at capacity

## 1.6.6

### Patch Changes
Expand Down
2 changes: 1 addition & 1 deletion packages/ws-worker/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@openfn/ws-worker",
"version": "1.6.6",
"version": "1.6.7",
"description": "A Websocket Worker to connect Lightning to a Runtime Engine",
"main": "dist/index.js",
"type": "module",
Expand Down
6 changes: 2 additions & 4 deletions packages/ws-worker/src/api/claim.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ const mockLogger = createMockLogger();

const verifyToken = async (token: string, publicKey: string) => {
const key = crypto.createPublicKey(publicKey);

const { payload } = await jose.jwtVerify(token, key, {
issuer: 'Lightning',
});
Expand All @@ -35,9 +34,8 @@ const claim = (

const activeWorkers = Object.keys(app.workflows).length;
if (activeWorkers >= maxWorkers) {
logger.debug(
`skipping claim attempt: server at capacity (${activeWorkers}/${maxWorkers})`
);
// Important: stop the workloop so that we don't try and claim any more
app.workloop?.stop(`server at capacity (${activeWorkers}/${maxWorkers})`);
return reject(new Error('Server at capacity'));
}

Expand Down
2 changes: 1 addition & 1 deletion packages/ws-worker/src/api/destroy.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ const destroy = async (app: ServerApp, logger: Logger) => {
app.destroyed = true;

// Immediately stop asking for more work
app.killWorkloop?.();
app.workloop?.stop('server closed');
app.queueChannel?.leave();

// Shut down the HTTP server
Expand Down
21 changes: 15 additions & 6 deletions packages/ws-worker/src/api/workloop.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,18 @@ import type { ServerApp } from '../server';
import type { CancelablePromise } from '../types';
import type { Logger } from '@openfn/logger';

export type Workloop = {
stop: (reason?: string) => void;
isStopped: () => boolean;
};

const startWorkloop = (
app: ServerApp,
logger: Logger,
minBackoff: number,
maxBackoff: number,
maxWorkers?: number
) => {
): Workloop => {
let promise: CancelablePromise;
let cancelled = false;

Expand All @@ -37,11 +42,15 @@ const startWorkloop = (
};
workLoop();

return () => {
logger.debug('cancelling workloop');
cancelled = true;
promise.cancel();
app.queueChannel?.leave();
return {
stop: (reason = 'reason unknown') => {
if (!cancelled) {
logger.info(`cancelling workloop: ${reason}`);
cancelled = true;
promise.cancel();
}
},
isStopped: () => cancelled,
};
};

Expand Down
59 changes: 36 additions & 23 deletions packages/ws-worker/src/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import { createMockLogger, Logger } from '@openfn/logger';
import { ClaimRun } from '@openfn/lexicon/lightning';
import { INTERNAL_RUN_COMPLETE } from './events';
import destroy from './api/destroy';
import startWorkloop from './api/workloop';
import startWorkloop, { Workloop } from './api/workloop';
import claim from './api/claim';
import { Context, execute } from './api/execute';
import healthcheck from './middleware/healthcheck';
Expand Down Expand Up @@ -49,10 +49,11 @@ export interface ServerApp extends Koa {
server: Server;
engine: RuntimeEngine;
options: ServerOptions;
workloop?: Workloop;

execute: ({ id, token }: ClaimRun) => Promise<void>;
destroy: () => void;
killWorkloop?: () => void;
resumeWorkloop: () => void;
}

type SocketAndChannel = {
Expand Down Expand Up @@ -83,17 +84,7 @@ function connect(app: ServerApp, logger: Logger, options: ServerOptions = {}) {
app.queueChannel = channel;

// trigger the workloop
if (!options.noLoop) {
logger.info('Starting workloop');
// TODO maybe namespace the workloop logger differently? It's a bit annoying
app.killWorkloop = startWorkloop(
app,
logger,
options.backoff?.min || MIN_BACKOFF,
options.backoff?.max || MAX_BACKOFF,
options.maxWorkflows
);
} else {
if (options.noLoop) {
// @ts-ignore
const port = app.server?.address().port;
logger.break();
Expand All @@ -103,20 +94,21 @@ function connect(app: ServerApp, logger: Logger, options: ServerOptions = {}) {
logger.info(` curl -X POST http://localhost:${port}/claim`);
logger.break();
}

app.resumeWorkloop();
};

// We were disconnected from the queue
const onDisconnect = () => {
if (app.killWorkloop) {
app.killWorkloop();
delete app.killWorkloop;
if (!app.destroyed) {
logger.info('Connection to lightning lost');
logger.info(
'Worker will automatically reconnect when lightning is back online'
);
// So far as I know, the socket will try and reconnect in the background forever
}
if (!app.workloop?.isStopped()) {
app.workloop?.stop('Socket disconnected unexpectedly');
}
if (!app.destroyed) {
logger.info('Connection to lightning lost');
logger.info(
'Worker will automatically reconnect when lightning is back online'
);
// So far as I know, the socket will try and reconnect in the background forever
}
};

Expand Down Expand Up @@ -177,6 +169,25 @@ function createServer(engine: RuntimeEngine, options: ServerOptions = {}) {

app.options = options;

// Start the workloop (if not already started)
app.resumeWorkloop = () => {
if (options.noLoop) {
return;
}

if (!app.workloop || app.workloop?.isStopped()) {
logger.info('Starting workloop');
// TODO maybe namespace the workloop logger differently? It's a bit annoying
app.workloop = startWorkloop(
app,
logger,
options.backoff?.min || MIN_BACKOFF,
options.backoff?.max || MAX_BACKOFF,
options.maxWorkflows
);
}
};

// TODO this probably needs to move into ./api/ somewhere
app.execute = async ({ id, token }: ClaimRun) => {
if (app.socket) {
Expand Down Expand Up @@ -206,6 +217,8 @@ function createServer(engine: RuntimeEngine, options: ServerOptions = {}) {
runChannel.leave();

app.events.emit(INTERNAL_RUN_COMPLETE);

app.resumeWorkloop();
};
const context = execute(
runChannel,
Expand Down
20 changes: 12 additions & 8 deletions packages/ws-worker/test/api/workloop.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,32 +6,36 @@ import { mockChannel } from '../../src/mock/sockets';
import startWorkloop from '../../src/api/workloop';
import { CLAIM } from '../../src/events';

let cancel: any;
let workloop: any;

const logger = createMockLogger();

test.afterEach(() => {
cancel?.(); // cancel any workloops
workloop?.stop(); // cancel any workloops
});

test('workloop can be cancelled', async (t) => {
let count = 0;

const app = {
workflows: {},
queueChannel: mockChannel({
[CLAIM]: () => {
count++;
cancel();
workloop.stop();
return { runs: [] };
},
}),
execute: () => {},
};

cancel = startWorkloop(app as any, logger, 1, 1);
workloop = startWorkloop(app as any, logger, 1, 1);
t.false(workloop.isStopped())

await sleep(100);
// A quirk of how cancel works is that the loop will be called a few times
t.assert(count <= 5);
t.true(count <= 5);
t.true(workloop.isStopped())
});

test('workloop sends the runs:claim event', (t) => {
Expand All @@ -47,7 +51,7 @@ test('workloop sends the runs:claim event', (t) => {
}),
execute: () => {},
};
cancel = startWorkloop(app as any, logger, 1, 1);
workloop = startWorkloop(app as any, logger, 1, 1);
});
});

Expand All @@ -68,7 +72,7 @@ test('workloop sends the runs:claim event several times ', (t) => {
}),
execute: () => {},
};
cancel = startWorkloop(app as any, logger, 1, 1);
workloop = startWorkloop(app as any, logger, 1, 1);
});
});

Expand All @@ -88,6 +92,6 @@ test('workloop calls execute if runs:claim returns runs', (t) => {
},
};

cancel = startWorkloop(app as any, logger, 1, 1);
workloop = startWorkloop(app as any, logger, 1, 1);
});
});
Loading

0 comments on commit 9e845b0

Please sign in to comment.