Skip to content

Commit

Permalink
Support delete replication flow in bucket_diff
Browse files Browse the repository at this point in the history
Signed-off-by: Aayush Chouhan <[email protected]>
  • Loading branch information
achouhan09 committed Jan 2, 2024
1 parent ba50da7 commit 9432b25
Show file tree
Hide file tree
Showing 2 changed files with 99 additions and 41 deletions.
20 changes: 13 additions & 7 deletions src/server/bg_services/log_replication_scanner.js
Original file line number Diff line number Diff line change
Expand Up @@ -146,8 +146,7 @@ class LogReplicationScanner {
});

let copy_keys = {};
// TODO: support delete flow in object versions replication
// const delete_keys = [];
let delete_keys = {};

for (const candidate of Object.values(candidates)) {
const action = candidate.action;
Expand All @@ -159,21 +158,28 @@ class LogReplicationScanner {
// This is because the order of the versions matter. hance we just need the hint from the logs
// regarding which key was touched
const { keys_diff_map } = await bucketDiff.get_buckets_diff({
prefix: candidates.key,
prefix: candidate.key,
max_keys: Number(process.env.REPLICATION_MAX_KEYS) || 1000, //max_keys refers her to the max number of versions (+deletes)
current_first_bucket_cont_token: '',
current_second_bucket_cont_token: '',
});
// Currently, as get_buckets_diff is not supporting deletions, we will just pass the keys_diff_map as copy
// This needs to be reevaluated once the delete is supported.

const { keys_del_map } = await bucketDiff.get_buckets_del({
prefix: candidate.key,
max_keys: Number(process.env.REPLICATION_MAX_KEYS) || 1000, //max_keys refers her to the max number of versions (+deletes)
current_first_bucket_cont_token: '',
current_second_bucket_cont_token: '',
});

delete_keys = {...delete_keys, ...keys_del_map};
copy_keys = { ...copy_keys, ...keys_diff_map };
}
}

dbg.log1('process_candidates_sync_version:: copy_keys', copy_keys);
//TODO support delete flow and return also the delete
dbg.log1('process_candidates_sync_version:: delete_keys', delete_keys);
// returning copy_keys and delete_keys after processing candidates
// return { copy_keys, delete_keys };
// delete_keys returning empty array for now, will update it once the delete flow is completed
return { copy_keys, delete_keys: [] };
}

Expand Down
120 changes: 86 additions & 34 deletions src/server/utils/bucket_diff.js
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,50 @@ class BucketDiff {
this.for_replication = for_replication;
}

/**
* @param {{
* prefix: string;
* max_keys: number;
* current_first_bucket_cont_token: string;
* current_second_bucket_cont_token: string;
* }} params
*/
async get_buckets_del(params) {
const {
prefix,
max_keys,
current_first_bucket_cont_token,
current_second_bucket_cont_token,
} = params;

const diff = {
// keys_del_map is {{ [key: string]: Array<object> }} where the Array consist of metadata objects.
// for example: {
// "1": [
// { ETag: 'etag1.1', Size: 24599, Key: '1', VersionId: 'v1.1', IsLatest: true, },
// { ETag: 'etag1.2', Size: 89317, Key: '1', VersionId: 'v1.2', IsLatest: false, }
// ]
// }
keys_del_map: {},
first_bucket_cont_token: '',
second_bucket_cont_token: '',
};

let first_bucket_contents_left;
let first_bucket_delete_markers;
let first_bucket_cont_token;
//list the objects in the first bucket
({
bucket_contents_left: first_bucket_contents_left,
delete_markers: first_bucket_delete_markers,
bucket_cont_token: first_bucket_cont_token
} = await this.get_objects(this.first_bucket, prefix, max_keys, current_first_bucket_cont_token));

// TODO

return diff;
}

/**
* @param {{
* prefix: string;
Expand Down Expand Up @@ -149,36 +193,44 @@ class BucketDiff {

/**
* @param {import("aws-sdk/lib/request").PromiseResult<AWS.S3.ListObjectVersionsOutput, AWS.AWSError> |
* import("aws-sdk/lib/request").PromiseResult<AWS.S3.ListObjectsV2Output, AWS.AWSError>} list
*
* _object_grouped_by_key_and_omitted will return the objects grouped by key.
* When we have versioning enabled, if there is more than one key, it omits
* the last key from the object, in order to avoid processing incomplete list of object + version
*
* the results of this will be {{ [key: string]: Array<object> }} where the Array consist of metadata objects.
* for example: {
* "1": [
* { ETag: 'etag1.1', Size: 24599, Key: '1', VersionId: 'v1.1', IsLatest: true, },
* { ETag: 'etag1.2', Size: 89317, Key: '1', VersionId: 'v1.2', IsLatest: false, }
* ]
* }
* @returns {nb.BucketDiffKeysDiff}
*/
_object_grouped_by_key_and_omitted(list) {
if (!list) return {};
dbg.log1('_object_grouped_by_key_and_omitted list:', list);
const field = this.version ? "Versions" : "Contents";
let grouped_by_key = _.groupBy(list[field], "Key");
// We should not omit if this is a list object and not list versions
// and the use of continuation token later on the road will lead us to skip the last key if omitted.
if (list.IsTruncated && this.version) {
const last_key_pos = list[field].length - 1;
if (Object.keys(grouped_by_key).length > 1) {
grouped_by_key = _.omit(grouped_by_key, list[field][last_key_pos].Key);
}
}
return grouped_by_key;
}
* import("aws-sdk/lib/request").PromiseResult<AWS.S3.ListObjectsV2Output, AWS.AWSError>} list
*
* _object_grouped_by_key_and_omitted will return the objects grouped by key.
* When we have versioning enabled, if there is more than one key, it omits
* the last key from the object, in order to avoid processing incomplete list of object + version
*
* the results of this will be {{ [key: string]: Array<object> }} where the Array consist of metadata objects.
* for example: {
* "1": [
* { ETag: 'etag1.1', Size: 24599, Key: '1', VersionId: 'v1.1', IsLatest: true, },
* { ETag: 'etag1.2', Size: 89317, Key: '1', VersionId: 'v1.2', IsLatest: false, }
* ]
* }
* @returns {Object}
*/
_object_grouped_by_key_and_omitted(list) {
if (!list) return {};
dbg.log1('_object_grouped_by_key_and_omitted list:', list);
const field = this.version ? "Versions" : "Contents";

// Grouping bucket_contents_left and delete_markers
let bucket_contents_left = _.groupBy(list[field], "Key");
let delete_markers = _.groupBy(list['DeleteMarkers'], "Key")

// We should not omit if this is a list object and not list versions
// and the use of continuation token later on the road will lead us to skip the last key if omitted.
if (list.IsTruncated && this.version) {
const last_key_pos = list[field].length - 1;
const last_key_pos_del = list['DeleteMarkers'].length -1;
if (Object.keys(bucket_contents_left).length > 1) {
bucket_contents_left = _.omit(bucket_contents_left, list[field][last_key_pos].Key);
}
if (Object.keys(delete_markers).length > 1) {
delete_markers = _.omit(delete_markers, list['DeleteMarkers'][last_key_pos_del].Key)
}
}
return { bucket_contents_left, delete_markers };
}

/**
* @param {_.Dictionary<any[]>} list
Expand All @@ -199,18 +251,18 @@ class BucketDiff {
* @param {number} max_keys
* @param {string} curr_bucket_cont_token
*
* get_objects will get a bucket and parameters and return the object we want to work on and the continuation token
* get_objects will get a bucket and parameters and return the object and delete markers we want to work on and the continuation token
*
*/
async get_objects(bucket, prefix, max_keys, curr_bucket_cont_token) {
dbg.log2('BucketDiff get_objects::', bucket, prefix, max_keys, this.version, curr_bucket_cont_token);
const list_objects_response = await this._list_objects(bucket, prefix, max_keys, curr_bucket_cont_token);
if (!list_objects_response) return { bucket_contents_left: {}, bucket_cont_token: '' };
dbg.log2('BucketDiff get_objects:: bucket_response', list_objects_response);
const bucket_contents_left = this._object_grouped_by_key_and_omitted(list_objects_response);
const { bucket_contents_left, delete_markers } = this._object_grouped_by_key_and_omitted(list_objects_response);
const bucket_cont_token = this._get_next_key_marker(list_objects_response, bucket_contents_left);
dbg.log2('BucketDiff get_objects:: bucket', bucket, 'bucket_contents_left', bucket_contents_left);
return { bucket_contents_left, bucket_cont_token };
dbg.log2('BucketDiff get_objects:: bucket', bucket, 'bucket_contents_left', bucket_contents_left, 'delete_markers', delete_markers);
return { bucket_contents_left, delete_markers, bucket_cont_token };
}

/**
Expand Down

0 comments on commit 9432b25

Please sign in to comment.