Skip to content

Commit

Permalink
Merge pull request #436 from OpenFn/backoff-tuning
Browse files Browse the repository at this point in the history
Worker: Backoff tuning
  • Loading branch information
josephjclark authored Nov 6, 2023
2 parents b782206 + f727125 commit de20432
Show file tree
Hide file tree
Showing 9 changed files with 68 additions and 33 deletions.
7 changes: 3 additions & 4 deletions .github/workflows/publish.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ jobs:
steps:
- uses: actions/checkout@v3
with:
fetch-depth: 0
fetch-depth: 1
- uses: actions/setup-node@v3
with:
node-version: '18'
Expand All @@ -23,9 +23,8 @@ jobs:
- run: pnpm config set "//registry.npmjs.org/:_authToken=${NPM_TOKEN}"
env:
NPM_TOKEN: ${{ secrets.NPM_TOKEN }}
- run: pnpm changeset tag
- run: git push --tags
- run: pnpm publish -r --report-summary --publish-branch main --access=public
- run: pnpm changeset publish --report-summary --publish-branch main --access=public
- run: git push --follow-tags
- run: pnpm run generate-slack-report
env:
SLACK_TOKEN: ${{ secrets.SLACK_TOKEN }}
7 changes: 7 additions & 0 deletions integration-tests/worker/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,12 @@
# @openfn/integration-tests-worker

## 1.0.5

### Patch Changes

- Updated dependencies
- @openfn/ws-worker@0.1.4

## 1.0.4

### Patch Changes
Expand Down
2 changes: 1 addition & 1 deletion integration-tests/worker/package.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"name": "@openfn/integration-tests-worker",
"private": true,
"version": "1.0.4",
"version": "1.0.5",
"description": "Lightning WOrker integration tests",
"author": "Open Function Group <[email protected]>",
"license": "ISC",
Expand Down
6 changes: 6 additions & 0 deletions packages/ws-worker/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
# ws-worker

## 0.1.4

### Patch Changes

- Accept backoff as startup command

## 0.1.3

### Patch Changes
Expand Down
2 changes: 1 addition & 1 deletion packages/ws-worker/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@openfn/ws-worker",
"version": "0.1.3",
"version": "0.1.4",
"description": "A Websocket Worker to connect Lightning to a Runtime Engine",
"main": "dist/index.js",
"type": "module",
Expand Down
9 changes: 5 additions & 4 deletions packages/ws-worker/src/api/workloop.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { CLAIM_ATTEMPT } from '../events';
import tryWithBackoff, { Options } from '../util/try-with-backoff';
import tryWithBackoff from '../util/try-with-backoff';

import type { CancelablePromise, Channel } from '../types';
import type { Logger } from '@openfn/logger';
Expand All @@ -10,16 +10,17 @@ const startWorkloop = (
channel: Channel,
execute: (attempt: CLAIM_ATTEMPT) => void,
logger: Logger,
options: Partial<Pick<Options, 'maxBackoff' | 'timeout'>> = {}
minBackoff: number,
maxBackoff: number
) => {
let promise: CancelablePromise;
let cancelled = false;

const workLoop = () => {
if (!cancelled) {
promise = tryWithBackoff(() => claim(channel, execute, logger), {
timeout: options.timeout,
maxBackoff: options.maxBackoff,
min: minBackoff,
max: maxBackoff,
});
// TODO this needs more unit tests I think
promise.then(() => {
Expand Down
25 changes: 19 additions & 6 deletions packages/ws-worker/src/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,18 @@ import connectToWorkerQueue from './channels/worker-queue';
import { CLAIM_ATTEMPT } from './events';

type ServerOptions = {
backoff?: number; // what is this?
maxBackoff?: number;
maxWorkflows?: number;
port?: number;
lightning?: string; // url to lightning instance
logger?: Logger;
noLoop?: boolean; // disable the worker loop

secret?: string; // worker secret

backoff?: {
min?: number;
max?: number;
};
};

// this is the server/koa API
Expand All @@ -37,6 +40,8 @@ interface ServerApp extends Koa {
}

const DEFAULT_PORT = 1234;
const MIN_BACKOFF = 1000;
const MAX_BACKOFF = 1000 * 30;

// TODO move out into another file, make testable, test in isolation
function connect(
Expand All @@ -57,12 +62,20 @@ function connect(

// trigger the workloop
if (!options.noLoop) {
if (app.killWorkloop) {
logger.info('Terminating old workloop');
app.killWorkloop();
}

logger.info('Starting workloop');
// TODO maybe namespace the workloop logger differently? It's a bit annoying
app.killWorkloop = startWorkloop(channel, app.execute, logger, {
maxBackoff: options.maxBackoff,
// timeout: 1000 * 60, // TMP debug poll once per minute
});
app.killWorkloop = startWorkloop(
channel,
app.execute,
logger,
options.backoff?.min || MIN_BACKOFF,
options.backoff?.max || MAX_BACKOFF
);
} else {
logger.break();
logger.warn('Workloop not starting');
Expand Down
15 changes: 15 additions & 0 deletions packages/ws-worker/src/start.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ type Args = {
loop?: boolean;
log: LogLevel;
mock: boolean;
backoff: string;
};

const args = yargs(hideBin(process.argv))
Expand Down Expand Up @@ -57,6 +58,10 @@ const args = yargs(hideBin(process.argv))
default: false,
type: 'boolean',
})
.option('backoff', {
description: 'Claim backoff rules: min/max (s)',
default: '1/10',
})
.parse() as Args;

const logger = createLogger('SRV', { level: args.log });
Expand All @@ -76,6 +81,11 @@ if (args.lightning === 'mock') {

args.secret = WORKER_SECRET;
}
const [minBackoff, maxBackoff] = args.backoff
.split('/')
.map((n: string) => parseInt(n, 10) * 1000);

console.log(minBackoff, maxBackoff);

function engineReady(engine: any) {
createWorker(engine, {
Expand All @@ -84,6 +94,11 @@ function engineReady(engine: any) {
logger,
secret: args.secret,
noLoop: !args.loop,
// TODO need to feed this through properly
backoff: {
min: minBackoff,
max: maxBackoff,
},
});
}

Expand Down
28 changes: 11 additions & 17 deletions packages/ws-worker/src/util/try-with-backoff.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,31 +3,24 @@ import { CancelablePromise } from '../types';
export type Options = {
attempts?: number;
maxAttempts?: number;
maxBackoff?: number;

// min and max durations
min?: number;
max?: number;

// these are provided internally
timeout?: number;
isCancelled?: () => boolean;
};

const MAX_BACKOFF = 1000 * 30;
// Rate at which timeouts are increased
const BACKOFF_MULTIPLIER = 1.15;

// This function will try and call its first argument every {opts.timeout|100}ms
// If the function throws, it will "backoff" and try again a little later
// Right now it's a bit of a sketch, but it sort of works!
const tryWithBackoff = (fn: any, opts: Options = {}): CancelablePromise => {
if (!opts.timeout) {
opts.timeout = 100; // TODO take this as minBackoff or initialBackoff or something
}
if (!opts.attempts) {
opts.attempts = 1;
}
if (!opts.maxBackoff) {
opts.maxBackoff = MAX_BACKOFF;
}
let { timeout, attempts, maxAttempts } = opts;
timeout = timeout;
attempts = attempts;
const { min = 1000, max = 10000, maxAttempts, attempts = 1 } = opts;

let cancelled = false;

Expand Down Expand Up @@ -58,12 +51,13 @@ const tryWithBackoff = (fn: any, opts: Options = {}): CancelablePromise => {
const nextOpts = {
maxAttempts,
attempts: attempts + 1,
timeout: Math.min(opts.maxBackoff!, timeout * 1.2),
min: Math.min(max, min * BACKOFF_MULTIPLIER),
max: max,
isCancelled: opts.isCancelled,
};

//console.log('trying again in ', nextOpts.min);
tryWithBackoff(fn, nextOpts).then(resolve).catch(reject);
}, timeout);
}, min);
}
});

Expand Down

0 comments on commit de20432

Please sign in to comment.