Skip to content

Commit

Permalink
Comments and partial types
Browse files Browse the repository at this point in the history
  • Loading branch information
Gonzalo-Avalos-Ribas committed Jun 24, 2024
1 parent 25c6876 commit bfd67e2
Show file tree
Hide file tree
Showing 4 changed files with 39 additions and 17 deletions.
5 changes: 0 additions & 5 deletions packages/integration-sdk-core/src/types/step.ts
Original file line number Diff line number Diff line change
Expand Up @@ -113,11 +113,6 @@ export type IntegrationStepResult = Omit<
* Duration of the step in milliseconds, calculated as endTime - startTime.
*/
duration?: number;
/**
* TODO: Decouple steps from uploads. This shouldn't be here.
* This are types that are involved in a failed upload. We should mark them as partial.
*/
partialTypesToForce?: string[];
};

export type IntegrationStep<
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ export function executeStepDependencyGraph<
startTime?: number;
endTime?: number;
duration?: number;
partialTypesToForce?: string[];
partialTypes?: string[];
}) {
const {
stepId,
Expand All @@ -155,7 +155,7 @@ export function executeStepDependencyGraph<
startTime,
endTime,
duration,
partialTypesToForce,
partialTypes,
} = params;
const existingResult = stepResultsMap.get(stepId);
if (existingResult) {
Expand All @@ -171,7 +171,7 @@ export function executeStepDependencyGraph<
startTime,
endTime,
duration,
partialTypesToForce: partialTypesToForce,
partialTypes: existingResult.partialTypes.concat(partialTypes ?? []),
});
}
}
Expand Down Expand Up @@ -450,7 +450,7 @@ export function executeStepDependencyGraph<
startTime,
endTime: Date.now(),
duration: Date.now() - startTime,
partialTypesToForce: possibleAdditionalPartialTypes,
partialTypes: possibleAdditionalPartialTypes,
});
enqueueLeafSteps();
}
Expand Down Expand Up @@ -536,7 +536,7 @@ export function executeStepDependencyGraph<
stepId: lastStep,
status: StepResultStatus.FAILURE,
typeTracker,
partialTypesToForce: err.typesInvolved, //We mark as partial all types related to the failed uploads
partialTypes: err.typesInvolved, //We mark as partial all types related to the failed uploads
});
} else {
updateStepResultStatus({
Expand Down
3 changes: 0 additions & 3 deletions packages/integration-sdk-runtime/src/execution/step.ts
Original file line number Diff line number Diff line change
Expand Up @@ -142,9 +142,6 @@ export function determinePartialDatasetsFromStepExecutionResults(
stepPartialDatasets.types.push(...stepResult.declaredTypes);
}

if (stepResult.partialTypesToForce) {
stepPartialDatasets.types.push(...stepResult.partialTypesToForce);
}
return {
types: uniq(partialDatasets.types.concat(stepPartialDatasets.types)),
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -240,13 +240,28 @@ export class FileSystemGraphObjectStore implements GraphObjectStore {
this.flushRelationshipsToDisk(onRelationshipsFlushed, true),
]);
}

/**
* Asynchronously flushes entity data to disk.
*
* This function ensures that entity data is saved to disk when necessary. It uses a locking mechanism
* to prevent concurrent modifications and checks if the data size exceeds a certain threshold before flushing.
*
* @param {function} [onEntitiesFlushed] - Optional. A callback function that is invoked after the entities
* have been flushed to disk. It receives an array of entities as
* an argument and returns a Promise.
* @param {Boolean} [force=false] - Optional. A boolean flag indicating whether to force the flushing process
* regardless of the data size threshold.
*
* This process ensures efficient and necessary data uploads, avoiding redundant or unnecessary disk operations.
*/
async flushEntitiesToDisk(
onEntitiesFlushed?: (entities: Entity[]) => Promise<void>,
force: Boolean = false,
) {
await this.lockOperation(async () => {
// Do not flush entities to disk if we haven't reached the `graphObjectBufferThresholdInBytes` of data in memory or if we don't force this process (when all the steps are completed)
// This code rechecks the condition that triggers the flushing process to avoid unnecessary uploads
// During concurrent steps, we might be deleting items from memory while a step is adding new items. This could cause the threshold
// to be triggered again. By rechecking the condition, we ensure that only necessary uploads occur.
if (
!force &&
this.localGraphObjectStore.getTotalEntitySizeInBytes() <
Expand Down Expand Up @@ -306,13 +321,28 @@ export class FileSystemGraphObjectStore implements GraphObjectStore {
}
});
}

/**
* Asynchronously flushes relationship data to disk.
*
* This function ensures that relationship data is saved to disk when necessary. It uses a locking mechanism
* to prevent concurrent modifications and checks if the data size exceeds a certain threshold before flushing.
*
* @param {function} [onRelationshipsFlushed] - Optional. A callback function that is invoked after the relationships
* have been flushed to disk. It receives an array of relationships as
* an argument and returns a Promise.
* @param {Boolean} [force=false] - Optional. A boolean flag indicating whether to force the flushing process
* regardless of the data size threshold.
*
* This process ensures efficient and necessary data uploads, avoiding redundant or unnecessary disk operations.
*/
async flushRelationshipsToDisk(
onRelationshipsFlushed?: (relationships: Relationship[]) => Promise<void>,
force: Boolean = false,
) {
await this.lockOperation(async () => {
// Do not flush relationships to disk if we haven't reached the `graphObjectBufferThresholdInBytes` of data in memory or if we don't force this process (when all the steps are completed)
// This code rechecks the condition that triggers the flushing process to avoid unnecessary uploads
// During concurrent steps, we might be deleting items from memory while a step is adding new items. This could cause the threshold
// to be triggered again. By rechecking the condition, we ensure that only necessary uploads occur.
if (
!force &&
this.localGraphObjectStore.getTotalRelationshipSizeInBytes() <
Expand Down

0 comments on commit bfd67e2

Please sign in to comment.