diff --git a/packages/integration-sdk-core/src/errors.ts b/packages/integration-sdk-core/src/errors.ts index ab76a0083..6b3ab697c 100644 --- a/packages/integration-sdk-core/src/errors.ts +++ b/packages/integration-sdk-core/src/errors.ts @@ -98,7 +98,16 @@ export class IntegrationLocalConfigFieldMissingError extends IntegrationError { }); } } - +export class UploadError extends IntegrationError { + readonly stepsInvolved: string[] | undefined; + constructor(message: string, stepsInvolved?: string[]) { + super({ + code: 'UPLOAD_ERROR', + message, + }); + this.stepsInvolved = stepsInvolved; + } +} export class IntegrationLocalConfigFieldTypeMismatchError extends IntegrationError { constructor(message: string) { super({ diff --git a/packages/integration-sdk-core/src/types/storage.ts b/packages/integration-sdk-core/src/types/storage.ts index 171902755..aa9961841 100644 --- a/packages/integration-sdk-core/src/types/storage.ts +++ b/packages/integration-sdk-core/src/types/storage.ts @@ -17,13 +17,19 @@ export interface GraphObjectStore { addEntities( stepId: string, newEntities: Entity[], - onEntitiesFlushed?: (entities: Entity[]) => Promise, + onEntitiesFlushed?: ( + entities: Entity[], + stepsInvolved?: string[], + ) => Promise, ): Promise; addRelationships( stepId: string, newRelationships: Relationship[], - onRelationshipsFlushed?: (relationships: Relationship[]) => Promise, + onRelationshipsFlushed?: ( + relationships: Relationship[], + stepsInvolved?: string[], + ) => Promise, ): Promise; findEntity(_key: string | undefined): Promise; @@ -39,8 +45,14 @@ export interface GraphObjectStore { ): Promise; flush( - onEntitiesFlushed?: (entities: Entity[]) => Promise, - onRelationshipsFlushed?: (relationships: Relationship[]) => Promise, + onEntitiesFlushed?: ( + entities: Entity[], + stepsInvolved?: string[], + ) => Promise, + onRelationshipsFlushed?: ( + relationships: Relationship[], + stepsInvolved?: string[], + ) => Promise, ): Promise; getIndexMetadataForGraphObjectType?: ( diff --git a/packages/integration-sdk-runtime/src/execution/dependencyGraph.ts b/packages/integration-sdk-runtime/src/execution/dependencyGraph.ts index ee7460fd5..7f5846ad3 100644 --- a/packages/integration-sdk-runtime/src/execution/dependencyGraph.ts +++ b/packages/integration-sdk-runtime/src/execution/dependencyGraph.ts @@ -16,6 +16,7 @@ import { StepResultStatus, StepStartStates, StepExecutionHandlerWrapperFunction, + UploadError, } from '@jupiterone/integration-sdk-core'; import { timeOperation } from '../metrics'; @@ -405,6 +406,21 @@ export function executeStepDependencyGraph< status = StepResultStatus.FAILURE; } + if (context.jobState.waitUntilUploadsComplete) { + try { + // Failing to upload all integration data should not be considered a + // fatal failure. We just want to make this step as a partial success + // and move on with our lives! + await context.jobState.waitUntilUploadsComplete(); + } catch (err) { + if (err instanceof UploadError) { + failAllStepsWithError(err.stepsInvolved, err); + } else { + context.logger.stepFailure(step, err); + status = StepResultStatus.FAILURE; + } + } + } updateStepResultStatus(stepId, status, typeTracker); enqueueLeafSteps(); } @@ -452,44 +468,61 @@ export function executeStepDependencyGraph< return status; } + function failAllStepsWithError(steps, err) { + for (const stepId of steps) { + executionContext.logger.stepFailure( + workingGraph.getNodeData(stepId), + err, + ); + updateStepResultStatus(stepId, StepResultStatus.FAILURE, typeTracker); + } + } async function forceFlushEverything() { /** Instead of flushing after each step, flush only when we finish all steps OR when we reach the threshold limit * Because the 'createStepGraphObjectDataUploader' needs a step I'm using the last step as it */ let uploader: StepGraphObjectDataUploader | undefined; + const lastStep = Array.from(stepResultsMap.keys()).pop() as string; if (createStepGraphObjectDataUploader) { - uploader = createStepGraphObjectDataUploader( - Array.from(stepResultsMap.keys()).pop() as string, - ); + uploader = createStepGraphObjectDataUploader(lastStep); } - const stepsInvolvedInUpload = graphObjectStore.getStepsStored - ? graphObjectStore.getStepsStored() - : []; await graphObjectStore.flush( - async (entities) => + async (entities, stepsInvolved) => entities.length - ? uploader?.enqueue({ - entities, - relationships: [], - }) + ? uploader?.enqueue( + { + entities, + relationships: [], + }, + stepsInvolved, + ) : undefined, - async (relationships) => + async (relationships, stepsInvolved) => relationships.length - ? uploader?.enqueue({ - entities: [], - relationships, - }) + ? uploader?.enqueue( + { + entities: [], + relationships, + }, + stepsInvolved, + ) : undefined, ); try { await uploader?.waitUntilUploadsComplete(); } catch (err) { - for (const stepId of stepsInvolvedInUpload) { + if (err instanceof UploadError) { + failAllStepsWithError(err.stepsInvolved, err); + } else { executionContext.logger.stepFailure( - workingGraph.getNodeData(stepId), + workingGraph.getNodeData(lastStep), err, ); - updateStepResultStatus(stepId, StepResultStatus.FAILURE, typeTracker); + updateStepResultStatus( + lastStep, + StepResultStatus.FAILURE, + typeTracker, + ); } } } diff --git a/packages/integration-sdk-runtime/src/execution/jobState.ts b/packages/integration-sdk-runtime/src/execution/jobState.ts index 49281dca0..fa4f94be9 100644 --- a/packages/integration-sdk-runtime/src/execution/jobState.ts +++ b/packages/integration-sdk-runtime/src/execution/jobState.ts @@ -178,11 +178,14 @@ export function createStepJobState({ await graphObjectStore.addEntities( stepId, entities, - async (entities) => - uploader?.enqueue({ - entities, - relationships: [], - }), + async (entities, stepsInvolved) => + uploader?.enqueue( + { + entities, + relationships: [], + }, + stepsInvolved, + ), ); if (afterAddEntity) { @@ -224,11 +227,14 @@ export function createStepJobState({ await graphObjectStore.addRelationships( stepId, relationships, - async (relationships) => - uploader?.enqueue({ - entities: [], - relationships, - }), + async (relationships, stepsInvolved) => + uploader?.enqueue( + { + entities: [], + relationships, + }, + stepsInvolved, + ), ); if (afterAddRelationship) { @@ -286,16 +292,22 @@ export function createStepJobState({ flush: () => graphObjectStore.flush( - async (entities) => - uploader?.enqueue({ - entities, - relationships: [], - }), - async (relationships) => - uploader?.enqueue({ - entities: [], - relationships, - }), + async (entities, stepsInvolved) => + uploader?.enqueue( + { + entities, + relationships: [], + }, + stepsInvolved, + ), + async (relationships, stepsInvolved) => + uploader?.enqueue( + { + entities: [], + relationships, + }, + stepsInvolved, + ), ), async waitUntilUploadsComplete() { diff --git a/packages/integration-sdk-runtime/src/execution/uploader.ts b/packages/integration-sdk-runtime/src/execution/uploader.ts index 5f96d47a3..f577e7818 100644 --- a/packages/integration-sdk-runtime/src/execution/uploader.ts +++ b/packages/integration-sdk-runtime/src/execution/uploader.ts @@ -1,4 +1,4 @@ -import { IntegrationError } from '@jupiterone/integration-sdk-core'; +import { UploadError } from '@jupiterone/integration-sdk-core'; import PQueue from 'p-queue/dist'; import { FlushedGraphObjectData } from '../storage/types'; import { @@ -10,7 +10,10 @@ import { randomUUID as uuid } from 'crypto'; export interface StepGraphObjectDataUploader { stepId: string; - enqueue: (graphObjectData: FlushedGraphObjectData) => Promise; + enqueue: ( + graphObjectData: FlushedGraphObjectData, + stepsInvolved?: string[], + ) => Promise; waitUntilUploadsComplete: () => Promise; } @@ -37,10 +40,10 @@ export function createQueuedStepGraphObjectDataUploader({ let completed = false; const uploadErrors: Error[] = []; - + const stepsInvolvedInFailures = new Set(); return { stepId, - async enqueue(graphObjectData) { + async enqueue(graphObjectData, stepsInvolved) { if (completed) { // This step has already called ran `waitUntilUploadsComplete`, so we // do not want to allow any additional enqueuing. @@ -75,6 +78,12 @@ export function createQueuedStepGraphObjectDataUploader({ // The JupiterOne synchronization should be resilient enough to handle // cases where this could cause an issue (e.g. a relationship getting // uploaded that references an entity that failed to upload). + if (stepsInvolved) { + stepsInvolved.forEach( + stepsInvolvedInFailures.add, + stepsInvolvedInFailures, + ); + } uploadErrors.push(err); }); }, @@ -93,15 +102,12 @@ export function createQueuedStepGraphObjectDataUploader({ } if (uploadErrors.length) { - throw new IntegrationError({ - code: 'UPLOAD_ERROR', - message: `Error(s) uploading graph object data (stepId=${stepId}, errorMessages=${uploadErrors.join( + throw new UploadError( + `Error(s) uploading graph object data (stepId=${stepId}, errorMessages=${uploadErrors.join( ',', )})`, - // Just include the first error cause. We should be able to gather - // additional information from the joined error messages. - cause: uploadErrors[0], - }); + Array.from(stepsInvolvedInFailures.values()), + ); } }, }; diff --git a/packages/integration-sdk-runtime/src/storage/FileSystemGraphObjectStore/FileSystemGraphObjectStore.ts b/packages/integration-sdk-runtime/src/storage/FileSystemGraphObjectStore/FileSystemGraphObjectStore.ts index caa67ff34..6e20d7e37 100644 --- a/packages/integration-sdk-runtime/src/storage/FileSystemGraphObjectStore/FileSystemGraphObjectStore.ts +++ b/packages/integration-sdk-runtime/src/storage/FileSystemGraphObjectStore/FileSystemGraphObjectStore.ts @@ -159,7 +159,10 @@ export class FileSystemGraphObjectStore implements GraphObjectStore { async addEntities( stepId: string, newEntities: Entity[], - onEntitiesFlushed?: (entities: Entity[]) => Promise, + onEntitiesFlushed?: ( + entities: Entity[], + stepsInvolved?: string[], + ) => Promise, ) { await this.localGraphObjectStore.addEntities(stepId, newEntities); @@ -174,7 +177,10 @@ export class FileSystemGraphObjectStore implements GraphObjectStore { async addRelationships( stepId: string, newRelationships: Relationship[], - onRelationshipsFlushed?: (relationships: Relationship[]) => Promise, + onRelationshipsFlushed?: ( + relationships: Relationship[], + stepsInvolved?: string[], + ) => Promise, ) { await this.localGraphObjectStore.addRelationships(stepId, newRelationships); @@ -235,8 +241,14 @@ export class FileSystemGraphObjectStore implements GraphObjectStore { } async flush( - onEntitiesFlushed?: (entities: Entity[]) => Promise, - onRelationshipsFlushed?: (relationships: Relationship[]) => Promise, + onEntitiesFlushed?: ( + entities: Entity[], + stepsInvolved?: string[], + ) => Promise, + onRelationshipsFlushed?: ( + relationships: Relationship[], + stepsInvolved?: string[], + ) => Promise, ) { await Promise.all([ this.flushEntitiesToDisk(onEntitiesFlushed), @@ -245,7 +257,10 @@ export class FileSystemGraphObjectStore implements GraphObjectStore { } async flushEntitiesToDisk( - onEntitiesFlushed?: (entities: Entity[]) => Promise, + onEntitiesFlushed?: ( + entities: Entity[], + stepsInvolved?: string[], + ) => Promise, ) { await this.lockOperation(async () => { const entitiesByStep = this.localGraphObjectStore.collectEntitiesByStep(); @@ -295,13 +310,19 @@ export class FileSystemGraphObjectStore implements GraphObjectStore { } if (onEntitiesFlushed) { - await onEntitiesFlushed(entitiesToUpload); + await onEntitiesFlushed( + entitiesToUpload, + Array.from(entitiesByStep.keys()), + ); } }); } async flushRelationshipsToDisk( - onRelationshipsFlushed?: (relationships: Relationship[]) => Promise, + onRelationshipsFlushed?: ( + relationships: Relationship[], + stepsInvolved?: string[], + ) => Promise, ) { await this.lockOperation(async () => { const relationshipsByStep = @@ -340,7 +361,10 @@ export class FileSystemGraphObjectStore implements GraphObjectStore { } if (onRelationshipsFlushed) { - await onRelationshipsFlushed(relationshipsToUpload); + await onRelationshipsFlushed( + relationshipsToUpload, + Array.from(relationshipsByStep.keys()), + ); } }); }