-
-
Notifications
You must be signed in to change notification settings - Fork 0
/
barrier.ts
62 lines (60 loc) · 1.85 KB
/
barrier.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
/**
* A synchronization primitive that allows multiple tasks to wait until all of
* them have reached a certain point of execution before continuing.
*
* A `Barrier` is initialized with a size `n`. Once created, `n` tasks can call
* the `wait` method on the `Barrier`. The `wait` method blocks until `n` tasks
* have called it. Once all `n` tasks have called `wait`, all tasks will
* unblock and continue executing.
*
* ```ts
* import { Barrier } from "@core/asyncutil/barrier";
*
* const barrier = new Barrier(3);
*
* async function worker(id: number) {
* console.log(`worker ${id} is waiting`);
* await barrier.wait();
* console.log(`worker ${id} is done`);
* }
*
* worker(1);
* worker(2);
* worker(3);
* ```
*/
export class Barrier {
#waiter: PromiseWithResolvers<void> = Promise.withResolvers();
#value: number;
/**
* Creates a new `Barrier` that blocks until `size` threads have called `wait`.
*
* @param size The number of threads that must reach the barrier before it unblocks.
* @throws {RangeError} if the size is not a positive safe integer.
*/
constructor(size: number) {
if (size <= 0 || !Number.isSafeInteger(size)) {
throw new RangeError(
`size must be a positive safe integer, got ${size}`,
);
}
this.#value = size;
}
/**
* Wait for all threads to reach the barrier.
* Blocks until all threads reach the barrier.
*/
wait({ signal }: { signal?: AbortSignal } = {}): Promise<void> {
if (signal?.aborted) {
return Promise.reject(signal.reason);
}
const { promise, resolve, reject } = this.#waiter;
const abort = () => reject(signal!.reason);
signal?.addEventListener("abort", abort, { once: true });
this.#value -= 1;
if (this.#value === 0) {
resolve();
}
return promise.finally(() => signal?.removeEventListener("abort", abort));
}
}