Skip to content

Commit

Permalink
Merge pull request #605 from OpenFn/private-adaptor-logs
Browse files Browse the repository at this point in the history
Private adaptor logs
  • Loading branch information
josephjclark authored Feb 15, 2024
2 parents b7f01fd + 854a586 commit b62c76e
Show file tree
Hide file tree
Showing 15 changed files with 253 additions and 100 deletions.
5 changes: 5 additions & 0 deletions .changeset/angry-plants-call.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@openfn/engine-multi': patch
---

Don't log adaptor logs to stdout
5 changes: 5 additions & 0 deletions .changeset/swift-panthers-divide.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@openfn/logger': patch
---

Support proxy() on the mock logger

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 3 additions & 1 deletion integration-tests/worker/dummy-repo/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
"private": true,
"version": "1.0.0",
"dependencies": {
"@openfn/stateful-test_1.0.0": "@npm:@openfn/[email protected]"
"@openfn/language-common_latest": "npm:@openfn/language-common@^1.12.0",
"@openfn/stateful-test_1.0.0": "@npm:@openfn/[email protected]",
"@openfn/test-adaptor_1.0.0": "@npm:@openfn/[email protected]"
}
}
2 changes: 1 addition & 1 deletion integration-tests/worker/src/init.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import crypto from 'node:crypto';
import createLightningServer, { toBase64 } from '@openfn/lightning-mock';
import createEngine from '@openfn/engine-multi';
import createWorkerServer from '@openfn/ws-worker';
import createLogger, { createMockLogger } from '@openfn/logger';
import { createMockLogger } from '@openfn/logger';

export const randomPort = () => Math.round(2000 + Math.random() * 1000);

Expand Down
1 change: 0 additions & 1 deletion integration-tests/worker/test/autoinstall.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ let worker;
const run = async (attempt) => {
return new Promise<any>(async (done, reject) => {
lightning.on('run:complete', (evt) => {
console.log('>', evt);
if (attempt.id === evt.runId) {
done(lightning.getResult(attempt.id));
}
Expand Down
2 changes: 1 addition & 1 deletion integration-tests/worker/test/benchmark.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ test.serial.skip('run 100 attempts', async (t) => {
}

lightning.on('step:complete', (evt) => {
// May want to disable this but it's nice feedback
// May want to disable this but it's nice feedback
t.log('Completed ', evt.runId);

if (evt.payload.reason !== 'success') {
Expand Down
205 changes: 121 additions & 84 deletions integration-tests/worker/test/integration.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,15 @@ test.after(async () => {
await worker.destroy();
});

test('should run a simple job with no compilation or adaptor', (t) => {
const createDummyWorker = () => {
const engineArgs = {
repoDir: path.resolve('./dummy-repo'),
maxWorkers: 1,
};
return initWorker(lightningPort, engineArgs);
};

test.serial('should run a simple job with no compilation or adaptor', (t) => {
return new Promise(async (done) => {
lightning.once('run:complete', (evt) => {
// This will fetch the final dataclip from the attempt
Expand All @@ -62,7 +70,7 @@ test('should run a simple job with no compilation or adaptor', (t) => {
});
});

test('run a job with autoinstall of common', (t) => {
test.serial('run a job with autoinstall of common', (t) => {
return new Promise(async (done) => {
let autoinstallEvent;

Expand Down Expand Up @@ -106,7 +114,7 @@ test('run a job with autoinstall of common', (t) => {
});

// this depends on prior test!
test('run a job which does NOT autoinstall common', (t) => {
test.serial('run a job which does NOT autoinstall common', (t) => {
return new Promise(async (done) => {
lightning.once('run:complete', () => {
try {
Expand Down Expand Up @@ -143,40 +151,7 @@ test('run a job which does NOT autoinstall common', (t) => {
});
});

test("Don't send job logs to stdout", (t) => {
return new Promise(async (done) => {
const attempt = {
id: crypto.randomUUID(),
jobs: [
{
adaptor: '@openfn/language-common@latest',
body: 'fn((s) => { console.log("@@@"); return s })',
},
],
};

lightning.once('run:complete', () => {
const jsonLogs = engineLogger._history;

// The engine logger shouldn't print out any job logs
const jobLog = jsonLogs.find((l) => l.name === 'JOB');
t.falsy(jobLog);
const jobLog2 = jsonLogs.find((l) => l.message[0] === '@@@');
t.falsy(jobLog2);

// But it SHOULD log engine stuff
const runtimeLog = jsonLogs.find(
(l) => l.name === 'R/T' && l.message[0].match(/completed step/i)
);
t.truthy(runtimeLog);
done();
});

lightning.enqueueRun(attempt);
});
});

test('run a job with initial state (with data)', (t) => {
test.serial('run a job with initial state (with data)', (t) => {
return new Promise(async (done) => {
const attempt = {
id: crypto.randomUUID(),
Expand All @@ -193,7 +168,8 @@ test('run a job with initial state (with data)', (t) => {

lightning.addDataclip('s1', initialState);

lightning.once('run:complete', () => {
lightning.once('run:complete', (evt) => {
t.log(evt.payload);
const result = lightning.getResult(attempt.id);
t.deepEqual(result, {
...initialState,
Expand All @@ -210,7 +186,7 @@ test('run a job with initial state (with data)', (t) => {
});
});

test('run a job with initial state (no top level keys)', (t) => {
test.serial('run a job with initial state (no top level keys)', (t) => {
return new Promise(async (done) => {
const attempt = {
id: crypto.randomUUID(),
Expand Down Expand Up @@ -257,7 +233,6 @@ test.skip('run a job with credentials', (t) => {
const app = new Koa();

app.use(async (ctx, next) => {
console.log('GET!');
// TODO check basic credential
ctx.body = '{ message: "ok" }';
ctx.response.headers['Content-Type'] = 'application/json';
Expand Down Expand Up @@ -313,7 +288,7 @@ test.skip('run a job with credentials', (t) => {
});
});

test('run a job with bad credentials', (t) => {
test.serial('run a job with bad credentials', (t) => {
return new Promise<void>(async (done) => {
const attempt = {
id: crypto.randomUUID(),
Expand Down Expand Up @@ -342,7 +317,7 @@ test('run a job with bad credentials', (t) => {
});
});

test('blacklist a non-openfn adaptor', (t) => {
test.serial('blacklist a non-openfn adaptor', (t) => {
return new Promise(async (done) => {
const attempt = {
id: crypto.randomUUID(),
Expand Down Expand Up @@ -385,21 +360,19 @@ test.skip('a timeout error should still call step-complete', (t) => {
});

lightning.once('step:complete', (event) => {
console.log(event);
t.is(event.payload.reason, 'kill');
t.is(event.payload.error_type, 'TimeoutError');
});

lightning.once('run:complete', () => {
console.log('DONE!');
done();
});

lightning.enqueueRun(attempt);
});
});

test('an OOM error should still call step-complete', (t) => {
test.serial('an OOM error should still call step-complete', (t) => {
return new Promise(async (done) => {
const attempt = {
id: crypto.randomUUID(),
Expand Down Expand Up @@ -430,7 +403,7 @@ test('an OOM error should still call step-complete', (t) => {
});
});

// test('run a job with complex behaviours (initial state, branching)', (t) => {
// test.serial('run a job with complex behaviours (initial state, branching)', (t) => {
// const attempt = {
// id: 'a1',
// initialState: 's1
Expand Down Expand Up @@ -464,61 +437,125 @@ test('an OOM error should still call step-complete', (t) => {
// });
// });
// });

// TODO this test is a bit different now
// I think it's worth keeping
test('stateful adaptor should create a new client for each attempt', (t) => {
test.serial("Don't send job logs to stdout", (t) => {
return new Promise(async (done) => {
// We want to create our own special worker here
await worker.destroy();

const attempt1 = {
const attempt = {
id: crypto.randomUUID(),
jobs: [
{
adaptor: '@openfn/[email protected]',
// manual import shouldn't be needed but its not important enough to fight over
body: `import { fn, threadId, clientId } from '@openfn/stateful-test';
fn(() => {
return { threadId, clientId }
})`,
adaptor: '@openfn/language-common@latest',
body: 'fn((s) => { console.log("@@@"); return s })',
},
],
};
const attempt2 = {
...attempt1,
id: crypto.randomUUID(),
};
let results = {};

lightning.on('run:complete', (evt) => {
const id = evt.runId;
results[id] = lightning.getResult(id);
lightning.once('run:complete', () => {
const jsonLogs = engineLogger._history;
// The engine logger shouldn't print out any job logs
const jobLog = jsonLogs.find((l) => l.name === 'JOB');
t.falsy(jobLog);
const jobLog2 = jsonLogs.find((l) => l.message[0] === '@@@');
t.falsy(jobLog2);

if (id === attempt2.id) {
const one = results[attempt1.id];
const two = results[attempt2.id];
// But it SHOULD log engine stuff
const runtimeLog = jsonLogs.find(
(l) => l.name === 'engine' && l.message[0].match(/complete workflow/i)
);
t.truthy(runtimeLog);
done();
});

// The two attempts should run in different threads
t.not(one.threadId, two.threadId);
t.not(one.clientId, two.clientId);
lightning.enqueueRun(attempt);
});
});

done();
}
});
test.serial("Don't send adaptor logs to stdout", (t) => {
return new Promise(async (done) => {
// We have to create a new worker with a different repo for this one
await worker.destroy();
({ worker, engineLogger } = await createDummyWorker());

const engineArgs = {
repoDir: path.resolve('./dummy-repo'),
maxWorkers: 1,
const message = 've have been expecting you meester bond';
const attempt = {
id: crypto.randomUUID(),
jobs: [
{
adaptor: '@openfn/[email protected]',
body: `import { log } from '@openfn/test-adaptor'; log("${message}")`,
},
],
};
await initWorker(lightningPort, engineArgs);

lightning.enqueueRun(attempt1);
lightning.enqueueRun(attempt2);
lightning.once('run:complete', () => {
const jsonLogs = engineLogger._history;
// The engine logger shouldn't print out any adaptor logs
const jobLog = jsonLogs.find((l) => l.name === 'ADA');
t.falsy(jobLog);
const jobLog2 = jsonLogs.find((l) => l.message[0] === message);
t.falsy(jobLog2);

// But it SHOULD log engine stuff
const runtimeLog = jsonLogs.find(
(l) => l.name === 'engine' && l.message[0].match(/complete workflow/i)
);
t.truthy(runtimeLog);
done();
});

lightning.enqueueRun(attempt);
});
});

test('worker should exit if it has an invalid key', (t) => {
test.serial(
'stateful adaptor should create a new client for each attempt',
(t) => {
return new Promise(async (done) => {
// We want to create our own special worker here
await worker.destroy();
({ worker, engineLogger } = await createDummyWorker());

const attempt1 = {
id: crypto.randomUUID(),
jobs: [
{
adaptor: '@openfn/[email protected]',
// manual import shouldn't be needed but its not important enough to fight over
body: `import { fn, threadId, clientId } from '@openfn/stateful-test';
fn(() => {
return { threadId, clientId }
})`,
},
],
};
const attempt2 = {
...attempt1,
id: crypto.randomUUID(),
};
let results = {};

lightning.on('run:complete', (evt) => {
const id = evt.runId;
results[id] = lightning.getResult(id);

if (id === attempt2.id) {
const one = results[attempt1.id];
const two = results[attempt2.id];

// The two attempts should run in different threads
t.not(one.threadId, two.threadId);
t.not(one.clientId, two.clientId);

done();
}
});

lightning.enqueueRun(attempt1);
lightning.enqueueRun(attempt2);
});
}
);

test.serial('worker should exit if it has an invalid key', (t) => {
return new Promise(async (done) => {
if (!worker.destroyed) {
await worker.destroy();
Expand Down
Loading

0 comments on commit b62c76e

Please sign in to comment.