Skip to content

Commit

Permalink
feat: Multiprocess
Browse files Browse the repository at this point in the history
  • Loading branch information
ukstv committed May 9, 2024
1 parent 2433189 commit 02ae74f
Show file tree
Hide file tree
Showing 2 changed files with 128 additions and 13 deletions.
106 changes: 106 additions & 0 deletions src/ancillary/multiprocess.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
import EventEmitter from 'node:events'
import cluster from 'node:cluster'
import { cpus } from 'node:os'

export type MultiprocessOptions = {
/**
* If `true`, a single worker failure should not kill us all. A failed worker will get respawned.
*/
keepAlive: boolean
autostart: boolean
workers: number | undefined
main: MultiprocessWork
}

const DEFAULT_OPTIONS: Partial<MultiprocessOptions> = {
keepAlive: true,
autostart: true,
}

export type MultiprocessWork = () => void | Promise<void>

export class Multiprocess extends EventEmitter {
private keepAlive: boolean
private readonly work: MultiprocessWork

constructor(work: MultiprocessWork, options: Partial<MultiprocessOptions>) {
super()
const effectiveOptions = { ...DEFAULT_OPTIONS, ...options }
if (!work || typeof work !== 'function') {
throw new Error('You need to provide a worker function.')
}

this.keepAlive = effectiveOptions.keepAlive ?? true
this.work = work.bind(this)
this.fork = this.fork.bind(this)
this.stop = this.stop.bind(this)

if (cluster.isPrimary) {
cluster.setupPrimary({
silent: false,
})
}

if (effectiveOptions.autostart) {
if (cluster.isWorker) {
this.work()
} else {
this.start(effectiveOptions)
}
}
}

start(options: Partial<MultiprocessOptions>) {
if (options.workers === 0) {
this.work()
return
}
let processes = options.workers || cpus().length // TODO workers = -1 or undef means no workers
process.on('SIGINT', this.stop).on('SIGTERM', this.stop)
cluster.on('online', (wrk) => {
this.emit('worker', wrk.process.pid)
})
cluster.on('exit', (wrk) => {
this.emit('exit', wrk.process.pid)
return this.fork()
})

while (processes) {
processes -= 1
cluster.fork()
}

if (options.main) {
options.main()
}
}

stop() {
if (cluster.isPrimary) {
this.keepAlive = false
for (const worker of Object.values(cluster.workers || {})) {
if (worker) {
worker.process.kill()
worker.kill()
}
}
this.emit('offline')
}
}

fork(): void {
if (this.keepAlive) {
cluster.fork()
}
}
}

/**
* Leverage node:cluster to spawn multiple identical child processes.
*
* @param work - execute inside a worker.
* @param options
*/
export function multiprocess(work: MultiprocessWork, options: Partial<MultiprocessOptions> = {}) {
return new Multiprocess(work, options)
}
35 changes: 22 additions & 13 deletions src/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,23 +2,23 @@ import bodyParser from 'body-parser'
import { Server } from '@overnightjs/core'
import { auth } from './auth/index.js'
import { expressLoggers, logger, expressErrorLogger } from './logger/index.js'


import * as http from 'http'
import { Config } from 'node-config-ts'
import { multiprocess, type Multiprocess } from './ancillary/multiprocess'

const DEFAULT_SERVER_PORT = 8081

export class CeramicAnchorServer extends Server {
private _server?: http.Server
private _server?: Multiprocess

constructor(controllers: any[], config: Config) {
super(true)

this.app.set('trust proxy', true)
this.app.use(bodyParser.raw({inflate: true, type: 'application/vnd.ipld.car', limit: '1mb'}))
this.app.use(bodyParser.raw({ inflate: true, type: 'application/vnd.ipld.car', limit: '1mb' }))
this.app.use(bodyParser.json({ type: 'application/json' }))
this.app.use(bodyParser.urlencoded({ extended: true, type: 'application/x-www-form-urlencoded' }))
this.app.use(
bodyParser.urlencoded({ extended: true, type: 'application/x-www-form-urlencoded' })
)
this.app.use(expressLoggers)
if (config.requireAuth == true) {
this.app.use(auth)
Expand All @@ -34,17 +34,26 @@ export class CeramicAnchorServer extends Server {
* @param port - Server listening port
*/
start(port: number = DEFAULT_SERVER_PORT): Promise<void> {
const workers = process.env['JEST_WORKER_ID'] ? 1 : undefined
return new Promise<void>((resolve, reject) => {
this._server = this.app
.listen(port, () => {
logger.imp(`Server ready: Listening on port ${port}`)
resolve()
})
.on('error', (err) => reject(err))
this._server = multiprocess(
() => {
this.app
.listen(port, () => {
logger.imp(`Server ready: Listening on port ${port}`)
resolve()
})
.on('error', (err) => reject(err))
},
{
keepAlive: false,
workers: workers,
}
)
})
}

stop(): void {
this._server?.close()
this._server?.stop()
}
}

0 comments on commit 02ae74f

Please sign in to comment.