Skip to content

Commit

Permalink
Merge pull request #987 from JupiterOne/INT-9336-error-handling
Browse files Browse the repository at this point in the history
Better the error handling
  • Loading branch information
Gonzalo-Avalos-Ribas authored Jun 13, 2024
2 parents a09a47c + 3663cb3 commit 9c0a861
Show file tree
Hide file tree
Showing 6 changed files with 159 additions and 63 deletions.
11 changes: 10 additions & 1 deletion packages/integration-sdk-core/src/errors.ts
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,16 @@ export class IntegrationLocalConfigFieldMissingError extends IntegrationError {
});
}
}

export class UploadError extends IntegrationError {
readonly stepsInvolved: string[] | undefined;
constructor(message: string, stepsInvolved?: string[]) {
super({
code: 'UPLOAD_ERROR',
message,
});
this.stepsInvolved = stepsInvolved;
}
}
export class IntegrationLocalConfigFieldTypeMismatchError extends IntegrationError {
constructor(message: string) {
super({
Expand Down
20 changes: 16 additions & 4 deletions packages/integration-sdk-core/src/types/storage.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,19 @@ export interface GraphObjectStore {
addEntities(
stepId: string,
newEntities: Entity[],
onEntitiesFlushed?: (entities: Entity[]) => Promise<void>,
onEntitiesFlushed?: (
entities: Entity[],
stepsInvolved?: string[],
) => Promise<void>,
): Promise<void>;

addRelationships(
stepId: string,
newRelationships: Relationship[],
onRelationshipsFlushed?: (relationships: Relationship[]) => Promise<void>,
onRelationshipsFlushed?: (
relationships: Relationship[],
stepsInvolved?: string[],
) => Promise<void>,
): Promise<void>;

findEntity(_key: string | undefined): Promise<Entity | undefined>;
Expand All @@ -39,8 +45,14 @@ export interface GraphObjectStore {
): Promise<void>;

flush(
onEntitiesFlushed?: (entities: Entity[]) => Promise<void>,
onRelationshipsFlushed?: (relationships: Relationship[]) => Promise<void>,
onEntitiesFlushed?: (
entities: Entity[],
stepsInvolved?: string[],
) => Promise<void>,
onRelationshipsFlushed?: (
relationships: Relationship[],
stepsInvolved?: string[],
) => Promise<void>,
): Promise<void>;

getIndexMetadataForGraphObjectType?: (
Expand Down
71 changes: 52 additions & 19 deletions packages/integration-sdk-runtime/src/execution/dependencyGraph.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import {
StepResultStatus,
StepStartStates,
StepExecutionHandlerWrapperFunction,
UploadError,
} from '@jupiterone/integration-sdk-core';

import { timeOperation } from '../metrics';
Expand Down Expand Up @@ -405,6 +406,21 @@ export function executeStepDependencyGraph<
status = StepResultStatus.FAILURE;
}

if (context.jobState.waitUntilUploadsComplete) {
try {
// Failing to upload all integration data should not be considered a
// fatal failure. We just want to make this step as a partial success
// and move on with our lives!
await context.jobState.waitUntilUploadsComplete();
} catch (err) {
if (err instanceof UploadError) {
failAllStepsWithError(err.stepsInvolved, err);
} else {
context.logger.stepFailure(step, err);
status = StepResultStatus.FAILURE;
}
}
}
updateStepResultStatus(stepId, status, typeTracker);
enqueueLeafSteps();
}
Expand Down Expand Up @@ -452,44 +468,61 @@ export function executeStepDependencyGraph<

return status;
}
function failAllStepsWithError(steps, err) {
for (const stepId of steps) {
executionContext.logger.stepFailure(
workingGraph.getNodeData(stepId),
err,
);
updateStepResultStatus(stepId, StepResultStatus.FAILURE, typeTracker);
}
}
async function forceFlushEverything() {
/** Instead of flushing after each step, flush only when we finish all steps OR when we reach the threshold limit
* Because the 'createStepGraphObjectDataUploader' needs a step I'm using the last step as it
*/
let uploader: StepGraphObjectDataUploader | undefined;
const lastStep = Array.from(stepResultsMap.keys()).pop() as string;
if (createStepGraphObjectDataUploader) {
uploader = createStepGraphObjectDataUploader(
Array.from(stepResultsMap.keys()).pop() as string,
);
uploader = createStepGraphObjectDataUploader(lastStep);
}
const stepsInvolvedInUpload = graphObjectStore.getStepsStored
? graphObjectStore.getStepsStored()
: [];
await graphObjectStore.flush(
async (entities) =>
async (entities, stepsInvolved) =>
entities.length
? uploader?.enqueue({
entities,
relationships: [],
})
? uploader?.enqueue(
{
entities,
relationships: [],
},
stepsInvolved,
)
: undefined,
async (relationships) =>
async (relationships, stepsInvolved) =>
relationships.length
? uploader?.enqueue({
entities: [],
relationships,
})
? uploader?.enqueue(
{
entities: [],
relationships,
},
stepsInvolved,
)
: undefined,
);
try {
await uploader?.waitUntilUploadsComplete();
} catch (err) {
for (const stepId of stepsInvolvedInUpload) {
if (err instanceof UploadError) {
failAllStepsWithError(err.stepsInvolved, err);
} else {
executionContext.logger.stepFailure(
workingGraph.getNodeData(stepId),
workingGraph.getNodeData(lastStep),
err,
);
updateStepResultStatus(stepId, StepResultStatus.FAILURE, typeTracker);
updateStepResultStatus(
lastStep,
StepResultStatus.FAILURE,
typeTracker,
);
}
}
}
Expand Down
52 changes: 32 additions & 20 deletions packages/integration-sdk-runtime/src/execution/jobState.ts
Original file line number Diff line number Diff line change
Expand Up @@ -178,11 +178,14 @@ export function createStepJobState({
await graphObjectStore.addEntities(
stepId,
entities,
async (entities) =>
uploader?.enqueue({
entities,
relationships: [],
}),
async (entities, stepsInvolved) =>
uploader?.enqueue(
{
entities,
relationships: [],
},
stepsInvolved,
),
);

if (afterAddEntity) {
Expand Down Expand Up @@ -224,11 +227,14 @@ export function createStepJobState({
await graphObjectStore.addRelationships(
stepId,
relationships,
async (relationships) =>
uploader?.enqueue({
entities: [],
relationships,
}),
async (relationships, stepsInvolved) =>
uploader?.enqueue(
{
entities: [],
relationships,
},
stepsInvolved,
),
);

if (afterAddRelationship) {
Expand Down Expand Up @@ -286,16 +292,22 @@ export function createStepJobState({

flush: () =>
graphObjectStore.flush(
async (entities) =>
uploader?.enqueue({
entities,
relationships: [],
}),
async (relationships) =>
uploader?.enqueue({
entities: [],
relationships,
}),
async (entities, stepsInvolved) =>
uploader?.enqueue(
{
entities,
relationships: [],
},
stepsInvolved,
),
async (relationships, stepsInvolved) =>
uploader?.enqueue(
{
entities: [],
relationships,
},
stepsInvolved,
),
),

async waitUntilUploadsComplete() {
Expand Down
28 changes: 17 additions & 11 deletions packages/integration-sdk-runtime/src/execution/uploader.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { IntegrationError } from '@jupiterone/integration-sdk-core';
import { UploadError } from '@jupiterone/integration-sdk-core';
import PQueue from 'p-queue/dist';
import { FlushedGraphObjectData } from '../storage/types';
import {
Expand All @@ -10,7 +10,10 @@ import { randomUUID as uuid } from 'crypto';

export interface StepGraphObjectDataUploader {
stepId: string;
enqueue: (graphObjectData: FlushedGraphObjectData) => Promise<void>;
enqueue: (
graphObjectData: FlushedGraphObjectData,
stepsInvolved?: string[],
) => Promise<void>;
waitUntilUploadsComplete: () => Promise<void>;
}

Expand All @@ -37,10 +40,10 @@ export function createQueuedStepGraphObjectDataUploader({

let completed = false;
const uploadErrors: Error[] = [];

const stepsInvolvedInFailures = new Set<string>();
return {
stepId,
async enqueue(graphObjectData) {
async enqueue(graphObjectData, stepsInvolved) {
if (completed) {
// This step has already called ran `waitUntilUploadsComplete`, so we
// do not want to allow any additional enqueuing.
Expand Down Expand Up @@ -75,6 +78,12 @@ export function createQueuedStepGraphObjectDataUploader({
// The JupiterOne synchronization should be resilient enough to handle
// cases where this could cause an issue (e.g. a relationship getting
// uploaded that references an entity that failed to upload).
if (stepsInvolved) {
stepsInvolved.forEach(
stepsInvolvedInFailures.add,
stepsInvolvedInFailures,
);
}
uploadErrors.push(err);
});
},
Expand All @@ -93,15 +102,12 @@ export function createQueuedStepGraphObjectDataUploader({
}

if (uploadErrors.length) {
throw new IntegrationError({
code: 'UPLOAD_ERROR',
message: `Error(s) uploading graph object data (stepId=${stepId}, errorMessages=${uploadErrors.join(
throw new UploadError(
`Error(s) uploading graph object data (stepId=${stepId}, errorMessages=${uploadErrors.join(
',',
)})`,
// Just include the first error cause. We should be able to gather
// additional information from the joined error messages.
cause: uploadErrors[0],
});
Array.from(stepsInvolvedInFailures.values()),
);
}
},
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,10 @@ export class FileSystemGraphObjectStore implements GraphObjectStore {
async addEntities(
stepId: string,
newEntities: Entity[],
onEntitiesFlushed?: (entities: Entity[]) => Promise<void>,
onEntitiesFlushed?: (
entities: Entity[],
stepsInvolved?: string[],
) => Promise<void>,
) {
await this.localGraphObjectStore.addEntities(stepId, newEntities);

Expand All @@ -174,7 +177,10 @@ export class FileSystemGraphObjectStore implements GraphObjectStore {
async addRelationships(
stepId: string,
newRelationships: Relationship[],
onRelationshipsFlushed?: (relationships: Relationship[]) => Promise<void>,
onRelationshipsFlushed?: (
relationships: Relationship[],
stepsInvolved?: string[],
) => Promise<void>,
) {
await this.localGraphObjectStore.addRelationships(stepId, newRelationships);

Expand Down Expand Up @@ -235,8 +241,14 @@ export class FileSystemGraphObjectStore implements GraphObjectStore {
}

async flush(
onEntitiesFlushed?: (entities: Entity[]) => Promise<void>,
onRelationshipsFlushed?: (relationships: Relationship[]) => Promise<void>,
onEntitiesFlushed?: (
entities: Entity[],
stepsInvolved?: string[],
) => Promise<void>,
onRelationshipsFlushed?: (
relationships: Relationship[],
stepsInvolved?: string[],
) => Promise<void>,
) {
await Promise.all([
this.flushEntitiesToDisk(onEntitiesFlushed),
Expand All @@ -245,7 +257,10 @@ export class FileSystemGraphObjectStore implements GraphObjectStore {
}

async flushEntitiesToDisk(
onEntitiesFlushed?: (entities: Entity[]) => Promise<void>,
onEntitiesFlushed?: (
entities: Entity[],
stepsInvolved?: string[],
) => Promise<void>,
) {
await this.lockOperation(async () => {
const entitiesByStep = this.localGraphObjectStore.collectEntitiesByStep();
Expand Down Expand Up @@ -295,13 +310,19 @@ export class FileSystemGraphObjectStore implements GraphObjectStore {
}

if (onEntitiesFlushed) {
await onEntitiesFlushed(entitiesToUpload);
await onEntitiesFlushed(
entitiesToUpload,
Array.from(entitiesByStep.keys()),
);
}
});
}

async flushRelationshipsToDisk(
onRelationshipsFlushed?: (relationships: Relationship[]) => Promise<void>,
onRelationshipsFlushed?: (
relationships: Relationship[],
stepsInvolved?: string[],
) => Promise<void>,
) {
await this.lockOperation(async () => {
const relationshipsByStep =
Expand Down Expand Up @@ -340,7 +361,10 @@ export class FileSystemGraphObjectStore implements GraphObjectStore {
}

if (onRelationshipsFlushed) {
await onRelationshipsFlushed(relationshipsToUpload);
await onRelationshipsFlushed(
relationshipsToUpload,
Array.from(relationshipsByStep.keys()),
);
}
});
}
Expand Down

0 comments on commit 9c0a861

Please sign in to comment.