Skip to content

Commit

Permalink
refine the parallel implementation
Browse files Browse the repository at this point in the history
- Avoid function allocations
- Stop processing the array upon being aborted
- Reject with `signal.reason` just like `signal.throwIfAborted` does
  • Loading branch information
aleclarson committed Nov 22, 2024
1 parent d483bce commit 3588941
Showing 1 changed file with 27 additions and 12 deletions.
39 changes: 27 additions & 12 deletions src/async/parallel.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,18 @@ import {

type AbortSignal = {
readonly aborted: boolean
readonly reason: any
addEventListener(type: 'abort', listener: () => void): void
removeEventListener(type: 'abort', listener: () => void): void
throwIfAborted(): void
}

type WorkItemResult<K> = {
index: number
result: K
error: any
}

export type ParallelOptions =
| {
limit: number
Expand Down Expand Up @@ -63,23 +66,20 @@ export async function parallel<T, K>(
item,
}))

let signal: AbortSignal | undefined
if (isNumber(options)) {
options = {
limit: options,
}
} else {
signal = options.signal
signal?.throwIfAborted()
}

options.signal?.throwIfAborted()

// Process array items
const processor = async (
resolve: (value: WorkItemResult<K>[]) => void,
reject: (error: any) => void,
) => {
const processor = async (resolve: (value: WorkItemResult<K>[]) => void) => {
const results: WorkItemResult<K>[] = []
const abortListener = () => reject(new Error('This operation was aborted'))
options.signal?.addEventListener('abort', abortListener)
while (true) {
while (!signal?.aborted) {
const next = work.pop()
if (!next) {
break
Expand All @@ -91,12 +91,27 @@ export async function parallel<T, K>(
index: next.index,
})
}
options.signal?.removeEventListener('abort', abortListener)
return resolve(results)
}
const queues = list(1, options.limit).map(() => new Promise(processor))

const queues = Promise.all(
list(1, options.limit).map(() => new Promise(processor)),
)

let signalPromise: Promise<never> | undefined
if (signal) {
signalPromise = new Promise((_, reject) => {
const onAbort = () => reject(signal.reason)
signal.addEventListener('abort', onAbort)
queues.then(() => signal.removeEventListener('abort', onAbort))
})
}

// Wait for all queues to complete
const itemResults = (await Promise.all(queues)) as WorkItemResult<K>[][]
const itemResults = await (signalPromise
? Promise.race([queues, signalPromise])
: queues)

const [errors, results] = fork(
sort(flat(itemResults), r => r.index),
x => !!x.error,
Expand Down

0 comments on commit 3588941

Please sign in to comment.