Skip to content

Commit

Permalink
Merge pull request #10 from wallind/issues-9-Worker-persistence-options
Browse files Browse the repository at this point in the history
9: worker persistence configurability
  • Loading branch information
Nathan Schwarz committed Nov 9, 2021
2 parents 1af0e3b + 7959142 commit 5058c32
Show file tree
Hide file tree
Showing 3 changed files with 68 additions and 22 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
.vscode
15 changes: 10 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -55,22 +55,27 @@ Both in-memory and persistent tasks are available at the same time, and can be u
- verifies if workers are available, or create them
- dispatches jobs to the workers
- removes the task from the queue once the job is done
- closes the workers when no jobs are available
- closes the workers when no jobs are available(behavior can be overriden with `keepAlive`)

on the Worker it :
- starts the job
- when the job's done, tell the Master that it's available and to remove previous task.

## prototype

`constructor(taskMap: Object, { port: Integer, maxAvailableWorkers: Integer, refreshRate: Integer, inMemoryOnly: Boolean, messageBroker: function, logs: String })`
`constructor(taskMap: Object, { port: Integer, maxAvailableWorkers: Integer, refreshRate: Integer, inMemoryOnly: Boolean, messageBroker: function, logs: String, keepAlive: String | Integer })`
- `taskMap`: a map of functions associated to a `taskType`
- `maxAvailableWorkers`: maximum number of child process (cores), default is set to system maximum
- `port`: server port for child process servers, default set to `3008`
- `refreshRate`: Worker pool refresh rate (in ms), default set to `1000`
- `inMemoryOnly`: force the cluster to only pull jobs from the in-memory task queue.
- `messageBroker` is optional, default set to null (see IPC section)<br>
- `logs`: is one of `['all', 'error']`, default sets to `all` : if set to `'error'`, will only show the errors and warning logs.
- `keepAlive`: an optional parameter that can be set to either:
- `'always'` to have the system start up the `maxAvailableWorkers` number of workers immediately and keep them all alive always
- some `Integer` value will have the system not shutdown workers until the number passed in milliseconds has passed since last a job was available to be picked up by a worker

**NOTE:** default behavior when `keepAlive` is not set is to only keep alive workers when there are jobs available to be picked up by them.

`Cluster.isMaster()`: `true` if this process is the master<br/>

Expand Down Expand Up @@ -136,9 +141,9 @@ in such case your overall system should be **slowed down** because some of the p
const taskMap = {
'TEST': job => console.log(`testing ${job._id} at position ${job.data.position}`),
'SYNC': (job) => console.log("this is a synchrone task"),
'SYNC': (job) => console.log("this is a synchronous task"),
'ASYNC': (job) => new Promise((resolve, reject) => Meteor.setTimeout(() => {
console.log("this is an asynchrone task")
console.log("this is an asynchronous task")
resolve()
}, job.data.timeout))
}
Expand Down Expand Up @@ -261,7 +266,7 @@ const cluster = new Cluster(taskMap, { messageBroker })

## secure your imports

Because the worker will only work on tasks, you should remove the unnecessary imports to avoid ressources consumption and longer startup time.<br/>
Because the worker will only work on tasks, you should remove the unnecessary imports to avoid resources consumption and longer startup time.<br/>
As a good practice you should put all your Master imports logic in the same file, and import it only on the master.<br/>
What I mean by "Master imports Logic" is :

Expand Down
74 changes: 57 additions & 17 deletions src/Cluster/MasterCluster.js
Original file line number Diff line number Diff line change
@@ -1,19 +1,31 @@
import StaticCluster from './StaticCluster'
import { Meteor } from 'meteor/meteor'
import { Meteor } from 'meteor/meteor'
import TaskQueue from '../TaskQueue'
import WORKER_STATUSES from '../Worker/statuses'
import MasterWorker from '../Worker/MasterWorker'
import { warnLogger } from '../logs'

const process = require('process')
const MAX_CPUS = StaticCluster.maxWorkers()

class MasterCluster extends StaticCluster {
/*
@params (taskMap: Object, masterProps: { port: Integer, maxAvailableWorkers: Integer, refreshRate: Integer, inMemoryOnly: Boolean })
initialize Cluster on the master
*/
constructor(taskMap, { port = 3008, maxAvailableWorkers = MAX_CPUS, refreshRate = 1000, inMemoryOnly = false, messageBroker = null } = {}) {

lastJobAvailableMilliseconds = Date.now()

/**
* initialize Cluster on the master
*
* @param { Object } taskMap
* @param { Object } masterProps
* - port?: Integer
* - refreshRate?: Integer
* - inMemoryOnly?: Boolean
* - messageBroker?: Function
* - keepAlive?: String | number
*/
constructor(
taskMap,
{ port = 3008, maxAvailableWorkers = MAX_CPUS, refreshRate = 1000, inMemoryOnly = false, messageBroker = null, keepAlive = null } = {}
) {
super()
Meteor.startup(() => {
if (maxAvailableWorkers > MAX_CPUS) {
Expand All @@ -28,17 +40,21 @@ class MasterCluster extends StaticCluster {
if (this._cpus === MAX_CPUS) {
warnLogger(`you should not use all the cpus, read more https://github.com/nathanschwarz/meteor-cluster/blob/main/README.md#cpus-allocation`)
}
if (keepAlive && !keepAlive === `always` && !(Number.isInteger(keepAlive) && keepAlive > 0)) {
warnLogger(`keepAlive should be either be "always" or some Integer greater than 0 specifying a time in milliseconds to remain on;`
+ ` ignoring keepAlive configuration and falling back to default behavior of only spinning up and keeping workers when the jobs are available`)
}
this._port = port
this._workers = []
this.inMemoryOnly = inMemoryOnly
this.messageBroker = messageBroker

// find worker by process id
this.getWorkerIndex = (id) =>this._workers.findIndex(w => w.id === id)
this.getWorkerIndex = (id) => this._workers.findIndex(w => w.id === id)
this.getWorker = (id) => this._workers[this.getWorkerIndex(id)]

// update all previous undone task, to restart them (if the master server has crashed or was stopped)
TaskQueue.update({ onGoing: true }, { $set: { onGoing: false }}, { multi: true })
TaskQueue.update({ onGoing: true }, { $set: { onGoing: false } }, { multi: true })

// initializing interval
this.interval = null
Expand Down Expand Up @@ -78,16 +94,40 @@ class MasterCluster extends StaticCluster {
}
}
}
/*
called at the interval set by Cluster.setRefreshRate
gets jobs from the list
gets available workers
dispatch the jobs to the workers
*/

/**
* Called at the interval set by Cluster.setRefreshRate
*
* - gets jobs from the list
* - if jobs are available update the lastJobAvailableMilliseconds to current time
* - calculates the desired number of workers
* - gets available workers
* - dispatch the jobs to the workers
*/
async _run() {
const currentMs = Date.now()

const jobsCount = TaskQueue.count(this.inMemoryOnly)
const hasJobs = jobsCount > 0
const wantedWorkers = Math.min(this._cpus, jobsCount)

// if there are jobs that are pending, update the lastJobAvailableMilliseconds to current time
// and keep the wantedWorkers
if (jobsCount > 0) {
this.lastJobAvailableMilliseconds = currentMs
}
// default behavior is to keep the workers alive in line with the number of jobs available
let wantedWorkers = Math.min(this._cpus, jobsCount)
if (this.keepAlive === `always`) {
// always keep the number of workers at the max requested
wantedWorkers = this._cpus
} else if (Number.isInteger(this.keepAlive)) {
// don't start shutting down workers till keepAlive milliseconds has elapsed since a job was available
if (currentMs - this.lastJobAvailableMilliseconds >= this.keepAlive) {
// still with the threshold of keepAlive milliseconds, keep the number of workers at the current worker
// count or the requested jobs count whichever is bigger
wantedWorkers = Math.min(this._cpus, Math.max(jobsCount, this._workers.length))
}
}

const availableWorkers = this._getAvailableWorkers(wantedWorkers)
await this._dispatchJobs(availableWorkers)
}
Expand Down

0 comments on commit 5058c32

Please sign in to comment.