diff --git a/src/ancillary/multiprocess.ts b/src/ancillary/multiprocess.ts new file mode 100644 index 000000000..edd8df620 --- /dev/null +++ b/src/ancillary/multiprocess.ts @@ -0,0 +1,106 @@ +import EventEmitter from 'node:events' +import cluster from 'node:cluster' +import { cpus } from 'node:os' + +export type MultiprocessOptions = { + /** + * If `true`, a single worker failure should not kill us all. A failed worker will get respawned. + */ + keepAlive: boolean + autostart: boolean + workers: number | undefined + main: MultiprocessWork +} + +const DEFAULT_OPTIONS: Partial = { + keepAlive: true, + autostart: true, +} + +export type MultiprocessWork = () => void | Promise + +export class Multiprocess extends EventEmitter { + private keepAlive: boolean + private readonly work: MultiprocessWork + + constructor(work: MultiprocessWork, options: Partial) { + super() + const effectiveOptions = { ...DEFAULT_OPTIONS, ...options } + if (!work || typeof work !== 'function') { + throw new Error('You need to provide a worker function.') + } + + this.keepAlive = effectiveOptions.keepAlive ?? true + this.work = work.bind(this) + this.fork = this.fork.bind(this) + this.stop = this.stop.bind(this) + + if (cluster.isPrimary) { + cluster.setupPrimary({ + silent: false, + }) + } + + if (effectiveOptions.autostart) { + if (cluster.isWorker) { + this.work() + } else { + this.start(effectiveOptions) + } + } + } + + start(options: Partial) { + if (options.workers === 0) { + this.work() + return + } + let processes = options.workers || cpus().length // TODO workers = -1 or undef means no workers + process.on('SIGINT', this.stop).on('SIGTERM', this.stop) + cluster.on('online', (wrk) => { + this.emit('worker', wrk.process.pid) + }) + cluster.on('exit', (wrk) => { + this.emit('exit', wrk.process.pid) + return this.fork() + }) + + while (processes) { + processes -= 1 + cluster.fork() + } + + if (options.main) { + options.main() + } + } + + stop() { + if (cluster.isPrimary) { + this.keepAlive = false + for (const worker of Object.values(cluster.workers || {})) { + if (worker) { + worker.process.kill() + worker.kill() + } + } + this.emit('offline') + } + } + + fork(): void { + if (this.keepAlive) { + cluster.fork() + } + } +} + +/** + * Leverage node:cluster to spawn multiple identical child processes. + * + * @param work - execute inside a worker. + * @param options + */ +export function multiprocess(work: MultiprocessWork, options: Partial = {}) { + return new Multiprocess(work, options) +} diff --git a/src/server.ts b/src/server.ts index 9fa1304b9..0e475a796 100644 --- a/src/server.ts +++ b/src/server.ts @@ -2,23 +2,23 @@ import bodyParser from 'body-parser' import { Server } from '@overnightjs/core' import { auth } from './auth/index.js' import { expressLoggers, logger, expressErrorLogger } from './logger/index.js' - - -import * as http from 'http' import { Config } from 'node-config-ts' +import { multiprocess, type Multiprocess } from './ancillary/multiprocess' const DEFAULT_SERVER_PORT = 8081 export class CeramicAnchorServer extends Server { - private _server?: http.Server + private _server?: Multiprocess constructor(controllers: any[], config: Config) { super(true) this.app.set('trust proxy', true) - this.app.use(bodyParser.raw({inflate: true, type: 'application/vnd.ipld.car', limit: '1mb'})) + this.app.use(bodyParser.raw({ inflate: true, type: 'application/vnd.ipld.car', limit: '1mb' })) this.app.use(bodyParser.json({ type: 'application/json' })) - this.app.use(bodyParser.urlencoded({ extended: true, type: 'application/x-www-form-urlencoded' })) + this.app.use( + bodyParser.urlencoded({ extended: true, type: 'application/x-www-form-urlencoded' }) + ) this.app.use(expressLoggers) if (config.requireAuth == true) { this.app.use(auth) @@ -34,17 +34,26 @@ export class CeramicAnchorServer extends Server { * @param port - Server listening port */ start(port: number = DEFAULT_SERVER_PORT): Promise { + const workers = process.env['JEST_WORKER_ID'] ? 1 : undefined return new Promise((resolve, reject) => { - this._server = this.app - .listen(port, () => { - logger.imp(`Server ready: Listening on port ${port}`) - resolve() - }) - .on('error', (err) => reject(err)) + this._server = multiprocess( + () => { + this.app + .listen(port, () => { + logger.imp(`Server ready: Listening on port ${port}`) + resolve() + }) + .on('error', (err) => reject(err)) + }, + { + keepAlive: false, + workers: workers, + } + ) }) } stop(): void { - this._server?.close() + this._server?.stop() } }