Open
Description
Recently, we encountered an integration where a single step that calls jobState.iterateEntities()
took multiple hours to execute. Since this step is sequentially reading from disk, waiting for network calls, and writing to disk, it appears that we could significantly speed up certain steps by implementing something like below:
export async function iterateEntityTypeIndex<T extends Entity = Entity>({
type,
iteratee,
}: IterateIndexInput<T>) {
const path = buildIndexDirectoryPath({
collectionType: 'entities',
type,
});
+ const queue = new PQueue({ concurrency: 5 });
await walkDirectory({
path,
iteratee: async (input) => {
const object = await readGraphObjectFile<FlushedEntityData>(input);
if (isObjectFlushedEntityData(object)) {
for (const entity of object.entities as T[]) {
- await iteratee(entity);
+ void queue.add(async () => await iteratee(entity));
}
}
},
});
+ await queue.onIdle();
}