Skip to content

Commit

Permalink
Support delete replication flow in bucket_diff
Browse files Browse the repository at this point in the history
Signed-off-by: Aayush Chouhan <[email protected]>
  • Loading branch information
achouhan09 committed Jan 5, 2024
1 parent d18db43 commit 2c555e6
Show file tree
Hide file tree
Showing 2 changed files with 78 additions and 43 deletions.
15 changes: 7 additions & 8 deletions src/server/bg_services/log_replication_scanner.js
Original file line number Diff line number Diff line change
Expand Up @@ -146,8 +146,7 @@ class LogReplicationScanner {
});

let copy_keys = {};
// TODO: support delete flow in object versions replication
// const delete_keys = [];
let delete_keys = [];

for (const candidate of Object.values(candidates)) {
const action = candidate.action;
Expand All @@ -158,22 +157,22 @@ class LogReplicationScanner {
// The action should not matter to get_buckets_diff, we will evaluate the action inside.
// This is because the order of the versions matter. hance we just need the hint from the logs
// regarding which key was touched
const { keys_diff_map } = await bucketDiff.get_buckets_diff({
prefix: candidates.key,
const { keys_diff_map, keys_del_arr } = await bucketDiff.get_buckets_diff({
prefix: candidate.key,
max_keys: Number(process.env.REPLICATION_MAX_KEYS) || 1000, //max_keys refers her to the max number of versions (+deletes)
current_first_bucket_cont_token: '',
current_second_bucket_cont_token: '',
});
// Currently, as get_buckets_diff is not supporting deletions, we will just pass the keys_diff_map as copy
// This needs to be reevaluated once the delete is supported.

delete_keys = [...delete_keys, ...keys_del_arr];
copy_keys = { ...copy_keys, ...keys_diff_map };
}
}

dbg.log1('process_candidates_sync_version:: copy_keys', copy_keys);
//TODO support delete flow and return also the delete
dbg.log1('process_candidates_sync_version:: delete_keys', delete_keys);
// returning copy_keys and delete_keys after processing candidates
// return { copy_keys, delete_keys };
// delete_keys returning empty array for now, will update it once the delete flow is completed
return { copy_keys, delete_keys: [] };
}

Expand Down
106 changes: 71 additions & 35 deletions src/server/utils/bucket_diff.js
Original file line number Diff line number Diff line change
Expand Up @@ -69,38 +69,47 @@ class BucketDiff {
// ]
// }
keys_diff_map: {},
keys_del_arr: [],
first_bucket_cont_token: '',
second_bucket_cont_token: '',
};

let first_bucket_contents_left;
let first_bucket_cont_token;
let first_bucket_delete_markers
//list the objects in the first bucket
({
bucket_contents_left: first_bucket_contents_left,
bucket_cont_token: first_bucket_cont_token
bucket_cont_token: first_bucket_cont_token,
delete_markers: first_bucket_delete_markers
} = await this.get_objects(this.first_bucket, prefix, max_keys, current_first_bucket_cont_token));

if (first_bucket_cont_token) diff.first_bucket_cont_token = first_bucket_cont_token;
if (Object.keys(first_bucket_contents_left).length === 0) return diff;

let second_bucket_contents_left;
let second_bucket_cont_token;
let second_bucket_delete_markers;
let new_second_bucket_cont_token = current_second_bucket_cont_token;
let keep_listing_second_bucket = true;

while (keep_listing_second_bucket) {
({
bucket_contents_left: second_bucket_contents_left,
bucket_cont_token: new_second_bucket_cont_token
bucket_cont_token: new_second_bucket_cont_token,
delete_markers: second_bucket_delete_markers
} = await this.get_objects(this.second_bucket, prefix, max_keys, new_second_bucket_cont_token));

const keys_diff_response = await this.get_keys_diff(
first_bucket_contents_left, second_bucket_contents_left, new_second_bucket_cont_token);

const delete_keys = await this.get_keys_del(
first_bucket_contents_left, first_bucket_delete_markers, second_bucket_delete_markers);

first_bucket_contents_left = keys_diff_response.keys_contents_left;
keep_listing_second_bucket = keys_diff_response.keep_listing_second_bucket;
diff.keys_diff_map = { ...diff.keys_diff_map, ...keys_diff_response.keys_diff_map };
diff.keys_del_arr = [...diff.keys_del_arr, ...delete_keys];

const first_bucket_key_array = Object.keys(first_bucket_contents_left);
const second_bucket_key_array = Object.keys(second_bucket_contents_left);
Expand Down Expand Up @@ -149,36 +158,36 @@ class BucketDiff {

/**
* @param {import("aws-sdk/lib/request").PromiseResult<AWS.S3.ListObjectVersionsOutput, AWS.AWSError> |
* import("aws-sdk/lib/request").PromiseResult<AWS.S3.ListObjectsV2Output, AWS.AWSError>} list
*
* _object_grouped_by_key_and_omitted will return the objects grouped by key.
* When we have versioning enabled, if there is more than one key, it omits
* the last key from the object, in order to avoid processing incomplete list of object + version
*
* the results of this will be {{ [key: string]: Array<object> }} where the Array consist of metadata objects.
* for example: {
* "1": [
* { ETag: 'etag1.1', Size: 24599, Key: '1', VersionId: 'v1.1', IsLatest: true, },
* { ETag: 'etag1.2', Size: 89317, Key: '1', VersionId: 'v1.2', IsLatest: false, }
* ]
* }
* @returns {nb.BucketDiffKeysDiff}
*/
_object_grouped_by_key_and_omitted(list) {
if (!list) return {};
dbg.log1('_object_grouped_by_key_and_omitted list:', list);
const field = this.version ? "Versions" : "Contents";
let grouped_by_key = _.groupBy(list[field], "Key");
// We should not omit if this is a list object and not list versions
// and the use of continuation token later on the road will lead us to skip the last key if omitted.
if (list.IsTruncated && this.version) {
const last_key_pos = list[field].length - 1;
if (Object.keys(grouped_by_key).length > 1) {
grouped_by_key = _.omit(grouped_by_key, list[field][last_key_pos].Key);
}
}
return grouped_by_key;
}
* import("aws-sdk/lib/request").PromiseResult<AWS.S3.ListObjectsV2Output, AWS.AWSError>} list
*
* _object_grouped_by_key_and_omitted will return the objects grouped by key.
* When we have versioning enabled, if there is more than one key, it omits
* the last key from the object, in order to avoid processing incomplete list of object + version
*
* the results of this will be {{ [key: string]: Array<object> }} where the Array consist of metadata objects.
* for example: {
* "1": [
* { ETag: 'etag1.1', Size: 24599, Key: '1', VersionId: 'v1.1', IsLatest: true, },
* { ETag: 'etag1.2', Size: 89317, Key: '1', VersionId: 'v1.2', IsLatest: false, }
* ]
* }
* @returns {nb.BucketDiffKeysDiff}
*/
_object_grouped_by_key_and_omitted(list) {
if (!list) return {};
dbg.log1('_object_grouped_by_key_and_omitted list:', list);
const field = this.version ? "Versions" : "Contents";
let grouped_by_key = _.groupBy(list[field], "Key");
// We should not omit if this is a list object and not list versions
// and the use of continuation token later on the road will lead us to skip the last key if omitted.
if (list.IsTruncated && this.version) {
const last_key_pos = list[field].length - 1;
if (Object.keys(grouped_by_key).length > 1) {
grouped_by_key = _.omit(grouped_by_key, list[field][last_key_pos].Key);
}
}
return grouped_by_key;
}

/**
* @param {_.Dictionary<any[]>} list
Expand All @@ -199,7 +208,7 @@ class BucketDiff {
* @param {number} max_keys
* @param {string} curr_bucket_cont_token
*
* get_objects will get a bucket and parameters and return the object we want to work on and the continuation token
* get_objects will get a bucket and parameters and return the object and delete markers we want to work on and the continuation token
*
*/
async get_objects(bucket, prefix, max_keys, curr_bucket_cont_token) {
Expand All @@ -209,8 +218,10 @@ class BucketDiff {
dbg.log2('BucketDiff get_objects:: bucket_response', list_objects_response);
const bucket_contents_left = this._object_grouped_by_key_and_omitted(list_objects_response);
const bucket_cont_token = this._get_next_key_marker(list_objects_response, bucket_contents_left);
dbg.log2('BucketDiff get_objects:: bucket', bucket, 'bucket_contents_left', bucket_contents_left);
return { bucket_contents_left, bucket_cont_token };
const field = 'DeleteMarker';
const delete_markers = _.groupBy(list_objects_response[field], "Key");
dbg.log2('BucketDiff get_objects:: bucket', bucket, 'bucket_contents_left', bucket_contents_left, 'delete_markers', delete_markers);
return { bucket_contents_left, delete_markers, bucket_cont_token };
}

/**
Expand All @@ -235,6 +246,31 @@ class BucketDiff {
return ans;
}

async get_keys_del(first_bucket_contents_left, first_bucket_delete_markers, second_bucket_delete_markers) {
const delete_keys = [];
const field = 'IsLatest';
// Checking if there are any keys with latest delete marker in first bucket
Object.keys(first_bucket_contents_left).forEach(key => {
const objects = first_bucket_contents_left[key];
const not_any_obj_islatest = objects.every(obj => !obj.IsLatest);
if (first_bucket_delete_markers[key] && second_bucket_delete_markers[key]){
let delete_marker_islatest = first_bucket_delete_markers[key][field];
// If none of the object version is latest for a key and delete marker is latest in first bucket
if (not_any_obj_islatest && delete_marker_islatest) {
// If the latest delete marker for the key is not already present in second bucket
// then this key should be deleted in second bucket
delete_marker_islatest = second_bucket_delete_markers[key][field];
if (!delete_marker_islatest) {
delete_keys.push(key);
delete first_bucket_contents_left[key];
}
}
}
});

return delete_keys;
}

/**
* @param {{ keys_diff_map: nb.BucketDiffKeysDiff; keys_contents_left: any; keep_listing_second_bucket?: boolean; }} ans
* @param {{}} second_bucket_keys
Expand Down

0 comments on commit 2c555e6

Please sign in to comment.