Skip to content

Commit

Permalink
Better worker logs (#784)
Browse files Browse the repository at this point in the history
* worker: added simple logging

* log run times

* changeset

* format

* log capacity

* updates

* format
  • Loading branch information
josephjclark authored Sep 25, 2024
1 parent 5db5862 commit f581c6b
Show file tree
Hide file tree
Showing 4 changed files with 18 additions and 4 deletions.
5 changes: 5 additions & 0 deletions .changeset/plenty-cobras-join.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@openfn/ws-worker': patch
---

log duration of runs and server capacity
6 changes: 4 additions & 2 deletions packages/ws-worker/src/api/claim.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,16 +35,18 @@ const claim = (

const activeWorkers = Object.keys(app.workflows).length;
if (activeWorkers >= maxWorkers) {
logger.debug('skipping claim attempt: server at capacity');
logger.debug(
`skipping claim attempt: server at capacity (${activeWorkers}/${maxWorkers})`
);
return reject(new Error('Server at capacity'));
}

if (!app.queueChannel) {
logger.debug('skipping claim attempt: websocket unavailable');
return reject(new Error('No websocket available'));
}
logger.debug(`requesting run (capacity ${activeWorkers}/${maxWorkers})`);

logger.debug('requesting run...');
app.queueChannel
.push<ClaimPayload>(CLAIM, { demand: 1 })
.receive('ok', ({ runs }: ClaimReply) => {
Expand Down
6 changes: 5 additions & 1 deletion packages/ws-worker/src/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,7 @@ function createServer(engine: RuntimeEngine, options: ServerOptions = {}) {
app.execute = async ({ id, token }: ClaimRun) => {
if (app.socket) {
try {
const start = Date.now();
app.workflows[id] = true;

const {
Expand All @@ -197,7 +198,10 @@ function createServer(engine: RuntimeEngine, options: ServerOptions = {}) {

// Callback to be triggered when the work is done (including errors)
const onFinish = () => {
logger.debug(`workflow ${id} complete: releasing worker`);
const duration = (Date.now() - start) / 1000;
logger.debug(
`workflow ${id} complete in ${duration}s: releasing worker`
);
delete app.workflows[id];
runChannel.leave();

Expand Down
5 changes: 4 additions & 1 deletion packages/ws-worker/src/start.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ const args = cli(process.argv);

const logger = createLogger('SRV', { level: args.log });

logger.info('Starting worker server...');

if (args.lightning === 'mock') {
args.lightning = 'ws://localhost:8888/worker';
if (!args.secret) {
Expand All @@ -25,7 +27,7 @@ const [minBackoff, maxBackoff] = args.backoff
.map((n: string) => parseInt(n, 10) * 1000);

function engineReady(engine: any) {
logger.debug('Creating worker server...');
logger.debug('Creating worker instance');

const workerOptions: ServerOptions = {
port: args.port,
Expand Down Expand Up @@ -61,6 +63,7 @@ function engineReady(engine: any) {
logger.debug('Worker options:', humanOptions);

createWorker(engine, workerOptions);
logger.success('Worker started OK');
}

if (args.mock) {
Expand Down

0 comments on commit f581c6b

Please sign in to comment.