From 99a4db92ebc24c59af91b46f54caeabb24efe933 Mon Sep 17 00:00:00 2001 From: Aayush Chouhan Date: Tue, 2 Jan 2024 19:07:15 +0530 Subject: [PATCH] Support delete replication flow in bucket_diff Signed-off-by: Aayush Chouhan --- .../bg_services/log_replication_scanner.js | 15 ++- src/server/utils/bucket_diff.js | 112 ++++++++++++------ 2 files changed, 84 insertions(+), 43 deletions(-) diff --git a/src/server/bg_services/log_replication_scanner.js b/src/server/bg_services/log_replication_scanner.js index b16fb52254..6f2a5c7096 100644 --- a/src/server/bg_services/log_replication_scanner.js +++ b/src/server/bg_services/log_replication_scanner.js @@ -146,8 +146,7 @@ class LogReplicationScanner { }); let copy_keys = {}; - // TODO: support delete flow in object versions replication - // const delete_keys = []; + let delete_keys = {}; for (const candidate of Object.values(candidates)) { const action = candidate.action; @@ -158,22 +157,22 @@ class LogReplicationScanner { // The action should not matter to get_buckets_diff, we will evaluate the action inside. // This is because the order of the versions matter. hance we just need the hint from the logs // regarding which key was touched - const { keys_diff_map } = await bucketDiff.get_buckets_diff({ - prefix: candidates.key, + const { keys_diff_map, keys_del_arr } = await bucketDiff.get_buckets_diff({ + prefix: candidate.key, max_keys: Number(process.env.REPLICATION_MAX_KEYS) || 1000, //max_keys refers her to the max number of versions (+deletes) current_first_bucket_cont_token: '', current_second_bucket_cont_token: '', }); - // Currently, as get_buckets_diff is not supporting deletions, we will just pass the keys_diff_map as copy - // This needs to be reevaluated once the delete is supported. + + delete_keys = {...delete_keys, ...keys_del_arr}; copy_keys = { ...copy_keys, ...keys_diff_map }; } } dbg.log1('process_candidates_sync_version:: copy_keys', copy_keys); - //TODO support delete flow and return also the delete + dbg.log1('process_candidates_sync_version:: delete_keys', delete_keys); // returning copy_keys and delete_keys after processing candidates - // return { copy_keys, delete_keys }; + // delete_keys returning empty array for now, will update it once the delete flow is completed return { copy_keys, delete_keys: [] }; } diff --git a/src/server/utils/bucket_diff.js b/src/server/utils/bucket_diff.js index 7e5c3ed467..59343e5012 100644 --- a/src/server/utils/bucket_diff.js +++ b/src/server/utils/bucket_diff.js @@ -44,6 +44,7 @@ class BucketDiff { this.for_replication = for_replication; } + /** * @param {{ * prefix: string; @@ -69,22 +70,26 @@ class BucketDiff { // ] // } keys_diff_map: {}, + keys_del_arr: [], first_bucket_cont_token: '', second_bucket_cont_token: '', }; let first_bucket_contents_left; let first_bucket_cont_token; + let first_bucket_delete_markers //list the objects in the first bucket ({ bucket_contents_left: first_bucket_contents_left, - bucket_cont_token: first_bucket_cont_token + bucket_cont_token: first_bucket_cont_token, + delete_markers: first_bucket_delete_markers } = await this.get_objects(this.first_bucket, prefix, max_keys, current_first_bucket_cont_token)); if (first_bucket_cont_token) diff.first_bucket_cont_token = first_bucket_cont_token; if (Object.keys(first_bucket_contents_left).length === 0) return diff; let second_bucket_contents_left; + let second_bucket_delete_markers; let second_bucket_cont_token; let new_second_bucket_cont_token = current_second_bucket_cont_token; let keep_listing_second_bucket = true; @@ -92,15 +97,20 @@ class BucketDiff { while (keep_listing_second_bucket) { ({ bucket_contents_left: second_bucket_contents_left, + delete_markers: second_bucket_delete_markers, bucket_cont_token: new_second_bucket_cont_token } = await this.get_objects(this.second_bucket, prefix, max_keys, new_second_bucket_cont_token)); const keys_diff_response = await this.get_keys_diff( first_bucket_contents_left, second_bucket_contents_left, new_second_bucket_cont_token); + const delete_keys = await this.get_keys_del( + first_bucket_contents_left, first_bucket_delete_markers, second_bucket_delete_markers); + first_bucket_contents_left = keys_diff_response.keys_contents_left; keep_listing_second_bucket = keys_diff_response.keep_listing_second_bucket; diff.keys_diff_map = { ...diff.keys_diff_map, ...keys_diff_response.keys_diff_map }; + diff.keys_del_arr = [...diff.keys_del_arr, ...delete_keys]; const first_bucket_key_array = Object.keys(first_bucket_contents_left); const second_bucket_key_array = Object.keys(second_bucket_contents_left); @@ -149,36 +159,46 @@ class BucketDiff { /** * @param {import("aws-sdk/lib/request").PromiseResult | - * import("aws-sdk/lib/request").PromiseResult} list - * - * _object_grouped_by_key_and_omitted will return the objects grouped by key. - * When we have versioning enabled, if there is more than one key, it omits - * the last key from the object, in order to avoid processing incomplete list of object + version - * - * the results of this will be {{ [key: string]: Array }} where the Array consist of metadata objects. - * for example: { - * "1": [ - * { ETag: 'etag1.1', Size: 24599, Key: '1', VersionId: 'v1.1', IsLatest: true, }, - * { ETag: 'etag1.2', Size: 89317, Key: '1', VersionId: 'v1.2', IsLatest: false, } - * ] - * } - * @returns {nb.BucketDiffKeysDiff} - */ - _object_grouped_by_key_and_omitted(list) { - if (!list) return {}; - dbg.log1('_object_grouped_by_key_and_omitted list:', list); - const field = this.version ? "Versions" : "Contents"; - let grouped_by_key = _.groupBy(list[field], "Key"); - // We should not omit if this is a list object and not list versions - // and the use of continuation token later on the road will lead us to skip the last key if omitted. - if (list.IsTruncated && this.version) { - const last_key_pos = list[field].length - 1; - if (Object.keys(grouped_by_key).length > 1) { - grouped_by_key = _.omit(grouped_by_key, list[field][last_key_pos].Key); - } - } - return grouped_by_key; - } + * import("aws-sdk/lib/request").PromiseResult} list + * + * _object_grouped_by_key_and_omitted will return the objects grouped by key for versions and delete markeres. + * When we have versioning enabled, if there is more than one key, it omits + * the last key from both objects, in order to avoid processing incomplete list of object + version + * + * the result of this will be a single object containing two objects of type {{ [key: string]: Array }} where the Array + * consist of metadata objects. + * for example: { + * "1": [ + * { ETag: 'etag1.1', Size: 24599, Key: '1', VersionId: 'v1.1', IsLatest: true, }, + * { ETag: 'etag1.2', Size: 89317, Key: '1', VersionId: 'v1.2', IsLatest: false, } + * ] + * } + * @returns {Object} + */ + _object_grouped_by_key_and_omitted(list) { + if (!list) return {}; + dbg.log1('_object_grouped_by_key_and_omitted list:', list); + const field = this.version ? "Versions" : "Contents"; + const delete_marker = "DeleteMarkers"; + + // Grouping bucket_contents_left and delete_markers + let bucket_contents_left = _.groupBy(list[field], "Key"); + let delete_markers = _.groupBy(list[delete_marker], "Key") + + // We should not omit if this is a list object and not list versions + // and the use of continuation token later on the road will lead us to skip the last key if omitted. + if (list.IsTruncated && this.version) { + const last_key_pos = list[field].length - 1; + const last_key_pos_del = list[delete_marker].length -1; + if (Object.keys(bucket_contents_left).length > 1) { + bucket_contents_left = _.omit(bucket_contents_left, list[field][last_key_pos].Key); + } + if (Object.keys(delete_markers).length > 1) { + delete_markers = _.omit(delete_markers, list[delete_marker][last_key_pos_del].Key) + } + } + return { bucket_contents_left, delete_markers }; + } /** * @param {_.Dictionary} list @@ -199,7 +219,7 @@ class BucketDiff { * @param {number} max_keys * @param {string} curr_bucket_cont_token * - * get_objects will get a bucket and parameters and return the object we want to work on and the continuation token + * get_objects will get a bucket and parameters and return the object and delete markers we want to work on and the continuation token * */ async get_objects(bucket, prefix, max_keys, curr_bucket_cont_token) { @@ -207,10 +227,10 @@ class BucketDiff { const list_objects_response = await this._list_objects(bucket, prefix, max_keys, curr_bucket_cont_token); if (!list_objects_response) return { bucket_contents_left: {}, bucket_cont_token: '' }; dbg.log2('BucketDiff get_objects:: bucket_response', list_objects_response); - const bucket_contents_left = this._object_grouped_by_key_and_omitted(list_objects_response); + const { bucket_contents_left, delete_markers } = this._object_grouped_by_key_and_omitted(list_objects_response); const bucket_cont_token = this._get_next_key_marker(list_objects_response, bucket_contents_left); - dbg.log2('BucketDiff get_objects:: bucket', bucket, 'bucket_contents_left', bucket_contents_left); - return { bucket_contents_left, bucket_cont_token }; + dbg.log2('BucketDiff get_objects:: bucket', bucket, 'bucket_contents_left', bucket_contents_left, 'delete_markers', delete_markers); + return { bucket_contents_left, delete_markers, bucket_cont_token }; } /** @@ -235,6 +255,28 @@ class BucketDiff { return ans; } + async get_keys_del(first_bucket_contents_left, first_bucket_delete_markers, second_bucket_delete_markers) { + const delete_keys = []; + + // Checking if there are any keys with latest delete marker in first bucket + Object.keys(first_bucket_contents_left).forEach(key => { + const objects = first_bucket_contents_left[key]; + const not_any_obj_islatest = objects.every(obj => !obj.IsLatest); + let delete_marker_islatest = first_bucket_delete_markers[key].some(obj => obj.IsLatest); + // If none of the object version is latest for a key and delete marker is latest + if (not_any_obj_islatest && delete_marker_islatest) { + // If the latest delete marker for the key is not already present in second bucket + // then this key should be deleted in second bucket + delete_marker_islatest = second_bucket_delete_markers[key].some(obj => obj.IsLatest); + if (!delete_marker_islatest) { + delete_keys.push(key); + } + } + }); + + return delete_keys; + } + /** * @param {{ keys_diff_map: nb.BucketDiffKeysDiff; keys_contents_left: any; keep_listing_second_bucket?: boolean; }} ans * @param {{}} second_bucket_keys