Skip to content

Commit

Permalink
Merge pull request #13 from wallind/issues-12-MasterCluster-initializ…
Browse files Browse the repository at this point in the history
…e-method

12: `MasterCluster#initialize` controlled by `autoInitialized: Boolean`
  • Loading branch information
Nathan Schwarz authored Dec 17, 2021
2 parents 8c27031 + 0e8433a commit 1bd304b
Show file tree
Hide file tree
Showing 2 changed files with 57 additions and 25 deletions.
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ Both in-memory and persistent tasks are available at the same time, and can be u

## prototype

`constructor(taskMap: Object, { port: Integer, maxAvailableWorkers: Integer, refreshRate: Integer, inMemoryOnly: Boolean, messageBroker: function, logs: String, keepAlive: String | Integer })`
`constructor(taskMap: Object, { port: Integer, maxAvailableWorkers: Integer, refreshRate: Integer, inMemoryOnly: Boolean, messageBroker: function, logs: String, keepAlive: String | Integer, autoInitialize: Boolean })`
- `taskMap`: a map of functions associated to a `taskType`
- `maxAvailableWorkers`: maximum number of child process (cores), default is set to system maximum
- `port`: server port for child process servers, default set to `3008`
Expand All @@ -76,6 +76,7 @@ Both in-memory and persistent tasks are available at the same time, and can be u
- some `Integer` value will have the system not shutdown workers until the number passed in milliseconds has passed since last a job was available to be picked up by a worker

**NOTE:** default behavior when `keepAlive` is not set is to only keep alive workers when there are jobs available to be picked up by them.
- `autoInitialize`: an optional parameter that controls whether the system will automatically initialize the cluster's polling for jobs when the `MasterCluster` is created. Default is set to `true`.

`Cluster.isMaster()`: `true` if this process is the master<br/>

Expand Down
79 changes: 55 additions & 24 deletions src/Cluster/MasterCluster.js
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,19 @@ class MasterCluster extends StaticCluster {
* - inMemoryOnly?: Boolean
* - messageBroker?: Function
* - keepAlive?: String | number
* - autoInitialize?: Boolean
*/
constructor(
taskMap,
{ port = 3008, maxAvailableWorkers = MAX_CPUS, refreshRate = 1000, inMemoryOnly = false, messageBroker = null, keepAlive = null } = {}
{
port = 3008,
maxAvailableWorkers = MAX_CPUS,
refreshRate = 1000,
inMemoryOnly = false,
messageBroker = null,
keepAlive = null,
autoInitialize = true
} = {}
) {
super()
Meteor.startup(() => {
Expand All @@ -44,10 +53,15 @@ class MasterCluster extends StaticCluster {
warnLogger(`keepAlive should be either be "always" or some Integer greater than 0 specifying a time in milliseconds to remain on;`
+ ` ignoring keepAlive configuration and falling back to default behavior of only spinning up and keeping workers when the jobs are available`)
}
if (typeof autoInitialize !== `boolean`) {
warnLogger(`autoInitialize should be a boolean(was passed as: ${typeof autoInitialize}),`
+ ` ignoring autoInitialize configuration and falling back to default behavior of autoInitialize: true`)
}
this._port = port
this._workers = []
this.inMemoryOnly = inMemoryOnly
this.messageBroker = messageBroker
this.refreshRate = refreshRate

// find worker by process id
this.getWorkerIndex = (id) => this._workers.findIndex(w => w.id === id)
Expand All @@ -57,18 +71,22 @@ class MasterCluster extends StaticCluster {
TaskQueue.update({ onGoing: true }, { $set: { onGoing: false } }, { multi: true })

// initializing interval
this.interval = null
// initializing pool refreshRate
this.setRefreshRate(refreshRate)
this.setIntervalHandle = null

if (autoInitialize) {
this.initialize()
}
})
}
/*
@params (wantedWorkers: Integer)
add workers if tasks > current workers
remove workers if tasks < current workers
@returns non idle workers
*/
_getAvailableWorkers(wantedWorkers) {

/**
* add workers if tasks > current workers
* remove workers if tasks < current workers
*
* @param { Integer } wantedWorkers
* @returns non idle workers
*/
_getAvailableWorkers (wantedWorkers) {
const workerToCreate = wantedWorkers - this._workers.length
if (workerToCreate > 0) {
for (let i = 0; i < workerToCreate; i++) {
Expand All @@ -82,15 +100,17 @@ class MasterCluster extends StaticCluster {
this._workers = this._workers.filter(w => !w.removed)
return this._workers.filter(w => w.isIdle && w.isReady)
}
/*
@params (availableWorkers: Worker)
dispatch jobs to idle workers
*/
async _dispatchJobs(availableWorkers) {
for (let i = 0; i < availableWorkers.length; i++) {

/**
* Dispatch jobs to idle workers
*
* @param { Worker } availableWorkers
*/
async _dispatchJobs (availableWorkers) {
for (const worker of availableWorkers) {
const job = await TaskQueue.pull(this.inMemoryOnly)
if (job !== undefined) {
availableWorkers[i].startJob(job)
worker.startJob(job)
}
}
}
Expand All @@ -104,7 +124,7 @@ class MasterCluster extends StaticCluster {
* - gets available workers
* - dispatch the jobs to the workers
*/
async _run() {
async _run () {
const currentMs = Date.now()

const jobsCount = TaskQueue.count(this.inMemoryOnly)
Expand Down Expand Up @@ -132,11 +152,22 @@ class MasterCluster extends StaticCluster {
await this._dispatchJobs(availableWorkers)
}

/*
@params (delay: Integer)
set the refresh rate at which Cluster._run is called
*/
setRefreshRate(delay) {
/**
* Starts the Cluster._run interval call at the rate determined by this.refreshRate
*/
initialize () {
this.setRefreshRate(this.refreshRate)
}

/**
* Set the refresh rate at which Cluster._run is called and restart the interval call at the new
* rate
*
* @param { Integer } delay
*/
setRefreshRate (delay) {
this.refreshRate = delay

if (this.interval != null) {
Meteor.clearInterval(this.interval)
}
Expand Down

0 comments on commit 1bd304b

Please sign in to comment.