From afe23dfacba5940743d10041460a7ddec9c640c5 Mon Sep 17 00:00:00 2001 From: nadav mizrahi Date: Tue, 26 Mar 2024 09:10:03 +0200 Subject: [PATCH 01/14] Fix s3_get_bucket_policy_status Signed-off-by: nadav mizrahi (cherry picked from commit 45b34f4db5bfe91546444d8f581bd011389136d1) --- .../s3/ops/s3_get_bucket_policy_status.js | 29 ++--- src/test/unit_tests/test_s3_bucket_policy.js | 117 +++++++++++++++++- 2 files changed, 127 insertions(+), 19 deletions(-) diff --git a/src/endpoint/s3/ops/s3_get_bucket_policy_status.js b/src/endpoint/s3/ops/s3_get_bucket_policy_status.js index 59428159b4..9d0a04ddf3 100644 --- a/src/endpoint/s3/ops/s3_get_bucket_policy_status.js +++ b/src/endpoint/s3/ops/s3_get_bucket_policy_status.js @@ -2,6 +2,7 @@ 'use strict'; const S3Error = require('../s3_errors').S3Error; +const _ = require('lodash'); /** * https://docs.aws.amazon.com/AmazonS3/latest/API/API_GetBucketPolicyStatus.html @@ -16,27 +17,19 @@ async function get_bucket_policy_status(req) { // TODO: implemented according to current implementation of authorize_request_policy. should update when authorize_request_policy changed // full public policy defintion: https://docs.aws.amazon.com/AmazonS3/latest/dev/access-control-block-public-access.html#access-control-block-public-access-policy-status function _is_policy_public(policy) { - for (const statement of policy.statement) { - let principal_wildcard = false; - let resource_wildcard = false; - if (statement.effect === 'deny') { - return false; - } - for (const principal of statement.principal) { - if (principal.unwrap() === '*') { - principal_wildcard = true; - } - } - for (const resource of statement.resource) { - if ((/[?*]/).test(resource)) { - resource_wildcard = true; + for (const statement of policy.Statement) { + if (statement.Effect === 'Allow' && statement.Principal) { + const statement_principal = statement.Principal.AWS ? statement.Principal.AWS : statement.Principal; + //although redundant, its technicly possible to have both wildcard and specific principal. + //in this case the wildcard principal override the specific one + for (const principal of _.flatten([statement_principal])) { + if (principal.unwrap() === '*') { + return true; + } } } - if (!principal_wildcard || !resource_wildcard) { - return false; - } } - return true; + return false; } module.exports = { diff --git a/src/test/unit_tests/test_s3_bucket_policy.js b/src/test/unit_tests/test_s3_bucket_policy.js index 8ed456929a..c08ed76621 100644 --- a/src/test/unit_tests/test_s3_bucket_policy.js +++ b/src/test/unit_tests/test_s3_bucket_policy.js @@ -106,7 +106,7 @@ async function setup() { }); } -/*eslint max-lines-per-function: ["error", 1100]*/ +/*eslint max-lines-per-function: ["error", 1300]*/ mocha.describe('s3_bucket_policy', function() { mocha.before(setup); mocha.it('should fail setting bucket policy when user doesn\'t exist', async function() { @@ -1130,4 +1130,119 @@ mocha.describe('s3_bucket_policy', function() { } }); }); + + mocha.describe('get-bucket-policy status should work', async function() { + + mocha.it('get-bucket-policy status should return true for public policy', async function() { + const self = this; // eslint-disable-line no-invalid-this + self.timeout(15000); + const s3_policy = { + Version: '2012-10-17', + //Effect allow and principal equals '*'. public policy + Statement: [ + { + Action: ['s3:PutObject'], + Effect: 'Allow', + Principal: { AWS: "*" }, + Resource: [`arn:aws:s3:::${BKT}/*`], + } + ]}; + await s3_owner.putBucketPolicy({ + Bucket: BKT, + Policy: JSON.stringify(s3_policy) + }); + const res = await s3_owner.getBucketPolicyStatus({Bucket: BKT}); + assert.strictEqual(res.PolicyStatus.IsPublic, true); + }); + + mocha.it('get-bucket-policy status should return false for non "*" principal', async function() { + const self = this; // eslint-disable-line no-invalid-this + self.timeout(15000); + const s3_policy = { + Version: '2012-10-17', + Statement: [ + { + Action: ['s3:PutObject'], + Effect: 'Allow', + Principal: { AWS: user_a }, //principal is user + Resource: [`arn:aws:s3:::${BKT}/*`], + } + ]}; + await s3_owner.putBucketPolicy({ + Bucket: BKT, + Policy: JSON.stringify(s3_policy) + }); + const res = await s3_owner.getBucketPolicyStatus({Bucket: BKT}); + assert.strictEqual(res.PolicyStatus.IsPublic, false); + }); + + mocha.it('get-bucket-policy status should return false for Deny actions', async function() { + const self = this; // eslint-disable-line no-invalid-this + self.timeout(15000); + const s3_policy = { + Version: '2012-10-17', + Statement: [ + { + Action: ['s3:PutObject'], + Effect: 'Deny', + Principal: { AWS: "*" }, + Resource: [`arn:aws:s3:::${BKT}/*`], + } + ]}; + await s3_owner.putBucketPolicy({ + Bucket: BKT, + Policy: JSON.stringify(s3_policy) + }); + const res = await s3_owner.getBucketPolicyStatus({Bucket: BKT}); + assert.strictEqual(res.PolicyStatus.IsPublic, false); + }); + + mocha.it('get-bucket-policy status should work for pricipal as a string', async function() { + const self = this; // eslint-disable-line no-invalid-this + self.timeout(15000); + const s3_policy = { + Version: '2012-10-17', + Statement: [ + { + Action: ['s3:PutObject'], + Effect: 'Allow', + Principal: "*", + Resource: [`arn:aws:s3:::${BKT}/*`], + } + ]}; + await s3_owner.putBucketPolicy({ + Bucket: BKT, + Policy: JSON.stringify(s3_policy) + }); + const res = await s3_owner.getBucketPolicyStatus({Bucket: BKT}); + assert.strictEqual(res.PolicyStatus.IsPublic, true); + }); + + mocha.it('get-bucket-policy principal should return true if at least one statement is public with principal * ', async function() { + const self = this; // eslint-disable-line no-invalid-this + self.timeout(15000); + const s3_policy = { + Version: '2012-10-17', + Statement: [ + { + Action: ['s3:PutObject'], + Effect: 'Allow', + Principal: "*", + Resource: [`arn:aws:s3:::${BKT}/*`], + }, + { + Action: ['s3:GetObject'], + Effect: 'Deny', + Principal: "*", + Resource: [`arn:aws:s3:::${BKT}/*`], + } + ]}; + await s3_owner.putBucketPolicy({ + Bucket: BKT, + Policy: JSON.stringify(s3_policy) + }); + const res = await s3_owner.getBucketPolicyStatus({Bucket: BKT}); + assert.strictEqual(res.PolicyStatus.IsPublic, true); + }); + }); }); From 5bcb8eda98cf2fc603e7e99c0db699674cc98133 Mon Sep 17 00:00:00 2001 From: Romy <35330373+romayalon@users.noreply.github.com> Date: Tue, 26 Mar 2024 20:24:32 +0200 Subject: [PATCH 02/14] add missing await Signed-off-by: Romy <35330373+romayalon@users.noreply.github.com> (cherry picked from commit 04fecc152610fc5fbb9a17586288ef55d555e155) --- src/endpoint/s3/s3_rest.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/endpoint/s3/s3_rest.js b/src/endpoint/s3/s3_rest.js index 3125ed1d07..cdea3b95b6 100755 --- a/src/endpoint/s3/s3_rest.js +++ b/src/endpoint/s3/s3_rest.js @@ -221,7 +221,7 @@ async function authorize_request_policy(req) { const is_anon = !(auth_token && auth_token.access_key); if (is_anon) { - authorize_anonymous_access(s3_policy, method, arn_path, req); + await authorize_anonymous_access(s3_policy, method, arn_path, req); return; } From d50e3c64323fbf5ef5a1af44d8c654033750a899 Mon Sep 17 00:00:00 2001 From: shirady <57721533+shirady@users.noreply.github.com> Date: Thu, 21 Mar 2024 13:58:56 +0200 Subject: [PATCH 03/14] NSFS | NS | nc_coretest use rm instead of rmdir (to avoid Node DeprecationWarning) Signed-off-by: shirady <57721533+shirady@users.noreply.github.com> (cherry picked from commit dc64a192f6719e4fa83e709827b80a61e70bd4ac) --- src/test/unit_tests/nc_coretest.js | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/test/unit_tests/nc_coretest.js b/src/test/unit_tests/nc_coretest.js index f85af3e8f0..7ca2c5b5ec 100644 --- a/src/test/unit_tests/nc_coretest.js +++ b/src/test/unit_tests/nc_coretest.js @@ -148,9 +148,9 @@ async function config_dir_setup() { */ async function config_dir_teardown() { await announce('config_dir_teardown'); - await fs.promises.rmdir(NC_CORETEST_STORAGE_PATH, { recursive: true }); + await fs.promises.rm(NC_CORETEST_STORAGE_PATH, { recursive: true }); await fs.promises.rm(NC_CORETEST_REDIRECT_FILE_PATH); - await fs.promises.rmdir(NC_CORETEST_CONFIG_DIR_PATH, { recursive: true, force: true }); + await fs.promises.rm(NC_CORETEST_CONFIG_DIR_PATH, { recursive: true, force: true }); } /** From f83acb90e993c56ce322733154491a3371c7f013 Mon Sep 17 00:00:00 2001 From: Utkarsh Srivastava Date: Mon, 1 Apr 2024 13:15:53 +0530 Subject: [PATCH 04/14] Fix glacier exipiry bug. When the date is for some reason set to (for example when a month changes) then the date becomes 0 and that completely throws off the expected expiry calculation. This is fixed by reusing the expiry function from the glacier backend. Signed-off-by: Utkarsh Srivastava (cherry picked from commit 069fcc444290359bb1e416ba878203d314fe37c3) --- src/test/unit_tests/test_nsfs_glacier_backend.js | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/src/test/unit_tests/test_nsfs_glacier_backend.js b/src/test/unit_tests/test_nsfs_glacier_backend.js index 670bd74b2e..7db51deb5f 100644 --- a/src/test/unit_tests/test_nsfs_glacier_backend.js +++ b/src/test/unit_tests/test_nsfs_glacier_backend.js @@ -15,6 +15,7 @@ const buffer_utils = require('../../util/buffer_utils'); const endpoint_stats_collector = require('../../sdk/endpoint_stats_collector'); const { NewlineReader } = require('../../util/file_reader'); const { TapeCloudGlacierBackend } = require('../../sdk/nsfs_glacier_backend/tapecloud'); +const { GlacierBackend } = require('../../sdk/nsfs_glacier_backend/backend'); const mkdtemp = util.promisify(fs.mkdtemp); const inspect = (x, max_arr = 5) => util.inspect(x, { colors: true, depth: null, maxArrayLength: max_arr }); @@ -148,8 +149,14 @@ mocha.describe('nsfs_glacier', async () => { // Ensure object is restored const md = await glacier_ns.read_object_md(params, dummy_object_sdk); + assert(!md.restore_status.ongoing); - assert(new Date(new Date().setDate(md.restore_status.expiry_time.getDate() - params.days)).getDate() === new Date().getDate()); + + const expected_expiry = GlacierBackend.generate_expiry(new Date(), params.days, '', config.NSFS_GLACIER_EXPIRY_TZ); + assert(expected_expiry.getTime() === md.restore_status.expiry_time.getTime()); + assert(expected_expiry.getDate() === md.restore_status.expiry_time.getDate()); + assert(expected_expiry.getMonth() === md.restore_status.expiry_time.getMonth()); + assert(expected_expiry.getFullYear() === md.restore_status.expiry_time.getFullYear()); }); }); }); From fd81260ad569afbe6c015c57eaf2b41aa4bdce17 Mon Sep 17 00:00:00 2001 From: liranmauda Date: Sun, 31 Mar 2024 11:20:29 +0300 Subject: [PATCH 05/14] Bumping node from 20.9.0 to 20.11.0 Bumping node from 20.9.0 to 20.11.0 Signed-off-by: liranmauda (cherry picked from commit 77f00218152e0d5a9f7c451c5bf819580fe43801) --- .nvmrc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.nvmrc b/.nvmrc index f3f52b42d3..8b0beab16a 100644 --- a/.nvmrc +++ b/.nvmrc @@ -1 +1 @@ -20.9.0 +20.11.0 From ab182e24da4f2230c9d11784c03bab3e3aa8e2d8 Mon Sep 17 00:00:00 2001 From: Utkarsh Srivastava Date: Tue, 2 Apr 2024 14:44:43 +0530 Subject: [PATCH 06/14] remove buffer pool logging and fix error missing storage class error Signed-off-by: Utkarsh Srivastava fix: correct storage class on list-parts Signed-off-by: Utkarsh Srivastava fix broken restore object test Signed-off-by: Utkarsh Srivastava fix broken restore object test Signed-off-by: Utkarsh Srivastava address PR comments Signed-off-by: Utkarsh Srivastava (cherry picked from commit 24adb0481c6f736f705e22cb0aaa7faa9dc45eaa) --- src/endpoint/s3/ops/s3_get_object_uploadId.js | 2 +- src/endpoint/s3/s3_errors.js | 5 ----- src/sdk/namespace_fs.js | 8 +++++++- src/test/unit_tests/test_namespace_fs.js | 6 +++--- src/util/buffer_utils.js | 5 +---- 5 files changed, 12 insertions(+), 14 deletions(-) diff --git a/src/endpoint/s3/ops/s3_get_object_uploadId.js b/src/endpoint/s3/ops/s3_get_object_uploadId.js index 54326bba51..119b5b7fc6 100644 --- a/src/endpoint/s3/ops/s3_get_object_uploadId.js +++ b/src/endpoint/s3/ops/s3_get_object_uploadId.js @@ -37,7 +37,7 @@ async function get_object_uploadId(req) { UploadId: req.query.uploadId, Initiator: s3_utils.DEFAULT_S3_USER, Owner: s3_utils.DEFAULT_S3_USER, - StorageClass: s3_utils.STORAGE_CLASS_STANDARD, + StorageClass: s3_utils.parse_storage_class(reply.storage_class), MaxParts: max, PartNumberMarker: num_marker, IsTruncated: reply.is_truncated, diff --git a/src/endpoint/s3/s3_errors.js b/src/endpoint/s3/s3_errors.js index bedb2b93d7..c30fbb751d 100644 --- a/src/endpoint/s3/s3_errors.js +++ b/src/endpoint/s3/s3_errors.js @@ -546,11 +546,6 @@ S3Error.InvalidObjectStorageClass = Object.freeze({ message: 'Restore is not allowed for the object\'s current storage class.', http_code: 403, }); -S3Error.StorageClassNotImplemented = Object.freeze({ - code: 'NotImplemented', - message: 'This storage class is not implemented.', - http_code: 501, -}); S3Error.RPC_ERRORS_TO_S3 = Object.freeze({ UNAUTHORIZED: S3Error.AccessDenied, diff --git a/src/sdk/namespace_fs.js b/src/sdk/namespace_fs.js index 9dc85d4d67..deac4e3ffe 100644 --- a/src/sdk/namespace_fs.js +++ b/src/sdk/namespace_fs.js @@ -1575,6 +1575,11 @@ class NamespaceFS { const fs_context = this.prepare_fs_context(object_sdk); await this._load_multipart(params, fs_context); await this._check_path_in_bucket_boundaries(fs_context, params.mpu_path); + const { data } = await nb_native().fs.readFile( + fs_context, + path.join(params.mpu_path, 'create_object_upload') + ); + const create_multipart_upload_params = JSON.parse(data.toString()); const entries = await nb_native().fs.readdir(fs_context, params.mpu_path); const multiparts = await Promise.all( entries @@ -1595,6 +1600,7 @@ class NamespaceFS { is_truncated: false, next_num_marker: undefined, multiparts, + storage_class: create_multipart_upload_params.storage_class }; } catch (err) { throw this._translate_object_error_codes(err); @@ -2968,7 +2974,7 @@ class NamespaceFS { async _throw_if_storage_class_not_supported(storage_class) { if (!await this._is_storage_class_supported(storage_class)) { - throw new S3Error(S3Error.StorageClassNotImplemented); + throw new S3Error(S3Error.InvalidStorageClass); } } diff --git a/src/test/unit_tests/test_namespace_fs.js b/src/test/unit_tests/test_namespace_fs.js index 9bade9e9cf..4e25a26fe1 100644 --- a/src/test/unit_tests/test_namespace_fs.js +++ b/src/test/unit_tests/test_namespace_fs.js @@ -483,9 +483,9 @@ mocha.describe('namespace_fs', function() { assert.fail('restore_object should fail'); } catch (err) { assert.strictEqual(err instanceof S3Error, true); - assert.strictEqual(err.code, S3Error.StorageClassNotImplemented.code); - assert.strictEqual(err.message, S3Error.StorageClassNotImplemented.message); - assert.strictEqual(err.http_code, S3Error.StorageClassNotImplemented.http_code); + assert.strictEqual(err.code, S3Error.InvalidStorageClass.code); + assert.strictEqual(err.message, S3Error.InvalidStorageClass.message); + assert.strictEqual(err.http_code, S3Error.InvalidStorageClass.http_code); } }); diff --git a/src/util/buffer_utils.js b/src/util/buffer_utils.js index dea98671b0..69d9e5d4ce 100644 --- a/src/util/buffer_utils.js +++ b/src/util/buffer_utils.js @@ -264,21 +264,18 @@ class MultiSizeBuffersPool { } /** - * @returns BuffersPool + * @returns {BuffersPool} */ get_buffers_pool(size) { const largest = this.pools[this.pools.length - 1]; if (typeof size !== 'number' || size < 0) { - dbg.log1('MultiSizeBuffersPool.get_buffers_pool: sem value', largest.sem._value, 'waiting_value', largest.sem._waiting_value, 'buffers length', largest.buffers.length); return largest; } for (const bp of this.pools) { if (size <= bp.buf_size) { - dbg.log1('MultiSizeBuffersPool.get_buffers_pool: sem value', bp.sem._value, 'waiting_value', bp.sem._waiting_value, 'buffers length', bp.buffers.length); return bp; } } - dbg.log1('MultiSizeBuffersPool.get_buffers_pool: sem value', largest.sem._value, 'waiting_value', largest.sem._waiting_value, 'buffers length', largest.buffers.length); return largest; } } From d7a34cf76623df883fa69c644eb47252a9b33eff Mon Sep 17 00:00:00 2001 From: Utkarsh Srivastava Date: Mon, 18 Mar 2024 17:18:14 +0530 Subject: [PATCH 07/14] improve time bounds for glacier operations Signed-off-by: Utkarsh Srivastava fix broken test Signed-off-by: Utkarsh Srivastava improve failure logging Signed-off-by: Utkarsh Srivastava improving command output parsing and refactor Signed-off-by: Utkarsh Srivastava replace catch with finally Signed-off-by: Utkarsh Srivastava (cherry picked from commit 57af8763952f06898b30af2e136cea6184a1cd01) --- config.js | 2 +- src/manage_nsfs/manage_nsfs_glacier.js | 68 +++++-- src/sdk/namespace_fs.js | 4 +- src/sdk/nsfs_glacier_backend/backend.js | 6 +- src/sdk/nsfs_glacier_backend/tapecloud.js | 183 +++++++++++------- .../unit_tests/test_nsfs_glacier_backend.js | 34 ++-- src/util/file_reader.js | 4 +- src/util/persistent_logger.js | 129 ++++++------ 8 files changed, 255 insertions(+), 175 deletions(-) diff --git a/config.js b/config.js index 8475266af4..6e25f36255 100644 --- a/config.js +++ b/config.js @@ -727,7 +727,7 @@ config.NSFS_EXIT_EVENTS_TIME_FRAME_MIN = 24 * 60; // per day config.NSFS_MAX_EXIT_EVENTS_PER_TIME_FRAME = 10; // allow max 10 failed forks per day config.NSFS_GLACIER_LOGS_DIR = '/var/run/noobaa-nsfs/wal'; -config.NSFS_GLACIER_LOGS_MAX_INTERVAL = 15 * 60 * 1000; +config.NSFS_GLACIER_LOGS_POLL_INTERVAL = 10 * 1000; // NSFS_GLACIER_ENABLED can override internal autodetection and will force // the use of restore for all objects. diff --git a/src/manage_nsfs/manage_nsfs_glacier.js b/src/manage_nsfs/manage_nsfs_glacier.js index 5ccca7a01d..624a234124 100644 --- a/src/manage_nsfs/manage_nsfs_glacier.js +++ b/src/manage_nsfs/manage_nsfs_glacier.js @@ -35,14 +35,7 @@ async function process_migrations() { * @param {import('../sdk/nsfs_glacier_backend/backend').GlacierBackend} backend */ async function run_glacier_migrations(fs_context, backend) { - // This WAL is getting opened only so that we can process all the prcess WAL entries - const wal = new PersistentLogger( - config.NSFS_GLACIER_LOGS_DIR, - GlacierBackend.MIGRATE_WAL_NAME, - { disable_rotate: true, locking: 'EXCLUSIVE' }, - ); - - await wal.process_inactive(async file => backend.migrate(fs_context, file)); + await run_glacier_operation(fs_context, GlacierBackend.MIGRATE_WAL_NAME, backend.migrate.bind(backend)); } async function process_restores() { @@ -69,14 +62,7 @@ async function process_restores() { * @param {import('../sdk/nsfs_glacier_backend/backend').GlacierBackend} backend */ async function run_glacier_restore(fs_context, backend) { - // This WAL is getting opened only so that we can process all the prcess WAL entries - const wal = new PersistentLogger( - config.NSFS_GLACIER_LOGS_DIR, - GlacierBackend.RESTORE_WAL_NAME, - { disable_rotate: true, locking: 'EXCLUSIVE' }, - ); - - await wal.process_inactive(async file => backend.restore(fs_context, file)); + await run_glacier_operation(fs_context, GlacierBackend.RESTORE_WAL_NAME, backend.restore.bind(backend)); } async function process_expiry() { @@ -106,7 +92,7 @@ async function time_exceeded(fs_context, interval, timestamp_file) { if (lastrun.getTime() + interval < Date.now()) return true; } catch (error) { - console.error('failed to read last run timestamp:', error); + console.error('failed to read last run timestamp:', error, 'timestamp_file:', timestamp_file); if (error.code === 'ENOENT') return true; throw error; @@ -129,6 +115,54 @@ async function record_current_time(fs_context, timestamp_file) { ); } +/** + * run_glacier_operations takes a log_namespace and a callback and executes the + * callback on each log file in that namespace. It will also generate a failure + * log file and persist the failures in that log file. + * @param {nb.NativeFSContext} fs_context + * @param {string} log_namespace + * @param {Function} cb + */ +async function run_glacier_operation(fs_context, log_namespace, cb) { + let log = null; + let failure_log = null; + + try { + // This logger is getting opened only so that we can process all the process the entries + log = new PersistentLogger(config.NSFS_GLACIER_LOGS_DIR, log_namespace, { locking: 'EXCLUSIVE' }); + failure_log = new PersistentLogger( + config.NSFS_GLACIER_LOGS_DIR, + `${log_namespace}.failure`, + { locking: 'EXCLUSIVE' }, + ); + + try { + // Process all the inactive and currently active log + await log.process_inactive(async file => cb(fs_context, file, failure_log.append.bind(failure_log))); + } catch (error) { + console.error('failed to process logs, error:', error, 'log_namespace:', log_namespace); + } + + try { + // Process the inactive failure logs (don't process the current though) + // This will REMOVE the previous failure logs and will merge them with the current failures + await failure_log.process_inactive(async file => cb(fs_context, file, failure_log.append.bind(failure_log)), false); + } catch (error) { + console.error('failed to process failure logs:', error, 'log_namespace:', log_namespace); + } + + try { + // Finally replace the current active so as to consume them in the next iteration + await failure_log._replace_active(); + } catch (error) { + console.error('failed to replace active failure log:', error, 'log_namespace:', log_namespace); + } + } finally { + if (log) await log.close(); + if (failure_log) await failure_log.close(); + } +} + /** * lock_and_run acquires a flock and calls the given callback after * acquiring the lock diff --git a/src/sdk/namespace_fs.js b/src/sdk/namespace_fs.js index deac4e3ffe..13b19d17df 100644 --- a/src/sdk/namespace_fs.js +++ b/src/sdk/namespace_fs.js @@ -3005,7 +3005,7 @@ class NamespaceFS { static get migrate_wal() { if (!NamespaceFS._migrate_wal) { NamespaceFS._migrate_wal = new PersistentLogger(config.NSFS_GLACIER_LOGS_DIR, GlacierBackend.MIGRATE_WAL_NAME, { - max_interval: config.NSFS_GLACIER_LOGS_MAX_INTERVAL, + poll_interval: config.NSFS_GLACIER_LOGS_POLL_INTERVAL, locking: 'SHARED', }); } @@ -3016,7 +3016,7 @@ class NamespaceFS { static get restore_wal() { if (!NamespaceFS._restore_wal) { NamespaceFS._restore_wal = new PersistentLogger(config.NSFS_GLACIER_LOGS_DIR, GlacierBackend.RESTORE_WAL_NAME, { - max_interval: config.NSFS_GLACIER_LOGS_MAX_INTERVAL, + poll_interval: config.NSFS_GLACIER_LOGS_POLL_INTERVAL, locking: 'SHARED', }); } diff --git a/src/sdk/nsfs_glacier_backend/backend.js b/src/sdk/nsfs_glacier_backend/backend.js index 833ec9e62f..14bc8f3455 100644 --- a/src/sdk/nsfs_glacier_backend/backend.js +++ b/src/sdk/nsfs_glacier_backend/backend.js @@ -60,9 +60,10 @@ class GlacierBackend { * NOTE: This needs to be implemented by each backend. * @param {nb.NativeFSContext} fs_context * @param {string} log_file log filename + * @param {(entry: string) => Promise} failure_recorder * @returns {Promise} */ - async migrate(fs_context, log_file) { + async migrate(fs_context, log_file, failure_recorder) { throw new Error('Unimplementented'); } @@ -77,9 +78,10 @@ class GlacierBackend { * NOTE: This needs to be implemented by each backend. * @param {nb.NativeFSContext} fs_context * @param {string} log_file log filename + * @param {(entry: string) => Promise} failure_recorder * @returns {Promise} */ - async restore(fs_context, log_file) { + async restore(fs_context, log_file, failure_recorder) { throw new Error('Unimplementented'); } diff --git a/src/sdk/nsfs_glacier_backend/tapecloud.js b/src/sdk/nsfs_glacier_backend/tapecloud.js index 6492318837..05d3df0c43 100644 --- a/src/sdk/nsfs_glacier_backend/tapecloud.js +++ b/src/sdk/nsfs_glacier_backend/tapecloud.js @@ -1,12 +1,17 @@ /* Copyright (C) 2024 NooBaa */ 'use strict'; +const { spawn } = require("child_process"); +const events = require('events'); +const os = require("os"); +const path = require("path"); const { PersistentLogger } = require("../../util/persistent_logger"); const { NewlineReader } = require('../../util/file_reader'); const { GlacierBackend } = require("./backend"); const config = require('../../../config'); -const path = require("path"); const { exec } = require('../../util/os_utils'); +const nb_native = require("../../util/nb_native"); +const { get_process_fs_context } = require("../../util/native_fs_utils"); const dbg = require('../../util/debug_module')(__filename); const ERROR_DUPLICATE_TASK = "GLESM431E"; @@ -21,11 +26,63 @@ function get_bin_path(bin_name) { return path.join(config.NSFS_GLACIER_TAPECLOUD_BIN_DIR, bin_name); } -async function get_task(task_id) { - return await exec(`${get_bin_path(TASK_SHOW_SCRIPT)} ${task_id}`, { return_stdout: true }); +/** + * @param {*} task_id + * @param {(entry: string) => Promise} recorder + */ +async function record_failed_tasks(task_id, recorder) { + const fs_context = get_process_fs_context(); + const tmp = path.join(os.tmpdir(), `eeadm_task_out_${Date.now()}`); + + let temp_fh = null; + let reader = null; + try { + temp_fh = await nb_native().fs.open(fs_context, tmp, 'rw'); + + const proc = spawn(get_bin_path(TASK_SHOW_SCRIPT), [task_id], { + stdio: ['pipe', temp_fh.fd, temp_fh.fd], + }); + + const [errcode] = await events.once(proc, 'exit'); + if (errcode) { + throw new Error('process exited with non-zero exit code:', errcode); + } + + reader = new NewlineReader(fs_context, tmp); + await reader.forEach(async line => { + if (!line.startsWith("Fail")) return; + + const parsed = line.split(/\s+/); + if (parsed.length !== 6) { + throw new Error('failed to parse task show'); + } + + if (parsed[1] !== ERROR_DUPLICATE_TASK) { + // Column 5 is the filename (refer tapecloud [eeadm] manual) + await recorder(parsed[5]); + } + + return true; + }); + } finally { + if (temp_fh) { + await temp_fh.close(fs_context); + await nb_native().fs.unlink(fs_context, tmp); + } + + if (reader) { + await reader.close(); + } + } } -async function tapecloud_failure_handler(error) { +/** + * tapecloud_failure_handler takes the error and runs task_show on the task + * ID to identify the failed entries and record them to the recorder + * @param {*} error + * @param {(entry: string) => Promise} recorder + */ +async function tapecloud_failure_handler(error, recorder) { const { stdout } = error; // Find the line in the stdout which has the line 'task ID is, ' and extract id @@ -37,24 +94,7 @@ async function tapecloud_failure_handler(error) { const task_id = match[1]; // Fetch task status and see what failed - const taskshowstdout = await get_task(task_id); - return taskshowstdout - .split('\n') - .filter(line => line.startsWith("Fail")) - .map(line => { - const parsed = line.split(/\s+/); - if (parsed.length !== 6) { - throw Error('failed to parse task show'); - } - - if (parsed[1] === ERROR_DUPLICATE_TASK) { - return null; - } - - // Column 5 is the filename (refer tapecloud [eeadm] manual) - return parsed[5]; - }) - .filter(Boolean); + await record_failed_tasks(task_id, recorder); } /** @@ -66,17 +106,16 @@ async function tapecloud_failure_handler(error) { * The function returns the names of the files which failed * to migrate. * @param {string} file filename - * @returns {Promise} failedfiles + * @param {(entry: string) => Promise} recorder */ -async function migrate(file) { +async function migrate(file, recorder) { try { dbg.log1("Starting migration for file", file); const out = await exec(`${get_bin_path(MIGRATE_SCRIPT)} ${file}`, { return_stdout: true }); dbg.log4("migrate finished with:", out); dbg.log1("Finished migration for file", file); - return []; } catch (error) { - return tapecloud_failure_handler(error); + await tapecloud_failure_handler(error, recorder); } } @@ -89,17 +128,16 @@ async function migrate(file) { * The function returns the names of the files which failed * to recall. * @param {string} file filename - * @returns {Promise} failed files + * @param {(entry: string) => Promise} recorder */ -async function recall(file) { +async function recall(file, recorder) { try { dbg.log1("Starting recall for file", file); const out = await exec(`${get_bin_path(RECALL_SCRIPT)} ${file}`, { return_stdout: true }); dbg.log4("recall finished with:", out); dbg.log1("Finished recall for file", file); - return []; } catch (error) { - return tapecloud_failure_handler(error); + await tapecloud_failure_handler(error, recorder); } } @@ -111,7 +149,7 @@ async function process_expired() { } class TapeCloudGlacierBackend extends GlacierBackend { - async migrate(fs_context, log_file) { + async migrate(fs_context, log_file, failure_recorder) { dbg.log2('TapeCloudGlacierBackend.migrate starting for', log_file); let filtered_log = null; @@ -120,7 +158,7 @@ class TapeCloudGlacierBackend extends GlacierBackend { filtered_log = new PersistentLogger( config.NSFS_GLACIER_LOGS_DIR, `tapecloud_migrate_run_${Date.now().toString()}`, - { disable_rotate: true, locking: 'EXCLUSIVE' }, + { locking: 'EXCLUSIVE' }, ); walreader = new NewlineReader(fs_context, log_file, 'EXCLUSIVE'); @@ -135,10 +173,13 @@ class TapeCloudGlacierBackend extends GlacierBackend { return true; } - // Something else is wrong with this entry of this file - // should skip processing this WAL for now - dbg.log1('skipping log entry', entry.path, 'due to error:', err); - return false; + dbg.log0( + 'adding log entry', entry.path, + 'to failure recorder due to error', err, + ); + await failure_recorder(entry.path); + + return true; } // Skip the file if it shouldn't be migrated @@ -150,6 +191,8 @@ class TapeCloudGlacierBackend extends GlacierBackend { // If the result of the above is false then it indicates that we concluded // to exit early hence the file shouldn't be processed further, exit + // + // NOTE: Should not hit this case anymore if (!result) return false; // If we didn't read even one line then it most likely indicates that the WAL is @@ -164,17 +207,18 @@ class TapeCloudGlacierBackend extends GlacierBackend { if (filtered_log.local_size === 0) return true; await filtered_log.close(); - const failed = await this._migrate(filtered_log.active_path); + await this._migrate(filtered_log.active_path, failure_recorder); - // Do not delete the WAL if migration failed - This allows easy retries - return failed.length === 0; + // Delete the log if the above write succeeds or else keep it + return true; } catch (error) { dbg.error('unexpected error occured while processing migrate WAL:', error); // Preserve the WAL if we encounter exception here, possible failures - // 1.eaedm command failure + // 1. eeadm command failure // 2. tempwal failure // 3. newline reader failure + // 4. failure log failure return false; } finally { if (filtered_log) { @@ -186,18 +230,18 @@ class TapeCloudGlacierBackend extends GlacierBackend { } } - async restore(fs_context, log_file) { + async restore(fs_context, log_file, failure_recorder) { dbg.log2('TapeCloudGlacierBackend.restore starting for', log_file); - let tempwal = null; + let filtered_log = null; let walreader = null; - let tempwalreader = null; + let filtered_log_reader = null; try { // tempwal will store all the files of interest and will be handed over to tapecloud script - tempwal = new PersistentLogger( + filtered_log = new PersistentLogger( config.NSFS_GLACIER_LOGS_DIR, `tapecloud_restore_run_${Date.now().toString()}`, - { disable_rotate: true, locking: 'EXCLUSIVE' }, + { locking: 'EXCLUSIVE' }, ); walreader = new NewlineReader(fs_context, log_file, 'EXCLUSIVE'); @@ -211,7 +255,7 @@ class TapeCloudGlacierBackend extends GlacierBackend { } // Add entry to the tempwal - await tempwal.append(entry.path); + await filtered_log.append(entry.path); return true; } catch (error) { @@ -220,8 +264,13 @@ class TapeCloudGlacierBackend extends GlacierBackend { return true; } - // Something else is wrong so skip processing the file for now - return false; + dbg.log0( + 'adding log entry', entry.path, + 'to failure recorder due to error', error, + ); + await failure_recorder(entry.path); + + return true; } }); @@ -239,15 +288,14 @@ class TapeCloudGlacierBackend extends GlacierBackend { } // If we didn't find any candidates despite complete read, exit and delete this WAL - if (tempwal.local_size === 0) return true; + if (filtered_log.local_size === 0) return true; - await tempwal.close(); - const failed = await this._recall(tempwal.active_path); + await filtered_log.close(); + await this._recall(filtered_log.active_path, failure_recorder); - tempwalreader = new NewlineReader(fs_context, tempwal.active_path, "EXCLUSIVE"); + filtered_log_reader = new NewlineReader(fs_context, filtered_log.active_path, "EXCLUSIVE"); - // Start iteration over the WAL again - [processed, result] = await tempwalreader.forEachFilePathEntry(async entry => { + [processed, result] = await filtered_log_reader.forEachFilePathEntry(async entry => { let fh = null; try { fh = await entry.open(); @@ -258,12 +306,6 @@ class TapeCloudGlacierBackend extends GlacierBackend { ] }); - // We noticed that the file has failed earlier - // so mustn't have been part of the WAL, ignore - if (failed.includes(entry.path)) { - return true; - } - const days = Number(stat.xattr[GlacierBackend.XATTR_RESTORE_REQUEST]); const expires_on = GlacierBackend.generate_expiry( new Date(), @@ -295,8 +337,7 @@ class TapeCloudGlacierBackend extends GlacierBackend { if (!result) return false; - // Even if we failed to process one entry in log, preserve the WAL - return failed.length === 0; + return true; } catch (error) { dbg.error('unexpected error occured while processing restore WAL:', error); @@ -307,11 +348,11 @@ class TapeCloudGlacierBackend extends GlacierBackend { return false; } finally { if (walreader) await walreader.close(); - if (tempwalreader) await tempwalreader.close(); + if (filtered_log_reader) await filtered_log_reader.close(); - if (tempwal) { - await tempwal.close(); - await tempwal.remove(); + if (filtered_log) { + await filtered_log.close(); + await filtered_log.remove(); } } } @@ -336,9 +377,10 @@ class TapeCloudGlacierBackend extends GlacierBackend { * * NOTE: Must be overwritten for tests * @param {string} file + * @param {(entry: string) => Promise} recorder */ - async _migrate(file) { - return migrate(file); + async _migrate(file, recorder) { + return migrate(file, recorder); } /** @@ -346,9 +388,10 @@ class TapeCloudGlacierBackend extends GlacierBackend { * * NOTE: Must be overwritten for tests * @param {string} file + * @param {(entry: string) => Promise} recorder */ - async _recall(file) { - return recall(file); + async _recall(file, recorder) { + return recall(file, recorder); } /** diff --git a/src/test/unit_tests/test_nsfs_glacier_backend.js b/src/test/unit_tests/test_nsfs_glacier_backend.js index 7db51deb5f..f620ecd5f5 100644 --- a/src/test/unit_tests/test_nsfs_glacier_backend.js +++ b/src/test/unit_tests/test_nsfs_glacier_backend.js @@ -15,6 +15,7 @@ const buffer_utils = require('../../util/buffer_utils'); const endpoint_stats_collector = require('../../sdk/endpoint_stats_collector'); const { NewlineReader } = require('../../util/file_reader'); const { TapeCloudGlacierBackend } = require('../../sdk/nsfs_glacier_backend/tapecloud'); +const { PersistentLogger } = require('../../util/persistent_logger'); const { GlacierBackend } = require('../../sdk/nsfs_glacier_backend/backend'); const mkdtemp = util.promisify(fs.mkdtemp); @@ -53,10 +54,31 @@ mocha.describe('nsfs_glacier', async () => { stats: endpoint_stats_collector.instance(), }); + glacier_ns._is_storage_class_supported = async () => true; mocha.before(async () => { config.NSFS_GLACIER_LOGS_DIR = await mkdtemp(path.join(os.tmpdir(), 'nsfs-wal-')); + + // Replace the logger by custom one + + const migrate_wal = NamespaceFS._migrate_wal; + NamespaceFS._migrate_wal = new PersistentLogger( + config.NSFS_GLACIER_LOGS_DIR, + GlacierBackend.MIGRATE_WAL_NAME, + { locking: 'EXCLUSIVE', poll_interval: 10 } + ); + + if (migrate_wal) await migrate_wal.close(); + + const restore_wal = NamespaceFS._restore_wal; + NamespaceFS._restore_wal = new PersistentLogger( + config.NSFS_GLACIER_LOGS_DIR, + GlacierBackend.RESTORE_WAL_NAME, + { locking: 'EXCLUSIVE', poll_interval: 10 } + ); + + if (restore_wal) await restore_wal.close(); }); mocha.describe('nsfs_glacier_tapecloud', async () => { @@ -84,12 +106,6 @@ mocha.describe('nsfs_glacier', async () => { console.log('upload_object response', inspect(upload_res)); - // Force a swap, 3 cases are possible: - // 1. The file was already swapped - Unlikely but whatever - // 2. The file was empty (bug) - swap returns without doing anything - // 3. The file is swapped successfully - await NamespaceFS.migrate_wal._swap(); - // Check if the log contains the entry let found = false; await NamespaceFS.migrate_wal.process_inactive(async file => { @@ -132,12 +148,6 @@ mocha.describe('nsfs_glacier', async () => { const restore_res = await glacier_ns.restore_object(params, dummy_object_sdk); assert(restore_res); - // Force a swap, 3 cases are possible: - // 1. The file was already swapped - Unlikely but whatever - // 2. The file was empty (bug) - swap returns without doing anything - // 3. The file is swapped successfully - await NamespaceFS.restore_wal._swap(); - // Issue restore await NamespaceFS.restore_wal.process_inactive(async file => { const fs_context = glacier_ns.prepare_fs_context(dummy_object_sdk); diff --git a/src/util/file_reader.js b/src/util/file_reader.js index 747ff04db0..5938bbc075 100644 --- a/src/util/file_reader.js +++ b/src/util/file_reader.js @@ -21,7 +21,7 @@ class NewlineReader { * in memory. * @param {nb.NativeFSContext} fs_context * @param {string} filepath - * @param {'EXCLUSIVE' | 'SHARED' | undefined} lock + * @param {'EXCLUSIVE' | 'SHARED'} [lock] */ constructor(fs_context, filepath, lock) { this.path = filepath; @@ -83,7 +83,7 @@ class NewlineReader { let count = 0; while (entry !== null) { count += 1; - if (!await cb(entry)) return [count, false]; + if ((await cb(entry)) === false) return [count, false]; entry = await this.nextline(); } diff --git a/src/util/persistent_logger.js b/src/util/persistent_logger.js index 1f41366a66..fd327cf243 100644 --- a/src/util/persistent_logger.js +++ b/src/util/persistent_logger.js @@ -21,9 +21,8 @@ class PersistentLogger { * @param {string} dir parent directory * @param {string} namespace file prefix * @param {{ - * max_interval?: Number, + * poll_interval?: Number, * locking?: "SHARED" | "EXCLUSIVE", - * disable_rotate?: boolean, * }} cfg */ constructor(dir, namespace, cfg) { @@ -43,7 +42,7 @@ class PersistentLogger { this.init_lock = new Semaphore(1); - if (!cfg.disable_rotate) this._auto_rotate(); + if (cfg.poll_interval) this._poll_active_file_change(cfg.poll_interval); } async init() { @@ -100,74 +99,18 @@ class PersistentLogger { }); } + /** + * appends the given data to the log file + * @param {string} data + */ async append(data) { const fh = await this.init(); - const buf = Buffer.from(data + "\n", 'utf8'); + const buf = Buffer.from(data + '\n', 'utf8'); await fh.write(this.fs_context, buf, buf.length); this.local_size += buf.length; } - _auto_rotate() { - this.swap_lock_file = path.join(this.dir, `${this.namespace}.swaplock`); - - setInterval(async () => { - await this._swap(); - }, this.cfg.max_interval).unref(); - } - - async _swap() { - if (!this.fh || !this.local_size) return; - - let slfh = null; - try { - // Taking this lock ensure that when the file isn't moved between us checking the inode - // and performing the rename - slfh = await nb_native().fs.open(this.fs_context, this.swap_lock_file, 'rw'); - await slfh.flock(this.fs_context, 'EXCLUSIVE'); - - let path_stat = null; - try { - // Ensure that the inode of the `this.active_path` is the same as the one we opened - path_stat = await nb_native().fs.stat(this.fs_context, this.active_path, {}); - } catch (error) { - if (error.code === 'ENOENT') { - // Some other process must have renamed the file - dbg.log1('got ENOENT for the active file'); - } else { - // TODO: Unexpected case, handle better - dbg.error('failed to stat current file:', error); - } - } - - if (path_stat && path_stat.ino === this.fh_stat.ino) { - // Yes, time can drift. It can go in past or future. This at times might produce - // duplicate names or might produce names which ideally would have produced in the past. - // - // Hence, the order of files in the directory is not guaranteed to be in order of "time". - const inactive_file = `${this.namespace}.${Date.now()}.log`; - try { - await nb_native().fs.rename(this.fs_context, this.active_path, path.join(this.dir, inactive_file)); - } catch (error) { - // It isn't really expected that this will fail assuming all the processes respect the locking - // semantics - // TODO: Unexpected case, handle better - dbg.error('failed to rename file', error); - } - } - - await this.close(); - } catch (error) { - dbg.log0( - 'failed to get swap lock:', error, - 'dir:', this.dir, - 'file:', this.file, - ); - } finally { - if (slfh) await slfh.close(this.fs_context); - } - } - async close() { const fh = this.fh; @@ -188,14 +131,28 @@ class PersistentLogger { /** * process_inactive takes a callback and runs it on all past WAL files. - * It does not do so in any particular order. + * It does so in lexographically sorted order. * @param {(file: string) => Promise} cb callback + * @param {boolean} replace_active */ - async process_inactive(cb) { - const files = await nb_native().fs.readdir(this.fs_context, this.dir); - const filtered = files.filter(f => this.inactive_regex.test(f.name) && f.name !== this.file && !native_fs_utils.isDirectory(f)); + async process_inactive(cb, replace_active = true) { + if (replace_active) { + await this._replace_active(); + } + + let filtered_files = []; + try { + const files = await nb_native().fs.readdir(this.fs_context, this.dir); + filtered_files = files + .sort((a, b) => a.name.localeCompare(b.name)) + .filter(f => this.inactive_regex.test(f.name) && f.name !== this.file && !native_fs_utils.isDirectory(f)); + } catch (error) { + dbg.error('failed reading dir:', this.dir, 'with error:', error); + return; + } - for (const file of filtered) { + for (const file of filtered_files) { + dbg.log1('Processing', this.dir, file); const delete_processed = await cb(path.join(this.dir, file.name)); if (delete_processed) { await nb_native().fs.unlink(this.fs_context, path.join(this.dir, file.name)); @@ -203,9 +160,43 @@ class PersistentLogger { } } + async _replace_active() { + const inactive_file = `${this.namespace}.${Date.now()}.log`; + const inactive_file_path = path.join(this.dir, inactive_file); + + try { + await nb_native().fs.rename(this.fs_context, this.active_path, inactive_file_path); + } catch (error) { + dbg.warn('failed to rename active file:', error); + } + } + async _open() { return nb_native().fs.open(this.fs_context, this.active_path, 'as'); } + + _poll_active_file_change(poll_interval) { + setInterval(async () => { + try { + const stat = await nb_native().fs.stat(this.fs_context, this.active_path); + + // Don't race with init process - Can happen if arogue/misconfigured + // process is continuously moving the active file + this.init_lock.surround(async () => { + // If the file has changed, re-init + if (stat.ino !== this.fh_stat.ino) { + dbg.log1('active file changed, closing for namespace:', this.namespace); + await this.close(); + } + }); + } catch (error) { + if (error.code === 'ENOENT') { + dbg.log1('active file removed, closing for namespace:', this.namespace); + await this.close(); + } + } + }, poll_interval).unref(); + } } From 37043a5d4da1379b8c4d52a6ec3cd9800053d18f Mon Sep 17 00:00:00 2001 From: Utkarsh Srivastava Date: Wed, 27 Mar 2024 01:15:36 +0530 Subject: [PATCH 08/14] provide better utilities to operate over log files Signed-off-by: Utkarsh Srivastava (cherry picked from commit a1a6cce4b8a6cd2885c68e21d088874d0b9657b8) --- src/manage_nsfs/manage_nsfs_glacier.js | 39 +-- src/sdk/nsfs_glacier_backend/tapecloud.js | 225 ++++++------------ .../unit_tests/test_nsfs_glacier_backend.js | 4 +- src/util/persistent_logger.js | 106 ++++++++- 4 files changed, 180 insertions(+), 194 deletions(-) diff --git a/src/manage_nsfs/manage_nsfs_glacier.js b/src/manage_nsfs/manage_nsfs_glacier.js index 624a234124..9352259d15 100644 --- a/src/manage_nsfs/manage_nsfs_glacier.js +++ b/src/manage_nsfs/manage_nsfs_glacier.js @@ -124,42 +124,13 @@ async function record_current_time(fs_context, timestamp_file) { * @param {Function} cb */ async function run_glacier_operation(fs_context, log_namespace, cb) { - let log = null; - let failure_log = null; - + const log = new PersistentLogger(config.NSFS_GLACIER_LOGS_DIR, log_namespace, { locking: 'EXCLUSIVE' }); try { - // This logger is getting opened only so that we can process all the process the entries - log = new PersistentLogger(config.NSFS_GLACIER_LOGS_DIR, log_namespace, { locking: 'EXCLUSIVE' }); - failure_log = new PersistentLogger( - config.NSFS_GLACIER_LOGS_DIR, - `${log_namespace}.failure`, - { locking: 'EXCLUSIVE' }, - ); - - try { - // Process all the inactive and currently active log - await log.process_inactive(async file => cb(fs_context, file, failure_log.append.bind(failure_log))); - } catch (error) { - console.error('failed to process logs, error:', error, 'log_namespace:', log_namespace); - } - - try { - // Process the inactive failure logs (don't process the current though) - // This will REMOVE the previous failure logs and will merge them with the current failures - await failure_log.process_inactive(async file => cb(fs_context, file, failure_log.append.bind(failure_log)), false); - } catch (error) { - console.error('failed to process failure logs:', error, 'log_namespace:', log_namespace); - } - - try { - // Finally replace the current active so as to consume them in the next iteration - await failure_log._replace_active(); - } catch (error) { - console.error('failed to replace active failure log:', error, 'log_namespace:', log_namespace); - } + await log.process(async (entry, failure_recorder) => cb(fs_context, entry, failure_recorder)); + } catch (error) { + console.error('failed to process log in namespace:', log_namespace); } finally { - if (log) await log.close(); - if (failure_log) await failure_log.close(); + await log.close(); } } diff --git a/src/sdk/nsfs_glacier_backend/tapecloud.js b/src/sdk/nsfs_glacier_backend/tapecloud.js index 05d3df0c43..8ff3553fe7 100644 --- a/src/sdk/nsfs_glacier_backend/tapecloud.js +++ b/src/sdk/nsfs_glacier_backend/tapecloud.js @@ -5,8 +5,8 @@ const { spawn } = require("child_process"); const events = require('events'); const os = require("os"); const path = require("path"); -const { PersistentLogger } = require("../../util/persistent_logger"); -const { NewlineReader } = require('../../util/file_reader'); +const { LogFile } = require("../../util/persistent_logger"); +const { NewlineReader, NewlineReaderEntry } = require('../../util/file_reader'); const { GlacierBackend } = require("./backend"); const config = require('../../../config'); const { exec } = require('../../util/os_utils'); @@ -152,208 +152,119 @@ class TapeCloudGlacierBackend extends GlacierBackend { async migrate(fs_context, log_file, failure_recorder) { dbg.log2('TapeCloudGlacierBackend.migrate starting for', log_file); - let filtered_log = null; - let walreader = null; - try { - filtered_log = new PersistentLogger( - config.NSFS_GLACIER_LOGS_DIR, - `tapecloud_migrate_run_${Date.now().toString()}`, - { locking: 'EXCLUSIVE' }, - ); - - walreader = new NewlineReader(fs_context, log_file, 'EXCLUSIVE'); + const file = new LogFile(fs_context, log_file); - const [processed, result] = await walreader.forEachFilePathEntry(async entry => { + try { + await file.collect_and_process(async (entry, batch_recorder) => { let should_migrate = true; try { - should_migrate = await this.should_migrate(fs_context, entry.path); + should_migrate = await this.should_migrate(fs_context, entry); } catch (err) { if (err.code === 'ENOENT') { // Skip this file - return true; + return; } dbg.log0( - 'adding log entry', entry.path, + 'adding log entry', entry, 'to failure recorder due to error', err, ); - await failure_recorder(entry.path); - return true; + // Can't really do anything if this fails - provider + // needs to make sure that appropriate error handling + // is being done there + await failure_recorder(entry); + return; } // Skip the file if it shouldn't be migrated - if (!should_migrate) return true; - - await filtered_log.append(entry.path); - return true; + if (!should_migrate) return; + + // Can't really do anything if this fails - provider + // needs to make sure that appropriate error handling + // is being done there + await batch_recorder(entry); + }, + async batch => { + // This will throw error only if our eeadm error handler + // panics as well and at that point it's okay to + // not handle the error and rather keep the log file around + await this._migrate(batch, failure_recorder); }); - // If the result of the above is false then it indicates that we concluded - // to exit early hence the file shouldn't be processed further, exit - // - // NOTE: Should not hit this case anymore - if (!result) return false; - - // If we didn't read even one line then it most likely indicates that the WAL is - // empty - this case is unlikely given the mechanism of WAL but still needs to be - // handled. - // Return `true` to mark it for deletion. - if (processed === 0) { - dbg.warn('unexpected empty persistent log found:', log_file); - return true; - } - // If we didn't find any candidates despite complete read, exit and delete this WAL - if (filtered_log.local_size === 0) return true; - - await filtered_log.close(); - await this._migrate(filtered_log.active_path, failure_recorder); - - // Delete the log if the above write succeeds or else keep it return true; } catch (error) { - dbg.error('unexpected error occured while processing migrate WAL:', error); - - // Preserve the WAL if we encounter exception here, possible failures - // 1. eeadm command failure - // 2. tempwal failure - // 3. newline reader failure - // 4. failure log failure + dbg.error('unexpected error in processing migrate:', error, 'for:', log_file); return false; - } finally { - if (filtered_log) { - await filtered_log.close(); - await filtered_log.remove(); - } - - if (walreader) await walreader.close(); } } async restore(fs_context, log_file, failure_recorder) { dbg.log2('TapeCloudGlacierBackend.restore starting for', log_file); - let filtered_log = null; - let walreader = null; - let filtered_log_reader = null; + const file = new LogFile(fs_context, log_file); try { - // tempwal will store all the files of interest and will be handed over to tapecloud script - filtered_log = new PersistentLogger( - config.NSFS_GLACIER_LOGS_DIR, - `tapecloud_restore_run_${Date.now().toString()}`, - { locking: 'EXCLUSIVE' }, - ); - - walreader = new NewlineReader(fs_context, log_file, 'EXCLUSIVE'); - - let [processed, result] = await walreader.forEachFilePathEntry(async entry => { + await file.collect_and_process(async (entry, batch_recorder) => { try { - const should_restore = await this.should_restore(fs_context, entry.path); + const should_restore = await this.should_restore(fs_context, entry); if (!should_restore) { // Skip this file - return true; + return; } // Add entry to the tempwal - await filtered_log.append(entry.path); - - return true; + await batch_recorder(entry); } catch (error) { if (error.code === 'ENOENT') { // Skip this file - return true; + return; } dbg.log0( - 'adding log entry', entry.path, + 'adding log entry', entry, 'to failure recorder due to error', error, ); - await failure_recorder(entry.path); - - return true; + await failure_recorder(entry); } - }); - - // If the result of the above iteration was negative it indicates - // an early exit hence no need to process further for now - if (!result) return false; - - // If we didn't read even one line then it most likely indicates that the WAL is - // empty - this case is unlikely given the mechanism of WAL but still needs to be - // handled. - // Return `true` so as clear this file - if (processed === 0) { - dbg.warn('unexpected empty persistent log found:', log_file); - return true; - } - - // If we didn't find any candidates despite complete read, exit and delete this WAL - if (filtered_log.local_size === 0) return true; - - await filtered_log.close(); - await this._recall(filtered_log.active_path, failure_recorder); - - filtered_log_reader = new NewlineReader(fs_context, filtered_log.active_path, "EXCLUSIVE"); - - [processed, result] = await filtered_log_reader.forEachFilePathEntry(async entry => { - let fh = null; - try { - fh = await entry.open(); - - const stat = await fh.stat(fs_context, { - xattr_get_keys: [ - GlacierBackend.XATTR_RESTORE_REQUEST, - ] - }); - - const days = Number(stat.xattr[GlacierBackend.XATTR_RESTORE_REQUEST]); - const expires_on = GlacierBackend.generate_expiry( - new Date(), - days, - config.NSFS_GLACIER_EXPIRY_TIME_OF_DAY, - config.NSFS_GLACIER_EXPIRY_TZ, - ); - - await fh.replacexattr(fs_context, { - [GlacierBackend.XATTR_RESTORE_EXPIRY]: expires_on.toISOString(), - }, GlacierBackend.XATTR_RESTORE_REQUEST); - - return true; - } catch (error) { - dbg.error(`failed to process ${entry.path}`, error); - // It's OK if the file got deleted between the last check and this check - // but if there is any other error, retry restore - // - // It could be that the error is transient and the actual - // restore did successfully take place, in that case, rely on tapecloud script to - // handle dups - if (error.code !== 'ENOENT') { - return false; + }, + async batch => { + await this._recall(batch, failure_recorder); + + const batch_file = new LogFile(fs_context, batch); + await batch_file.collect_and_process(async (entry_path, batch_recorder) => { + const entry = new NewlineReaderEntry(fs_context, entry_path); + let fh = null; + try { + fh = await entry.open(); + + const stat = await fh.stat(fs_context, { + xattr_get_keys: [ + GlacierBackend.XATTR_RESTORE_REQUEST, + ] + }); + + const days = Number(stat.xattr[GlacierBackend.XATTR_RESTORE_REQUEST]); + const expires_on = GlacierBackend.generate_expiry( + new Date(), + days, + config.NSFS_GLACIER_EXPIRY_TIME_OF_DAY, + config.NSFS_GLACIER_EXPIRY_TZ, + ); + + await fh.replacexattr(fs_context, { + [GlacierBackend.XATTR_RESTORE_EXPIRY]: expires_on.toISOString(), + }, GlacierBackend.XATTR_RESTORE_REQUEST); + } catch (error) { + dbg.error(`failed to process ${entry.path}`, error); + } finally { + if (fh) await fh.close(fs_context); } - } finally { - if (fh) await fh.close(fs_context); - } + }); }); - - if (!result) return false; - return true; } catch (error) { - dbg.error('unexpected error occured while processing restore WAL:', error); - - // Preserve the WAL, failure cases: - // 1. tapecloud command exception - // 2. WAL open failure - // 3. Newline reader failure + dbg.error('unexpected error in processing restore:', error, 'for:', log_file); return false; - } finally { - if (walreader) await walreader.close(); - if (filtered_log_reader) await filtered_log_reader.close(); - - if (filtered_log) { - await filtered_log.close(); - await filtered_log.remove(); - } } } diff --git a/src/test/unit_tests/test_nsfs_glacier_backend.js b/src/test/unit_tests/test_nsfs_glacier_backend.js index f620ecd5f5..16a8760026 100644 --- a/src/test/unit_tests/test_nsfs_glacier_backend.js +++ b/src/test/unit_tests/test_nsfs_glacier_backend.js @@ -108,7 +108,7 @@ mocha.describe('nsfs_glacier', async () => { // Check if the log contains the entry let found = false; - await NamespaceFS.migrate_wal.process_inactive(async file => { + await NamespaceFS.migrate_wal._process(async file => { const fs_context = glacier_ns.prepare_fs_context(dummy_object_sdk); const reader = new NewlineReader(fs_context, file, 'EXCLUSIVE'); @@ -149,7 +149,7 @@ mocha.describe('nsfs_glacier', async () => { assert(restore_res); // Issue restore - await NamespaceFS.restore_wal.process_inactive(async file => { + await NamespaceFS.restore_wal._process(async file => { const fs_context = glacier_ns.prepare_fs_context(dummy_object_sdk); await backend.restore(fs_context, file); diff --git a/src/util/persistent_logger.js b/src/util/persistent_logger.js index fd327cf243..cf16812876 100644 --- a/src/util/persistent_logger.js +++ b/src/util/persistent_logger.js @@ -6,6 +6,7 @@ const nb_native = require('./nb_native'); const native_fs_utils = require('./native_fs_utils'); const P = require('./promise'); const Semaphore = require('./semaphore'); +const { NewlineReader } = require('./file_reader'); const dbg = require('./debug_module')(__filename); /** @@ -135,7 +136,7 @@ class PersistentLogger { * @param {(file: string) => Promise} cb callback * @param {boolean} replace_active */ - async process_inactive(cb, replace_active = true) { + async _process(cb, replace_active = true) { if (replace_active) { await this._replace_active(); } @@ -160,6 +161,48 @@ class PersistentLogger { } } + /** + * process is a safe wrapper around _process function which creates a failure logger for the + * callback function which allows persisting failures to disk + * @param {(file: string, failure_recorder: (entry: string) => Promise) => Promise} cb callback + */ + async process(cb) { + let failure_log = null; + + try { + // This logger is getting opened only so that we can process all the process the entries + failure_log = new PersistentLogger( + this.dir, + `${this.namespace}.failure`, + { locking: 'EXCLUSIVE' }, + ); + + try { + // Process all the inactive and currently active log + await this._process(async file => cb(file, failure_log.append.bind(failure_log))); + } catch (error) { + dbg.error('failed to process logs, error:', error, 'log_namespace:', this.namespace); + } + + try { + // Process the inactive failure logs (don't process the current though) + // This will REMOVE the previous failure logs and will merge them with the current failures + await failure_log._process(async file => cb(file, failure_log.append.bind(failure_log)), false); + } catch (error) { + dbg.error('failed to process failure logs:', error, 'log_namespace:', this.namespace); + } + + try { + // Finally replace the current active so as to consume them in the next iteration + await failure_log._replace_active(); + } catch (error) { + dbg.error('failed to replace active failure log:', error, 'log_namespace:', this.namespace); + } + } finally { + if (failure_log) await failure_log.close(); + } + } + async _replace_active() { const inactive_file = `${this.namespace}.${Date.now()}.log`; const inactive_file_path = path.join(this.dir, inactive_file); @@ -199,5 +242,66 @@ class PersistentLogger { } } +class LogFile { + /** + * @param {nb.NativeFSContext} fs_context + * @param {string} log_path + */ + constructor(fs_context, log_path) { + this.fs_context = fs_context; + this.log_path = log_path; + } + + /** + * batch_and_consume takes 2 functins, first function iterates over the log file + * line by line and can choose to add some entries to a batch and then the second + * function will be invoked to a with a path to the persistent log. + * + * + * The fact that this function allows easy iteration and then later on optional consumption + * of that batch provides the ability to invoke this funcition recursively composed in whatever + * order that is required. + * @param {(entry: string, batch_recorder: (entry: string) => Promise) => Promise} collect + * @param {(batch: string) => Promise} [process] + * @returns {Promise} + */ + async collect_and_process(collect, process) { + let log_reader = null; + let filtered_log = null; + try { + filtered_log = new PersistentLogger( + path.dirname(this.log_path), + `tmp_consume_${Date.now().toString()}`, + { locking: 'EXCLUSIVE'} + ); + + log_reader = new NewlineReader(this.fs_context, this.log_path, 'EXCLUSIVE'); + await log_reader.forEach(async entry => { + await collect(entry, filtered_log.append.bind(filtered_log)); + return true; + }); + + if (filtered_log.local_size === 0) return; + + await filtered_log.close(); + await process?.(filtered_log.active_path); + } catch (error) { + dbg.error('unexpected error in consuming log file:', this.log_path); + + // bubble the error to the caller + throw error; + } finally { + if (log_reader) { + await log_reader.close(); + } + + if (filtered_log) { + await filtered_log.close(); + await filtered_log.remove(); + } + } + } +} exports.PersistentLogger = PersistentLogger; +exports.LogFile = LogFile; From d6f6cfb266f04ef90dfbe7a996f5e7893a2d3b5b Mon Sep 17 00:00:00 2001 From: shirady <57721533+shirady@users.noreply.github.com> Date: Sun, 31 Mar 2024 15:57:34 +0300 Subject: [PATCH 09/14] Fix Bug BZ 2271067 | NSFS | Change mapping in pool server 1. In pool_server inside calc_namespace_resource_mode remove the mapping of ENOENT to storage_not_exist. 2. In the namespace_monitor change the error code either from a thrown error or an error that we created to a code that we choose and add log printing (could be seen in the endpoint logs). 3. Add config NS_MAX_ALLOWED_IO_ERRORS that we will use in the calc_namespace_resource_mode. Signed-off-by: shirady <57721533+shirady@users.noreply.github.com> (cherry picked from commit 07b195cbd8243a133040fd6074343bf985e8fd73) --- config.js | 5 +++++ src/server/bg_services/namespace_monitor.js | 19 +++++++++++++++++-- src/server/system_services/pool_server.js | 3 +-- 3 files changed, 23 insertions(+), 4 deletions(-) diff --git a/config.js b/config.js index 6e25f36255..0cc9cce980 100644 --- a/config.js +++ b/config.js @@ -607,6 +607,11 @@ config.WORM_ENABLED = false; config.NAMESPACE_MONITOR_ENABLED = true; config.NAMESPACE_MONITOR_DELAY = 3 * 60 * 1000; +////////////////////////////////// +// NAMESPACE MODE CALC // +////////////////////////////////// + +config.NS_MAX_ALLOWED_IO_ERRORS = 9; //////////////////////////////// // BUCKET REPLICATOR // diff --git a/src/server/bg_services/namespace_monitor.js b/src/server/bg_services/namespace_monitor.js index 440f4d33df..a852d7e562 100644 --- a/src/server/bg_services/namespace_monitor.js +++ b/src/server/bg_services/namespace_monitor.js @@ -13,6 +13,7 @@ const nb_native = require('../../util/nb_native'); const config = require('../../../config'); const P = require('../../util/promise'); const noobaa_s3_client = require('../../sdk/noobaa_s3_client/noobaa_s3_client'); +const S3Error = require('../../endpoint/s3/s3_errors').S3Error; class NamespaceMonitor { @@ -205,7 +206,12 @@ class NamespaceMonitor { } } + // In test_nsfs_resource we check readdir and stat - + // readdir checks read permissions and in the past stat didn't check read permissions. + // Nowadays stat also checks read permissions so now readdir is redundant - decided to keep it. async test_nsfs_resource(nsr) { + dbg.log1('test_nsfs_resource: (name, namespace_store, nsfs_config):', + nsr.name, nsr.namespace_store, nsr.nsfs_config); try { const fs_context = { backend: nsr.nsfs_config.fs_backend || '', @@ -214,12 +220,21 @@ class NamespaceMonitor { await nb_native().fs.readdir(fs_context, nsr.nsfs_config.fs_root_path); const stat = await nb_native().fs.stat(fs_context, nsr.nsfs_config.fs_root_path); //In the event of deleting the nsr.nsfs_config.fs_root_path in the FS side, - // The number of link will be 0, then we will throw ENOENT which translate to STORAGE_NOT_EXIST + // The number of link will be 0, then we will throw an error which translate to STORAGE_NOT_EXIST if (stat.nlink === 0) { + dbg.log1('test_nsfs_resource: the number of links is 0', nsr.nsfs_config); throw Object.assign(new Error('FS root path has no links'), { code: 'ENOENT' }); } } catch (err) { - dbg.log1('test_nsfs_resource: got error:', err, nsr.nsfs_config); + dbg.error('test_nsfs_resource: got error:', err, nsr.nsfs_config); + // we change the code to control the mapping in pool server when calc the namespace mode + if (err.code === 'ENOENT') { + // it can happen if (1) stat/readdir threw ENOENT or (2) the number of links is 0 + throw Object.assign(err, { code: S3Error.NoSuchBucket.code }); + } + if (err.code === `EPERM` || err.code === `EACCES`) { + throw Object.assign(err, { code: S3Error.AccessDenied.code }); + } throw err; } } diff --git a/src/server/system_services/pool_server.js b/src/server/system_services/pool_server.js index 5dfa92a5fc..9112b20d7a 100644 --- a/src/server/system_services/pool_server.js +++ b/src/server/system_services/pool_server.js @@ -1097,7 +1097,6 @@ function calc_namespace_resource_mode(namespace_resource) { const map_err_to_type_count = { ContainerNotFound: 'storage_not_exist', NoSuchBucket: 'storage_not_exist', - ENOENT: 'storage_not_exist', AccessDenied: 'auth_failed', AuthenticationFailed: 'auth_failed', }; @@ -1114,7 +1113,7 @@ function calc_namespace_resource_mode(namespace_resource) { const mode = (errors_count.storage_not_exist && 'STORAGE_NOT_EXIST') || (errors_count.auth_failed && 'AUTH_FAILED') || - (errors_count.io_errors > config.CLOUD_MAX_ALLOWED_IO_TEST_ERRORS && 'IO_ERRORS') || + (errors_count.io_errors > config.NS_MAX_ALLOWED_IO_ERRORS && 'IO_ERRORS') || 'OPTIMAL'; return mode; From 5ba72674e4e4b81518eb3d53a8833ae6a9bf81d4 Mon Sep 17 00:00:00 2001 From: Utkarsh Srivastava Date: Tue, 2 Apr 2024 20:17:14 +0530 Subject: [PATCH 10/14] add basic support for write thresholds Signed-off-by: Utkarsh Srivastava use LRU Cache and cache state per bucket path Signed-off-by: Utkarsh Srivastava minimize space cache to be per fsid Signed-off-by: Utkarsh Srivastava address PR comments Signed-off-by: Utkarsh Srivastava address PR comments Signed-off-by: Utkarsh Srivastava (cherry picked from commit 4967d3c260de2afd4d313e1eb36419fbceb3ece4) --- config.js | 28 +++++++++++++++ src/sdk/namespace_fs.js | 78 +++++++++++++++++++++++++++++++++++++++++ 2 files changed, 106 insertions(+) diff --git a/config.js b/config.js index 0cc9cce980..a365795b83 100644 --- a/config.js +++ b/config.js @@ -761,6 +761,34 @@ config.NSFS_GLACIER_EXPIRY_TZ = 'LOCAL'; // Format must be HH:MM:SS config.NSFS_GLACIER_EXPIRY_TIME_OF_DAY = '00:00:00'; +config.NSFS_STATFS_CACHE_SIZE = config.NSFS_DIR_CACHE_MAX_TOTAL_SIZE; +config.NSFS_STATFS_CACHE_EXPIRY_MS = 1 * 1000; + +// NSFS_LOW_FREE_SPACE_CHECK_ENABLED if set to true will use the below mentioned +// thresholds to determine if the writes should be denied even +// before we hit ENOSPC more filesystem. +config.NSFS_LOW_FREE_SPACE_CHECK_ENABLED = false; + +// NSFS_LOW_FREE_SPACE_MB controls that how much space in +// bytes does NooBaa consider to be too low to perform `PUT` operations +// safely. +config.NSFS_LOW_FREE_SPACE_MB = 8 * 1024; + +// NSFS_LOW_FREE_SPACE_PERCENT controls how much space in terms of +// percentage does NooBaa consider to be too low to perform `PUT` +// operations safely. +config.NSFS_LOW_FREE_SPACE_PERCENT = 0.08; + +// NSFS_LOW_FREE_SPACE_MB_UNLEASH controls how much much space in bytes +// does NooBaa consider to be enough to perform `PUT` operations +// safely. +config.NSFS_LOW_FREE_SPACE_MB_UNLEASH = 10 * 1024; + +// NSFS_LOW_FREE_SPACE_PERCENT_UNLEASH controls how much much space in of +// percentage does NooBaa consider to be enough to perform `PUT` +// operations safely. +config.NSFS_LOW_FREE_SPACE_PERCENT_UNLEASH = 0.10; + //////////////////////////// // NSFS NON CONTAINERIZED // //////////////////////////// diff --git a/src/sdk/namespace_fs.js b/src/sdk/namespace_fs.js index 13b19d17df..26fce45cf2 100644 --- a/src/sdk/namespace_fs.js +++ b/src/sdk/namespace_fs.js @@ -415,6 +415,28 @@ const versions_dir_cache = new LRUCache({ max_usage: config.NSFS_DIR_CACHE_MAX_TOTAL_SIZE, }); +/** + * @typedef {{ + * statfs: Record + * }} NsfsBucketStatFsCache + * @type {LRUCache} + */ +const nsfs_bucket_statfs_cache = new LRUCache({ + name: 'nsfs-bucket-statfs', + make_key: ({ bucket_path }) => bucket_path, + load: async ({ bucket_path, fs_context }) => { + const statfs = await nb_native().fs.statfs(fs_context, bucket_path); + return { statfs }; + }, + // validate - no need, validation will be as costly as `load`, + // instead let the item expire + expiry_ms: config.NSFS_STATFS_CACHE_EXPIRY_MS, + max_usage: config.NSFS_STATFS_CACHE_SIZE, +}); + + +const nsfs_low_space_fsids = new Set(); + /** * NamespaceFS map objets to files in a filesystem. * @implements {nb.Namespace} @@ -1066,6 +1088,7 @@ class NamespaceFS { async upload_object(params, object_sdk) { const fs_context = this.prepare_fs_context(object_sdk); await this._load_bucket(params, fs_context); + await this._throw_if_low_space(fs_context, params.size); const open_mode = native_fs_utils._is_gpfs(fs_context) ? 'wt' : 'w'; const file_path = this._get_file_path(params); let upload_params; @@ -1476,6 +1499,7 @@ class NamespaceFS { try { const fs_context = this.prepare_fs_context(object_sdk); await this._load_bucket(params, fs_context); + await this._throw_if_low_space(fs_context); params.obj_id = uuidv4(); params.mpu_path = this._mpu_path(params); await native_fs_utils._create_path(params.mpu_path, fs_context); @@ -1522,6 +1546,7 @@ class NamespaceFS { let part_md_file; try { await this._load_multipart(params, fs_context); + await this._throw_if_low_space(fs_context, params.size); const md_upload_path = this._get_part_md_path(params); part_md_file = await native_fs_utils.open_file(fs_context, this.bucket_path, md_upload_path, md_open_mode); @@ -1621,6 +1646,7 @@ class NamespaceFS { let read_file; let target_file; const fs_context = this.prepare_fs_context(object_sdk); + await this._throw_if_low_space(fs_context); const open_mode = 'w*'; try { const md5_enabled = config.NSFS_CALCULATE_MD5 || (this.force_md5_etag || @@ -2990,6 +3016,58 @@ class NamespaceFS { return false; } + /** + * @param {nb.NativeFSContext} fs_context + * @param {number} [size_hint] + * @returns {Promise} + */ + async _throw_if_low_space(fs_context, size_hint = 0) { + if (!config.NSFS_LOW_FREE_SPACE_CHECK_ENABLED) return; + + const MB = 1024 ** 2; + const { statfs } = await nsfs_bucket_statfs_cache.get_with_cache({ bucket_path: this.bucket_path, fs_context }); + const block_size_mb = statfs.bsize / MB; + const free_space_mb = Math.floor(statfs.bfree * block_size_mb) - Math.floor(size_hint / MB); + const total_space_mb = Math.floor(statfs.blocks * block_size_mb); + + const low_space_threshold = this._get_free_space_threshold( + config.NSFS_LOW_FREE_SPACE_MB, + config.NSFS_LOW_FREE_SPACE_PERCENT, + total_space_mb, + ); + const ok_space_threshold = this._get_free_space_threshold( + config.NSFS_LOW_FREE_SPACE_MB_UNLEASH, + config.NSFS_LOW_FREE_SPACE_PERCENT_UNLEASH, + total_space_mb, + ); + + dbg.log1('_throw_if_low_space:', { free_space_mb, total_space_mb, low_space_threshold, ok_space_threshold }); + + if (nsfs_low_space_fsids.has(statfs.fsid)) { + if (free_space_mb < ok_space_threshold) { + throw new S3Error(S3Error.SlowDown); + } else { + nsfs_low_space_fsids.delete(statfs.fsid); + } + } else if (free_space_mb < low_space_threshold) { + nsfs_low_space_fsids.add(statfs.fsid); + throw new S3Error(S3Error.SlowDown); + } + } + + /** + * _get_free_space_threshold takes the free space threshold + * in bytes and in percentage and returns the one that is lower + * @param {number} in_bytes free space threshold in mb + * @param {number} in_percentage free space threshold in percentage + * @param {number} total_space total space in mb + * @returns {number} + */ + _get_free_space_threshold(in_bytes, in_percentage, total_space) { + const free_from_percentage = in_percentage * total_space; + return Math.max(in_bytes, free_from_percentage); + } + async append_to_migrate_wal(entry) { if (!config.NSFS_GLACIER_LOGS_ENABLED) return; From 69ffacce5da5cb6b1afb150dd4abb6aa9bb03b2e Mon Sep 17 00:00:00 2001 From: Utkarsh Srivastava Date: Thu, 4 Apr 2024 13:31:08 +0530 Subject: [PATCH 11/14] fix statfs cache size Signed-off-by: Utkarsh Srivastava (cherry picked from commit 14d4c653596343a7ae468039ddbd0b56eda3d6fe) --- config.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/config.js b/config.js index a365795b83..62e9c56c60 100644 --- a/config.js +++ b/config.js @@ -761,7 +761,7 @@ config.NSFS_GLACIER_EXPIRY_TZ = 'LOCAL'; // Format must be HH:MM:SS config.NSFS_GLACIER_EXPIRY_TIME_OF_DAY = '00:00:00'; -config.NSFS_STATFS_CACHE_SIZE = config.NSFS_DIR_CACHE_MAX_TOTAL_SIZE; +config.NSFS_STATFS_CACHE_SIZE = 10000; config.NSFS_STATFS_CACHE_EXPIRY_MS = 1 * 1000; // NSFS_LOW_FREE_SPACE_CHECK_ENABLED if set to true will use the below mentioned From 6c2c9512f9941188ed512b09edd3d83909db2609 Mon Sep 17 00:00:00 2001 From: nadav mizrahi Date: Wed, 3 Apr 2024 12:24:45 +0300 Subject: [PATCH 12/14] change bucket policy validation order in bucketspace-fs Signed-off-by: nadav mizrahi (cherry picked from commit 14452977d344c98e3abb50470edb3f7368a9d533) --- src/sdk/bucketspace_fs.js | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/sdk/bucketspace_fs.js b/src/sdk/bucketspace_fs.js index 0460ed026a..ad8e876830 100644 --- a/src/sdk/bucketspace_fs.js +++ b/src/sdk/bucketspace_fs.js @@ -568,12 +568,13 @@ class BucketSpaceFS extends BucketSpaceSimpleFS { const bucket_config_path = this._get_bucket_config_path(name); const { data } = await nb_native().fs.readFile(this.fs_context, bucket_config_path); const bucket = JSON.parse(data.toString()); - await bucket_policy_utils.validate_s3_policy(policy, bucket.name, async principal => this._get_account_by_name(principal)); bucket.s3_policy = policy; const bucket_to_validate = _.omitBy(bucket, _.isUndefined); dbg.log2("put_bucket_policy: bucket properties before validate_bucket_schema", bucket_to_validate); nsfs_schema_utils.validate_bucket_schema(bucket_to_validate); + await bucket_policy_utils.validate_s3_policy(bucket.s3_policy, bucket.name, async principal => + this._get_account_by_name(principal)); const update_bucket = JSON.stringify(bucket); await nb_native().fs.writeFile( this.fs_context, From d6f7c55f5219d4e8c4c7e0411e460554ee207f3b Mon Sep 17 00:00:00 2001 From: naveenpaul1 Date: Wed, 3 Apr 2024 15:37:58 +0530 Subject: [PATCH 13/14] NSFS | Empty 'logs' directory is getting created under 's3-config' directory Signed-off-by: naveenpaul1 (cherry picked from commit bc7f44368c5bcabb581673118176302b1aad0c5b) --- src/util/debug_module.js | 20 -------------------- 1 file changed, 20 deletions(-) diff --git a/src/util/debug_module.js b/src/util/debug_module.js index eefb3c1493..919712c3a5 100644 --- a/src/util/debug_module.js +++ b/src/util/debug_module.js @@ -35,13 +35,6 @@ if (process.env.NOOBAA_LOG_LEVEL) { } } -function _should_log_to_file() { - if (process.env.container === 'docker') return false; - if (process.env.CONTAINER_PLATFORM === 'KUBERNETES') return false; - if (global.document) return false; - return true; -} - // override the default inspect options if (!util.inspect.defaultOptions) util.inspect.defaultOptions = {}; util.inspect.defaultOptions.depth = 10; @@ -205,19 +198,6 @@ class InternalDebugLogger { this._log_console_silent = false; this._log_file = null; - if (!_should_log_to_file()) { - return; - } - - //if logs directory doesn't exist, create it - try { - fs.mkdirSync('./logs'); - } catch (e) { - if (e.code !== 'EEXIST') { - throw e; - } - } - } static instance() { From 7b93ca218298bd3d2033c8b08a1a848a60ce3070 Mon Sep 17 00:00:00 2001 From: shirady <57721533+shirady@users.noreply.github.com> Date: Sun, 7 Apr 2024 08:51:29 +0300 Subject: [PATCH 14/14] Fix Bug BZ 2272900 | NSFS | Remove issues updates with system store 1. Remove issues updates with system store 2. Avoid update issues report in case of no such key when head object 3. Move update last monitoring to the memory instead of system store Signed-off-by: shirady <57721533+shirady@users.noreply.github.com> (cherry picked from commit 88f6b676ed8595143bb3eb15329f7d444c918e8b) --- src/api/pool_api.js | 20 ++++++++++ src/sdk/namespace_fs.js | 25 +++++++++++- src/server/bg_services/namespace_monitor.js | 28 +++++++------- src/server/system_services/pool_server.js | 43 +++++++++++++++------ 4 files changed, 88 insertions(+), 28 deletions(-) diff --git a/src/api/pool_api.js b/src/api/pool_api.js index 14652b5b52..c0cbf08f11 100644 --- a/src/api/pool_api.js +++ b/src/api/pool_api.js @@ -377,6 +377,26 @@ module.exports = { } }, + update_last_monitoring: { + doc: 'Update last namespace monitoring', + method: 'POST', + params: { + type: 'object', + required: ['namespace_resource_id', 'last_monitoring'], + properties: { + last_monitoring: { + idate: true, + }, + namespace_resource_id: { + objectid: true + }, + } + }, + auth: { + system: 'admin' + } + }, + scale_hosts_pool: { doc: 'Change the pool\'s underlaying host count', method: 'POST', diff --git a/src/sdk/namespace_fs.js b/src/sdk/namespace_fs.js index 26fce45cf2..191040eab2 100644 --- a/src/sdk/namespace_fs.js +++ b/src/sdk/namespace_fs.js @@ -537,6 +537,24 @@ class NamespaceFS { } } + /** + * _should_update_issues_report is intended to avoid updating the namespace issues report in case: + * 1. The key doesn't exist and the path is not internal - + * internal path is created for specific cases, for example in version. + * Note: it also covers the delete marker case (since it is in a versioned path) + * IMPORTANT: This function is correct only for read_object_md! + * @param {object} params + * @param {string} file_path + * @param {object} err + */ + _should_update_issues_report(params, file_path, err) { + const { key } = params; + const md_file_path = this._get_file_md_path({ key }); + const non_internal_path = file_path === md_file_path; + const no_such_key_condition = err.code === `ENOENT` && non_internal_path; + return !no_such_key_condition; + } + is_readonly_namespace() { return this.access_mode === 'READ_ONLY'; } @@ -872,8 +890,9 @@ class NamespaceFS { async read_object_md(params, object_sdk) { const fs_context = this.prepare_fs_context(object_sdk); + let file_path; try { - const file_path = await this._find_version_path(fs_context, params, true); + file_path = await this._find_version_path(fs_context, params, true); await this._check_path_in_bucket_boundaries(fs_context, file_path); await this._load_bucket(params, fs_context); let stat = await nb_native().fs.stat(fs_context, file_path); @@ -894,7 +913,9 @@ class NamespaceFS { this._throw_if_delete_marker(stat); return this._get_object_info(params.bucket, params.key, stat, params.version_id || 'null', isDir); } catch (err) { - this.run_update_issues_report(object_sdk, err); + if (this._should_update_issues_report(params, file_path, err)) { + this.run_update_issues_report(object_sdk, err); + } throw this._translate_object_error_codes(err); } } diff --git a/src/server/bg_services/namespace_monitor.js b/src/server/bg_services/namespace_monitor.js index a852d7e562..6c1ef81105 100644 --- a/src/server/bg_services/namespace_monitor.js +++ b/src/server/bg_services/namespace_monitor.js @@ -73,18 +73,18 @@ class NamespaceMonitor { } else { dbg.error('namespace_monitor: invalid endpoint type', endpoint_type); } - await this.update_last_monitoring(nsr._id, nsr.name, endpoint_type); + this.update_last_monitoring(nsr._id, nsr.name, endpoint_type); } catch (err) { - await this.run_update_issues_report(err, nsr); + this.run_update_issues_report(err, nsr); dbg.log1(`test_namespace_resource_validity: namespace resource ${nsr.name} has error as expected`); } }); dbg.log1(`test_namespace_resource_validity finished successfully..`); } - async run_update_issues_report(err, nsr) { + run_update_issues_report(err, nsr) { if (!err.code) return; - await this.client.pool.update_issues_report({ + this.client.pool.update_issues_report({ namespace_resource_id: nsr._id, error_code: String(err.code), time: Date.now(), @@ -98,17 +98,17 @@ class NamespaceMonitor { }); } - async update_last_monitoring(nsr_id, nsr_name, endpoint_type) { + update_last_monitoring(nsr_id, nsr_name, endpoint_type) { dbg.log0(`update_last_monitoring: monitoring namespace ${nsr_name} type ${endpoint_type}, ${nsr_id} finished successfully..`); - await system_store.make_changes({ - update: { - namespace_resources: [{ - _id: nsr_id, - $set: { - last_monitoring: Date.now() - } - }] - } + this.client.pool.update_last_monitoring({ + namespace_resource_id: nsr_id, + last_monitoring: Date.now(), + }, { + auth_token: auth_server.make_auth_token({ + system_id: system_store.data.systems[0]._id, + account_id: system_store.data.systems[0].owner._id, + role: 'admin' + }) }); } diff --git a/src/server/system_services/pool_server.js b/src/server/system_services/pool_server.js index 9112b20d7a..67b63b8082 100644 --- a/src/server/system_services/pool_server.js +++ b/src/server/system_services/pool_server.js @@ -48,6 +48,11 @@ const POOL_HOSTS_INFO_DEFAULTS = Object.freeze({ by_service: {}, }); + +// key: namespace_resource_id, value: { last_monitoring: date, issues: array of issues } +// (see namespace_resource_schema) +const map_issues_and_monitoring_report = new Map(); + const NO_CAPAITY_LIMIT = 1024 ** 2; // 1MB const LOW_CAPACITY_HARD_LIMIT = 30 * (1024 ** 3); // 30GB @@ -1101,7 +1106,12 @@ function calc_namespace_resource_mode(namespace_resource) { AuthenticationFailed: 'auth_failed', }; - const errors_count = _.reduce(namespace_resource.issues_report, (acc, issue) => { + const namespace_resource_id = namespace_resource._id.toString(); + if (!map_issues_and_monitoring_report.has(namespace_resource_id)) { + map_issues_and_monitoring_report.set(namespace_resource_id, { last_monitoring: undefined, issues: [] }); + } + const issues_report = map_issues_and_monitoring_report.get(namespace_resource_id).issues; + const errors_count = _.reduce(issues_report, (acc, issue) => { // skip if error timestamp is before of the latest monitoring if (issue.time < namespace_resource.last_monitoring) { return acc; @@ -1406,24 +1416,32 @@ function update_issues_report(req) { dbg.log0('update_issues_report: can not find namespace_resource, ignoring update of issues report'); return; } - const cur_issues_report = ns_resource.issues_report || []; + + if (!map_issues_and_monitoring_report.has(namespace_resource_id)) { + map_issues_and_monitoring_report.set(namespace_resource_id, { last_monitoring: undefined, issues: [] }); + } + const cur_issues_report = map_issues_and_monitoring_report.get(namespace_resource_id).issues; // save the last 10 errors if (cur_issues_report.length === 10) { cur_issues_report.shift(); } cur_issues_report.push({ error_code, time }); - const updates = { issues_report: cur_issues_report }; - if (monitoring) updates.last_monitoring = time; + if (monitoring) { + map_issues_and_monitoring_report.get(namespace_resource_id).last_monitoring = time; + } + dbg.log3('update_issues_report:', namespace_resource_id, cur_issues_report); +} - return system_store.make_changes({ - update: { - namespace_resources: [{ - _id: ns_resource._id, - $set: updates - }] - } - }); +function update_last_monitoring(req) { + const { namespace_resource_id, last_monitoring } = req.rpc_params; + + if (!map_issues_and_monitoring_report.has(namespace_resource_id)) { + map_issues_and_monitoring_report.set(namespace_resource_id, { last_monitoring: undefined, issues: [] }); + } + + map_issues_and_monitoring_report.get(namespace_resource_id).last_monitoring = last_monitoring; + dbg.log3('update_last_monitoring:', namespace_resource_id, last_monitoring); } // EXPORTS @@ -1454,4 +1472,5 @@ exports.update_cloud_pool = update_cloud_pool; exports.get_optimal_non_mongo_pool_id = get_optimal_non_mongo_pool_id; exports.get_hosts_pool_agent_config = get_hosts_pool_agent_config; exports.update_issues_report = update_issues_report; +exports.update_last_monitoring = update_last_monitoring; exports.calc_namespace_resource_mode = calc_namespace_resource_mode;