Skip to content

Commit

Permalink
PR feedback. Changed concurrency parameter to be inside an options ob…
Browse files Browse the repository at this point in the history
…ject parameter
  • Loading branch information
VDubber committed Aug 14, 2024
1 parent 072d620 commit c74beec
Show file tree
Hide file tree
Showing 5 changed files with 31 additions and 22 deletions.
11 changes: 7 additions & 4 deletions packages/integration-sdk-core/src/types/jobState.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@ export interface GraphObjectFilter {
}

export type GraphObjectIteratee<T> = (obj: T) => void | Promise<void>;
export type GraphObjectIterateeOptions = {
concurrency?: number;
};

/**
* The `JobState` is used to store and retrieve entities and relationships
Expand Down Expand Up @@ -103,7 +106,7 @@ export interface JobState {
*
* If concurrency is specified (defaults to 1), the iteratee will be executed
* concurrently for graph objects that are found in memory and for
* graph objects found in single graph file.
* graph objects found in a given graph file. No specific ordering is guaranteed.
* Consideration should be taken for provider rate limits when API requests
* are being made within the iteratee and also while increasing concurrency
* beyond the default.
Expand All @@ -120,7 +123,7 @@ export interface JobState {
iterateEntities: <T extends Entity = Entity>(
filter: GraphObjectFilter,
iteratee: GraphObjectIteratee<T>,
concurrency?: number,
options?: GraphObjectIterateeOptions,
) => Promise<void>;

/**
Expand All @@ -135,7 +138,7 @@ export interface JobState {
*
* If concurrency is specified (defaults to 1), the iteratee will be executed
* concurrently for graph objects that are found in memory and for
* graph objects found in single graph file.
* graph objects found in a given graph file. No specific ordering is guaranteed.
* Consideration should be taken for provider rate limits when API requests
* are being made within the iteratee and also while increasing concurrency
* beyond the default.
Expand All @@ -152,7 +155,7 @@ export interface JobState {
iterateRelationships: <T extends Relationship = Relationship>(
filter: GraphObjectFilter,
iteratee: GraphObjectIteratee<T>,
concurrency?: number,
options?: GraphObjectIterateeOptions,
) => Promise<void>;

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import {
GraphObjectIndexMetadata,
GetIndexMetadataForGraphObjectTypeParams,
IntegrationStep,
GraphObjectIterateeOptions,
} from '@jupiterone/integration-sdk-core';

import { flushDataToDisk } from './flushDataToDisk';
Expand Down Expand Up @@ -210,7 +211,7 @@ export class FileSystemGraphObjectStore implements GraphObjectStore {
async iterateEntities<T extends Entity = Entity>(
filter: GraphObjectFilter,
iteratee: GraphObjectIteratee<T>,
concurrency?: number,
options?: GraphObjectIterateeOptions,
) {
//TODO: Remove maps. This is a hack we did to avoid returning duplicated entities.
//This should not work this way.
Expand All @@ -223,12 +224,12 @@ export class FileSystemGraphObjectStore implements GraphObjectStore {
iteratedEntities.set(obj._key, true);
return iteratee(obj);
},
concurrency,
options,
);

await iterateEntityTypeIndex({
type: filter._type,
concurrency,
options,
iteratee: (obj: T) => {
if (iteratedEntities.has(obj._key)) {
return;
Expand All @@ -241,7 +242,7 @@ export class FileSystemGraphObjectStore implements GraphObjectStore {
async iterateRelationships<T extends Relationship = Relationship>(
filter: GraphObjectFilter,
iteratee: GraphObjectIteratee<T>,
concurrency?: number,
options?: GraphObjectIterateeOptions,
) {
//TODO: Remove maps. This is a hack we did to avoid returning duplicated relationships.
//This should not work this way.
Expand All @@ -255,7 +256,7 @@ export class FileSystemGraphObjectStore implements GraphObjectStore {

await iterateRelationshipTypeIndex({
type: filter._type,
concurrency,
options,
iteratee: (obj: T) => {
if (iteratedRelationships.has(obj._key)) {
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -514,7 +514,7 @@ describe('iterateEntities', () => {
const nonMatchingEntities = times(25, () =>
createTestEntity({ _type: uuid() }),
);
const matchingEntities = times(25000, () =>
const matchingEntities = times(2500, () =>
createTestEntity({ _type: matchingType }),
);

Expand Down Expand Up @@ -579,7 +579,7 @@ describe('iterateEntities', () => {
}
collectedEntities.set(e._key, e);
},
5,
{ concurrency: 5 },
);

expect(collectedEntities.size).toEqual(matchingEntities.length + 1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,18 @@ import pMap from 'p-map';
interface BaseIterateCollectionIndexParams<GraphObject> {
type: string;
iteratee: GraphObjectIteratee<GraphObject>;
concurrency?: number;
options?: BaseIterateCollectionIndexOptionsParams;
}

interface IterateCollectionIndexParams<GraphObject>
extends BaseIterateCollectionIndexParams<GraphObject> {
collectionType: 'entities' | 'relationships';
}

interface BaseIterateCollectionIndexOptionsParams {
concurrency?: number;
}

/**
* Iterates through graph files.
* If concurrency is specified, it'll process the graph objects concurrently.
Expand All @@ -32,7 +36,7 @@ interface IterateCollectionIndexParams<GraphObject>
async function iterateCollectionTypeIndex<T extends Entity | Relationship>({
type,
collectionType,
concurrency,
options,
iteratee,
}: IterateCollectionIndexParams<T>) {
const path = buildIndexDirectoryPath({
Expand All @@ -44,31 +48,31 @@ async function iterateCollectionTypeIndex<T extends Entity | Relationship>({
await pMap(
(data[collectionType] as T[]) || [],
(graphObj) => iteratee(graphObj),
{ concurrency: concurrency ?? 1 },
{ concurrency: options?.concurrency ?? 1 },
);
}, path);
}

export async function iterateEntityTypeIndex<T extends Entity = Entity>({
type,
iteratee,
concurrency,
options,
}: BaseIterateCollectionIndexParams<T>) {
await iterateCollectionTypeIndex({
type,
iteratee,
concurrency,
options,
collectionType: 'entities',
});
}

export async function iterateRelationshipTypeIndex<
T extends Relationship = Relationship,
>({ type, iteratee, concurrency }: BaseIterateCollectionIndexParams<T>) {
>({ type, iteratee, options }: BaseIterateCollectionIndexParams<T>) {
await iterateCollectionTypeIndex({
type,
iteratee,
concurrency,
options,
collectionType: 'relationships',
});
}
Expand Down
9 changes: 5 additions & 4 deletions packages/integration-sdk-runtime/src/storage/memory.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import {
Entity,
GraphObjectFilter,
GraphObjectIteratee,
GraphObjectIterateeOptions,
GraphObjectStore,
IntegrationError,
IntegrationMissingKeyError,
Expand Down Expand Up @@ -156,7 +157,7 @@ export class InMemoryGraphObjectStore implements GraphObjectStore {
async iterateEntities<T extends Entity = Entity>(
filter: GraphObjectFilter,
iteratee: GraphObjectIteratee<T>,
concurrency?: number,
options?: GraphObjectIterateeOptions,
): Promise<void> {
const entityTypeKeysMap = this.entityTypeToKeysMap.get(filter._type);

Expand All @@ -179,14 +180,14 @@ export class InMemoryGraphObjectStore implements GraphObjectStore {

await iteratee(graphObjectData.entity as T);
},
{ concurrency: concurrency ?? 1 },
{ concurrency: options?.concurrency ?? 1 },
);
}

async iterateRelationships<T extends Relationship = Relationship>(
filter: GraphObjectFilter,
iteratee: GraphObjectIteratee<T>,
concurrency?: number,
options?: GraphObjectIterateeOptions,
): Promise<void> {
const relationshipTypeKeysMap = this.relationshipTypeToKeysMap.get(
filter._type,
Expand All @@ -211,7 +212,7 @@ export class InMemoryGraphObjectStore implements GraphObjectStore {

await iteratee(graphObjectData.relationship as T);
},
{ concurrency: concurrency ?? 1 },
{ concurrency: options?.concurrency ?? 1 },
);
}

Expand Down

0 comments on commit c74beec

Please sign in to comment.