From 4a4cac81dfd05e394f759d14f61bd8003f5a34d3 Mon Sep 17 00:00:00 2001 From: jackyalbo Date: Wed, 17 Jul 2019 19:31:38 +0300 Subject: [PATCH] SystemStore to fetch data in deltas --- src/server/bg_services/md_aggregator.js | 81 ++++++- src/server/system_services/bucket_server.js | 17 +- .../system_services/schemas/system_schema.js | 8 +- src/server/system_services/system_store.js | 86 +++++-- src/test/unit_tests/coretest.js | 6 + .../unit_tests/test_md_aggregator_unit.js | 225 ++++++++++-------- src/test/unit_tests/test_system_store.js | 93 +++++++- src/tools/mongodb_bucket_blow.js | 8 +- 8 files changed, 386 insertions(+), 138 deletions(-) diff --git a/src/server/bg_services/md_aggregator.js b/src/server/bg_services/md_aggregator.js index 9398ce8cb1..1167643db5 100644 --- a/src/server/bg_services/md_aggregator.js +++ b/src/server/bg_services/md_aggregator.js @@ -32,18 +32,49 @@ async function run_md_aggregator(md_store, system_store, target_now, delay) { const system = system_store.data.systems[0]; if (!system || system_utils.system_in_maintenance(system._id)) return; + const global_last_update = system.global_last_update; + let has_more = true; let update_range = true; let range = {}; + const md_local_store = { + data: { + buckets: _.clone(system_store.data.buckets), + pools: _.clone(system_store.data.pools), + } + }; + while (has_more) { - if (update_range) range = await find_next_range({ target_now, system_store }); - const changes = range && await range_md_aggregator({ md_store, system_store, range }); + if (update_range) { + range = await find_next_range({ + target_now, + system_store: md_local_store, + global_last_update, + original_system_store: system_store, + }); + } + const changes = range && await range_md_aggregator({ + md_store, + system_store: md_local_store, + range, + global_last_update + }); if (changes) { const update = _.omit(changes, 'more_updates'); await system_store.make_changes({ update }); - await P.delay(delay); update_range = !changes.more_updates; + if (update_range) { + await system_store.make_changes({ + update: { + systems: [{ + _id: system._id, + global_last_update: range.till_time, + }] + } + }); + } + await P.delay(delay); } else { has_more = false; } @@ -56,13 +87,15 @@ async function run_md_aggregator(md_store, system_store, target_now, delay) { function find_minimal_range({ target_now, system_store, + global_last_update }) { let from_time = target_now; let till_time = target_now; let should_reset_all = false; _.forEach(system_store.data.buckets, bucket => { - const last_update = _.get(bucket, 'storage_stats.last_update') || config.NOOBAA_EPOCH; + const bucket_last_update = _.get(bucket, 'storage_stats.last_update') || config.NOOBAA_EPOCH; + const last_update = global_last_update > bucket_last_update ? global_last_update : bucket_last_update; if (last_update > target_now) { dbg.error('find_next_range: time skew detected for bucket', bucket.name, 'last_update', last_update, @@ -78,7 +111,8 @@ function find_minimal_range({ } }); _.forEach(system_store.data.pools, pool => { - const last_update = _.get(pool, 'storage_stats.last_update') || config.NOOBAA_EPOCH; + const pool_last_update = _.get(pool, 'storage_stats.last_update') || config.NOOBAA_EPOCH; + const last_update = global_last_update > pool_last_update ? global_last_update : pool_last_update; if (last_update > target_now) { dbg.error('find_next_range: time skew detected for pool', pool.name, 'last_update', last_update, @@ -100,25 +134,32 @@ function find_minimal_range({ function find_next_range({ target_now, system_store, + global_last_update, + original_system_store, }) { let { from_time, till_time, should_reset_all } = find_minimal_range({ target_now, system_store, + global_last_update }); + // printing the range and the buckets/pools relative info dbg.log0('find_next_range:', 'from_time', from_time, 'till_time*', till_time - from_time, - 'target_now*', target_now - from_time + 'target_now*', target_now - from_time, + 'global_last_update', global_last_update, ); _.forEach(system_store.data.buckets, bucket => { - const last_update = _.get(bucket, 'storage_stats.last_update') || config.NOOBAA_EPOCH; + const bucket_last_update = _.get(bucket, 'storage_stats.last_update') || config.NOOBAA_EPOCH; + const last_update = bucket_last_update > global_last_update ? bucket_last_update : global_last_update; dbg.log1('find_next_range: bucket', bucket.name, 'last_update*', last_update - from_time ); }); _.forEach(system_store.data.pools, pool => { - const last_update = _.get(pool, 'storage_stats.last_update') || config.NOOBAA_EPOCH; + const pool_last_update = _.get(pool, 'storage_stats.last_update') || config.NOOBAA_EPOCH; + const last_update = pool_last_update > global_last_update ? pool_last_update : global_last_update; dbg.log1('find_next_range: pool', pool.name, 'last_update*', last_update - from_time ); @@ -132,8 +173,12 @@ function find_next_range({ ); // Assigning NOOBAA_EPOCH so we will gather all data again till the new time // This means that we will be eventually consistent - return system_store.make_changes({ + return original_system_store.make_changes({ update: { + systems: [{ + _id: original_system_store.data.systems[0]._id, + global_last_update: config.NOOBAA_EPOCH, + }], buckets: _.map(system_store.data.buckets, bucket => ({ _id: bucket._id, storage_stats: { @@ -193,8 +238,8 @@ function range_md_aggregator({ const till_time = range.till_time; let more_updates = false; - const filtered_buckets = _.filter(system_store.data.buckets, bucket => bucket.storage_stats.last_update === from_time); - const filtered_pools = _.filter(system_store.data.pools, pool => pool.storage_stats.last_update === from_time); + const filtered_buckets = _.filter(system_store.data.buckets, bucket => bucket.storage_stats.last_update <= from_time); + const filtered_pools = _.filter(system_store.data.pools, pool => pool.storage_stats.last_update <= from_time); if (filtered_buckets.length > config.MD_AGGREGATOR_BATCH || filtered_pools.length > config.MD_AGGREGATOR_BATCH) { more_updates = true; } @@ -228,6 +273,7 @@ function range_md_aggregator({ ); const buckets_updates = _.map(buckets, bucket => { + let dont_change_last_update = false; const new_storage_stats = calculate_new_bucket({ bucket, existing_chunks_aggregate, @@ -237,30 +283,41 @@ function range_md_aggregator({ existing_blocks_aggregate, deleted_blocks_aggregate }); + if (_.isEqual(_.omit(bucket.storage_stats, 'last_update'), new_storage_stats)) { + dont_change_last_update = true; + } new_storage_stats.last_update = till_time; + bucket.storage_stats = new_storage_stats; return { _id: bucket._id, storage_stats: new_storage_stats, + dont_change_last_update }; }); const pools_updates = _.map(pools, pool => { + let dont_change_last_update = false; const new_storage_stats = calculate_new_pool({ pool, existing_blocks_aggregate, deleted_blocks_aggregate }); + if (_.isEqual(_.omit(pool.storage_stats, 'last_update'), new_storage_stats)) { + dont_change_last_update = true; + } new_storage_stats.last_update = till_time; + pool.storage_stats = new_storage_stats; return { _id: pool._id, storage_stats: new_storage_stats, + dont_change_last_update, }; }); return { buckets: buckets_updates, pools: pools_updates, - more_updates + more_updates, }; }); } diff --git a/src/server/system_services/bucket_server.js b/src/server/system_services/bucket_server.js index 9ceac18a57..1df9727e13 100644 --- a/src/server/system_services/bucket_server.js +++ b/src/server/system_services/bucket_server.js @@ -57,7 +57,9 @@ function new_bucket_defaults(name, system_id, tiering_policy_id, tag) { objects_size: 0, objects_count: 0, objects_hist: [], - last_update: now - (2 * config.MD_GRACE_IN_MILLISECONDS) + // new buckets creation date will be rounded down to config.MD_AGGREGATOR_INTERVAL (30 seconds) + last_update: (Math.floor(now / config.MD_AGGREGATOR_INTERVAL) * config.MD_AGGREGATOR_INTERVAL) - + (2 * config.MD_GRACE_IN_MILLISECONDS), }, lambda_triggers: [], versioning: 'DISABLED' @@ -1102,9 +1104,12 @@ function get_bucket_info({ }; const metrics = _calc_metrics({ bucket, nodes_aggregate_pool, hosts_aggregate_pool, tiering_pools_status, info }); + const bucket_last_update = _.get(bucket, 'storage_stats.last_update') || config.NOOBAA_EPOCH; + const system_last_update = _.get(bucket, 'system.global.last_update') || config.NOOBAA_EPOCH; + const last_update = Math.max(system_last_update, bucket_last_update); info.usage_by_pool = { pools: {}, - last_update: _.get(bucket, 'storage_stats.last_update') + last_update, }; info.usage_by_pool.pools = []; @@ -1255,6 +1260,10 @@ function _calc_metrics({ const bucket_total = bucket_free.plus(bucket_used) .plus(bucket_used_other); + const bucket_last_update = _.get(bucket, 'storage_stats.last_update') || config.NOOBAA_EPOCH; + const system_last_update = _.get(bucket, 'system.global_last_update') || config.NOOBAA_EPOCH; + const last_update = Math.max(system_last_update, bucket_last_update); + info.storage = { values: size_utils.to_bigint_storage({ used: bucket_used, @@ -1262,7 +1271,7 @@ function _calc_metrics({ total: bucket_total, free: bucket_free, }), - last_update: _.get(bucket, 'storage_stats.last_update') + last_update }; const actual_free = size_utils.json_to_bigint(_.get(info, 'tiering.data.free') || 0); @@ -1291,7 +1300,7 @@ function _calc_metrics({ size_reduced: bucket_chunks_capacity, free: actual_free, available_for_upload, - last_update: _.get(bucket, 'storage_stats.last_update') + last_update, }); return { diff --git a/src/server/system_services/schemas/system_schema.js b/src/server/system_services/schemas/system_schema.js index fc3d883381..85c9929741 100644 --- a/src/server/system_services/schemas/system_schema.js +++ b/src/server/system_services/schemas/system_schema.js @@ -212,7 +212,11 @@ module.exports = { } } } - }, - } + } + }, + + global_last_update: { + idate: true + }, } }; diff --git a/src/server/system_services/system_store.js b/src/server/system_services/system_store.js index 6c84c43e1f..5f75d5200b 100644 --- a/src/server/system_services/system_store.js +++ b/src/server/system_services/system_store.js @@ -27,6 +27,7 @@ const size_utils = require('../../util/size_utils'); const os_utils = require('../../util/os_utils'); const mongo_utils = require('../../util/mongo_utils'); const mongo_client = require('../../util/mongo_client'); +const config = require('../../../config'); const { RpcError } = require('../../rpc'); const COLLECTIONS = [{ @@ -410,6 +411,7 @@ class SystemStore extends EventEmitter { super(); // // TODO: This is currently used as a cache, maybe will be moved in the future // this.valid_for_alloc_by_tier = {}; + this.last_update_time = config.NOOBAA_EPOCH; this.is_standalone = options.standalone; this.is_cluster_master = false; this.is_finished_initial_load = false; @@ -438,6 +440,11 @@ class SystemStore extends EventEmitter { .catch(_.noop); } + clean_system_store() { + this.old_db_data = undefined; + this.last_update_time = config.NOOBAA_EPOCH; + } + async refresh() { let load_time = 0; if (this.data) { @@ -463,7 +470,7 @@ class SystemStore extends EventEmitter { let new_data = new SystemStoreData(); let millistamp = time_utils.millistamp(); await this._register_for_changes(); - await this._read_data_from_db(new_data); + await this._read_new_data_from_db(new_data); const secret = await os_utils.read_server_secret(); this._server_secret = secret; dbg.log1('SystemStore: fetch took', time_utils.millitook(millistamp)); @@ -471,10 +478,11 @@ class SystemStore extends EventEmitter { dbg.log1('SystemStore: fetch data', util.inspect(new_data, { depth: 4 })); + this.old_db_data = this.old_db_data ? this._update_data_from_new(this.old_db_data, new_data) : new_data; + this.data = _.cloneDeep(this.old_db_data); millistamp = time_utils.millistamp(); - new_data.rebuild(); + this.data.rebuild(); dbg.log1('SystemStore: rebuild took', time_utils.millitook(millistamp)); - this.data = new_data; this.emit('load'); this.is_finished_initial_load = true; return this.data; @@ -485,6 +493,15 @@ class SystemStore extends EventEmitter { }); } + _update_data_from_new(data, new_data) { + COLLECTIONS.forEach(col => { + const old_data = data[col.name]; + const res = _.unionBy(new_data[col.name], old_data, doc => doc._id.toString()); + new_data[col.name] = res.filter(doc => !doc.deleted); + }); + return new_data; + } + async _register_for_changes() { if (this.is_standalone) { @@ -523,6 +540,32 @@ class SystemStore extends EventEmitter { )); } + _read_new_data_from_db(target) { + const now = Date.now(); + let newly_updated_query = { + last_update: { + $gte: this.last_update_time, + } + }; + return mongo_client.instance().connect() + .then(() => P.map(COLLECTIONS, + col => mongo_client.instance().collection(col.name) + .find(newly_updated_query, { + projection: { last_update: 0 } + }) + .toArray() + .then(res => { + for (const item of res) { + this._check_schema(col, item, 'warn'); + } + target[col.name] = res; + }) + )) + .then(() => { + this.last_update_time = now; + }); + } + _check_schema(col, item, warn) { return mongo_client.instance().validate(col.name, item, warn); } @@ -610,7 +653,8 @@ class SystemStore extends EventEmitter { */ async make_changes(changes) { const bulk_per_collection = {}; - const now = new Date(); + const now = Date.now(); + let any_news = false; dbg.log0('SystemStore.make_changes:', util.inspect(changes, { depth: 5 })); @@ -636,6 +680,8 @@ class SystemStore extends EventEmitter { _.each(list, item => { this._check_schema(col, item); data.check_indexes(col, item); + item.last_update = now; + any_news = true; get_bulk(name).insert(item); }); }); @@ -643,7 +689,8 @@ class SystemStore extends EventEmitter { const col = get_collection(name); _.each(list, item => { data.check_indexes(col, item); - let updates = _.omit(item, '_id', '$find'); + let dont_change_last_update = Boolean(item.dont_change_last_update); + let updates = _.omit(item, '_id', '$find', 'dont_change_last_update'); let finds = item.$find || _.pick(item, '_id'); if (_.isEmpty(updates)) return; let keys = _.keys(updates); @@ -671,6 +718,11 @@ class SystemStore extends EventEmitter { // if (updates.$set) { // this._check_schema(col, updates.$set, 'warn'); // } + if (!dont_change_last_update) { + if (!updates.$set) updates.$set = {}; + updates.$set.last_update = now; + any_news = true; + } get_bulk(name) .find(finds) .updateOne(updates); @@ -679,13 +731,15 @@ class SystemStore extends EventEmitter { _.each(changes.remove, (list, name) => { get_collection(name); _.each(list, id => { + any_news = true; get_bulk(name) .find({ _id: id }) .updateOne({ $set: { - deleted: now + deleted: now, + last_update: now, } }); }); @@ -707,15 +761,17 @@ class SystemStore extends EventEmitter { bulk => bulk.length && bulk.execute({ j: true }) )); - if (this.is_standalone) { - await this.load(); - } else { - // notify all the cluster (including myself) to reload - await server_rpc.client.redirector.publish_to_cluster({ - method_api: 'server_inter_process_api', - method_name: 'load_system_store', - target: '' - }); + if (any_news) { + if (this.is_standalone) { + await this.load(); + } else { + // notify all the cluster (including myself) to reload + await server_rpc.client.redirector.publish_to_cluster({ + method_api: 'server_inter_process_api', + method_name: 'load_system_store', + target: '' + }); + } } } diff --git a/src/test/unit_tests/coretest.js b/src/test/unit_tests/coretest.js index 8496385990..264e5e7fd3 100644 --- a/src/test/unit_tests/coretest.js +++ b/src/test/unit_tests/coretest.js @@ -118,6 +118,12 @@ function setup({ incomplete_rpc_coverage } = {}) { await mongo_client.instance().db().dropDatabase(); await announce('mongo_client reconnect()'); await mongo_client.instance().reconnect(); + system_store.clean_system_store(); + await server_rpc.client.redirector.publish_to_cluster({ + method_api: 'server_inter_process_api', + method_name: 'load_system_store', + target: '' + }); await announce('ensure_support_account()'); await account_server.ensure_support_account(); diff --git a/src/test/unit_tests/test_md_aggregator_unit.js b/src/test/unit_tests/test_md_aggregator_unit.js index fd5a5715fa..0508e590da 100644 --- a/src/test/unit_tests/test_md_aggregator_unit.js +++ b/src/test/unit_tests/test_md_aggregator_unit.js @@ -17,10 +17,97 @@ const config = require('../../../config.js'); const MDStore = require('../../server/object_services/md_store').MDStore; const md_aggregator = require('../../server/bg_services/md_aggregator.js'); +function make_test_system_store(last_update, md_store) { + + const systems = _.times(1, i => ({ + _id: md_store.make_md_id(), + name: `system${i}`, + owner: md_store.make_md_id(), + })); + + const buckets = _.times(10, i => ({ + _id: md_store.make_md_id(), + name: `bucket${i}`, + storage_stats: { + last_update, + chunks_capacity: 0, + blocks_size: 0, + pools: {}, + objects_size: 0, + objects_count: 0, + objects_hist: [], + }, + })); + + const pools = _.times(10, i => ({ + _id: md_store.make_md_id(), + name: `pool${i}`, + storage_stats: { + last_update, + blocks_size: 0, + } + })); + + const system_store = { + is_finished_initial_load: true, + data: { + buckets, + pools, + systems, + }, + changes_list: [], + debug: true, + find_system_by_id(id) { + return _.find(this.data.systems, system => String(system._id) === String(id)); + }, + find_bucket_by_id(id) { + return _.find(this.data.buckets, bucket => String(bucket._id) === String(id)); + }, + find_pool_by_id(id) { + return _.find(this.data.pools, pool => String(pool._id) === String(id)); + }, + make_changes(changes) { + this.changes_list.push(changes); + if (this.debug) { + coretest.log('system store changes #', + this.changes_list.length, + util.inspect(changes, true, null, true) + ); + } + if (changes.update.systems) { + changes.update.systems.forEach(updates => { + const system = this.find_system_by_id(updates._id); + _.forEach(updates, (val, key) => { + if (key !== '_id') _.set(system, key, val); + }); + }); + } + if (changes.update.buckets) { + changes.update.buckets.forEach(updates => { + const bucket = this.find_bucket_by_id(updates._id); + _.forEach(updates, (val, key) => { + if (key !== '_id') _.set(bucket, key, val); + }); + }); + } + if (changes.update.pools) { + changes.update.pools.forEach(updates => { + const pool = this.find_pool_by_id(updates._id); + _.forEach(updates, (val, key) => { + if (key !== '_id') _.set(pool, key, val); + }); + }); + } + return P.resolve(); + }, + }; + + return system_store; +} + mocha.describe('md_aggregator', function() { const md_store = new MDStore(`_test_md_store_${Date.now().toString(36)}`); - const system_id = md_store.make_md_id(); mocha.describe('calculations', function() { @@ -214,9 +301,10 @@ mocha.describe('md_aggregator', function() { self.timeout(30000); const last_update = Date.now(); const target_now = last_update + CYCLE; - const system_store = make_test_system_store(last_update); + const system_store = make_test_system_store(last_update, md_store); const block_id1 = md_store.make_md_id_from_time(last_update + sub_cycle()); coretest.log('block 1 addtion date', block_id1.getTimestamp().getTime()); + const system_id = system_store.data.systems[0]._id; return P.resolve() .then(() => md_store.insert_blocks([{ @@ -243,7 +331,7 @@ mocha.describe('md_aggregator', function() { })) .then(() => md_aggregator.run_md_aggregator(md_store, system_store, target_now, 0)) .then(() => { - assert.strictEqual(system_store.changes_list.length, 1); + assert.strictEqual(system_store.changes_list.length, 2); const changes = system_store.changes_list[0]; assert.strictEqual(changes.update.buckets.length, system_store.data.buckets.length); assert.strictEqual(changes.update.pools.length, system_store.data.pools.length); @@ -263,18 +351,19 @@ mocha.describe('md_aggregator', function() { self.timeout(30000); const last_update = Date.now(); const target_now = last_update + (2 * CYCLE); - const system_store = make_test_system_store(last_update); + const system_store = make_test_system_store(last_update, md_store); const block_id1 = md_store.make_md_id_from_time(last_update + sub_cycle()); const block_id2 = md_store.make_md_id_from_time(last_update + sub_cycle()); const bucket = system_store.data.buckets[0]; const pool = system_store.data.pools[0]; coretest.log('block 1 addtion date', block_id1.getTimestamp().getTime()); coretest.log('block 2 addtion date', block_id2.getTimestamp().getTime()); + const system_id = system_store.data.systems[0]._id; return P.resolve() .then(() => md_store.insert_blocks([ - make_block(block_id1, 120, bucket, pool), - make_block(block_id2, 350, bucket, pool), + make_block(block_id1, 120, bucket, pool, system_id), + make_block(block_id2, 350, bucket, pool, system_id), ])) .then(() => md_store.insert_chunks([{ _id: md_store.make_md_id_from_time(last_update + sub_cycle()), @@ -294,7 +383,7 @@ mocha.describe('md_aggregator', function() { })) .then(() => md_aggregator.run_md_aggregator(md_store, system_store, target_now, 0)) .then(() => { - assert.strictEqual(system_store.changes_list.length, 2); + assert.strictEqual(system_store.changes_list.length, 4); const changes0 = system_store.changes_list[0]; assert.strictEqual(changes0.update.buckets.length, system_store.data.buckets.length); assert.strictEqual(changes0.update.pools.length, system_store.data.pools.length); @@ -306,7 +395,7 @@ mocha.describe('md_aggregator', function() { changes0.update.pools.forEach(item => { assert.strictEqual(item.storage_stats.last_update, last_update + CYCLE); }); - const changes1 = system_store.changes_list[1]; + const changes1 = system_store.changes_list[2]; assert.strictEqual(changes1.update.buckets[0].storage_stats.blocks_size, 120); assert.strictEqual(changes1.update.pools[0].storage_stats.blocks_size, 120); changes1.update.buckets.forEach(item => { @@ -323,17 +412,18 @@ mocha.describe('md_aggregator', function() { self.timeout(30000); const last_update = Date.now(); const target_now = last_update + (2 * CYCLE); - const system_store = make_test_system_store(last_update); + const system_store = make_test_system_store(last_update, md_store); const bucket = system_store.data.buckets[0]; const pool = system_store.data.pools[0]; const blocks_to_delete = []; + const system_id = system_store.data.systems[0]._id; return P.resolve() .then(() => { const blocks = []; for (let i = 0; i < 1024; ++i) { // 1 PB const block_id = md_store.make_md_id_from_time(last_update + sub_cycle()); - blocks.push(make_block(block_id, 1024 * 1024 * 1024 * 1024, bucket, pool)); + blocks.push(make_block(block_id, 1024 * 1024 * 1024 * 1024, bucket, pool, system_id)); if (i % 2) blocks_to_delete.push(block_id); } return md_store.insert_blocks(blocks); @@ -356,7 +446,7 @@ mocha.describe('md_aggregator', function() { })) .then(() => md_aggregator.run_md_aggregator(md_store, system_store, target_now, 0)) .then(() => { - assert.strictEqual(system_store.changes_list.length, 2); + assert.strictEqual(system_store.changes_list.length, 4); const changes0 = system_store.changes_list[0]; assert.strictEqual(changes0.update.buckets.length, system_store.data.buckets.length); assert.strictEqual(changes0.update.pools.length, system_store.data.pools.length); @@ -368,7 +458,7 @@ mocha.describe('md_aggregator', function() { changes0.update.pools.forEach(item => { assert.strictEqual(item.storage_stats.last_update, last_update + CYCLE); }); - const changes1 = system_store.changes_list[1]; + const changes1 = system_store.changes_list[2]; assert.deepEqual(changes1.update.buckets[0].storage_stats.blocks_size, Math.pow(2, 49)); assert.deepEqual(changes1.update.pools[0].storage_stats.blocks_size, Math.pow(2, 49)); changes1.update.buckets.forEach(item => { @@ -385,10 +475,11 @@ mocha.describe('md_aggregator', function() { const self = this; // eslint-disable-line no-invalid-this self.timeout(30000); const last_update = Date.now(); - const system_store = make_test_system_store(last_update); + const system_store = make_test_system_store(last_update, md_store); const num_ranges = system_store.data.buckets.length; const range = CYCLE / 2; const target_now = last_update + (num_ranges * range); + const system_id = system_store.data.systems[0]._id; return P.resolve() .then(() => md_store.insert_blocks(_.times(num_ranges, i => { @@ -398,7 +489,7 @@ mocha.describe('md_aggregator', function() { bucket.storage_stats.last_update = current_cycle; pool.storage_stats.last_update = current_cycle; const block_id = md_store.make_md_id_from_time(current_cycle + (sub_cycle() / 2)); - return make_block(block_id, 666, bucket, pool); + return make_block(block_id, 666, bucket, pool, system_id); }))) .then(() => md_store.insert_chunks([{ _id: md_store.make_md_id_from_time(last_update + sub_cycle()), @@ -415,18 +506,20 @@ mocha.describe('md_aggregator', function() { })) .then(() => md_aggregator.run_md_aggregator(md_store, system_store, target_now, 0)) .then(() => { - assert.strictEqual(system_store.changes_list.length, num_ranges); + assert.strictEqual(system_store.changes_list.length, num_ranges * 2); system_store.changes_list.forEach((changes, i) => { - assert.strictEqual(changes.update.buckets.length, i + 1); - assert.strictEqual(changes.update.pools.length, i + 1); - assert.strictEqual(changes.update.buckets[0].storage_stats.blocks_size, 666); - assert.strictEqual(changes.update.pools[0].storage_stats.blocks_size, 666); - changes.update.buckets.forEach(item => { - assert.strictEqual(item.storage_stats.last_update, last_update + ((i + 1) * range)); - }); - changes.update.pools.forEach(item => { - assert.strictEqual(item.storage_stats.last_update, last_update + ((i + 1) * range)); - }); + if (!changes.update.systems) { + assert.strictEqual(changes.update.buckets.length, (i / 2) + 1); + assert.strictEqual(changes.update.pools.length, (i / 2) + 1); + assert.strictEqual(changes.update.buckets[0].storage_stats.blocks_size, 666); + assert.strictEqual(changes.update.pools[0].storage_stats.blocks_size, 666); + changes.update.buckets.forEach(item => { + assert.strictEqual(item.storage_stats.last_update, last_update + (((i / 2) + 1) * range)); + }); + changes.update.pools.forEach(item => { + assert.strictEqual(item.storage_stats.last_update, last_update + (((i / 2) + 1) * range)); + }); + } }); }); }); @@ -436,7 +529,7 @@ mocha.describe('md_aggregator', function() { self.timeout(30000); const last_update = Date.now(); const target_now = last_update - 1; - const system_store = make_test_system_store(last_update); + const system_store = make_test_system_store(last_update, md_store); return P.resolve() .then(() => md_aggregator.run_md_aggregator(md_store, system_store, target_now, 0)) @@ -465,91 +558,25 @@ mocha.describe('md_aggregator', function() { const num_splits = 13; const last_update = Date.now(); const target_now = last_update + (num_splits * CYCLE); - const system_store = make_test_system_store(last_update); + const system_store = make_test_system_store(last_update, md_store); system_store.debug = false; return P.resolve() .then(() => md_aggregator.run_md_aggregator(md_store, system_store, target_now, 0)) .then(() => { - assert.strictEqual(system_store.changes_list.length, num_splits); + assert.strictEqual(system_store.changes_list.length, num_splits * 2); system_store.changes_list.forEach((changes, i) => { - assert.strictEqual(changes.update.buckets.length, system_store.data.buckets.length); - assert.strictEqual(changes.update.pools.length, system_store.data.pools.length); + if (!changes.update.systems) { + assert.strictEqual(changes.update.buckets.length, system_store.data.buckets.length); + assert.strictEqual(changes.update.pools.length, system_store.data.pools.length); + } }); }); }); }); - - function make_test_system_store(last_update) { - - const buckets = _.times(10, i => ({ - _id: md_store.make_md_id(), - name: `bucket${i}`, - storage_stats: { - last_update, - chunks_capacity: 0, - blocks_size: 0, - pools: {}, - objects_size: 0, - objects_count: 0, - objects_hist: [], - }, - })); - - const pools = _.times(10, i => ({ - _id: md_store.make_md_id(), - name: `pool${i}`, - storage_stats: { - last_update, - blocks_size: 0, - } - })); - - const system_store = { - is_finished_initial_load: true, - data: { - buckets, - pools, - }, - changes_list: [], - debug: true, - find_bucket_by_id(id) { - return _.find(this.data.buckets, bucket => String(bucket._id) === String(id)); - }, - find_pool_by_id(id) { - return _.find(this.data.pools, pool => String(pool._id) === String(id)); - }, - make_changes(changes) { - this.changes_list.push(changes); - if (this.debug) { - coretest.log('system store changes #', - this.changes_list.length, - util.inspect(changes, true, null, true) - ); - } - changes.update.buckets.forEach(updates => { - const bucket = this.find_bucket_by_id(updates._id); - _.forEach(updates, (val, key) => { - if (key !== '_id') _.set(bucket, key, val); - }); - }); - changes.update.pools.forEach(updates => { - const pool = this.find_pool_by_id(updates._id); - _.forEach(updates, (val, key) => { - if (key !== '_id') _.set(pool, key, val); - }); - }); - return P.resolve(); - }, - }; - - return system_store; - } - - - function make_block(block_id, size, bucket, pool) { + function make_block(block_id, size, bucket, pool, system_id) { return { _id: block_id, system: system_id, diff --git a/src/test/unit_tests/test_system_store.js b/src/test/unit_tests/test_system_store.js index b57b93125c..7ea772f0f4 100644 --- a/src/test/unit_tests/test_system_store.js +++ b/src/test/unit_tests/test_system_store.js @@ -9,9 +9,8 @@ const _ = require('lodash'); const mongo_client = require('../../util/mongo_client'); coretest.setup(); -// const _ = require('lodash'); const mocha = require('mocha'); -// const assert = require('assert'); +const assert = require('assert'); const system_store = require('../../server/system_services/system_store').get_instance(); @@ -29,14 +28,17 @@ function _get_wiredtiger_log_diff(a, b) { mocha.describe('system_store', function() { // eslint-disable-next-line no-undef - after(function() { + afterEach(function() { // hacky - all the added systems were failing some of the next tests // remove all dummy systems coretest.log('cleaning test systems:'); - return mongo_client.instance().collection('systems').remove({ + return mongo_client.instance().collection('systems').deleteMany({ name: { $nin: ['demo', 'coretest'] } + }).then(() => { + system_store.clean_system_store(); + return system_store.load(); }); }); @@ -98,6 +100,89 @@ mocha.describe('system_store', function() { }); }); + mocha.it('Check make_changes updates new created systems', function() { + const LOOP_CYCLES = 10; + let first_data_store; + return system_store.load() + .then(data1 => { + first_data_store = _.cloneDeep(data1); + console.log('first_data_store', first_data_store.systems.length); + return promise_utils.loop(LOOP_CYCLES, cycle => system_store.make_changes({ + insert: { + systems: [{ + _id: system_store.new_system_store_id(), + name: `JenTheMajesticSlothSystemStoreLoop2-${cycle}`, + owner: system_store.new_system_store_id() + }] + } + })); + }) + .then(() => system_store.load()) + .then(data2 => { + console.log('new_data_store', data2.systems.length); + assert.deepStrictEqual(first_data_store.systems.length + LOOP_CYCLES, data2.systems.length); + }); + }); + + mocha.it('Check make_changes returns no diff when not changing last_update', function() { + const system_id = system_store.new_system_store_id(); + const orig_name = `JenTheMajesticSlothSystemStoreLoop3`; + return system_store.load() + .then(() => system_store.make_changes({ + insert: { + systems: [{ + _id: system_id, + name: orig_name, + owner: system_store.new_system_store_id(), + }] + } + })) + .then(() => system_store.make_changes({ + update: { + systems: [{ + _id: system_id, + name: 'new_name', + dont_change_last_update: true + }] + } + })) + .then(() => system_store.load()) + .then(data2 => { + console.log('new_data_store', data2.systems.length); + assert.strictEqual(data2.systems[0].name, orig_name); + }); + }); + + mocha.it('Check make_changes returns diff when changing last_update', function() { + const system_id = system_store.new_system_store_id(); + const orig_name = `JenTheMajesticSlothSystemStoreLoop3`; + return system_store.load() + .then(() => system_store.make_changes({ + insert: { + systems: [{ + _id: system_id, + name: orig_name, + owner: system_store.new_system_store_id(), + }] + } + })) + .then(() => system_store.make_changes({ + update: { + systems: [{ + _id: system_id, + name: 'new_name', + dont_change_last_update: false + }] + } + })) + .then(() => system_store.load()) + .then(data2 => { + console.log('new_data_store', data2.systems.length); + assert.strictEqual(data2.systems[0].name, 'new_name'); + }); + }); + + // TODO continue test_system_store ... }); diff --git a/src/tools/mongodb_bucket_blow.js b/src/tools/mongodb_bucket_blow.js index 670be7e9de..c51cd0ed36 100644 --- a/src/tools/mongodb_bucket_blow.js +++ b/src/tools/mongodb_bucket_blow.js @@ -12,6 +12,7 @@ let system_id = db.systems.findOne()._id; let pool_id = db.pools.findOne({ resource_type: { $ne: "INTERNAL" } })._id; let ccc = db.chunk_configs.findOne()._id; +let now = Date.now(); for (let j = 0; j < 5; ++j) { let array_of_tiers = []; @@ -31,6 +32,7 @@ for (let j = 0; j < 5; ++j) { _id: new ObjectId(), spread_pools: [pool_id], }], + last_update: now, }); array_of_policies.push({ _id: policy_id, @@ -45,7 +47,8 @@ for (let j = 0; j < 5; ++j) { chunk_split_config: { avg_chunk: 4194304, delta_chunk: 1048576 - } + }, + last_update: now }); array_of_buckets.push({ _id: bucket_id, @@ -64,7 +67,8 @@ for (let j = 0; j < 5; ++j) { last_update: Date.now() - (2 * 90000) }, lambda_triggers: [], - versioning: "DISABLED" + versioning: "DISABLED", + last_update: now, }); } db.tiers.insert(array_of_tiers);