Skip to content

Commit

Permalink
fix(error): Fix error passthrough in queued tasks (#145)
Browse files Browse the repository at this point in the history
  • Loading branch information
az0uz authored Dec 9, 2023
1 parent 88d794f commit 217e178
Showing 1 changed file with 16 additions and 12 deletions.
28 changes: 16 additions & 12 deletions src/core/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -339,28 +339,30 @@ export class ZarrArray<StoreGetOptions = any> {
// create promise queue with concurrency control
const queue = new PQueue({ concurrency: concurrencyLimit });

const allTasks = [];

if (progressCallback) {

let progress = 0;
let queueSize = 0;
for (const _ of indexer.iter()) queueSize += 1;
progressCallback({ progress: 0, queueSize: queueSize });
for (const proj of indexer.iter()) {
(async () => {
await queue.add(() => this.chunkGetItem(proj.chunkCoords, proj.chunkSelection, out, proj.outSelection, indexer.dropAxes, storeOptions));
allTasks.push(queue.add(async () => {
await this.chunkGetItem(proj.chunkCoords, proj.chunkSelection, out, proj.outSelection, indexer.dropAxes, storeOptions);
progress += 1;
progressCallback({ progress: progress, queueSize: queueSize });
})();
}));
}

} else {
for (const proj of indexer.iter()) {
queue.add(() => this.chunkGetItem(proj.chunkCoords, proj.chunkSelection, out, proj.outSelection, indexer.dropAxes, storeOptions));
allTasks.push(queue.add(() => this.chunkGetItem(proj.chunkCoords, proj.chunkSelection, out, proj.outSelection, indexer.dropAxes, storeOptions)));
}
}

// guarantees that all work on queue has finished
await queue.onIdle();
// guarantees that all work on queue has finished and throws if any of the tasks errored.
await Promise.all(allTasks);

// Return scalar instead of zero-dimensional array.
if (out.shape.length === 0) {
Expand Down Expand Up @@ -594,6 +596,8 @@ export class ZarrArray<StoreGetOptions = any> {

const queue = new PQueue({ concurrency: concurrencyLimit });

const allTasks = [];

if (progressCallback) {

let queueSize = 0;
Expand All @@ -603,24 +607,24 @@ export class ZarrArray<StoreGetOptions = any> {
progressCallback({ progress: 0, queueSize: queueSize });
for (const proj of indexer.iter()) {
const chunkValue = this.getChunkValue(proj, indexer, value, selectionShape);
(async () => {
await queue.add(() => this.chunkSetItem(proj.chunkCoords, proj.chunkSelection, chunkValue));
allTasks.push(queue.add(async () => {
await this.chunkSetItem(proj.chunkCoords, proj.chunkSelection, chunkValue);
progress += 1;
progressCallback({ progress: progress, queueSize: queueSize });
})();
}));
}

} else {

for (const proj of indexer.iter()) {
const chunkValue = this.getChunkValue(proj, indexer, value, selectionShape);
queue.add(() => this.chunkSetItem(proj.chunkCoords, proj.chunkSelection, chunkValue));
allTasks.push(queue.add(() => this.chunkSetItem(proj.chunkCoords, proj.chunkSelection, chunkValue)));
}

}

// guarantees that all work on queue has finished
await queue.onIdle();
// guarantees that all work on queue has finished and throws if any of the tasks errored.
await Promise.all(allTasks);
}

private async chunkSetItem(chunkCoords: number[], chunkSelection: DimensionSelection[], value: number | NestedArray<TypedArray>) {
Expand Down

0 comments on commit 217e178

Please sign in to comment.