Skip to content

Commit

Permalink
Support delete replication flow in bucket_diff
Browse files Browse the repository at this point in the history
Signed-off-by: Aayush Chouhan <[email protected]>
  • Loading branch information
achouhan09 committed Feb 20, 2024
1 parent 0b6d436 commit f019d08
Showing 1 changed file with 100 additions and 29 deletions.
129 changes: 100 additions & 29 deletions src/server/utils/bucket_diff.js
Original file line number Diff line number Diff line change
Expand Up @@ -68,47 +68,67 @@ class BucketDiff {
// { ETag: 'etag1.2', Size: 89317, Key: '1', VersionId: 'v1.2', IsLatest: false, }
// ]
// }
//
// keys_del_diff_map is {{ [key: string]: Array<object> }} where the Array consist of metadata objects.
// for example: {
// "1": [
// { Key: '1', VersionId: "v1.2", IsLatest: true, LastModified: '2022-02-27T10:45:00.000Z' },
// { Key: '1', VersionId: "v1.1", IsLatest: false, LastModified: '2022-01-27T10:45:00.000Z' }
// ]
// }
keys_diff_map: {},
keys_del_diff_map: {},
first_bucket_cont_token: '',
second_bucket_cont_token: '',
};

let first_bucket_contents_left;
let first_bucket_cont_token;
let first_bucket_delete_markers;
//list the objects in the first bucket
({
bucket_contents_left: first_bucket_contents_left,
bucket_cont_token: first_bucket_cont_token
bucket_cont_token: first_bucket_cont_token,
delete_markers: first_bucket_delete_markers
} = await this.get_objects(this.first_bucket, prefix, max_keys, current_first_bucket_cont_token));

if (first_bucket_cont_token) diff.first_bucket_cont_token = first_bucket_cont_token;
if (Object.keys(first_bucket_contents_left).length === 0) return diff;
if (Object.keys(first_bucket_contents_left).length === 0 &&
Object.keys(first_bucket_delete_markers).length === 0) return diff;

let second_bucket_contents_left;
let second_bucket_cont_token;
let second_bucket_delete_markers;
let new_second_bucket_cont_token = current_second_bucket_cont_token;
let keep_listing_second_bucket = true;

while (keep_listing_second_bucket) {
({
bucket_contents_left: second_bucket_contents_left,
bucket_cont_token: new_second_bucket_cont_token
bucket_cont_token: new_second_bucket_cont_token,
delete_markers: second_bucket_delete_markers
} = await this.get_objects(this.second_bucket, prefix, max_keys, new_second_bucket_cont_token));

const keys_diff_response = await this.get_keys_diff(
first_bucket_contents_left, second_bucket_contents_left, new_second_bucket_cont_token);
first_bucket_contents_left, second_bucket_contents_left, new_second_bucket_cont_token,
first_bucket_delete_markers, second_bucket_delete_markers);

first_bucket_contents_left = keys_diff_response.keys_contents_left;
first_bucket_delete_markers = keys_diff_response.delete_markers_left;
keep_listing_second_bucket = keys_diff_response.keep_listing_second_bucket;
diff.keys_diff_map = { ...diff.keys_diff_map, ...keys_diff_response.keys_diff_map };
diff.keys_del_diff_map = {...diff.keys_del_diff_map, ...keys_diff_response.keys_del_diff_map};

const first_bucket_key_array = Object.keys(first_bucket_contents_left);
const first_bucket_del_array = Object.keys(first_bucket_delete_markers);
const second_bucket_key_array = Object.keys(second_bucket_contents_left);

if (first_bucket_key_array.length !== 0 && second_bucket_key_array.length !== 0) {
if (first_bucket_key_array.length !== 0 && second_bucket_key_array.length !== 0 ||
first_bucket_del_array.length !== 0) {
const first_bucket_key_in_last_pos = first_bucket_key_array[first_bucket_key_array.length - 1];
const second_bucket_key_in_last_pos = second_bucket_key_array[second_bucket_key_array.length - 1];
if (first_bucket_key_in_last_pos >= second_bucket_key_in_last_pos) {
if (first_bucket_key_in_last_pos >= second_bucket_key_in_last_pos ||
first_bucket_del_array.length !== 0) {
second_bucket_cont_token = new_second_bucket_cont_token;
}
}
Expand Down Expand Up @@ -151,33 +171,42 @@ class BucketDiff {
* @param {import("aws-sdk/lib/request").PromiseResult<AWS.S3.ListObjectVersionsOutput, AWS.AWSError> |
* import("aws-sdk/lib/request").PromiseResult<AWS.S3.ListObjectsV2Output, AWS.AWSError>} list
*
* _object_grouped_by_key_and_omitted will return the objects grouped by key.
* _object_grouped_by_key_and_omitted will return the objects and delete markers grouped by key.
* When we have versioning enabled, if there is more than one key, it omits
* the last key from the object, in order to avoid processing incomplete list of object + version
* the last key from both, in order to avoid processing incomplete list of object + version
*
* the results of this will be {{ [key: string]: Array<object> }} where the Array consist of metadata objects.
* the results of this will be two objects of type {{ [key: string]: Array<object> }} where the Array consist of metadata objects.
* for example: {
* "1": [
* { ETag: 'etag1.1', Size: 24599, Key: '1', VersionId: 'v1.1', IsLatest: true, },
* { ETag: 'etag1.2', Size: 89317, Key: '1', VersionId: 'v1.2', IsLatest: false, }
* ]
* }
* @returns {nb.BucketDiffKeysDiff}
* @returns {Object}
*/
_object_grouped_by_key_and_omitted(list) {
if (!list) return {};
dbg.log1('_object_grouped_by_key_and_omitted list:', list);
const field = this.version ? "Versions" : "Contents";
let grouped_by_key = _.groupBy(list[field], "Key");
const delete_marker = 'DeleteMarkers';

// group objects and delete markers with the Key
let grouped_objects = _.groupBy(list[field], "Key");
let grouped_delete_markers = list[delete_marker] && _.groupBy(list[delete_marker], "Key");
// We should not omit if this is a list object and not list versions
// and the use of continuation token later on the road will lead us to skip the last key if omitted.
if (list.IsTruncated && this.version) {
const last_key_pos = list[field].length - 1;
if (Object.keys(grouped_by_key).length > 1) {
grouped_by_key = _.omit(grouped_by_key, list[field][last_key_pos].Key);
let last_key_pos = list[field].length - 1;
if (Object.keys(grouped_objects).length > 1) {
grouped_objects = _.omit(grouped_objects, list[field][last_key_pos].Key);
}

last_key_pos = list[delete_marker].length - 1;
if (Object.keys(grouped_delete_markers).length > 1) {
grouped_delete_markers = _.omit(grouped_delete_markers, list[delete_marker][last_key_pos].Key);
}
}
return grouped_by_key;
return { grouped_objects, grouped_delete_markers};
}

/**
Expand All @@ -199,34 +228,42 @@ class BucketDiff {
* @param {number} max_keys
* @param {string} curr_bucket_cont_token
*
* get_objects will get a bucket and parameters and return the object we want to work on and the continuation token
* get_objects will get a bucket and parameters and return the object and delete markers we want to work on and the continuation token
*
*/
async get_objects(bucket, prefix, max_keys, curr_bucket_cont_token) {
dbg.log2('BucketDiff get_objects::', bucket, prefix, max_keys, this.version, curr_bucket_cont_token);
const list_objects_response = await this._list_objects(bucket, prefix, max_keys, curr_bucket_cont_token);
if (!list_objects_response) return { bucket_contents_left: {}, bucket_cont_token: '' };
dbg.log2('BucketDiff get_objects:: bucket_response', list_objects_response);
const bucket_contents_left = this._object_grouped_by_key_and_omitted(list_objects_response);
const { bucket_contents_left, delete_markers } = this._object_grouped_by_key_and_omitted(list_objects_response);
const bucket_cont_token = this._get_next_key_marker(list_objects_response, bucket_contents_left);
dbg.log2('BucketDiff get_objects:: bucket', bucket, 'bucket_contents_left', bucket_contents_left);
return { bucket_contents_left, bucket_cont_token };
dbg.log2('BucketDiff get_objects:: bucket', bucket, 'bucket_contents_left', bucket_contents_left, 'delete_markers', delete_markers);
return { bucket_contents_left, delete_markers, bucket_cont_token };
}

/**
* @param {any} first_bucket_keys
* @param {nb.BucketDiffKeysDiff} second_bucket_keys
* @param {any} second_bucket_keys
* @param {string} second_bucket_cont_token
* @param {any} first_bucket_delete_markers
* @param {any} second_bucket_delete_markers
*
* get_keys_version_diff finds the object keys and versions that the first bucket contains but second bucket doesn't
* get_keys_version_diff finds the object keys and versions that the first bucket contains but second bucket doesn't and
* this function also process the delete_markers to find the delete keys
*/
async get_keys_diff(first_bucket_keys, second_bucket_keys, second_bucket_cont_token) {
async get_keys_diff(first_bucket_keys, second_bucket_keys, second_bucket_cont_token,
first_bucket_delete_markers, second_bucket_delete_markers) {
const ans = {
keys_diff_map: {},
keys_del_diff_map: {},
keys_contents_left: first_bucket_keys,
delete_markers_left: first_bucket_delete_markers,
keep_listing_second_bucket: false,
};

await this._process_delete_keys(ans, first_bucket_keys, first_bucket_delete_markers, second_bucket_delete_markers);

const stop_compare = this._process_keys_out_of_range(ans, second_bucket_keys);
if (stop_compare) return ans;

Expand All @@ -236,7 +273,36 @@ class BucketDiff {
}

/**
* @param {{ keys_diff_map: nb.BucketDiffKeysDiff; keys_contents_left: any; keep_listing_second_bucket?: boolean; }} ans
* @param {{ keys_diff_map: nb.BucketDiffKeysDiff; keys_del_diff_map: nb.BucketDiffKeysDiff; keys_contents_left: any;
* delete_markers_left: any; keep_listing_second_bucket?: boolean; }} ans
* @param {any} first_bucket_contents_left
* @param {any} first_bucket_delete_markers
* @param {any} second_bucket_delete_markers
*
*/
async _process_delete_keys(ans, first_bucket_contents_left, first_bucket_delete_markers, second_bucket_delete_markers) {
const field = 'IsLatest';
if (first_bucket_delete_markers.length === 0) {
return ans;
}

ans.delete_markers_left = first_bucket_delete_markers;
Object.keys(first_bucket_delete_markers).forEach(key => {
const first_bucket_delete_marker = first_bucket_delete_markers[key][field];
const second_bucket_delete_marker = second_bucket_delete_markers[key] && second_bucket_delete_markers[key][field];
// If the delete marker for a key in the first bucket is latest but is either not present or not latest in the second bucket
if (first_bucket_delete_marker && !second_bucket_delete_marker) {
ans.keys_del_diff_map[key] = first_bucket_contents_left[key];
ans.delete_markers_left = _.omit(ans.delete_markers_left, key);
}
});

return ans;
}

/**
* @param {{ keys_diff_map: nb.BucketDiffKeysDiff; keys_del_diff_map: nb.BucketDiffKeysDiff; keys_contents_left: any;
* delete_markers_left: any; keep_listing_second_bucket?: boolean; }} ans
* @param {{}} second_bucket_keys
*/
_process_keys_out_of_range(ans, second_bucket_keys) {
Expand All @@ -256,7 +322,8 @@ class BucketDiff {
}

/**
* @param {{ keys_diff_map: nb.BucketDiffKeysDiff; keys_contents_left: any; keep_listing_second_bucket: boolean; }} ans
* @param {{ keys_diff_map: nb.BucketDiffKeysDiff; keys_del_diff_map: nb.BucketDiffKeysDiff; keys_contents_left: any;
* delete_markers_left: any; keep_listing_second_bucket: boolean; }} ans
* @param {nb.BucketDiffKeysDiff} second_bucket_keys
* @param {string} second_bucket_cont_token
*/
Expand Down Expand Up @@ -338,7 +405,8 @@ class BucketDiff {
}

/**
* @param {{ keys_diff_map: nb.BucketDiffKeysDiff; keys_contents_left: any; keep_listing_second_bucket: any; }} ans
* @param {{ keys_diff_map: nb.BucketDiffKeysDiff; keys_del_diff_map: nb.BucketDiffKeysDiff; keys_contents_left: any;
* delete_markers_left: any; keep_listing_second_bucket: boolean; }} ans
* @param {string} second_bucket_cont_token
*/
_keep_listing_or_return_ans(ans, second_bucket_cont_token) {
Expand All @@ -352,7 +420,8 @@ class BucketDiff {
}

/**
* @param {{ keys_diff_map: nb.BucketDiffKeysDiff; keys_contents_left: any; keep_listing_second_bucket: boolean; }} ans
* @param {{ keys_diff_map: nb.BucketDiffKeysDiff; keys_del_diff_map: nb.BucketDiffKeysDiff; keys_contents_left: any;
* delete_markers_left: any; keep_listing_second_bucket: boolean; }} ans
* @param {string | any[]} etag_pos_on_first_bucket
* @param {string} cur_first_bucket_key
* @param {any[]} first_bucket_curr_obj
Expand Down Expand Up @@ -386,8 +455,9 @@ class BucketDiff {
}

/**
* @param {{ keys_diff_map: nb.BucketDiffKeysDiff; keys_contents_left: any; keep_listing_second_bucket: boolean; }} ans
* @param {string} cur_bucket_key
* @param {{ keys_diff_map: nb.BucketDiffKeysDiff; keys_del_diff_map: nb.BucketDiffKeysDiff; keys_contents_left: any;
* delete_markers_left: any; keep_listing_second_bucket: boolean; }} ans
* @param {string} cur_bucket_key
* @param {any} bucket_curr_obj
*/
_populate_diff_map_and_omit_contents_left(ans, cur_bucket_key, bucket_curr_obj) {
Expand All @@ -396,7 +466,8 @@ class BucketDiff {
}

/**
* @param {{ keys_diff_map: nb.BucketDiffKeysDiff; keys_contents_left: any; keep_listing_second_bucket: boolean; }} ans
* @param {{ keys_diff_map: nb.BucketDiffKeysDiff; keys_del_diff_map: nb.BucketDiffKeysDiff; keys_contents_left: any;
* delete_markers_left: any; keep_listing_second_bucket: boolean; }} ans
* @param {string} cur_first_bucket_key
* @param {any[]} first_bucket_curr_obj
* @param {any[]} second_bucket_curr_obj
Expand Down

0 comments on commit f019d08

Please sign in to comment.