diff --git a/src/core/index.ts b/src/core/index.ts index a421d14..10835c6 100644 --- a/src/core/index.ts +++ b/src/core/index.ts @@ -339,6 +339,8 @@ export class ZarrArray { // create promise queue with concurrency control const queue = new PQueue({ concurrency: concurrencyLimit }); + const allTasks = []; + if (progressCallback) { let progress = 0; @@ -346,21 +348,21 @@ export class ZarrArray { for (const _ of indexer.iter()) queueSize += 1; progressCallback({ progress: 0, queueSize: queueSize }); for (const proj of indexer.iter()) { - (async () => { - await queue.add(() => this.chunkGetItem(proj.chunkCoords, proj.chunkSelection, out, proj.outSelection, indexer.dropAxes, storeOptions)); + allTasks.push(queue.add(async () => { + await this.chunkGetItem(proj.chunkCoords, proj.chunkSelection, out, proj.outSelection, indexer.dropAxes, storeOptions); progress += 1; progressCallback({ progress: progress, queueSize: queueSize }); - })(); + })); } } else { for (const proj of indexer.iter()) { - queue.add(() => this.chunkGetItem(proj.chunkCoords, proj.chunkSelection, out, proj.outSelection, indexer.dropAxes, storeOptions)); + allTasks.push(queue.add(() => this.chunkGetItem(proj.chunkCoords, proj.chunkSelection, out, proj.outSelection, indexer.dropAxes, storeOptions))); } } - // guarantees that all work on queue has finished - await queue.onIdle(); + // guarantees that all work on queue has finished and throws if any of the tasks errored. + await Promise.all(allTasks); // Return scalar instead of zero-dimensional array. if (out.shape.length === 0) { @@ -594,6 +596,8 @@ export class ZarrArray { const queue = new PQueue({ concurrency: concurrencyLimit }); + const allTasks = []; + if (progressCallback) { let queueSize = 0; @@ -603,24 +607,24 @@ export class ZarrArray { progressCallback({ progress: 0, queueSize: queueSize }); for (const proj of indexer.iter()) { const chunkValue = this.getChunkValue(proj, indexer, value, selectionShape); - (async () => { - await queue.add(() => this.chunkSetItem(proj.chunkCoords, proj.chunkSelection, chunkValue)); + allTasks.push(queue.add(async () => { + await this.chunkSetItem(proj.chunkCoords, proj.chunkSelection, chunkValue); progress += 1; progressCallback({ progress: progress, queueSize: queueSize }); - })(); + })); } } else { for (const proj of indexer.iter()) { const chunkValue = this.getChunkValue(proj, indexer, value, selectionShape); - queue.add(() => this.chunkSetItem(proj.chunkCoords, proj.chunkSelection, chunkValue)); + allTasks.push(queue.add(() => this.chunkSetItem(proj.chunkCoords, proj.chunkSelection, chunkValue))); } } - // guarantees that all work on queue has finished - await queue.onIdle(); + // guarantees that all work on queue has finished and throws if any of the tasks errored. + await Promise.all(allTasks); } private async chunkSetItem(chunkCoords: number[], chunkSelection: DimensionSelection[], value: number | NestedArray) {