Skip to content

Commit

Permalink
Use pMap to iterateEntities and iterateRelationship with an optional …
Browse files Browse the repository at this point in the history
…concurrency parameter.
  • Loading branch information
VDubber committed Aug 13, 2024
1 parent 40e735b commit 554a2b8
Show file tree
Hide file tree
Showing 5 changed files with 146 additions and 34 deletions.
34 changes: 34 additions & 0 deletions packages/integration-sdk-core/src/types/jobState.ts
Original file line number Diff line number Diff line change
Expand Up @@ -100,10 +100,27 @@ export interface JobState {
* steps it depends on. Other steps outside the dependency ancestry may not
* have run and therefore entities collected by those other steps should not
* be expected to exist.
*
* If concurrency is specified (defaults to 1), the iteratee will be executed
* concurrently for graph objects that are found in memory and for
* graph objects found in single graph file.
* Consideration should be taken for provider rate limits when API requests
* are being made within the iteratee and also while increasing concurrency
* beyond the default.
*
* Example:
* await jobState.iterateEntities(
* { _type: EcrEntities.ECR_IMAGE._type },
* async (image) => {
* ...
* },
* 5,
* );
*/
iterateEntities: <T extends Entity = Entity>(
filter: GraphObjectFilter,
iteratee: GraphObjectIteratee<T>,
concurrency?: number,
) => Promise<void>;

/**
Expand All @@ -115,10 +132,27 @@ export interface JobState {
* previous steps it depends on. Other steps outside the dependency ancestry
* may not have run and therefore relationships collected by those other steps
* should not be expected to exist.
*
* If concurrency is specified (defaults to 1), the iteratee will be executed
* concurrently for graph objects that are found in memory and for
* graph objects found in single graph file.
* Consideration should be taken for provider rate limits when API requests
* are being made within the iteratee and also while increasing concurrency
* beyond the default.
*
* Example:
* await jobState.iterateRelationships(
* { _type: Relationships.COMPARTMENT_HAS_DOMAIN._type },
* async (relationship) => {
* ...
* },
* 5,
* );
*/
iterateRelationships: <T extends Relationship = Relationship>(
filter: GraphObjectFilter,
iteratee: GraphObjectIteratee<T>,
concurrency?: number,
) => Promise<void>;

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -210,19 +210,25 @@ export class FileSystemGraphObjectStore implements GraphObjectStore {
async iterateEntities<T extends Entity = Entity>(
filter: GraphObjectFilter,
iteratee: GraphObjectIteratee<T>,
concurrency?: number,
) {
//TODO: Remove maps. This is a hack we did to avoid returning duplicated entities.
//This should not work this way.
//There is a detailed description of the changes to come to avoid having to do this
//Here: https://jupiterone.atlassian.net/wiki/spaces/INT/pages/786169857/Task+SDK+decouple+tasks
const iteratedEntities = new Map<string, boolean>();
await this.localGraphObjectStore.iterateEntities(filter, (obj: T) => {
iteratedEntities.set(obj._key, true);
return iteratee(obj);
});
await this.localGraphObjectStore.iterateEntities(
filter,
(obj: T) => {
iteratedEntities.set(obj._key, true);
return iteratee(obj);
},
concurrency,
);

await iterateEntityTypeIndex({
type: filter._type,
concurrency,
iteratee: (obj: T) => {
if (iteratedEntities.has(obj._key)) {
return;
Expand All @@ -235,6 +241,7 @@ export class FileSystemGraphObjectStore implements GraphObjectStore {
async iterateRelationships<T extends Relationship = Relationship>(
filter: GraphObjectFilter,
iteratee: GraphObjectIteratee<T>,
concurrency?: number,
) {
//TODO: Remove maps. This is a hack we did to avoid returning duplicated relationships.
//This should not work this way.
Expand All @@ -248,6 +255,7 @@ export class FileSystemGraphObjectStore implements GraphObjectStore {

await iterateRelationshipTypeIndex({
type: filter._type,
concurrency,
iteratee: (obj: T) => {
if (iteratedRelationships.has(obj._key)) {
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -545,6 +545,49 @@ describe('iterateEntities', () => {
]);
});

test('should allow concurrent executions of the iteratee function.', async () => {
const { storageDirectoryPath, store } = setupFileSystemObjectStore();

const matchingType = uuid();

const nonMatchingEntities = times(25, () =>
createTestEntity({ _type: uuid() }),
);
const matchingEntities = times(300, () =>
createTestEntity({ _type: matchingType }),
);

await store.addEntities(storageDirectoryPath, [
...nonMatchingEntities,
...matchingEntities,
]);

await store.flushEntitiesToDisk(undefined, true);

const bufferedEntity = createTestEntity({ _type: matchingType });
await store.addEntities(storageDirectoryPath, [bufferedEntity]);

const collectedEntities = new Map<string, Entity>();

await store.iterateEntities(
{ _type: matchingType },
(e: Entity) => {
if (collectedEntities.has(e._key)) {
throw new Error(
`duplicate entity _key found in iterateEntities (_key=${e._key})`,
);
}
collectedEntities.set(e._key, e);
},
5,
);
console.log('length', collectedEntities.size);
expect(Array.from(collectedEntities.values())).toEqual([
bufferedEntity,
...matchingEntities,
]);
});

test('should allow extended types to be iterated', async () => {
const { storageDirectoryPath, store } = setupFileSystemObjectStore();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,20 +8,31 @@ import { readJsonFromPath, WalkDirectoryIterateeInput } from '../../fileSystem';

import { buildIndexDirectoryPath } from './path';
import { iterateParsedGraphFiles } from '../..';
import pMap from 'p-map';

interface BaseIterateCollectionIndexParams<GraphObject> {
type: string;
iteratee: GraphObjectIteratee<GraphObject>;
concurrency?: number;
}

interface IterateCollectionIndexParams<GraphObject>
extends BaseIterateCollectionIndexParams<GraphObject> {
collectionType: 'entities' | 'relationships';
}

/**
* Iterates through graph files.
* If concurrency is specified, it'll process the graph objects concurrently.
* @param type
* @param collectionType
* @param concurrency
* @param iteratee
*/
async function iterateCollectionTypeIndex<T extends Entity | Relationship>({
type,
collectionType,
concurrency,
iteratee,
}: IterateCollectionIndexParams<T>) {
const path = buildIndexDirectoryPath({
Expand All @@ -30,29 +41,34 @@ async function iterateCollectionTypeIndex<T extends Entity | Relationship>({
});

await iterateParsedGraphFiles(async (data) => {
for (const graphObj of (data[collectionType] as T[]) || []) {
await iteratee(graphObj);
}
await pMap(
(data[collectionType] as T[]) || [],
(graphObj) => iteratee(graphObj),
{ concurrency: concurrency ?? 1 },
);
}, path);
}

export async function iterateEntityTypeIndex<T extends Entity = Entity>({
type,
iteratee,
concurrency,
}: BaseIterateCollectionIndexParams<T>) {
await iterateCollectionTypeIndex({
type,
iteratee,
concurrency,
collectionType: 'entities',
});
}

export async function iterateRelationshipTypeIndex<
T extends Relationship = Relationship,
>({ type, iteratee }: BaseIterateCollectionIndexParams<T>) {
>({ type, iteratee, concurrency }: BaseIterateCollectionIndexParams<T>) {
await iterateCollectionTypeIndex({
type,
iteratee,
concurrency,
collectionType: 'relationships',
});
}
Expand Down
63 changes: 37 additions & 26 deletions packages/integration-sdk-runtime/src/storage/memory.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import {
Relationship,
} from '@jupiterone/integration-sdk-core';
import { getSizeOfObject } from '../synchronization/batchBySize';
import pMap from 'p-map';

export interface GraphObjectMetadata {
stepId: string;
Expand Down Expand Up @@ -155,31 +156,37 @@ export class InMemoryGraphObjectStore implements GraphObjectStore {
async iterateEntities<T extends Entity = Entity>(
filter: GraphObjectFilter,
iteratee: GraphObjectIteratee<T>,
concurrency?: number,
): Promise<void> {
const entityTypeKeysMap = this.entityTypeToKeysMap.get(filter._type);

if (!entityTypeKeysMap) {
return;
}

for (const [_key] of entityTypeKeysMap) {
const graphObjectData = this.entityKeyToEntityMap.get(_key);

if (!graphObjectData) {
// NOTE: This should never happen. Our data structures should stay in
// sync.
throw new IntegrationMissingKeyError(
`Failed to find entity (_type=${filter._type}, _key=${_key})`,
);
}

await iteratee(graphObjectData.entity as T);
}
await pMap(
entityTypeKeysMap,
async ([_key]) => {
const graphObjectData = this.entityKeyToEntityMap.get(_key);

if (!graphObjectData) {
// NOTE: This should never happen. Our data structures should stay in
// sync.
throw new IntegrationMissingKeyError(
`Failed to find entity (_type=${filter._type}, _key=${_key})`,
);
}

await iteratee(graphObjectData.entity as T);
},
{ concurrency: concurrency ?? 1 },
);
}

async iterateRelationships<T extends Relationship = Relationship>(
filter: GraphObjectFilter,
iteratee: GraphObjectIteratee<T>,
concurrency?: number,
): Promise<void> {
const relationshipTypeKeysMap = this.relationshipTypeToKeysMap.get(
filter._type,
Expand All @@ -189,19 +196,23 @@ export class InMemoryGraphObjectStore implements GraphObjectStore {
return;
}

for (const [_key] of relationshipTypeKeysMap) {
const graphObjectData = this.relationshipKeyToRelationshipMap.get(_key);

if (!graphObjectData) {
// NOTE: This should never happen. Our data structures should stay in
// sync.
throw new IntegrationMissingKeyError(
`Failed to find relationship (_type=${filter._type}, _key=${_key})`,
);
}

await iteratee(graphObjectData.relationship as T);
}
await pMap(
relationshipTypeKeysMap,
async ([_key]) => {
const graphObjectData = this.relationshipKeyToRelationshipMap.get(_key);

if (!graphObjectData) {
// NOTE: This should never happen. Our data structures should stay in
// sync.
throw new IntegrationMissingKeyError(
`Failed to find relationship (_type=${filter._type}, _key=${_key})`,
);
}

await iteratee(graphObjectData.relationship as T);
},
{ concurrency: concurrency ?? 1 },
);
}

/**
Expand Down

0 comments on commit 554a2b8

Please sign in to comment.