Skip to content

Commit

Permalink
Merge pull request #749 from OpenFn/worker-logging
Browse files Browse the repository at this point in the history
Worker: Log improvements and WORKER_MAX_SOCKET_TIMEOUT_SECONDS
  • Loading branch information
taylordowns2000 authored Aug 23, 2024
2 parents afde5ce + 0ab72ff commit 6274e96
Show file tree
Hide file tree
Showing 15 changed files with 165 additions and 44 deletions.
7 changes: 7 additions & 0 deletions integration-tests/worker/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,12 @@
# @openfn/integration-tests-worker

## 1.0.54

### Patch Changes

- Updated dependencies [a08fb47]
- @openfn/ws-worker@1.5.1

## 1.0.53

### Patch Changes
Expand Down
2 changes: 1 addition & 1 deletion integration-tests/worker/package.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"name": "@openfn/integration-tests-worker",
"private": true,
"version": "1.0.53",
"version": "1.0.54",
"description": "Lightning WOrker integration tests",
"author": "Open Function Group <[email protected]>",
"license": "ISC",
Expand Down
4 changes: 3 additions & 1 deletion packages/lightning-mock/src/api-sockets.ts
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,8 @@ const createSocketAPI = (
path: string,
httpServer: Server,
logger?: Logger,
logLevel?: LogLevel
logLevel?: LogLevel,
socketDelay = 1
) => {
// set up a websocket server to listen to connections
const server = new WebSocketServer({
Expand All @@ -99,6 +100,7 @@ const createSocketAPI = (
server,
state,
logger: logger && createLogger('PHX', { level: logLevel }),
socketDelay: socketDelay,
});

wss.registerEvents('worker:queue', {
Expand Down
5 changes: 4 additions & 1 deletion packages/lightning-mock/src/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ export type LightningOptions = {

// if passed, a JWT will be included in all claim responses
runPrivateKey?: string;

socketDelay?: number; // add a delay to all web socket replies
};

export type RunId = string;
Expand Down Expand Up @@ -107,7 +109,8 @@ const createLightningServer = (options: LightningOptions = {}) => {
'/worker', // TODO I should option drive this
server,
options.logger,
options.logLevel
options.logLevel,
options.socketDelay
);

app.use(createDevAPI(app as any, state, logger, api));
Expand Down
13 changes: 7 additions & 6 deletions packages/lightning-mock/src/socket-server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ type CreateServerOptions = {
state: ServerState;
logger?: Logger;
onMessage?: (evt: PhoenixEvent) => void;
socketDelay?: number;
};

type MockSocketServer = typeof WebSocketServer & {
Expand All @@ -87,6 +88,7 @@ function createServer({
state,
logger,
onMessage = () => {},
socketDelay = 1,
}: CreateServerOptions) {
const channels: Record<Topic, Set<EventHandler>> = {
// create a stub listener for pheonix to prevent errors
Expand Down Expand Up @@ -171,8 +173,10 @@ function createServer({
topic,
payload,
});
// @ts-ignore
ws.send(evt);
setTimeout(() => {
// @ts-ignore
ws.send(evt);
}, socketDelay);
};

ws.sendJSON = async ({ event, ref, topic, payload }: PhoenixEvent) => {
Expand All @@ -190,9 +194,6 @@ function createServer({
ws.on('message', async function (data: string) {
// decode the data
const evt = (await decode(data)) as PhoenixEvent;
// if (evt.event !== 'claim') {
// console.log(evt);
// }
onMessage(evt);

if (evt.topic) {
Expand All @@ -212,7 +213,7 @@ function createServer({
fn(ws, { event, topic, payload, ref, join_ref });
});
} else {
// This behaviour is just a convenience for unit tesdting
// This behaviour is just a convenience for unit testing
ws.reply({
ref,
join_ref,
Expand Down
36 changes: 36 additions & 0 deletions packages/lightning-mock/test/socket-server.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -155,3 +155,39 @@ test.serial('onMessage', (t) => {
});
});
});

test.serial('defer message', (t) => {
return new Promise(async (done) => {
// Bit annoying but we need to rebuild the server to
// get a delay on it
server.close();

server = createSocketServer({
// @ts-ignore
state: {
events: new EventEmitter(),
},
onMessage: (evt) => {
messages.push(evt);
},
socketDelay: 500,
});

socket = new Socket('ws://localhost:8080', {
transport: WebSocket,
params: { token: 'x.y.z' },
});

socket.connect();

await wait(500);

const channel = socket.channel('x', {});
const start = Date.now();
channel.join().receive('ok', async () => {
const duration = Date.now() - start;
t.assert(duration >= 500);
done();
});
});
});
7 changes: 7 additions & 0 deletions packages/ws-worker/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,12 @@
# ws-worker

## 1.5.1

### Patch Changes

- a08fb47: Update CLI docs
Add WORKER_MAX_SOCKET_TIMEOUT_SECONDS

## 1.5.0

### Minor Changes
Expand Down
2 changes: 1 addition & 1 deletion packages/ws-worker/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@openfn/ws-worker",
"version": "1.5.0",
"version": "1.5.1",
"description": "A Websocket Worker to connect Lightning to a Runtime Engine",
"main": "dist/index.js",
"type": "module",
Expand Down
11 changes: 7 additions & 4 deletions packages/ws-worker/src/api/claim.ts
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,10 @@ const claim = (
app.queueChannel
.push<ClaimPayload>(CLAIM, { demand: 1 })
.receive('ok', ({ runs }: ClaimReply) => {
logger.debug(`pulled ${runs.length} runs`);
logger.debug(
`claimed ${runs.length} runs: `,
runs.map((r) => r.id).join(',')
);
// TODO what if we get here after we've been cancelled?
// the events have already been claimed...

Expand Down Expand Up @@ -80,11 +83,11 @@ const claim = (
})
// TODO need implementations for both of these really
// What do we do if we fail to join the worker channel?
.receive('error', () => {
logger.debug('pull err');
.receive('error', (e) => {
logger.error('Error on claim', e);
})
.receive('timeout', () => {
logger.debug('pull timeout');
logger.error('TIMEOUT on claim. Runs may be lost.');
reject(new Error('timeout'));
});
});
Expand Down
2 changes: 1 addition & 1 deletion packages/ws-worker/src/api/destroy.ts
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ const waitForRuns = (app: ServerApp, logger: Logger) =>
log();
app.events.on(INTERNAL_RUN_COMPLETE, onRunComplete);
} else {
logger.debug('No active rns detected');
logger.debug('No active runs detected, closing immediately');
resolve();
}
});
Expand Down
2 changes: 2 additions & 0 deletions packages/ws-worker/src/channels/worker-queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ const connectToWorkerQueue = (
endpoint: string,
serverId: string,
secret: string,
timeout: number = 10,
logger: Logger,
SocketConstructor = PhxSocket
) => {
Expand All @@ -31,6 +32,7 @@ const connectToWorkerQueue = (
const socket = new SocketConstructor(endpoint, {
params,
transport: WebSocket,
timeout: timeout * 1000,
});

let didOpen = false;
Expand Down
19 changes: 13 additions & 6 deletions packages/ws-worker/src/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ export type ServerOptions = {
max?: number;
};

socketTimeoutSeconds?: number;
payloadLimitMb?: number; // max memory limit for socket payload (ie, step:complete, log)
};

Expand All @@ -59,7 +60,7 @@ type SocketAndChannel = {
channel: Channel;
};

const DEFAULT_PORT = 1234;
const DEFAULT_PORT = 2222;
const MIN_BACKOFF = 1000;
const MAX_BACKOFF = 1000 * 30;

Expand Down Expand Up @@ -93,13 +94,13 @@ function connect(app: ServerApp, logger: Logger, options: ServerOptions = {}) {
options.maxWorkflows
);
} else {
// @ts-ignore
const port = app.server?.address().port;
logger.break();
logger.warn('Workloop not starting');
logger.warn('Noloop active: workloop has not started');
logger.info('This server will not auto-pull work from lightning.');
logger.info('You can manually claim by posting to /claim, eg:');
logger.info(
` curl -X POST http://locahost:${options.port || DEFAULT_PORT}/claim`
);
logger.info(` curl -X POST http://localhost:${port}/claim`);
logger.break();
}
};
Expand Down Expand Up @@ -130,7 +131,13 @@ function connect(app: ServerApp, logger: Logger, options: ServerOptions = {}) {
logger.debug(e);
};

connectToWorkerQueue(options.lightning!, app.id, options.secret!, logger)
connectToWorkerQueue(
options.lightning!,
app.id,
options.secret!,
options.socketTimeoutSeconds,
logger
)
.on('connect', onConnect)
.on('disconnect', onDisconnect)
.on('error', onError);
Expand Down
26 changes: 20 additions & 6 deletions packages/ws-worker/src/util/cli.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,22 +2,27 @@ import yargs from 'yargs';
import { LogLevel } from '@openfn/logger';
import { hideBin } from 'yargs/helpers';

const DEFAULT_PORT = 2222;
const DEFAULT_WORKER_CAPACITY = 5;
const DEFAULT_SOCKET_TIMEOUT_SECONDS = 10;

type Args = {
_: string[];
port?: number;
lightning?: string;
repoDir?: string;
secret?: string;
loop?: boolean;
log: LogLevel;
log?: LogLevel;
lightningPublicKey?: string;
mock: boolean;
mock?: boolean;
backoff: string;
capacity?: number;
runMemory?: number;
payloadMemory?: number;
statePropsToRemove?: string[];
maxRunDurationSeconds: number;
socketTimeoutSeconds?: number;
};

type ArgTypes = string | string[] | number | undefined;
Expand Down Expand Up @@ -56,13 +61,14 @@ export default function parseArgs(argv: string[]): Args {
WORKER_REPO_DIR,
WORKER_SECRET,
WORKER_STATE_PROPS_TO_REMOVE,
WORKER_SOCKET_TIMEOUT_SECONDS,
} = process.env;

const parser = yargs(hideBin(argv))
.command('server', 'Start a ws-worker server')
.option('port', {
alias: 'p',
description: 'Port to run the server on. Env: WORKER_PORT',
description: `Port to run the server on. Default ${DEFAULT_PORT}. Env: WORKER_PORT`,
type: 'number',
})
.option('lightning', {
Expand All @@ -80,6 +86,9 @@ export default function parseArgs(argv: string[]): Args {
description:
'Worker secret. (comes from WORKER_SECRET by default). Env: WORKER_SECRET',
})
.option('socket-timeout', {
description: `Timeout for websockets to Lighting, in seconds. Defaults to 10.`,
})
.option('lightning-public-key', {
description:
'Base64-encoded public key. Used to verify run tokens. Env: WORKER_LIGHTNING_PUBLIC_KEY',
Expand All @@ -103,7 +112,7 @@ export default function parseArgs(argv: string[]): Args {
'Claim backoff rules: min/max (in seconds). Env: WORKER_BACKOFF',
})
.option('capacity', {
description: 'max concurrent workers. Env: WORKER_CAPACITY',
description: `max concurrent workers. Default ${DEFAULT_WORKER_CAPACITY}. Env: WORKER_CAPACITY`,
type: 'number',
})
.option('state-props-to-remove', {
Expand Down Expand Up @@ -132,7 +141,7 @@ export default function parseArgs(argv: string[]): Args {

return {
...args,
port: setArg(args.port, WORKER_PORT, 2222),
port: setArg(args.port, WORKER_PORT, DEFAULT_PORT),
lightning: setArg(
args.lightning,
WORKER_LIGHTNING_SERVICE_URL,
Expand All @@ -146,7 +155,7 @@ export default function parseArgs(argv: string[]): Args {
),
log: setArg(args.log, WORKER_LOG_LEVEL as LogLevel, 'debug'),
backoff: setArg(args.backoff, WORKER_BACKOFF, '1/10'),
capacity: setArg(args.capacity, WORKER_CAPACITY, 5),
capacity: setArg(args.capacity, WORKER_CAPACITY, DEFAULT_WORKER_CAPACITY),
statePropsToRemove: setArg(
args.statePropsToRemove,
WORKER_STATE_PROPS_TO_REMOVE,
Expand All @@ -159,5 +168,10 @@ export default function parseArgs(argv: string[]): Args {
WORKER_MAX_RUN_DURATION_SECONDS,
300
),
socketTimeoutSeconds: setArg(
args.socketTimeoutSeconds,
WORKER_SOCKET_TIMEOUT_SECONDS,
DEFAULT_SOCKET_TIMEOUT_SECONDS
),
} as Args;
}
Loading

0 comments on commit 6274e96

Please sign in to comment.