Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

allow concurrency option for iterateEntities #993

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions packages/integration-sdk-core/src/types/jobState.ts
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ export interface JobState {
iterateEntities: <T extends Entity = Entity>(
filter: GraphObjectFilter,
iteratee: GraphObjectIteratee<T>,
options?: { concurrency: number },
) => Promise<void>;

/**
Expand Down
1 change: 1 addition & 0 deletions packages/integration-sdk-core/src/types/storage.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ export interface GraphObjectStore {
iterateEntities<T extends Entity = Entity>(
filter: GraphObjectFilter,
iteratee: GraphObjectIteratee<T>,
options?: { concurrency: number },
): Promise<void>;

iterateRelationships<T extends Relationship = Relationship>(
Expand Down
2 changes: 1 addition & 1 deletion packages/integration-sdk-runtime/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
"globby": "^11.0.0",
"lodash": "^4.17.15",
"p-map": "^4.0.0",
"p-queue": "^6.3.0",
"p-queue": "^6.6.2",
"rimraf": "^3.0.2"
},
"devDependencies": {
Expand Down
4 changes: 2 additions & 2 deletions packages/integration-sdk-runtime/src/execution/jobState.ts
Original file line number Diff line number Diff line change
Expand Up @@ -278,8 +278,8 @@ export function createStepJobState({
return duplicateKeyTracker.hasKey(_key);
},

iterateEntities: (filter, iteratee) =>
graphObjectStore.iterateEntities(filter, iteratee),
iterateEntities: (filter, iteratee, options) =>
graphObjectStore.iterateEntities(filter, iteratee, options),

iterateRelationships: (filter, iteratee) =>
graphObjectStore.iterateRelationships(filter, iteratee),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -211,12 +211,14 @@ export class FileSystemGraphObjectStore implements GraphObjectStore {
async iterateEntities<T extends Entity = Entity>(
filter: GraphObjectFilter,
iteratee: GraphObjectIteratee<T>,
options?: { concurrency: number },
) {
await this.localGraphObjectStore.iterateEntities(filter, iteratee);
await this.localGraphObjectStore.iterateEntities(filter, iteratee, options);

await iterateEntityTypeIndex({
type: filter._type,
iteratee,
options,
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -576,6 +576,42 @@ describe('iterateEntities', () => {
]),
);
});

test('should allow concurrency when iterating entities', async () => {
const { storageDirectoryPath, store } = setupFileSystemObjectStore();

const entityType = uuid();

type TestEntity = Entity & { randomField: string };

const entities = times(5, () =>
createTestEntity({ _type: entityType, randomField: 'field' }),
);

await store.addEntities(storageDirectoryPath, entities);

const taskBeginTimes = {};
const collectedEntities: TestEntity[] = [];
const task = async (e: TestEntity) => {
taskBeginTimes[e._key] = Date.now();
await new Promise((resolve) => setTimeout(resolve, 100));
collectedEntities.push(e);
};

await store.iterateEntities<TestEntity>({ _type: entityType }, task, {
concurrency: 5,
});

for (const e1 of entities) {
for (const e2 of entities) {
expect(
Math.abs(taskBeginTimes[e1._key] - taskBeginTimes[e2._key]),
).toBeLessThan(100);
}
}

expect(collectedEntities.length).toBe(5);
});
});

describe('iterateRelationships', () => {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,13 @@ import { readJsonFromPath, WalkDirectoryIterateeInput } from '../../fileSystem';

import { buildIndexDirectoryPath } from './path';
import { iterateParsedGraphFiles } from '../..';
import PQueue from 'p-queue';
import { onQueueSizeIsLessThanLimit } from '../queue';

interface BaseIterateCollectionIndexParams<GraphObject> {
type: string;
iteratee: GraphObjectIteratee<GraphObject>;
options?: { concurrency: number };
}

interface IterateCollectionIndexParams<GraphObject>
Expand All @@ -23,27 +26,39 @@ async function iterateCollectionTypeIndex<T extends Entity | Relationship>({
type,
collectionType,
iteratee,
options,
}: IterateCollectionIndexParams<T>) {
const path = buildIndexDirectoryPath({
collectionType,
type,
});

const queue = new PQueue({});
const concurrency = options?.concurrency ?? 1;

await iterateParsedGraphFiles(async (data) => {
for (const graphObj of (data[collectionType] as T[]) || []) {
await iteratee(graphObj);
// We mark this as void because we want to fire the task away and not wait for it to resolve
// that is handled by the combination of onQueueSizeIsLessThanLimit and onIdle
void queue.add(() => iteratee(graphObj));
await onQueueSizeIsLessThanLimit(queue, concurrency);
}
}, path);

// Wait for all tasks to complete
await queue.onIdle();
}

export async function iterateEntityTypeIndex<T extends Entity = Entity>({
type,
iteratee,
options,
}: BaseIterateCollectionIndexParams<T>) {
await iterateCollectionTypeIndex({
type,
iteratee,
collectionType: 'entities',
options,
});
}

Expand Down
57 changes: 57 additions & 0 deletions packages/integration-sdk-runtime/src/storage/memory.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import {
createTestEntity,
createTestRelationship,
} from '@jupiterone/integration-sdk-private-test-utils';
import { times } from 'lodash';

async function collectEntitiesByType(
store: InMemoryGraphObjectStore,
Expand Down Expand Up @@ -81,6 +82,62 @@ describe('#InMemoryGraphObjectStore', () => {
expect(await collectEntitiesByType(store, e2._type)).toEqual([e2]);
});

test('tasks should run concurrently', async () => {
const concurrency = 5;

const taskStartTimes = {};

// This tests the concurrency by making each task take _at least_ 250 ms.
// Then it compares the start time of each task to all the other tasks.
// Since concurrency is equal to the number of entities we are iterating
// all the tasks should start immediately and the difference in start time should be
// less than 250ms
const task = async (entity: Entity) => {
taskStartTimes[entity._key] = Date.now();
await new Promise((resolve) => setTimeout(resolve, 250)); // Mock task delay
};
const store = new InMemoryGraphObjectStore();

const entities = times(concurrency, () =>
createTestEntity({ _type: 'test' }),
);
await store.addEntities(uuid(), entities);

await store.iterateEntities({ _type: 'test' }, task, { concurrency });

// All tasks should have started with 250ms of each other since concurrency is 5
for (const e1 of entities) {
for (const e2 of entities) {
expect(
Math.abs(taskStartTimes[e2._key] - taskStartTimes[e1._key]),
).toBeLessThan(1000);
}
}
});

test('tasks should not run with more than expected concurrency', async () => {
const concurrency = 1;

const taskStartTimes = {};

const task = async (entity: Entity) => {
taskStartTimes[entity._key] = Date.now();
await new Promise((resolve) => setTimeout(resolve, 100));
};
const store = new InMemoryGraphObjectStore();

const entities = times(2, () => createTestEntity({ _type: 'test' }));
await store.addEntities(uuid(), entities);

await store.iterateEntities({ _type: 'test' }, task, { concurrency });
expect(
Math.abs(
taskStartTimes[entities[0]._key] - taskStartTimes[entities[1]._key],
),
).toBeGreaterThan(98);
// leave a little for clock / timing inaccuracy
});

test('should not throw if iterating entity _type that does not exist', async () => {
const store = new InMemoryGraphObjectStore();
expect(await collectEntitiesByType(store, uuid())).toEqual([]);
Expand Down
15 changes: 14 additions & 1 deletion packages/integration-sdk-runtime/src/storage/memory.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import {
Relationship,
} from '@jupiterone/integration-sdk-core';
import { getSizeOfObject } from '../synchronization/batchBySize';
import PQueue from 'p-queue';
import { onQueueSizeIsLessThanLimit } from './queue';

export interface GraphObjectMetadata {
stepId: string;
Expand Down Expand Up @@ -145,8 +147,11 @@ export class InMemoryGraphObjectStore implements GraphObjectStore {
async iterateEntities<T extends Entity = Entity>(
filter: GraphObjectFilter,
iteratee: GraphObjectIteratee<T>,
options?: { concurrency: number },
): Promise<void> {
const entityTypeKeysMap = this.entityTypeToKeysMap.get(filter._type);
const concurrency = options?.concurrency ?? 1;
const queue = new PQueue({ concurrency });

if (!entityTypeKeysMap) {
return;
Expand All @@ -163,8 +168,16 @@ export class InMemoryGraphObjectStore implements GraphObjectStore {
);
}

await iteratee(graphObjectData.entity as T);
// We mark this as void because we want to fire the task away and not wait for it to resolve
// that is handled by the combination of onQueueSizeIsLessThanLimit and onIdle
void queue.add(() => iteratee(graphObjectData.entity as T));
// Don't flood the queue with promises. If we get to twice our concurrency we wait
// This is queued tasks, not tasks that running we could have up to
// concurrency-tasks (running) + concurrency-tasks (queued)
await onQueueSizeIsLessThanLimit(queue, concurrency);
}
// wait for everything to finish
await queue.onIdle();
}

async iterateRelationships<T extends Relationship = Relationship>(
Expand Down
32 changes: 32 additions & 0 deletions packages/integration-sdk-runtime/src/storage/queue.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
import PQueue from 'p-queue';

// Copied from: https://github.com/sindresorhus/p-queue/pull/131/files
// and backported to the commonjs compatible version.
//
// MIT License

// Copyright (c) Sindre Sorhus <[email protected]> (https://sindresorhus.com)
//
// Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
export async function onQueueSizeIsLessThanLimit(
queue: PQueue,
limit: number,
): Promise<void> {
if (queue.size < limit) {
return;
}

return new Promise<void>((resolve) => {
const listener = () => {
if (queue.size < limit) {
queue.removeListener('next', listener);
resolve();
}
};

queue.on('next', listener);
});
}
2 changes: 1 addition & 1 deletion yarn.lock
Original file line number Diff line number Diff line change
Expand Up @@ -8594,7 +8594,7 @@ [email protected]:
resolved "https://registry.yarnpkg.com/p-pipe/-/p-pipe-3.1.0.tgz#48b57c922aa2e1af6a6404cb7c6bf0eb9cc8e60e"
integrity sha512-08pj8ATpzMR0Y80x50yJHn37NF6vjrqHutASaX5LiH5npS9XPvrUmscd9MF5R4fuYRHOxQR1FfMIlF7AzwoPqw==

[email protected], p-queue@^6.3.0:
[email protected], p-queue@^6.6.2:
version "6.6.2"
resolved "https://registry.yarnpkg.com/p-queue/-/p-queue-6.6.2.tgz#2068a9dcf8e67dd0ec3e7a2bcb76810faa85e426"
integrity sha512-RwFpb72c/BhQLEXIZ5K2e+AhgNVmIejGlTgiB9MzZ0e93GRvqZ7uSi0dvRF7/XIXDeNkra2fNHBxTyPDGySpjQ==
Expand Down
Loading