Skip to content

Commit

Permalink
worker: fix package.json resolution
Browse files Browse the repository at this point in the history
  • Loading branch information
josephjclark committed Dec 17, 2024
1 parent 306e049 commit 4df5e77
Show file tree
Hide file tree
Showing 8 changed files with 47 additions and 21 deletions.
6 changes: 3 additions & 3 deletions integration-tests/worker/src/init.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import createLightningServer, { toBase64 } from '@openfn/lightning-mock';
import createEngine from '@openfn/engine-multi';
import createWorkerServer from '@openfn/ws-worker';
import { createMockLogger } from '@openfn/logger';
// import createLogger from '@openfn/logger';
import createLogger from '@openfn/logger';

export const randomPort = () => Math.round(2000 + Math.random() * 1000);

Expand Down Expand Up @@ -39,8 +39,8 @@ export const initWorker = async (
});

const worker = createWorkerServer(engine, {
logger: createMockLogger(),
// logger: createLogger('worker', { level: 'debug' }),
// logger: createMockLogger(),
logger: createLogger('worker', { level: 'debug' }),
port: workerPort,
lightning: `ws://localhost:${lightningPort}/worker`,
secret: crypto.randomUUID(),
Expand Down
2 changes: 1 addition & 1 deletion packages/engine-multi/src/engine.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ const DEFAULT_RUN_TIMEOUT = 1000 * 60 * 10; // ms
const DEFAULT_MEMORY_LIMIT_MB = 500;

// For each workflow, create an API object with its own event emitter
// this is a bt wierd - what if the emitter went on state instead?
// this is a bit weird - what if the emitter went on state instead?
const createWorkflowEvents = (
engine: EngineAPI,
context: ExecutionContext,
Expand Down
11 changes: 9 additions & 2 deletions packages/engine-multi/src/util/load-versions.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,15 @@
import fs from 'fs';

import path from 'node:path';
import { fileURLToPath } from 'node:url';
import { Versions } from '../types';

const pkg = JSON.parse(fs.readFileSync('../../package.json', 'utf-8'));
const pkg = JSON.parse(
fs.readFileSync(
path.join(fileURLToPath(import.meta.url), '../../package.json'),
'utf-8'
)
);

// Load key versions at init time
const versions = {
node: process.version.substring(1),
Expand Down
4 changes: 2 additions & 2 deletions packages/ws-worker/src/channels/worker-queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import { Socket as PhxSocket } from 'phoenix';
import { WebSocket } from 'ws';
import { API_VERSION } from '@openfn/lexicon/lightning';
import generateWorkerToken from '../util/worker-token';
import version from '../util/load-version';
import getVersion from '../util/load-version';

import type { Logger } from '@openfn/logger';
import type { Channel } from '../types';
Expand All @@ -22,7 +22,7 @@ const connectToWorkerQueue = (
const params = {
token,
api_version: API_VERSION,
worker_version: version,
worker_version: await getVersion(),
};

// @ts-ignore ts doesn't like the constructor here at all
Expand Down
4 changes: 2 additions & 2 deletions packages/ws-worker/src/events/run-start.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import { sendEvent, Context, onJobLog } from '../api/execute';
import calculateVersionString from '../util/versions';

import { timeInMicroseconds } from '../util';
import version from '../util/load-version';
import getVersion from '../util/load-version';

export default async function onRunStart(
context: Context,
Expand All @@ -31,7 +31,7 @@ export default async function onRunStart(
};

const versions = {
worker: version,
worker: await getVersion(),
...event.versions,
};

Expand Down
32 changes: 24 additions & 8 deletions packages/ws-worker/src/util/load-version.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,27 @@ import fs from 'fs';
import path from 'node:path';
import { fileURLToPath } from 'node:url';

const pkg = JSON.parse(
fs.readFileSync(
path.join(fileURLToPath(import.meta.url), '../../../package.json'),
'utf-8'
)
);

export default pkg.version;
let version = '';

// find the parenting package.json
// this is non-trivial because the path is different in src and dist builds
export default function getVersion() {
if (!version) {
let nextPath = path.dirname(fileURLToPath(import.meta.url));
while (nextPath) {
const pkgPath = path.resolve(nextPath, 'package.json');
try {
fs.statSync(pkgPath);
nextPath = pkgPath;
break;
} catch (e) {
nextPath = path.dirname(nextPath);
}
}

const pkg = JSON.parse(fs.readFileSync(nextPath, 'utf-8'));
version = pkg.version;
}

return version;
}
5 changes: 3 additions & 2 deletions packages/ws-worker/test/channels/worker-queue.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import { API_VERSION } from '@openfn/lexicon/lightning';

import connectToWorkerQueue from '../../src/channels/worker-queue';
import { MockSocket } from '../../src/mock/sockets';
import loadVersions from '../../src/util/load-version';
import loadVersion from '../../src/util/load-version';

const logger = createMockLogger();

Expand Down Expand Up @@ -64,12 +64,13 @@ test('should connect with an auth token', async (t) => {
});

test('should connect with api and worker versions', async (t) => {
const version = await loadVersion();
return new Promise((done) => {
function createSocket(endpoint: string, options: any) {
const socket = new MockSocket(endpoint, {}, async () => {
const { worker_version, api_version } = options.params;

t.is(worker_version, loadVersions().engine);
t.is(worker_version, version);
t.truthy(worker_version);

t.is(api_version, API_VERSION);
Expand Down
4 changes: 3 additions & 1 deletion packages/ws-worker/test/events/run-start.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import { createRunState } from '../../src/util';
import { RUN_LOG, RUN_START } from '../../src/events';

import { timestamp } from '@openfn/logger';
import version from '../../src/util/load-version';
import getVersion from '../../src/util/load-version';

test('should include a timestamp', async (t) => {
const plan = {
Expand Down Expand Up @@ -61,6 +61,8 @@ test('run:start event should include versions', async (t) => {
versions,
};

const version = await getVersion();

const state = createRunState(plan, input);
state.activeJob = jobId;
state.activeStep = 'b';
Expand Down

0 comments on commit 4df5e77

Please sign in to comment.