Skip to content

Commit

Permalink
Merge pull request #14 from SimonErm/feature/promiseThrottle
Browse files Browse the repository at this point in the history
Feature/promise throttle
  • Loading branch information
SimonErm authored Jun 26, 2020
2 parents 207980f + 1a39517 commit db54aca
Showing 1 changed file with 61 additions and 23 deletions.
84 changes: 61 additions & 23 deletions src/Queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ export interface QueueOptions {
* Interval in which the queue checks for new jobs to execute
*/
updateInterval?: number;
concurrency?: number;
}
/**
* ## Usage
Expand All @@ -39,6 +40,26 @@ export interface QueueOptions {
* ```
*/
export class Queue {
static get instance() {
if (this.queueInstance) {
return this.queueInstance;
} else {
this.queueInstance = new Queue();
return this.queueInstance;
}
}
/**
* @returns true if the Queue is running and false otherwise
*/
get isRunning() {
return this.isActive;
}
/**
* @returns the workers map (readonly)
*/
get registeredWorkers() {
return this.workers;
}
private static queueInstance: Queue | null;

private jobStore: JobStore;
Expand All @@ -49,9 +70,12 @@ export class Queue {
private executedJobs: Array<Job<any>>;
private activeJobCount: number;

private concurrency: number;
private updateInterval: number;
private onQueueFinish: (executedJobs: Array<Job<any>>) => void;

private queuedJobExecuter: any[] = [];

private constructor() {
this.jobStore = NativeModules.JobQueue;
this.workers = {};
Expand All @@ -63,27 +87,7 @@ export class Queue {

this.updateInterval = 10;
this.onQueueFinish = (executedJobs: Array<Job<any>>) => {};
}

static get instance() {
if (this.queueInstance) {
return this.queueInstance;
} else {
this.queueInstance = new Queue();
return this.queueInstance;
}
}
/**
* @returns true if the Queue is running and false otherwise
*/
get isRunning() {
return this.isActive;
}
/**
* @returns the workers map (readonly)
*/
get registeredWorkers() {
return this.workers;
this.concurrency = -1;
}
/**
* @returns a promise that resolves all jobs that are queued and not active
Expand All @@ -93,9 +97,14 @@ export class Queue {
}

configure(options: QueueOptions) {
const { onQueueFinish = (executedJobs: Array<Job<any>>) => {}, updateInterval = 10 } = options;
const {
onQueueFinish = (executedJobs: Array<Job<any>>) => {},
updateInterval = 10,
concurrency = -1
} = options;
this.onQueueFinish = onQueueFinish;
this.updateInterval = updateInterval;
this.concurrency = concurrency;
}
/**
* adds a [[Worker]] to the queue which can execute Jobs
Expand Down Expand Up @@ -185,7 +194,7 @@ export class Queue {
const nextJob = await this.jobStore.getNextJob();
if (this.isJobNotEmpty(nextJob)) {
const nextJobs = await this.getJobsForWorker(nextJob.workerName);
const processingJobs = nextJobs.map(this.excuteJob);
const processingJobs = nextJobs.map(async (job) => this.limitExecution(this.excuteJob, job));
await Promise.all(processingJobs);
} else if (!this.isExecuting()) {
this.finishQueue();
Expand All @@ -198,6 +207,35 @@ export class Queue {
return Object.keys(rawJob).length > 0;
}

private limitExecution = async (executer: (rawJob: RawJob) => Promise<void>, rawJob: RawJob) => {
return new Promise(async (resolve) => await this.enqueueJobExecuter(executer, resolve, rawJob));
};

private enqueueJobExecuter = async (
executer: (rawJob: RawJob) => Promise<void>,
resolve: () => void,
rawJob: RawJob
) => {
if (this.isExecuterAvailable()) {
await this.runExecuter(executer, resolve, rawJob);
} else {
this.queuedJobExecuter.push(this.runExecuter.bind(null, executer, resolve, rawJob));
}
};

private runExecuter = async (executer: (rawJob: RawJob) => Promise<void>, resolve: () => void, rawJob: RawJob) => {
try {
await executer(rawJob);
} finally {
resolve();
if (this.queuedJobExecuter.length > 0 && this.isExecuterAvailable()) {
await this.queuedJobExecuter.shift()();
}
}
};
private isExecuterAvailable() {
return this.concurrency <= 0 || this.activeJobCount < this.concurrency;
}
private isExecuting() {
return this.activeJobCount > 0;
}
Expand Down

0 comments on commit db54aca

Please sign in to comment.