-
Notifications
You must be signed in to change notification settings - Fork 6
/
Copy pathasyncQueue.ts
55 lines (49 loc) · 1.28 KB
/
asyncQueue.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
export type Action<T> = () => Promise<T>;
// based on https://medium.com/@karenmarkosyan/how-to-manage-promises-into-dynamic-queue-with-vanilla-javascript-9d0d1f8d4df5
export class AsyncQueue<T, E> {
inProgress = 0;
concurrency: number;
queue: {
action: Action<T>;
resolve: (t: T) => void;
reject: (err: E) => void;
}[] = [];
constructor(concurrency: number) {
this.concurrency = concurrency;
}
/// Schedule an action for start later. Immediately returns a Promise<T> but actual
/// work of the original action->promise starts later
schedule(t: Action<T>): Promise<T> {
return new Promise<T>((resolve, reject) => {
this.queue.push({
action: t,
resolve,
reject,
});
this.startQueuedItem();
});
}
/// Start an action from the front of the queue.
private startQueuedItem(): void {
if (this.inProgress >= this.concurrency) {
return;
}
const item = this.queue.shift();
if (item === undefined) {
// is empty
return;
}
this.inProgress += 1;
item.action()
.then((val: T) => {
item.resolve(val);
})
.catch((err) => {
item.reject(err);
})
.finally(() => {
this.inProgress -= 1;
this.startQueuedItem();
});
}
}