diff --git a/packages/integration-sdk-core/src/errors.ts b/packages/integration-sdk-core/src/errors.ts index ab76a0083..0c202f0e9 100644 --- a/packages/integration-sdk-core/src/errors.ts +++ b/packages/integration-sdk-core/src/errors.ts @@ -98,7 +98,16 @@ export class IntegrationLocalConfigFieldMissingError extends IntegrationError { }); } } - +export class UploadError extends IntegrationError { + readonly typesInvolved: string[] | undefined; + constructor(message: string, typesInvolved?: string[]) { + super({ + code: 'UPLOAD_ERROR', + message, + }); + this.typesInvolved = typesInvolved; + } +} export class IntegrationLocalConfigFieldTypeMismatchError extends IntegrationError { constructor(message: string) { super({ diff --git a/packages/integration-sdk-runtime/src/execution/__tests__/dependencyGraph.test.ts b/packages/integration-sdk-runtime/src/execution/__tests__/dependencyGraph.test.ts index 769ae9019..ddbae1bbe 100644 --- a/packages/integration-sdk-runtime/src/execution/__tests__/dependencyGraph.test.ts +++ b/packages/integration-sdk-runtime/src/execution/__tests__/dependencyGraph.test.ts @@ -650,7 +650,7 @@ describe('executeStepDependencyGraph', () => { expect(spyB).toHaveBeenCalledBefore(spyC); }); - test('should mark steps with failed executionHandlers with status FAILURE and dependent steps with status PARTIAL_SUCCESS_DUE_TO_DEPENDENCY_FAILURE when step upload fails', async () => { + test('should throw if upload fails', async () => { const spyA = jest.fn(); const spyB = jest.fn(); const spyC = jest.fn(); @@ -715,18 +715,19 @@ describe('executeStepDependencyGraph', () => { function createFailingUploader( stepId: string, + collector: FlushedGraphObjectData[], ): StepGraphObjectDataUploader { return { stepId, - async enqueue() { + async enqueue(graphObjectData) { + collector.push(graphObjectData); return Promise.resolve(); }, waitUntilUploadsComplete() { - return Promise.reject(new Error('expected upload wait failure')); + return Promise.reject(new Error('Expected error')); }, }; } - const passingUploaderCollector: FlushedGraphObjectData[] = []; /** @@ -737,73 +738,23 @@ describe('executeStepDependencyGraph', () => { * 'b' depends on 'a', * 'c' depends on 'b' */ - const result = await executeSteps( - steps, - stepStartStates, - graphObjectStore, - (stepId) => { - if (stepId === 'b') { - return createFailingUploader(stepId); - } else { - return createPassingUploader(stepId, passingUploaderCollector); + await expect( + executeSteps(steps, stepStartStates, graphObjectStore, (stepId) => { + if (stepId == 'c') { + return createFailingUploader(stepId, passingUploaderCollector); } - }, - ); + return createPassingUploader(stepId, passingUploaderCollector); + }), + ).rejects.toThrow(); const expectedCollected: FlushedGraphObjectData[] = [ { - entities: [eA], - relationships: [], - }, - { - entities: [eC], + entities: [eA, eB, eC], relationships: [], }, ]; - expect(passingUploaderCollector).toEqual(expectedCollected); - expect(result).toEqual([ - { - id: 'a', - name: 'a', - declaredTypes: [], - partialTypes: [], - encounteredTypes: [eA._type], - encounteredTypeCounts: expect.any(Object), - status: StepResultStatus.SUCCESS, - startTime: expect.any(Number), - endTime: expect.any(Number), - duration: expect.any(Number), - }, - { - id: 'b', - name: 'b', - declaredTypes: [], - partialTypes: [], - encounteredTypes: [eB._type], - encounteredTypeCounts: expect.any(Object), - dependsOn: ['a'], - status: StepResultStatus.FAILURE, - startTime: expect.any(Number), - endTime: expect.any(Number), - duration: expect.any(Number), - }, - { - id: 'c', - name: 'c', - declaredTypes: [], - partialTypes: [], - encounteredTypes: [eC._type], - encounteredTypeCounts: expect.any(Object), - dependsOn: ['b'], - status: StepResultStatus.PARTIAL_SUCCESS_DUE_TO_DEPENDENCY_FAILURE, - startTime: expect.any(Number), - endTime: expect.any(Number), - duration: expect.any(Number), - }, - ]); - expect(spyA).toHaveBeenCalledTimes(1); expect(spyB).toHaveBeenCalledTimes(1); expect(spyC).toHaveBeenCalledTimes(1); diff --git a/packages/integration-sdk-runtime/src/execution/dependencyGraph.ts b/packages/integration-sdk-runtime/src/execution/dependencyGraph.ts index 87c6bb7d6..bad7c62f2 100644 --- a/packages/integration-sdk-runtime/src/execution/dependencyGraph.ts +++ b/packages/integration-sdk-runtime/src/execution/dependencyGraph.ts @@ -16,6 +16,7 @@ import { StepResultStatus, StepStartStates, StepExecutionHandlerWrapperFunction, + UploadError, } from '@jupiterone/integration-sdk-core'; import { timeOperation } from '../metrics'; @@ -145,9 +146,17 @@ export function executeStepDependencyGraph< startTime?: number; endTime?: number; duration?: number; + partialTypes?: string[]; }) { - const { stepId, status, typeTracker, startTime, endTime, duration } = - params; + const { + stepId, + status, + typeTracker, + startTime, + endTime, + duration, + partialTypes, + } = params; const existingResult = stepResultsMap.get(stepId); if (existingResult) { stepResultsMap.set(stepId, { @@ -162,6 +171,9 @@ export function executeStepDependencyGraph< startTime, endTime, duration, + partialTypes: Array.from( + new Set(existingResult.partialTypes.concat(partialTypes ?? [])), + ), }); } } @@ -417,9 +429,7 @@ export function executeStepDependencyGraph< status = StepResultStatus.FAILURE; } - - await context.jobState.flush(); - + let possibleAdditionalPartialTypes: string[] | undefined = undefined; if (context.jobState.waitUntilUploadsComplete) { try { // Failing to upload all integration data should not be considered a @@ -429,6 +439,9 @@ export function executeStepDependencyGraph< } catch (err) { context.logger.stepFailure(step, err); status = StepResultStatus.FAILURE; + if (err instanceof UploadError) { + possibleAdditionalPartialTypes = err.typesInvolved; + } } } @@ -439,6 +452,7 @@ export function executeStepDependencyGraph< startTime, endTime: Date.now(), duration: Date.now() - startTime, + partialTypes: possibleAdditionalPartialTypes, }); enqueueLeafSteps(); } @@ -486,11 +500,62 @@ export function executeStepDependencyGraph< return status; } + + async function forceFlushEverything() { + /** Instead of flushing after each step, flush only when we finish all steps OR when we reach the threshold limit + * Because the 'createStepGraphObjectDataUploader' needs a step I'm using the last step as it + */ + let uploader: StepGraphObjectDataUploader | undefined; + const lastStep = Array.from(stepResultsMap.keys()).pop() as string; + if (createStepGraphObjectDataUploader) { + uploader = createStepGraphObjectDataUploader(lastStep); + } + await graphObjectStore.flush( + async (entities) => + entities.length + ? uploader?.enqueue({ + entities, + relationships: [], + }) + : undefined, + async (relationships) => + relationships.length + ? uploader?.enqueue({ + entities: [], + relationships, + }) + : undefined, + ); + try { + await uploader?.waitUntilUploadsComplete(); + } catch (err) { + executionContext.logger.stepFailure( + workingGraph.getNodeData(lastStep), + err, + ); + if (err instanceof UploadError) { + updateStepResultStatus({ + stepId: lastStep, + status: StepResultStatus.FAILURE, + typeTracker, + partialTypes: err.typesInvolved, //We mark as partial all types related to the failed uploads + }); + } else { + updateStepResultStatus({ + stepId: lastStep, + status: StepResultStatus.FAILURE, + typeTracker, + }); + } + } + } + // kick off work for all leaf nodes enqueueLeafSteps(); void promiseQueue .onIdle() + .then(forceFlushEverything) .then(() => resolve([...stepResultsMap.values()])) .catch(reject); }); diff --git a/packages/integration-sdk-runtime/src/execution/uploader.test.ts b/packages/integration-sdk-runtime/src/execution/uploader.test.ts index 9ffcd680c..d7252d5b9 100644 --- a/packages/integration-sdk-runtime/src/execution/uploader.test.ts +++ b/packages/integration-sdk-runtime/src/execution/uploader.test.ts @@ -16,6 +16,7 @@ import { createApiClient, getApiBaseUrl } from '../api'; import { generateSynchronizationJob } from '../synchronization/__tests__/util/generateSynchronizationJob'; import { createMockIntegrationLogger } from '../../test/util/fixtures'; import { getExpectedRequestHeaders } from '../../test/util/request'; +import { UploadError } from '@jupiterone/integration-sdk-core'; function createFlushedGraphObjectData(): FlushedGraphObjectData { return { @@ -135,6 +136,52 @@ describe('#createQueuedStepGraphObjectDataUploader', () => { expect(uploaded).toEqual([flushed[0], flushed[2], flushedAfterFailure]); }); + + test('should throw UploadError with types involved', async () => { + const uploaded: FlushedGraphObjectData[] = []; + const stepId = uuid(); + + let numQueued = 0; + + const uploader = createQueuedStepGraphObjectDataUploader({ + stepId, + uploadConcurrency: 2, + async upload(d) { + numQueued++; + + if (numQueued === 2) { + await sleep(100); + throw new Error('expected upload error'); + } else { + await sleep(200); + uploaded.push(d); + } + }, + }); + + const flushed = await createAndEnqueueUploads(uploader, 3); + + // Ensure that the next enqueue happens _after_ a failure has occurred. + await sleep(300); + const flushedAfterFailure = createFlushedGraphObjectData(); + await uploader.enqueue(flushedAfterFailure); + + try { + await uploader.waitUntilUploadsComplete(); + } catch (error) { + expect(error).toBeInstanceOf(UploadError); + flushed[1].entities.forEach((entity) => + expect((error as UploadError).typesInvolved as string[]).toInclude( + entity._type, + ), + ); + flushed[1].relationships.forEach((relationship) => + expect((error as UploadError).typesInvolved as string[]).toInclude( + relationship._type, + ), + ); + } + }); }); describe('#createPersisterApiStepGraphObjectDataUploader', () => { diff --git a/packages/integration-sdk-runtime/src/execution/uploader.ts b/packages/integration-sdk-runtime/src/execution/uploader.ts index 5f96d47a3..ea0505b0c 100644 --- a/packages/integration-sdk-runtime/src/execution/uploader.ts +++ b/packages/integration-sdk-runtime/src/execution/uploader.ts @@ -1,4 +1,4 @@ -import { IntegrationError } from '@jupiterone/integration-sdk-core'; +import { UploadError } from '@jupiterone/integration-sdk-core'; import PQueue from 'p-queue/dist'; import { FlushedGraphObjectData } from '../storage/types'; import { @@ -37,7 +37,7 @@ export function createQueuedStepGraphObjectDataUploader({ let completed = false; const uploadErrors: Error[] = []; - + const typesInvolvedInFailures = new Set(); return { stepId, async enqueue(graphObjectData) { @@ -76,6 +76,16 @@ export function createQueuedStepGraphObjectDataUploader({ // cases where this could cause an issue (e.g. a relationship getting // uploaded that references an entity that failed to upload). uploadErrors.push(err); + if (graphObjectData) { + graphObjectData.entities.forEach( + (entity) => typesInvolvedInFailures.add(entity._type), + typesInvolvedInFailures, + ); + graphObjectData.relationships.forEach( + (relationship) => typesInvolvedInFailures.add(relationship._type), + typesInvolvedInFailures, + ); + } }); }, @@ -93,15 +103,12 @@ export function createQueuedStepGraphObjectDataUploader({ } if (uploadErrors.length) { - throw new IntegrationError({ - code: 'UPLOAD_ERROR', - message: `Error(s) uploading graph object data (stepId=${stepId}, errorMessages=${uploadErrors.join( + throw new UploadError( + `Error(s) uploading graph object data (stepId=${stepId}, errorMessages=${uploadErrors.join( ',', )})`, - // Just include the first error cause. We should be able to gather - // additional information from the joined error messages. - cause: uploadErrors[0], - }); + Array.from(typesInvolvedInFailures.values()), + ); } }, }; diff --git a/packages/integration-sdk-runtime/src/storage/FileSystemGraphObjectStore/FileSystemGraphObjectStore.ts b/packages/integration-sdk-runtime/src/storage/FileSystemGraphObjectStore/FileSystemGraphObjectStore.ts index 1ab3156c3..d8f81902b 100644 --- a/packages/integration-sdk-runtime/src/storage/FileSystemGraphObjectStore/FileSystemGraphObjectStore.ts +++ b/packages/integration-sdk-runtime/src/storage/FileSystemGraphObjectStore/FileSystemGraphObjectStore.ts @@ -1,5 +1,4 @@ import { Sema } from 'async-sema'; -import pMap from 'p-map'; import { Entity, @@ -237,109 +236,159 @@ export class FileSystemGraphObjectStore implements GraphObjectStore { onRelationshipsFlushed?: (relationships: Relationship[]) => Promise, ) { await Promise.all([ - this.flushEntitiesToDisk(onEntitiesFlushed), - this.flushRelationshipsToDisk(onRelationshipsFlushed), + this.flushEntitiesToDisk(onEntitiesFlushed, true), + this.flushRelationshipsToDisk(onRelationshipsFlushed, true), ]); } - + /** + * Asynchronously flushes entity data to disk. + * + * This function ensures that entity data is saved to disk when necessary. It uses a locking mechanism + * to prevent concurrent modifications and checks if the data size exceeds a certain threshold before flushing. + * + * @param {function} [onEntitiesFlushed] - Optional. A callback function that is invoked after the entities + * have been flushed to disk. It receives an array of entities as + * an argument and returns a Promise. + * @param {Boolean} [force=false] - Optional. A boolean flag indicating whether to force the flushing process + * regardless of the data size threshold. + * + * This process ensures efficient and necessary data uploads, avoiding redundant or unnecessary disk operations. + */ async flushEntitiesToDisk( onEntitiesFlushed?: (entities: Entity[]) => Promise, + force: Boolean = false, ) { - await this.lockOperation(() => - pMap( - this.localGraphObjectStore.collectEntitiesByStep(), - async ([stepId, entities]) => { - const indexable = entities.filter((e) => { - const indexMetadata = this.getIndexMetadataForGraphObjectType({ - stepId, - _type: e._type, - graphObjectCollectionType: 'entities', - }); - - if (typeof indexMetadata === 'undefined') { - return true; - } - - return indexMetadata.enabled === true; + await this.lockOperation(async () => { + // This code rechecks the condition that triggers the flushing process to avoid unnecessary uploads + // During concurrent steps, we might be deleting items from memory while a step is adding new items. This could cause the threshold + // to be triggered again. By rechecking the condition, we ensure that only necessary uploads occur. + if ( + !force && + this.localGraphObjectStore.getTotalEntitySizeInBytes() < + this.graphObjectBufferThresholdInBytes + ) { + return; + } + + const entitiesByStep = this.localGraphObjectStore.collectEntitiesByStep(); + let entitiesToUpload: Entity[] = []; + for (const [stepId, entities] of entitiesByStep) { + const indexable = entities.filter((e) => { + const indexMetadata = this.getIndexMetadataForGraphObjectType({ + stepId, + _type: e._type, + graphObjectCollectionType: 'entities', }); - if (indexable.length) { - await Promise.all( - chunk(indexable, this.graphObjectFileSize).map(async (data) => { - const graphObjectsToFilePaths = await flushDataToDisk({ - storageDirectoryPath: stepId, - collectionType: 'entities', - data, - pretty: this.prettifyFiles, - }); - - for (const { - graphDataPath, - collection, - } of graphObjectsToFilePaths) { - for (const [index, e] of collection.entries()) { - this.entityOnDiskLocationMap.set(e._key, { - graphDataPath, - index, - }); - } - } - }), - ); + if (typeof indexMetadata === 'undefined') { + return true; } - this.localGraphObjectStore.flushEntities(entities, stepId); + return indexMetadata.enabled === true; + }); + + if (indexable.length) { + await Promise.all( + chunk(indexable, this.graphObjectFileSize).map(async (data) => { + const graphObjectsToFilePaths = await flushDataToDisk({ + storageDirectoryPath: stepId, + collectionType: 'entities', + data, + pretty: this.prettifyFiles, + }); + + for (const { + graphDataPath, + collection, + } of graphObjectsToFilePaths) { + for (const [index, e] of collection.entries()) { + this.entityOnDiskLocationMap.set(e._key, { + graphDataPath, + index, + }); + } + } + }), + ); + } - if (onEntitiesFlushed) { - await onEntitiesFlushed(entities); - } - }, - ), - ); - } + this.localGraphObjectStore.flushEntities(entities, stepId); + entitiesToUpload = entitiesToUpload.concat(entities); + } + if (onEntitiesFlushed) { + await onEntitiesFlushed(entitiesToUpload); + } + }); + } + /** + * Asynchronously flushes relationship data to disk. + * + * This function ensures that relationship data is saved to disk when necessary. It uses a locking mechanism + * to prevent concurrent modifications and checks if the data size exceeds a certain threshold before flushing. + * + * @param {function} [onRelationshipsFlushed] - Optional. A callback function that is invoked after the relationships + * have been flushed to disk. It receives an array of relationships as + * an argument and returns a Promise. + * @param {Boolean} [force=false] - Optional. A boolean flag indicating whether to force the flushing process + * regardless of the data size threshold. + * + * This process ensures efficient and necessary data uploads, avoiding redundant or unnecessary disk operations. + */ async flushRelationshipsToDisk( onRelationshipsFlushed?: (relationships: Relationship[]) => Promise, + force: Boolean = false, ) { - await this.lockOperation(() => - pMap( - this.localGraphObjectStore.collectRelationshipsByStep(), - async ([stepId, relationships]) => { - const indexable = relationships.filter((r) => { - const indexMetadata = this.getIndexMetadataForGraphObjectType({ - stepId, - _type: r._type, - graphObjectCollectionType: 'relationships', - }); - - if (typeof indexMetadata === 'undefined') { - return true; - } - - return indexMetadata.enabled === true; + await this.lockOperation(async () => { + // This code rechecks the condition that triggers the flushing process to avoid unnecessary uploads + // During concurrent steps, we might be deleting items from memory while a step is adding new items. This could cause the threshold + // to be triggered again. By rechecking the condition, we ensure that only necessary uploads occur. + if ( + !force && + this.localGraphObjectStore.getTotalRelationshipSizeInBytes() < + this.graphObjectBufferThresholdInBytes + ) { + return; + } + const relationshipsByStep = + this.localGraphObjectStore.collectRelationshipsByStep(); + let relationshipsToUpload: Relationship[] = []; + for (const [stepId, relationships] of relationshipsByStep) { + const indexable = relationships.filter((r) => { + const indexMetadata = this.getIndexMetadataForGraphObjectType({ + stepId, + _type: r._type, + graphObjectCollectionType: 'relationships', }); - if (indexable.length) { - await Promise.all( - chunk(indexable, this.graphObjectFileSize).map(async (data) => { - await flushDataToDisk({ - storageDirectoryPath: stepId, - collectionType: 'relationships', - data, - pretty: this.prettifyFiles, - }); - }), - ); + if (typeof indexMetadata === 'undefined') { + return true; } - this.localGraphObjectStore.flushRelationships(relationships, stepId); + return indexMetadata.enabled === true; + }); + + if (indexable.length) { + await Promise.all( + chunk(indexable, this.graphObjectFileSize).map(async (data) => { + await flushDataToDisk({ + storageDirectoryPath: stepId, + collectionType: 'relationships', + data, + pretty: this.prettifyFiles, + }); + }), + ); + } + + this.localGraphObjectStore.flushRelationships(relationships, stepId); + relationshipsToUpload = relationshipsToUpload.concat(relationships); + } - if (onRelationshipsFlushed) { - await onRelationshipsFlushed(relationships); - } - }, - ), - ); + if (onRelationshipsFlushed) { + await onRelationshipsFlushed(relationshipsToUpload); + } + }); } getIndexMetadataForGraphObjectType({ diff --git a/packages/integration-sdk-runtime/src/storage/FileSystemGraphObjectStore/__tests__/FileSystemGraphObjectStore.test.ts b/packages/integration-sdk-runtime/src/storage/FileSystemGraphObjectStore/__tests__/FileSystemGraphObjectStore.test.ts index e7ddf9d0d..8f7f16e4b 100644 --- a/packages/integration-sdk-runtime/src/storage/FileSystemGraphObjectStore/__tests__/FileSystemGraphObjectStore.test.ts +++ b/packages/integration-sdk-runtime/src/storage/FileSystemGraphObjectStore/__tests__/FileSystemGraphObjectStore.test.ts @@ -98,7 +98,7 @@ describe('flushEntitiesToDisk', () => { const entities = times(25, () => createTestEntity({ _type: entityType })); await store.addEntities(storageDirectoryPath, entities); - await store.flushEntitiesToDisk(); + await store.flushEntitiesToDisk(undefined, true); const entitiesDirectory = path.join( getRootStorageDirectory(), @@ -147,7 +147,7 @@ describe('flushEntitiesToDisk', () => { graphObjectBufferThresholdInBytes: getSizeOfObject(entities), }); await store.addEntities(storageDirectoryPath, entities); - await store.flushEntitiesToDisk(); + await store.flushEntitiesToDisk(undefined, true); const entitiesDirectory = path.join( getRootStorageDirectory(), @@ -183,7 +183,7 @@ describe('flushRelationshipsToDisk', () => { ); await store.addRelationships(storageDirectoryPath, relationships); - await store.flushRelationshipsToDisk(); + await store.flushRelationshipsToDisk(undefined, true); const relationshipsDirectory = path.join( getRootStorageDirectory(), @@ -233,7 +233,7 @@ describe('flushRelationshipsToDisk', () => { graphObjectBufferThresholdInBytes: getSizeOfObject(relationships), }); await store.addRelationships(storageDirectoryPath, relationships); - await store.flushEntitiesToDisk(); + await store.flushEntitiesToDisk(undefined, true); const relationshipsDirectory = path.join( getRootStorageDirectory(), @@ -443,7 +443,7 @@ describe('findEntity', () => { ...nonMatchingEntities, matchingEntity, ]); - await store.flushEntitiesToDisk(); + await store.flushEntitiesToDisk(undefined, true); await expect(store.findEntity(_key)).resolves.toEqual(matchingEntity); expect(localGraphObjectStoreFindEntitySpy).toHaveLastReturnedWith( @@ -495,7 +495,7 @@ describe('findEntity', () => { const matchingEntity = createTestEntity({ _type, _key }); await store.addEntities(stepId, [...nonMatchingEntities, matchingEntity]); - await store.flushEntitiesToDisk(); + await store.flushEntitiesToDisk(undefined, true); await expect(store.findEntity(_key)).resolves.toBeUndefined(); expect(localGraphObjectStoreFindEntitySpy).toHaveLastReturnedWith( @@ -523,7 +523,7 @@ describe('iterateEntities', () => { ...matchingEntities, ]); - await store.flushEntitiesToDisk(); + await store.flushEntitiesToDisk(undefined, true); const bufferedEntity = createTestEntity({ _type: matchingType }); await store.addEntities(storageDirectoryPath, [bufferedEntity]); @@ -751,7 +751,7 @@ describe('flush callbacks', () => { expect(addEntitiesFlushCalledTimes).toEqual(0); expect(flushedEntitiesCollected).toEqual([]); - await store.flushEntitiesToDisk(onEntitiesFlushed); + await store.flushEntitiesToDisk(onEntitiesFlushed, true); expect(addEntitiesFlushCalledTimes).toEqual(1); expect(flushedEntitiesCollected).toEqual(entities); @@ -838,7 +838,7 @@ describe('flush callbacks', () => { expect(addEntitiesFlushCalledTimes).toEqual(0); expect(flushedEntitiesCollected).toEqual([]); - await store.flushEntitiesToDisk(onEntitiesFlushed); + await store.flushEntitiesToDisk(onEntitiesFlushed, true); expect(addEntitiesFlushCalledTimes).toEqual(1); // This should include every entity. Even the ones that are not written to @@ -944,7 +944,7 @@ describe('flush callbacks', () => { expect(addRelationshipsFlushedCalledTimes).toEqual(0); expect(flushedRelationshipsCollected).toEqual([]); - await store.flushRelationshipsToDisk(onRelationshipsFlushed); + await store.flushRelationshipsToDisk(onRelationshipsFlushed, true); expect(addRelationshipsFlushedCalledTimes).toEqual(1); // This should include every relationship. Even the ones that are not written to @@ -987,7 +987,7 @@ describe('flush callbacks', () => { expect(addRelationshipsFlushedCalledTimes).toEqual(0); expect(flushedRelationshipsCollected).toEqual([]); - await store.flushRelationshipsToDisk(onRelationshipsFlushed); + await store.flushRelationshipsToDisk(onRelationshipsFlushed, true); expect(addRelationshipsFlushedCalledTimes).toEqual(1); expect(flushedRelationshipsCollected).toEqual(relationships);