Skip to content

Commit

Permalink
socket
Browse files Browse the repository at this point in the history
  • Loading branch information
goloveychuk committed Sep 12, 2022
1 parent 9914c68 commit e6730eb
Show file tree
Hide file tree
Showing 9 changed files with 234 additions and 60 deletions.
2 changes: 1 addition & 1 deletion runner/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "fastest-jest-runner",
"version": "29.0.3",
"version": "29.0.4",
"repository": {
"type": "git",
"url": "https://github.com/goloveychuk/fastest-jest-runner.git",
Expand Down
8 changes: 8 additions & 0 deletions runner/src/fifo-maker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,4 +28,12 @@ export class FifoMaker {
this.allFifos.set(id, fifo)
return fifo
}
makeFifo2(desc: string): Fifo {
const id = this.curId++;
const p = path.join(this.baseDir, `pipe_${desc}_${id}`);
// addon.make_fifo(p);
const fifo: Fifo = {path: p, id};
this.allFifos.set(id, fifo)
return fifo
}
}
83 changes: 44 additions & 39 deletions runner/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@ import type { SnapshotBuilderModule, SnapshotConfig } from './snapshots/types';
import { replaceRootDirInPath } from 'jest-config';
import { createScriptTransformer } from '@jest/transform';
import { Config } from '@jest/types';
import { createTimings } from './log';
import { createTimings } from './log';
import { createServer } from './socket';

function setCleanupBeforeExit(clean: () => void) {
let called = false;
Expand Down Expand Up @@ -108,8 +109,6 @@ class TestRunner extends EmittingTestRunner {
return this.runSnapshotsTests(tests, watcher, options);
}



async runDebug(tests: Array<Test>, watcher: TestWatcher) {
if (tests.length !== 1) {
throw new Error('Debug is supported for 1 test only');
Expand All @@ -134,10 +133,7 @@ class TestRunner extends EmittingTestRunner {
resolver,
});


const snapshotName = await getSnapshotName(
test
);
const snapshotName = await getSnapshotName(test);

await this.#eventEmitter.emit('test-file-start', [test]);

Expand All @@ -156,9 +152,9 @@ class TestRunner extends EmittingTestRunner {
}

async getCommons(projectConfig: Config.ProjectConfig) {
const fastestRunnerConfig = normalizeRunnerConfig(projectConfig.globals[
'fastest-jest-runner'
] as any)
const fastestRunnerConfig = normalizeRunnerConfig(
projectConfig.globals['fastest-jest-runner'] as any,
);

const snapshotPath = replaceRootDirInPath(
projectConfig.rootDir,
Expand All @@ -179,10 +175,14 @@ class TestRunner extends EmittingTestRunner {

const validateSnapshotName = (name: string): string => {
if (snapshotBuilder.snapshots[name]) {
return name
return name;
}
throw new Error(`Snapshot "${name}" not found, available snapshots: ${Object.keys(snapshotBuilder.snapshots).join(', ')}`);
}
throw new Error(
`Snapshot "${name}" not found, available snapshots: ${Object.keys(
snapshotBuilder.snapshots,
).join(', ')}`,
);
};

const getSnapshotName = async (test: Test) => {
const testSource = await fs.promises.readFile(test.path, 'utf-8');
Expand All @@ -200,11 +200,11 @@ class TestRunner extends EmittingTestRunner {
}
const snapshotName = await snapshotBuilder.getSnapshot({
testPath: test.path,
docblockPragmas
docblockPragmas,
});

return validateSnapshotName(snapshotName)
}
return validateSnapshotName(snapshotName);
};

return {
getSnapshotName,
Expand All @@ -219,15 +219,14 @@ class TestRunner extends EmittingTestRunner {
options: TestRunnerOptions,
) {
const workerPath = require.resolve('./worker');

const rootDir = await fs.promises.mkdtemp(
path.join(os.tmpdir(), 'fastest-jest-runner-'),
);
//todo setCleanupBeforeExit


const fifoMaker = new FifoMaker(rootDir);
const workerFifo = fifoMaker.makeFifo('worker');

const workerFifo = fifoMaker.makeFifo2('worker');

const testsLeft = new Set<string>();
const onProcExit: OnProcExit = (data) => {
Expand Down Expand Up @@ -285,7 +284,9 @@ class TestRunner extends EmittingTestRunner {
{
execArgv: [
...currentArgv,
...(fastestRunnerConfig.maxOldSpace ? [`--max-old-space-size=${fastestRunnerConfig.maxOldSpace}`] : []),
...(fastestRunnerConfig.maxOldSpace
? [`--max-old-space-size=${fastestRunnerConfig.maxOldSpace}`]
: []),
'--expose-gc',
'--v8-pool-size=0',
'--single-threaded',
Expand All @@ -305,45 +306,49 @@ class TestRunner extends EmittingTestRunner {
console.log(data.toString('utf-8'));
});
child.stderr!.on('data', (data) => {
const chunk = data.toString('utf-8')
console.log(chunk)
const chunk = data.toString('utf-8');
console.log(chunk);
});

child.send(workerConfig);


const concurrency = options.serial ? 1 : this._globalConfig.maxWorkers;
// let concurrency = 25;
console.log({ concurrency });

const workerWriter = await createAsyncFifoWriter<WorkerInput.Input>(
workerFifo,
);
const workerWriter = await createServer<WorkerInput.Input>(workerFifo.path);

const testsById = new Map<number, Test>();
const cleanups: Array<() => Promise<void>> = [];

const initSnapshot = async (name: string) => {
const snapFifo = fifoMaker.makeFifo('snapshot');
const snapFifo = fifoMaker.makeFifo2('snapshot');

const writer = await createAsyncFifoWriter<WorkerInput.Input>(snapFifo);
const writer = await createServer<WorkerInput.Input>(snapFifo.path);
// const writer = await createAsyncFifoWriter<WorkerInput.Input>(snapFifo);
console.log(`spinning!!!!!!!!!!!!!!!!! ${name}`);

await workerWriter.write({
type: 'spinSnapshot',
name,
snapFifo,
});
cleanups.push(async () => {
await writer.write({ type: 'stop' });
await writer.stop()
});
// setInterval(() => {
// writer.write({ type: 'ping' });
// }, 1000)
return { writer };
};

const initOrGetSnapshot = withCache(initSnapshot);

const runTest = async (test: Test): Promise<TestResult> => {
const __timing = createTimings()
const __timing = createTimings();

__timing.time('runTest', 'start')
__timing.time('runTest', 'start');
testsLeft.add(test.path);
// return new Promise<TestResult>((resolve, reject) => {
const snapshotName = await getSnapshotName(test);
Expand All @@ -354,27 +359,27 @@ class TestRunner extends EmittingTestRunner {
testsById.set(resultFifo.id, test);

console.log(`sent msg ${test.path}`);
__timing.time('writeToFifo', 'start')

__timing.time('writeToFifo', 'start');
await snapshotObj.writer.write({
type: 'test',
testPath: test.path,
resultFifo,
});
__timing.time('writeToFifo', 'end')
__timing.time('writeToFifo', 'end');
// child.stdin!.uncork()
// child.send([id, test.path]);

__timing.time('readTestResult', 'start')
__timing.time('readTestResult', 'start');
const resp = JSON.parse(
await fs.promises.readFile(resultFifo.path, 'utf8'),
) as WorkerResponse.Response;
__timing.time('readTestResult', 'end')
__timing.time('readTestResult', 'end');
await fs.promises.unlink(resultFifo.path);
testsLeft.delete(test.path);
// process.kill(resp.pid, 'SIGKILL'); // killing zombies, better to wait for SIGCHLD
//
__timing.time('runTest', 'end')
__timing.time('runTest', 'end');

if (resp.testResult.type === 'error') {
throw resp.testResult.data;
Expand Down Expand Up @@ -413,7 +418,7 @@ class TestRunner extends EmittingTestRunner {

const cleanup = async () => {
await Promise.all(cleanups.map((cl) => cl()));
await workerWriter.write({ type: 'stop' });
await workerWriter.stop()
await fs.promises.rm(rootDir, { recursive: true });
console.log('before proc loop');
const timer = setTimeout(() => {
Expand Down Expand Up @@ -471,4 +476,4 @@ class CancelRun extends Error {

export default TestRunner;

export * from './snapshots/public';
export * from './snapshots/public';
136 changes: 136 additions & 0 deletions runner/src/socket.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
import * as net from 'net';

type Payload<T> =
| {
counter: number;
kind: 'payload';
data: T;
}
| {
sent: number;
kind: 'ping';
}
| {
kind: 'stop';
};

const DELIMITER = '\n';

const sleep = (ms: number) => new Promise((resolve) => setTimeout(resolve, ms));

export async function createServer<T>(socketPath: string) {
// connec
let counter = 0;
let closing = false;
let _resolve: (socket: net.Socket) => void;
const connected = new Promise<net.Socket>((resolve) => {
_resolve = resolve;
});
const server = net
.createServer((stream) => {
_resolve(stream);
// stream.on('data', (_msg) => {
// const msg = _msg.toString();
// });
})
.listen(socketPath);

const _write = async (payload: Payload<unknown>, end?: boolean) => {
const socket = await connected; //todo await prev req?
const serialized = JSON.stringify(payload) + DELIMITER;

await new Promise<void>((resolve, reject) =>
socket.write(serialized, (err) => {
if (err) {
reject(err);
} else {
resolve();
}
}),
);
if (end) {
await new Promise<void>((resolve) => socket.end(resolve));
}
};

const stop = async () => {
closing = true;
await _write({ kind: 'stop' });
server.close();
};
const write = async (data: T) => {
const payload: Payload<T> = {
data,
counter: counter++,
kind: 'payload',
};
await _write(payload);
};

const pingPong = async () => {
while (true) {
if (closing) {
return;
}
await _write({
sent: Date.now(),
kind: 'ping',
});
await sleep(1000);
}
};

pingPong();

return { write, stop };
}

async function* splitByNewline(gen: {
[Symbol.asyncIterator](): AsyncGenerator<any, any, void>;
}) {
let soFar: string | undefined = undefined;

for await (const data of gen) {
const parts: string[] = ((soFar ?? '') + data).split(DELIMITER);
soFar = parts.pop();

for (const part of parts) {
yield part;
}
}
}

export async function connectToServer<T>(socketPath: string) {
let counter = 0;

const client = net.createConnection(socketPath, () => {
console.log('connected');
});

async function* gen() {
for await (const d of splitByNewline(client as any)) {
const msg = JSON.parse(d) as Payload<T>;

if (msg.kind === 'ping') {
console.log('ping');
continue;
} else if (msg.kind === 'stop') {
return;
} else {
if (msg.counter !== counter++) {
throw new Error(`invalid counter ${msg.counter} expected ${counter}`);
}
client.pause();
yield msg.data;
client.resume();
}
}
}

return {
stop: () => {
client.destroy();
},
gen,
};
}
7 changes: 2 additions & 5 deletions runner/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -184,17 +184,14 @@ export declare namespace WorkerInput {
resultFifo: Fifo;
};

export type Stop = {
type: 'stop';
};


export type SpinSnapshot = {
type: 'spinSnapshot';
name: string;
snapFifo: Fifo;
};

export type Input = RunTest | Stop | SpinSnapshot;
export type Input = RunTest | SpinSnapshot
}

export type RetryData = {
Expand Down
Loading

0 comments on commit e6730eb

Please sign in to comment.