diff --git a/packages/integration-sdk-core/src/types/jobState.ts b/packages/integration-sdk-core/src/types/jobState.ts index a1e346f19..f9debbb4b 100644 --- a/packages/integration-sdk-core/src/types/jobState.ts +++ b/packages/integration-sdk-core/src/types/jobState.ts @@ -104,6 +104,7 @@ export interface JobState { iterateEntities: ( filter: GraphObjectFilter, iteratee: GraphObjectIteratee, + options?: { concurrency: number }, ) => Promise; /** diff --git a/packages/integration-sdk-core/src/types/storage.ts b/packages/integration-sdk-core/src/types/storage.ts index 0f6ab8959..0adf2190f 100644 --- a/packages/integration-sdk-core/src/types/storage.ts +++ b/packages/integration-sdk-core/src/types/storage.ts @@ -31,6 +31,7 @@ export interface GraphObjectStore { iterateEntities( filter: GraphObjectFilter, iteratee: GraphObjectIteratee, + options?: { concurrency: number }, ): Promise; iterateRelationships( diff --git a/packages/integration-sdk-runtime/package.json b/packages/integration-sdk-runtime/package.json index 584b79f6f..5201d5dae 100644 --- a/packages/integration-sdk-runtime/package.json +++ b/packages/integration-sdk-runtime/package.json @@ -36,7 +36,7 @@ "globby": "^11.0.0", "lodash": "^4.17.15", "p-map": "^4.0.0", - "p-queue": "^6.3.0", + "p-queue": "^6.6.2", "rimraf": "^3.0.2" }, "devDependencies": { diff --git a/packages/integration-sdk-runtime/src/execution/jobState.ts b/packages/integration-sdk-runtime/src/execution/jobState.ts index 49281dca0..a286f1f64 100644 --- a/packages/integration-sdk-runtime/src/execution/jobState.ts +++ b/packages/integration-sdk-runtime/src/execution/jobState.ts @@ -278,8 +278,8 @@ export function createStepJobState({ return duplicateKeyTracker.hasKey(_key); }, - iterateEntities: (filter, iteratee) => - graphObjectStore.iterateEntities(filter, iteratee), + iterateEntities: (filter, iteratee, options) => + graphObjectStore.iterateEntities(filter, iteratee, options), iterateRelationships: (filter, iteratee) => graphObjectStore.iterateRelationships(filter, iteratee), diff --git a/packages/integration-sdk-runtime/src/storage/FileSystemGraphObjectStore/FileSystemGraphObjectStore.ts b/packages/integration-sdk-runtime/src/storage/FileSystemGraphObjectStore/FileSystemGraphObjectStore.ts index 1ab3156c3..f43d425fb 100644 --- a/packages/integration-sdk-runtime/src/storage/FileSystemGraphObjectStore/FileSystemGraphObjectStore.ts +++ b/packages/integration-sdk-runtime/src/storage/FileSystemGraphObjectStore/FileSystemGraphObjectStore.ts @@ -211,12 +211,14 @@ export class FileSystemGraphObjectStore implements GraphObjectStore { async iterateEntities( filter: GraphObjectFilter, iteratee: GraphObjectIteratee, + options?: { concurrency: number }, ) { - await this.localGraphObjectStore.iterateEntities(filter, iteratee); + await this.localGraphObjectStore.iterateEntities(filter, iteratee, options); await iterateEntityTypeIndex({ type: filter._type, iteratee, + options, }); } diff --git a/packages/integration-sdk-runtime/src/storage/FileSystemGraphObjectStore/__tests__/FileSystemGraphObjectStore.test.ts b/packages/integration-sdk-runtime/src/storage/FileSystemGraphObjectStore/__tests__/FileSystemGraphObjectStore.test.ts index d3b2ded1f..a754cb6ff 100644 --- a/packages/integration-sdk-runtime/src/storage/FileSystemGraphObjectStore/__tests__/FileSystemGraphObjectStore.test.ts +++ b/packages/integration-sdk-runtime/src/storage/FileSystemGraphObjectStore/__tests__/FileSystemGraphObjectStore.test.ts @@ -576,6 +576,42 @@ describe('iterateEntities', () => { ]), ); }); + + test('should allow concurrency when iterating entities', async () => { + const { storageDirectoryPath, store } = setupFileSystemObjectStore(); + + const entityType = uuid(); + + type TestEntity = Entity & { randomField: string }; + + const entities = times(5, () => + createTestEntity({ _type: entityType, randomField: 'field' }), + ); + + await store.addEntities(storageDirectoryPath, entities); + + const taskBeginTimes = {}; + const collectedEntities: TestEntity[] = []; + const task = async (e: TestEntity) => { + taskBeginTimes[e._key] = Date.now(); + await new Promise((resolve) => setTimeout(resolve, 100)); + collectedEntities.push(e); + }; + + await store.iterateEntities({ _type: entityType }, task, { + concurrency: 5, + }); + + for (const e1 of entities) { + for (const e2 of entities) { + expect( + Math.abs(taskBeginTimes[e1._key] - taskBeginTimes[e2._key]), + ).toBeLessThan(100); + } + } + + expect(collectedEntities.length).toBe(5); + }); }); describe('iterateRelationships', () => { diff --git a/packages/integration-sdk-runtime/src/storage/FileSystemGraphObjectStore/indices.ts b/packages/integration-sdk-runtime/src/storage/FileSystemGraphObjectStore/indices.ts index 290aac63f..d47311ab2 100644 --- a/packages/integration-sdk-runtime/src/storage/FileSystemGraphObjectStore/indices.ts +++ b/packages/integration-sdk-runtime/src/storage/FileSystemGraphObjectStore/indices.ts @@ -8,10 +8,13 @@ import { readJsonFromPath, WalkDirectoryIterateeInput } from '../../fileSystem'; import { buildIndexDirectoryPath } from './path'; import { iterateParsedGraphFiles } from '../..'; +import PQueue from 'p-queue'; +import { onQueueSizeIsLessThanLimit } from '../queue'; interface BaseIterateCollectionIndexParams { type: string; iteratee: GraphObjectIteratee; + options?: { concurrency: number }; } interface IterateCollectionIndexParams @@ -23,27 +26,39 @@ async function iterateCollectionTypeIndex({ type, collectionType, iteratee, + options, }: IterateCollectionIndexParams) { const path = buildIndexDirectoryPath({ collectionType, type, }); + const queue = new PQueue({}); + const concurrency = options?.concurrency ?? 1; + await iterateParsedGraphFiles(async (data) => { for (const graphObj of (data[collectionType] as T[]) || []) { - await iteratee(graphObj); + // We mark this as void because we want to fire the task away and not wait for it to resolve + // that is handled by the combination of onQueueSizeIsLessThanLimit and onIdle + void queue.add(() => iteratee(graphObj)); + await onQueueSizeIsLessThanLimit(queue, concurrency); } }, path); + + // Wait for all tasks to complete + await queue.onIdle(); } export async function iterateEntityTypeIndex({ type, iteratee, + options, }: BaseIterateCollectionIndexParams) { await iterateCollectionTypeIndex({ type, iteratee, collectionType: 'entities', + options, }); } diff --git a/packages/integration-sdk-runtime/src/storage/memory.test.ts b/packages/integration-sdk-runtime/src/storage/memory.test.ts index 8ac51ad0f..94a59cca1 100644 --- a/packages/integration-sdk-runtime/src/storage/memory.test.ts +++ b/packages/integration-sdk-runtime/src/storage/memory.test.ts @@ -5,6 +5,7 @@ import { createTestEntity, createTestRelationship, } from '@jupiterone/integration-sdk-private-test-utils'; +import { times } from 'lodash'; async function collectEntitiesByType( store: InMemoryGraphObjectStore, @@ -81,6 +82,62 @@ describe('#InMemoryGraphObjectStore', () => { expect(await collectEntitiesByType(store, e2._type)).toEqual([e2]); }); + test('tasks should run concurrently', async () => { + const concurrency = 5; + + const taskStartTimes = {}; + + // This tests the concurrency by making each task take _at least_ 250 ms. + // Then it compares the start time of each task to all the other tasks. + // Since concurrency is equal to the number of entities we are iterating + // all the tasks should start immediately and the difference in start time should be + // less than 250ms + const task = async (entity: Entity) => { + taskStartTimes[entity._key] = Date.now(); + await new Promise((resolve) => setTimeout(resolve, 250)); // Mock task delay + }; + const store = new InMemoryGraphObjectStore(); + + const entities = times(concurrency, () => + createTestEntity({ _type: 'test' }), + ); + await store.addEntities(uuid(), entities); + + await store.iterateEntities({ _type: 'test' }, task, { concurrency }); + + // All tasks should have started with 250ms of each other since concurrency is 5 + for (const e1 of entities) { + for (const e2 of entities) { + expect( + Math.abs(taskStartTimes[e2._key] - taskStartTimes[e1._key]), + ).toBeLessThan(1000); + } + } + }); + + test('tasks should not run with more than expected concurrency', async () => { + const concurrency = 1; + + const taskStartTimes = {}; + + const task = async (entity: Entity) => { + taskStartTimes[entity._key] = Date.now(); + await new Promise((resolve) => setTimeout(resolve, 100)); + }; + const store = new InMemoryGraphObjectStore(); + + const entities = times(2, () => createTestEntity({ _type: 'test' })); + await store.addEntities(uuid(), entities); + + await store.iterateEntities({ _type: 'test' }, task, { concurrency }); + expect( + Math.abs( + taskStartTimes[entities[0]._key] - taskStartTimes[entities[1]._key], + ), + ).toBeGreaterThan(98); + // leave a little for clock / timing inaccuracy + }); + test('should not throw if iterating entity _type that does not exist', async () => { const store = new InMemoryGraphObjectStore(); expect(await collectEntitiesByType(store, uuid())).toEqual([]); diff --git a/packages/integration-sdk-runtime/src/storage/memory.ts b/packages/integration-sdk-runtime/src/storage/memory.ts index e7179e676..3a43bb6a5 100644 --- a/packages/integration-sdk-runtime/src/storage/memory.ts +++ b/packages/integration-sdk-runtime/src/storage/memory.ts @@ -8,6 +8,8 @@ import { Relationship, } from '@jupiterone/integration-sdk-core'; import { getSizeOfObject } from '../synchronization/batchBySize'; +import PQueue from 'p-queue'; +import { onQueueSizeIsLessThanLimit } from './queue'; export interface GraphObjectMetadata { stepId: string; @@ -145,8 +147,11 @@ export class InMemoryGraphObjectStore implements GraphObjectStore { async iterateEntities( filter: GraphObjectFilter, iteratee: GraphObjectIteratee, + options?: { concurrency: number }, ): Promise { const entityTypeKeysMap = this.entityTypeToKeysMap.get(filter._type); + const concurrency = options?.concurrency ?? 1; + const queue = new PQueue({ concurrency }); if (!entityTypeKeysMap) { return; @@ -163,8 +168,16 @@ export class InMemoryGraphObjectStore implements GraphObjectStore { ); } - await iteratee(graphObjectData.entity as T); + // We mark this as void because we want to fire the task away and not wait for it to resolve + // that is handled by the combination of onQueueSizeIsLessThanLimit and onIdle + void queue.add(() => iteratee(graphObjectData.entity as T)); + // Don't flood the queue with promises. If we get to twice our concurrency we wait + // This is queued tasks, not tasks that running we could have up to + // concurrency-tasks (running) + concurrency-tasks (queued) + await onQueueSizeIsLessThanLimit(queue, concurrency); } + // wait for everything to finish + await queue.onIdle(); } async iterateRelationships( diff --git a/packages/integration-sdk-runtime/src/storage/queue.ts b/packages/integration-sdk-runtime/src/storage/queue.ts new file mode 100644 index 000000000..bcf24ad71 --- /dev/null +++ b/packages/integration-sdk-runtime/src/storage/queue.ts @@ -0,0 +1,32 @@ +import PQueue from 'p-queue'; + +// Copied from: https://github.com/sindresorhus/p-queue/pull/131/files +// and backported to the commonjs compatible version. +// +// MIT License + +// Copyright (c) Sindre Sorhus (https://sindresorhus.com) +// +// Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software. +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. +export async function onQueueSizeIsLessThanLimit( + queue: PQueue, + limit: number, +): Promise { + if (queue.size < limit) { + return; + } + + return new Promise((resolve) => { + const listener = () => { + if (queue.size < limit) { + queue.removeListener('next', listener); + resolve(); + } + }; + + queue.on('next', listener); + }); +} diff --git a/yarn.lock b/yarn.lock index e18bd7d9b..08660c9f5 100644 --- a/yarn.lock +++ b/yarn.lock @@ -8594,7 +8594,7 @@ p-pipe@3.1.0: resolved "https://registry.yarnpkg.com/p-pipe/-/p-pipe-3.1.0.tgz#48b57c922aa2e1af6a6404cb7c6bf0eb9cc8e60e" integrity sha512-08pj8ATpzMR0Y80x50yJHn37NF6vjrqHutASaX5LiH5npS9XPvrUmscd9MF5R4fuYRHOxQR1FfMIlF7AzwoPqw== -p-queue@6.6.2, p-queue@^6.3.0: +p-queue@6.6.2, p-queue@^6.6.2: version "6.6.2" resolved "https://registry.yarnpkg.com/p-queue/-/p-queue-6.6.2.tgz#2068a9dcf8e67dd0ec3e7a2bcb76810faa85e426" integrity sha512-RwFpb72c/BhQLEXIZ5K2e+AhgNVmIejGlTgiB9MzZ0e93GRvqZ7uSi0dvRF7/XIXDeNkra2fNHBxTyPDGySpjQ==