Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Don't flush after step completes #977

Merged
merged 25 commits into from
Jun 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
0147cf6
Dont flush after steps
Oct 5, 2023
7f5ce50
format
Oct 5, 2023
08d9dc3
Removed .only from test
Oct 5, 2023
bdd38f7
Remove logger.info
Oct 5, 2023
0267c15
Corrected tests - Added comments - format
Oct 6, 2023
4f9eaaf
Added the failure of the last steps in case upload fails
Oct 9, 2023
753c85c
Added logic to fail all steps involved in an upload in case the uploa…
Oct 9, 2023
530ae9c
add graphObjectStore optional function
Oct 9, 2023
4cb0e15
Added a bit more efficiency
Oct 9, 2023
52e7821
Merge pull request #979 from JupiterOne/INT-9336-dont-flush-after-ste…
Gonzalo-Avalos-Ribas Oct 10, 2023
c85dbad
Minor changes
Oct 23, 2023
3663cb3
Better the error handling
Oct 23, 2023
a09a47c
Merge main
Dec 14, 2023
9c0a861
Merge pull request #987 from JupiterOne/INT-9336-error-handling
Gonzalo-Avalos-Ribas Jun 13, 2024
d9955ef
Merge main. resolve conflicts
Gonzalo-Avalos-Ribas Jun 13, 2024
0fc355a
Corrections
Gonzalo-Avalos-Ribas Jun 13, 2024
971bfbb
Corrected locks
Gonzalo-Avalos-Ribas Jun 17, 2024
af84aef
Corrected the tests
Gonzalo-Avalos-Ribas Jun 17, 2024
7449aea
Removed unused code, add comments
Gonzalo-Avalos-Ribas Jun 18, 2024
c4184e4
Removed steps involved
Gonzalo-Avalos-Ribas Jun 21, 2024
1599384
Removed involved steps
Gonzalo-Avalos-Ribas Jun 21, 2024
cd160ff
Final changes
Gonzalo-Avalos-Ribas Jun 21, 2024
25c6876
Corrections
Gonzalo-Avalos-Ribas Jun 21, 2024
bfd67e2
Comments and partial types
Gonzalo-Avalos-Ribas Jun 24, 2024
8c0b69e
Check for duplicates in partialTypes
Gonzalo-Avalos-Ribas Jun 24, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 10 additions & 1 deletion packages/integration-sdk-core/src/errors.ts
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,16 @@ export class IntegrationLocalConfigFieldMissingError extends IntegrationError {
});
}
}

export class UploadError extends IntegrationError {
readonly typesInvolved: string[] | undefined;
constructor(message: string, typesInvolved?: string[]) {
super({
code: 'UPLOAD_ERROR',
message,
});
this.typesInvolved = typesInvolved;
}
}
export class IntegrationLocalConfigFieldTypeMismatchError extends IntegrationError {
constructor(message: string) {
super({
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -650,7 +650,7 @@ describe('executeStepDependencyGraph', () => {
expect(spyB).toHaveBeenCalledBefore(spyC);
});

test('should mark steps with failed executionHandlers with status FAILURE and dependent steps with status PARTIAL_SUCCESS_DUE_TO_DEPENDENCY_FAILURE when step upload fails', async () => {
test('should throw if upload fails', async () => {
const spyA = jest.fn();
const spyB = jest.fn();
const spyC = jest.fn();
Expand Down Expand Up @@ -715,18 +715,19 @@ describe('executeStepDependencyGraph', () => {

function createFailingUploader(
stepId: string,
collector: FlushedGraphObjectData[],
): StepGraphObjectDataUploader {
return {
stepId,
async enqueue() {
async enqueue(graphObjectData) {
collector.push(graphObjectData);
return Promise.resolve();
},
waitUntilUploadsComplete() {
return Promise.reject(new Error('expected upload wait failure'));
return Promise.reject(new Error('Expected error'));
},
};
}

const passingUploaderCollector: FlushedGraphObjectData[] = [];

/**
Expand All @@ -737,73 +738,23 @@ describe('executeStepDependencyGraph', () => {
* 'b' depends on 'a',
* 'c' depends on 'b'
*/
const result = await executeSteps(
steps,
stepStartStates,
graphObjectStore,
(stepId) => {
if (stepId === 'b') {
return createFailingUploader(stepId);
} else {
return createPassingUploader(stepId, passingUploaderCollector);
await expect(
executeSteps(steps, stepStartStates, graphObjectStore, (stepId) => {
if (stepId == 'c') {
return createFailingUploader(stepId, passingUploaderCollector);
}
},
);
return createPassingUploader(stepId, passingUploaderCollector);
}),
).rejects.toThrow();

const expectedCollected: FlushedGraphObjectData[] = [
{
entities: [eA],
relationships: [],
},
{
entities: [eC],
entities: [eA, eB, eC],
relationships: [],
},
];

expect(passingUploaderCollector).toEqual(expectedCollected);

expect(result).toEqual([
{
id: 'a',
name: 'a',
declaredTypes: [],
partialTypes: [],
encounteredTypes: [eA._type],
encounteredTypeCounts: expect.any(Object),
status: StepResultStatus.SUCCESS,
startTime: expect.any(Number),
endTime: expect.any(Number),
duration: expect.any(Number),
},
{
id: 'b',
name: 'b',
declaredTypes: [],
partialTypes: [],
encounteredTypes: [eB._type],
encounteredTypeCounts: expect.any(Object),
dependsOn: ['a'],
status: StepResultStatus.FAILURE,
startTime: expect.any(Number),
endTime: expect.any(Number),
duration: expect.any(Number),
},
{
id: 'c',
name: 'c',
declaredTypes: [],
partialTypes: [],
encounteredTypes: [eC._type],
encounteredTypeCounts: expect.any(Object),
dependsOn: ['b'],
status: StepResultStatus.PARTIAL_SUCCESS_DUE_TO_DEPENDENCY_FAILURE,
startTime: expect.any(Number),
endTime: expect.any(Number),
duration: expect.any(Number),
},
]);

expect(spyA).toHaveBeenCalledTimes(1);
expect(spyB).toHaveBeenCalledTimes(1);
expect(spyC).toHaveBeenCalledTimes(1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import {
StepResultStatus,
StepStartStates,
StepExecutionHandlerWrapperFunction,
UploadError,
} from '@jupiterone/integration-sdk-core';

import { timeOperation } from '../metrics';
Expand Down Expand Up @@ -145,9 +146,17 @@ export function executeStepDependencyGraph<
startTime?: number;
endTime?: number;
duration?: number;
partialTypes?: string[];
}) {
const { stepId, status, typeTracker, startTime, endTime, duration } =
params;
const {
stepId,
status,
typeTracker,
startTime,
endTime,
duration,
partialTypes,
} = params;
const existingResult = stepResultsMap.get(stepId);
if (existingResult) {
stepResultsMap.set(stepId, {
Expand All @@ -162,6 +171,9 @@ export function executeStepDependencyGraph<
startTime,
endTime,
duration,
partialTypes: Array.from(
new Set(existingResult.partialTypes.concat(partialTypes ?? [])),
),
});
}
}
Expand Down Expand Up @@ -417,9 +429,7 @@ export function executeStepDependencyGraph<

status = StepResultStatus.FAILURE;
}

await context.jobState.flush();

let possibleAdditionalPartialTypes: string[] | undefined = undefined;
if (context.jobState.waitUntilUploadsComplete) {
try {
// Failing to upload all integration data should not be considered a
Expand All @@ -429,6 +439,9 @@ export function executeStepDependencyGraph<
} catch (err) {
context.logger.stepFailure(step, err);
status = StepResultStatus.FAILURE;
if (err instanceof UploadError) {
possibleAdditionalPartialTypes = err.typesInvolved;
}
}
}

Expand All @@ -439,6 +452,7 @@ export function executeStepDependencyGraph<
startTime,
endTime: Date.now(),
duration: Date.now() - startTime,
partialTypes: possibleAdditionalPartialTypes,
});
enqueueLeafSteps();
}
Expand Down Expand Up @@ -486,11 +500,62 @@ export function executeStepDependencyGraph<

return status;
}

async function forceFlushEverything() {
/** Instead of flushing after each step, flush only when we finish all steps OR when we reach the threshold limit
* Because the 'createStepGraphObjectDataUploader' needs a step I'm using the last step as it
*/
let uploader: StepGraphObjectDataUploader | undefined;
const lastStep = Array.from(stepResultsMap.keys()).pop() as string;
if (createStepGraphObjectDataUploader) {
uploader = createStepGraphObjectDataUploader(lastStep);
}
await graphObjectStore.flush(
async (entities) =>
entities.length
? uploader?.enqueue({
entities,
relationships: [],
})
: undefined,
async (relationships) =>
relationships.length
? uploader?.enqueue({
entities: [],
relationships,
})
: undefined,
);
try {
await uploader?.waitUntilUploadsComplete();
} catch (err) {
executionContext.logger.stepFailure(
workingGraph.getNodeData(lastStep),
err,
);
if (err instanceof UploadError) {
updateStepResultStatus({
stepId: lastStep,
status: StepResultStatus.FAILURE,
typeTracker,
partialTypes: err.typesInvolved, //We mark as partial all types related to the failed uploads
});
} else {
updateStepResultStatus({
stepId: lastStep,
status: StepResultStatus.FAILURE,
typeTracker,
});
}
}
}

// kick off work for all leaf nodes
enqueueLeafSteps();

void promiseQueue
.onIdle()
.then(forceFlushEverything)
.then(() => resolve([...stepResultsMap.values()]))
.catch(reject);
});
Expand Down
47 changes: 47 additions & 0 deletions packages/integration-sdk-runtime/src/execution/uploader.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import { createApiClient, getApiBaseUrl } from '../api';
import { generateSynchronizationJob } from '../synchronization/__tests__/util/generateSynchronizationJob';
import { createMockIntegrationLogger } from '../../test/util/fixtures';
import { getExpectedRequestHeaders } from '../../test/util/request';
import { UploadError } from '@jupiterone/integration-sdk-core';

function createFlushedGraphObjectData(): FlushedGraphObjectData {
return {
Expand Down Expand Up @@ -135,6 +136,52 @@ describe('#createQueuedStepGraphObjectDataUploader', () => {

expect(uploaded).toEqual([flushed[0], flushed[2], flushedAfterFailure]);
});

test('should throw UploadError with types involved', async () => {
const uploaded: FlushedGraphObjectData[] = [];
const stepId = uuid();

let numQueued = 0;

const uploader = createQueuedStepGraphObjectDataUploader({
stepId,
uploadConcurrency: 2,
async upload(d) {
numQueued++;

if (numQueued === 2) {
await sleep(100);
throw new Error('expected upload error');
} else {
await sleep(200);
uploaded.push(d);
}
},
});

const flushed = await createAndEnqueueUploads(uploader, 3);

// Ensure that the next enqueue happens _after_ a failure has occurred.
await sleep(300);
const flushedAfterFailure = createFlushedGraphObjectData();
await uploader.enqueue(flushedAfterFailure);

try {
await uploader.waitUntilUploadsComplete();
} catch (error) {
expect(error).toBeInstanceOf(UploadError);
flushed[1].entities.forEach((entity) =>
expect((error as UploadError).typesInvolved as string[]).toInclude(
entity._type,
),
);
flushed[1].relationships.forEach((relationship) =>
expect((error as UploadError).typesInvolved as string[]).toInclude(
relationship._type,
),
);
}
});
});

describe('#createPersisterApiStepGraphObjectDataUploader', () => {
Expand Down
25 changes: 16 additions & 9 deletions packages/integration-sdk-runtime/src/execution/uploader.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { IntegrationError } from '@jupiterone/integration-sdk-core';
import { UploadError } from '@jupiterone/integration-sdk-core';
import PQueue from 'p-queue/dist';
import { FlushedGraphObjectData } from '../storage/types';
import {
Expand Down Expand Up @@ -37,7 +37,7 @@ export function createQueuedStepGraphObjectDataUploader({

let completed = false;
const uploadErrors: Error[] = [];

const typesInvolvedInFailures = new Set<string>();
return {
stepId,
async enqueue(graphObjectData) {
Expand Down Expand Up @@ -76,6 +76,16 @@ export function createQueuedStepGraphObjectDataUploader({
// cases where this could cause an issue (e.g. a relationship getting
// uploaded that references an entity that failed to upload).
uploadErrors.push(err);
if (graphObjectData) {
graphObjectData.entities.forEach(
(entity) => typesInvolvedInFailures.add(entity._type),
typesInvolvedInFailures,
);
graphObjectData.relationships.forEach(
(relationship) => typesInvolvedInFailures.add(relationship._type),
typesInvolvedInFailures,
);
}
});
},

Expand All @@ -93,15 +103,12 @@ export function createQueuedStepGraphObjectDataUploader({
}

if (uploadErrors.length) {
throw new IntegrationError({
code: 'UPLOAD_ERROR',
message: `Error(s) uploading graph object data (stepId=${stepId}, errorMessages=${uploadErrors.join(
throw new UploadError(
`Error(s) uploading graph object data (stepId=${stepId}, errorMessages=${uploadErrors.join(
',',
)})`,
// Just include the first error cause. We should be able to gather
// additional information from the joined error messages.
cause: uploadErrors[0],
});
Array.from(typesInvolvedInFailures.values()),
);
}
},
};
Expand Down
Loading