diff --git a/packages/integration-sdk-runtime/src/storage/FileSystemGraphObjectStore/FileSystemGraphObjectStore.ts b/packages/integration-sdk-runtime/src/storage/FileSystemGraphObjectStore/FileSystemGraphObjectStore.ts index af3650124..17fe14a22 100644 --- a/packages/integration-sdk-runtime/src/storage/FileSystemGraphObjectStore/FileSystemGraphObjectStore.ts +++ b/packages/integration-sdk-runtime/src/storage/FileSystemGraphObjectStore/FileSystemGraphObjectStore.ts @@ -243,8 +243,8 @@ export class FileSystemGraphObjectStore implements GraphObjectStore { async flushEntitiesToDisk( onEntitiesFlushed?: (entities: Entity[]) => Promise, ) { - await this.lockOperation(() => - pMap( + const entities = await this.lockOperation(() => { + return pMap( this.localGraphObjectStore.collectEntitiesByStep(), async ([stepId, entities]) => { const indexable = entities.filter((e) => { @@ -287,20 +287,21 @@ export class FileSystemGraphObjectStore implements GraphObjectStore { } this.localGraphObjectStore.flushEntities(entities); - - if (onEntitiesFlushed) { - await onEntitiesFlushed(entities); - } + return entities; }, - ), - ); + ); + }); + + if (onEntitiesFlushed) { + await Promise.all(entities.map(onEntitiesFlushed)); + } } async flushRelationshipsToDisk( onRelationshipsFlushed?: (relationships: Relationship[]) => Promise, ) { - await this.lockOperation(() => - pMap( + const relationships = await this.lockOperation(() => { + return pMap( this.localGraphObjectStore.collectRelationshipsByStep(), async ([stepId, relationships]) => { const indexable = relationships.filter((r) => { @@ -331,13 +332,14 @@ export class FileSystemGraphObjectStore implements GraphObjectStore { } this.localGraphObjectStore.flushRelationships(relationships); - - if (onRelationshipsFlushed) { - await onRelationshipsFlushed(relationships); - } + return relationships; }, - ), - ); + ); + }); + + if (onRelationshipsFlushed) { + await Promise.all(relationships.map(onRelationshipsFlushed)); + } } getIndexMetadataForGraphObjectType({ @@ -379,7 +381,7 @@ export class FileSystemGraphObjectStore implements GraphObjectStore { private async lockOperation(operation: () => Promise) { await this.semaphore.acquire(); try { - await operation(); + return await operation(); } finally { this.semaphore.release(); }