From 9d978edb13acb0d05b32f24ab95f02808f284043 Mon Sep 17 00:00:00 2001 From: jackyalbo Date: Thu, 30 May 2019 11:07:33 +0300 Subject: [PATCH] Adding location support + some tiering fixes --- config.js | 3 +- .../src/app/reducers/host-parts-reducer.js | 9 +- src/sdk/map_client.js | 2 +- src/server/bg_services/tier_ttf_worker.js | 53 +++--- src/server/object_services/map_builder.js | 3 + src/server/object_services/map_db_types.js | 2 +- src/server/object_services/map_reader.js | 156 +++++++++++------- src/server/object_services/map_server.js | 31 +++- src/server/object_services/mapper.js | 1 + src/server/object_services/md_store.js | 28 +--- .../schemas/object_multipart_indexes.js | 1 + .../schemas/object_part_indexes.js | 1 + src/test/unit_tests/index.js | 2 +- src/util/mongo_client.js | 10 +- tsconfig.json | 1 + 15 files changed, 183 insertions(+), 120 deletions(-) diff --git a/config.js b/config.js index a73f1ae314..932870f3b1 100644 --- a/config.js +++ b/config.js @@ -188,8 +188,7 @@ config.MIRROR_WRITER_EMPTY_DELAY = 30000; config.MIRROR_WRITER_MARKER_STORE_PERIOD = 10 * 60000; // store markers every 10 min config.TIER_TTF_WORKER_ENABLED = true; -config.TIER_TTF_WORKER_BATCH_SIZE = 50; -config.TIER_TTF_WORKER_BATCH_DELAY = 50; +config.TIER_TTF_WORKER_BATCH_DELAY = 500; config.TIER_TTF_WORKER_EMPTY_DELAY = 30000; config.TIER_SPILLBACK_WORKER_ENABLED = true; diff --git a/frontend/src/app/reducers/host-parts-reducer.js b/frontend/src/app/reducers/host-parts-reducer.js index c3ac2af2ee..69705adc30 100644 --- a/frontend/src/app/reducers/host-parts-reducer.js +++ b/frontend/src/app/reducers/host-parts-reducer.js @@ -82,15 +82,13 @@ function onFailFetchHostObjects(state, { payload }) { function onFetchCloudResourceObjects(state, { payload }) { return onFetchHostObjects( - state, - { payload: _resourceQueryToHostQuery(payload) } + state, { payload: _resourceQueryToHostQuery(payload) } ); } function onCompleteFetchCloudResourceObjects(state, { payload }) { return onCompleteFetchHostObjects( - state, - { + state, { payload: { query: _resourceQueryToHostQuery(payload.query), response: payload.response @@ -101,8 +99,7 @@ function onCompleteFetchCloudResourceObjects(state, { payload }) { function onFailCloudResourceObjects(state, { payload }) { return onFailFetchHostObjects( - state, - { + state, { payload: { query: _resourceQueryToHostQuery(payload.query), error: payload.error diff --git a/src/sdk/map_client.js b/src/sdk/map_client.js index 726dbb097f..a8b61181b0 100644 --- a/src/sdk/map_client.js +++ b/src/sdk/map_client.js @@ -143,7 +143,7 @@ class MapClient { async put_mapping() { // TODO should we filter out chunk.had_errors from put mapping? await this.rpc_client.object.put_mapping({ - chunks: this.chunks.map(chunk => chunk.to_api()), + chunks: this.chunks.filter(chunk => !chunk.had_errors).map(chunk => chunk.to_api()), move_to_tier: this.move_to_tier && this.move_to_tier._id, }); } diff --git a/src/server/bg_services/tier_ttf_worker.js b/src/server/bg_services/tier_ttf_worker.js index 11ae400310..2d4cb13e57 100644 --- a/src/server/bg_services/tier_ttf_worker.js +++ b/src/server/bg_services/tier_ttf_worker.js @@ -6,12 +6,11 @@ const _ = require('lodash'); const config = require('../../../config'); const dbg = require('../../util/debug_module')(__filename); const system_store = require('../system_services/system_store').get_instance(); -const UsageReportStore = require('../analytic_services/usage_report_store').UsageReportStore; +const usage_aggregator = require('./usage_aggregator'); const MDStore = require('../object_services/md_store').MDStore; const system_utils = require('../utils/system_utils'); const size_utils = require('../../util/size_utils'); const auth_server = require('../common_services/auth_server'); -const nodes_client = require('../node_services/nodes_client'); const node_allocator = require('../node_services/node_allocator'); const mapper = require('../object_services/mapper'); @@ -20,6 +19,7 @@ class TieringTTFWorker { this.name = name; this.client = client; this.initialized = false; + this.last_run = 'force'; } _can_run() { @@ -41,6 +41,7 @@ class TieringTTFWorker { const multi_tiered_buckets = this._get_multi_tiered_buckets(); if (!multi_tiered_buckets || !multi_tiered_buckets.length) { dbg.log0('no buckets with more than one tier. nothing to do'); + this.last_run = 'force'; return config.TIER_TTF_WORKER_EMPTY_DELAY; } @@ -50,7 +51,7 @@ class TieringTTFWorker { } _get_multi_tiered_buckets() { - return system_store.data.buckets.filter(bucket => bucket.tiering.tiers.length > 2); + return system_store.data.buckets.filter(bucket => bucket.tiering.tiers.length > 1); } async _rebuild_need_to_move_chunks(buckets) { @@ -63,28 +64,35 @@ class TieringTTFWorker { const now = Date.now(); for (const bucket of buckets) { - await node_allocator.refresh_tiering_alloc(bucket.tiering); + await node_allocator.refresh_tiering_alloc(bucket.tiering, this.last_run); const tiering_status = node_allocator.get_tiering_status(bucket.tiering); const selected_tier = mapper.select_tier_for_write(bucket.tiering, tiering_status); - - const storage = await nodes_client.instance().aggregate_data_free_by_tier([String(selected_tier._id)], - selected_tier.system._id); - const tier_storage_free = size_utils.json_to_bigint(storage[String(selected_tier._id)][0].free); - const reports = await UsageReportStore.instance().get_usage_reports({ + const tier_storage_free = size_utils.json_to_bigint(size_utils.reduce_minimum( + 'free', tiering_status[String(selected_tier._id)].mirrors_storage.map(storage => (storage.free || 0)) + )); + const valid = _.values(tiering_status[String(selected_tier._id)].pools).every(pool => pool.valid_for_allocation); + const reports = await usage_aggregator.get_bandwidth_report({ + bucket: bucket._id, since: now - (1000 * 60 * 60), till: now, - bucket: bucket._id, + time_range: 'hour' }); const report = reports[0]; - bucket.TTF = report && report.write_bytes ? tier_storage_free.divide(size_utils.json_to_bigint(report.write_bytes).divide(60)) : + const time = valid && report ? Math.floor((now - report.timestamp) / 1000 / 60) : 60; + bucket.TTF = valid && report && report.write_bytes ? tier_storage_free + .divide(size_utils.json_to_bigint(report.write_bytes).divide(time)) : // how much time in minutes will it take to fill (avg by last report) MAX_TTF; // time to fill in minutes - bucket.tier = selected_tier._id; + dbg.log1('TTF bucket', bucket.name, 'storage_free', tier_storage_free, 'report', report, 'TTF:', bucket.TTF); } const sorted_buckets = buckets.filter(bucket => bucket.TTF.lesser(TOO_BIG_TTF)).sort(compare_buckets_by_TTF); let chunks_to_rebuild = 0; - if (_.isEmpty(sorted_buckets)) return config.TIER_TTF_WORKER_EMPTY_DELAY; + if (_.isEmpty(sorted_buckets)) { + this.last_run = 'force'; + return config.TIER_TTF_WORKER_EMPTY_DELAY; + } for (const bucket of sorted_buckets) { - switch (bucket.TTF.value) { + const bucket_ttf = bucket.TTF.toJSNumber(); + switch (bucket_ttf) { case 0: chunks_to_rebuild = 30; break; @@ -112,18 +120,25 @@ class TieringTTFWorker { default: chunks_to_rebuild = 1; } - if (!chunks_to_rebuild) return config.TIER_TTF_WORKER_EMPTY_DELAY; - const chunk_ids = await MDStore.instance().find_oldest_tier_chunk_ids(bucket.tier, chunks_to_rebuild, 1); - await node_allocator.refresh_tiering_alloc(bucket.tiering); + if (!chunks_to_rebuild) continue; const tiering_status = node_allocator.get_tiering_status(bucket.tiering); - const next_tier = mapper.select_tier_for_write(bucket.tiering, tiering_status, bucket.tier); + const previous_tier = mapper.select_tier_for_write(bucket.tiering, tiering_status); + const next_tier_order = this.find_tier_order_in_tiering(bucket, previous_tier) + 1; + const chunk_ids = await MDStore.instance().find_oldest_tier_chunk_ids(previous_tier._id, chunks_to_rebuild, 1); + if (!chunk_ids.length) continue; + const next_tier = mapper.select_tier_for_write(bucket.tiering, tiering_status, next_tier_order); if (!next_tier) continue; - console.log(`TieringTTFWorker: Moving the following ${chunks_to_rebuild} chunks to next tier ${next_tier._id}`, chunk_ids); + console.log(`TieringTTFWorker: Moving the following ${chunks_to_rebuild} from ${previous_tier._id} to chunks to next tier ${next_tier._id}`, chunk_ids); await this._build_chunks(chunk_ids, next_tier._id); } + this.last_run = undefined; return config.TIER_TTF_WORKER_BATCH_DELAY; } + find_tier_order_in_tiering(bucket, tier) { + return bucket.tiering.tiers.find(t => String(t.tier._id) === String(tier._id)).order; + } + async _build_chunks(chunk_ids, next_tier) { return this.client.scrubber.build_chunks({ chunk_ids, tier: next_tier }, { auth_token: auth_server.make_auth_token({ diff --git a/src/server/object_services/map_builder.js b/src/server/object_services/map_builder.js index 5644ce52fe..98d658f291 100644 --- a/src/server/object_services/map_builder.js +++ b/src/server/object_services/map_builder.js @@ -52,6 +52,9 @@ class MapBuilder { await builder_lock.surround_keys(_.map(this.chunk_ids, String), async () => { + if (this.move_to_tier) { + await MDStore.instance().update_chunks_by_ids(this.chunk_ids, { tier: this.move_to_tier._id }); + } // we run the build twice. first time to perform all allocation, second time to perform deletions await this.run_build(this.chunk_ids); diff --git a/src/server/object_services/map_db_types.js b/src/server/object_services/map_db_types.js index 6f74a5f86a..c270760894 100644 --- a/src/server/object_services/map_db_types.js +++ b/src/server/object_services/map_db_types.js @@ -67,7 +67,7 @@ class ChunkDB { /** @returns {nb.ChunkConfig} */ get chunk_config() { return system_store.data.get_by_id(this.chunk_db.chunk_config); } - + /** @returns {FragDB[]} */ get frags() { if (!this.__frags) this.__frags = this.chunk_db.frags.map(frag_db => new_frag_db(frag_db, this)); return this.__frags; diff --git a/src/server/object_services/map_reader.js b/src/server/object_services/map_reader.js index 5a8bd012d7..99a71e4880 100644 --- a/src/server/object_services/map_reader.js +++ b/src/server/object_services/map_reader.js @@ -4,14 +4,16 @@ /** @typedef {typeof import('../../sdk/nb')} nb */ const _ = require('lodash'); -// const util = require('util'); +const util = require('util'); -// const dbg = require('../../util/debug_module')(__filename); +const dbg = require('../../util/debug_module')(__filename); const config = require('../../../config.js'); const MDStore = require('./md_store').MDStore; const map_server = require('./map_server'); const mongo_utils = require('../../util/mongo_utils'); const { ChunkDB } = require('./map_db_types'); +const server_rpc = require('../server_rpc'); +const auth_server = require('../common_services/auth_server'); /** * @@ -44,7 +46,10 @@ async function read_object_mapping(obj, start, end, location_info) { end_gt: rng.start, }); // console.log('TODO GGG read_object_mapping', parts); - const chunks = await read_parts_mapping(parts); + let chunks = await read_parts_mapping(parts, location_info); + if (await update_chunks_on_read(chunks, location_info)) { + chunks = await read_parts_mapping(parts, location_info); + } return chunks; } @@ -89,11 +94,13 @@ async function read_node_mapping(node_ids, skip, limit) { /** * * @param {nb.PartSchemaDB[]} parts + * @param {nb.LocationInfo} [location_info] * @returns {Promise} */ -async function read_parts_mapping(parts) { +async function read_parts_mapping(parts, location_info) { const chunks_db = await MDStore.instance().find_chunks_by_ids(mongo_utils.uniq_ids(parts, 'chunk')); - await MDStore.instance().load_blocks_for_chunks(chunks_db); + const sorter = location_info ? _block_sorter_local(location_info) : _block_sorter_basic; + await MDStore.instance().load_blocks_for_chunks(chunks_db, sorter); const chunks_db_by_id = _.keyBy(chunks_db, '_id'); const chunks = parts.map(part => { const chunk = new ChunkDB({ ...chunks_db_by_id[part.chunk.toHexString()], parts: [part] }); @@ -104,59 +111,48 @@ async function read_parts_mapping(parts) { } -// /** -// * @param {nb.Chunk[]} chunks -// * @param {nb.LocationInfo} [location_info] -// */ -// async function update_chunks_on_read(chunks, location_info) { -// const chunks = _.map(parts, 'chunk'); -// const tiering_status_by_bucket_id = {}; - -// for (const chunk of chunks) { -// map_server.populate_chunk(chunk); -// } - -// await _load_chunk_mappings(chunks, tiering_status_by_bucket_id); - -// const chunks_to_scrub = []; -// try { -// const bucket = system_store.data.get_by_id(chunks[0].bucket); -// const tiering_status = tiering_status_by_bucket_id[bucket._id]; -// const selected_tier = mapper.select_tier_for_write(bucket.tiering, tiering_status); -// for (const chunk of chunks) { -// map_server.populate_chunk(chunk); -// if (!chunk.tier._id || !_.isEqual(chunk.tier._id, selected_tier._id)) { -// dbg.log0('Chunk with low tier will be sent for rebuilding', chunk); -// chunks_to_scrub.push(chunk); -// } else if (location_info) { -// const chunk_info = mapper.get_chunk_info(chunk); -// const mapping = mapper.map_chunk(chunk_info, chunk.tier, bucket.tiering, tiering_status, location_info); -// if (mapper.should_rebuild_chunk_to_local_mirror(mapping, location_info)) { -// dbg.log2('Chunk with following mapping will be sent for rebuilding', chunk, mapping); -// chunks_to_scrub.push(chunk); -// } -// } -// } -// if (chunks_to_scrub.length) { -// dbg.log1('Chunks wasn\'t found in local pool - the following will be rebuilt:', util.inspect(chunks_to_scrub)); -// await server_rpc.client.scrubber.build_chunks({ -// chunk_ids: _.map(chunks_to_scrub, '_id'), -// tier: selected_tier._id, -// }, { -// auth_token: auth_server.make_auth_token({ -// system_id: chunks_to_scrub[0].system, -// role: 'admin' -// }) -// }); -// } -// } catch (err) { -// dbg.warn('Chunks failed to rebuilt - skipping'); -// } -// if (chunks_to_scrub.length) { -// // mismatch type... -// await MDStore.instance().load_blocks_for_chunks(chunks); -// } -// } +/** + * @param {nb.Chunk[]} chunks + * @param {nb.LocationInfo} [location_info] + */ +async function update_chunks_on_read(chunks, location_info) { + const chunks_to_scrub = []; + try { + const bucket = chunks[0].bucket; + const selected_tier = await map_server.select_tier_for_write(bucket); + for (const chunk of chunks) { + if ((!chunk.tier._id || !_.isEqual(chunk.tier._id, selected_tier._id)) && + map_server.enough_room_in_tier(selected_tier, bucket)) { + dbg.log0('Chunk with low tier will be sent for rebuilding', chunk._id); + chunks_to_scrub.push(chunk); + } else if (location_info) { + if (chunk.tier.data_placement !== 'MIRROR') return; + const mirror = await map_server.select_mirror_for_write(chunk.tier, bucket.tiering, location_info); + if (mirror.spread_pools.find(pool => (location_info.region && location_info.region === pool.region) || + location_info.pool_id === String(pool._id))) { + dbg.log2('Chunk with following mapping will be sent for rebuilding', chunk); + chunks_to_scrub.push(chunk); + } + } + } + if (chunks_to_scrub.length) { + const chunk_ids = _.map(chunks_to_scrub, '_id'); + dbg.log1('Chunks wasn\'t found in local pool/upper tier - the following will be rebuilt:', util.inspect(chunks_to_scrub)); + await server_rpc.client.scrubber.build_chunks({ + chunk_ids, + tier: selected_tier._id, + }, { + auth_token: auth_server.make_auth_token({ + system_id: bucket.system._id, + role: 'admin' + }) + }); + } + } catch (err) { + dbg.warn('Chunks failed to rebuilt - skipping'); + } + return chunks_to_scrub.length; +} // sanitizing start & end: we want them to be integers, positive, up to obj.size. function sanitize_object_range(obj, start, end) { @@ -184,6 +180,50 @@ function sanitize_object_range(obj, start, end) { }; } +/** + * sorting function for sorting blocks with most recent heartbeat first + * @param {nb.Block} block1 + * @param {nb.Block} block2 + */ +function _block_sorter_basic(block1, block2) { + const node1 = block1.node; + const node2 = block2.node; + if (node2.readable && !node1.readable) return 1; + if (node1.readable && !node2.readable) return -1; + return node2.heartbeat - node1.heartbeat; +} + +/** + * locality sorting function for blocks + * @param {nb.LocationInfo} location_info + */ +function _block_sorter_local(location_info) { + return sort_func; + /** + * locality sorting function for blocks + * @param {nb.Block} block1 + * @param {nb.Block} block2 + */ + function sort_func(block1, block2) { + const node1 = block1.node; + const node2 = block2.node; + const { node_id, host_id, pool_id, region } = location_info; + if (node2.readable && !node1.readable) return 1; + if (node1.readable && !node2.readable) return -1; + if (String(node2._id) === node_id && String(node1._id) !== node_id) return 1; + if (String(node1._id) === node_id && String(node2._id) !== node_id) return -1; + if (node2.host_id === host_id && node1.host_id !== host_id) return 1; + if (node1.host_id === host_id && node2.host_id !== host_id) return -1; + if (String(block2.pool) === pool_id && String(block1.pool) !== pool_id) return 1; + if (String(block1.pool) === pool_id && String(block2.pool) !== pool_id) return -1; + if (region) { + if (block2.pool.region === region && block1.pool.region !== region) return 1; + if (block1.pool.region === region && block2.pool.region !== region) return -1; + } + return node2.heartbeat - node1.heartbeat; + } +} + // EXPORTS exports.read_object_mapping = read_object_mapping; exports.read_object_mapping_admin = read_object_mapping_admin; diff --git a/src/server/object_services/map_server.js b/src/server/object_services/map_server.js index d2d7d4753c..57fcf48c5a 100644 --- a/src/server/object_services/map_server.js +++ b/src/server/object_services/map_server.js @@ -5,7 +5,6 @@ const _ = require('lodash'); const assert = require('assert'); -// const util = require('util'); const P = require('../../util/promise'); const dbg = require('../../util/debug_module')(__filename); @@ -127,7 +126,7 @@ class GetMapping { await P.map(uniq_tiers, tier => ensure_room_in_tier(tier, bucket)); await P.delay(config.ALLOCATE_RETRY_DELAY_MS); // TODO Decide if we want to update the chunks mappings when looping - // await this.prepare_chunks_group(chunks, bucket); + // await _prepare_chunks_group({ chunks, move_to_tier: this.move_to_tier, location_info: this.location_info }); } } })); @@ -363,7 +362,6 @@ class PutMapping { MDStore.instance().insert_chunks(this.new_chunks), MDStore.instance().insert_parts(this.new_parts), map_deleter.delete_blocks(this.delete_blocks), - this.move_to_tier && MDStore.instance().update_chunks_by_ids(this.update_chunk_ids, { tier: this.move_to_tier._id }), // TODO // (upload_size > obj.upload_size) && MDStore.instance().update_object_by_id(obj._id, { upload_size: upload_size }) @@ -375,6 +373,7 @@ class PutMapping { /** * @param {nb.Bucket} bucket + * @returns {Promise} */ async function select_tier_for_write(bucket) { const tiering = bucket.tiering; @@ -383,6 +382,18 @@ async function select_tier_for_write(bucket) { return mapper.select_tier_for_write(tiering, tiering_status); } +/** + * @param {nb.Tier} tier + * @param {nb.Tiering} tiering + * @param {nb.LocationInfo} location_info + * @returns {Promise} + */ +async function select_mirror_for_write(tier, tiering, location_info) { + await node_allocator.refresh_tiering_alloc(tiering); + const tiering_status = node_allocator.get_tiering_status(tiering); + return mapper.select_mirror_for_write(tier, tiering, tiering_status, location_info); +} + /** * @param {nb.ID} tier_id @@ -468,17 +479,17 @@ function enough_room_in_tier(tier, bucket) { const tier_status = tiering_status[tier_id_str]; const tier_in_tiering = _.find(tiering.tiers, t => String(t.tier._id) === tier_id_str); if (!tier_in_tiering || !tier_status) throw new Error(`Can't find current tier in bucket`); - const available_to_upload = size_utils.json_to_bigint(size_utils.reduce_maximum( + const available_to_upload = size_utils.json_to_bigint(size_utils.reduce_minimum( 'free', tier_status.mirrors_storage.map(storage => (storage.free || 0)) )); if (available_to_upload && available_to_upload.greater(config.ENOUGH_ROOM_IN_TIER_THRESHOLD)) { - dbg.log0('make_room_in_tier: has enough room', tier.name, available_to_upload.toJSNumber(), '>', config.ENOUGH_ROOM_IN_TIER_THRESHOLD); + dbg.log0('enough_room_in_tier: has enough room', tier.name, available_to_upload.toJSNumber(), '>', config.ENOUGH_ROOM_IN_TIER_THRESHOLD); map_reporter.add_event(`has_enough_room(${tier.name})`, available_to_upload.toJSNumber(), 0); return true; } else { - dbg.log0(`make_room_in_tier: not enough room ${tier.name}:`, + dbg.log0(`enough_room_in_tier: not enough room ${tier.name}:`, `${available_to_upload.toJSNumber()} < ${config.ENOUGH_ROOM_IN_TIER_THRESHOLD} should move chunks to next tier`); - map_reporter.add_event(`not_enough_room(${tier.name})`, available_to_upload.toJSNumber(), 0); + map_reporter.add_event(`enough_room_in_tier: not_enough_room(${tier.name})`, available_to_upload.toJSNumber(), 0); return false; } } @@ -523,9 +534,13 @@ async function _prepare_chunks_group({ chunks, move_to_tier, location_info }) { for (const chunk of chunks) { chunk.is_accessible = false; + chunk.is_building_blocks = false; + chunk.is_building_frags = false; let num_accessible_frags = 0; for (const frag of chunk.frags) { frag.is_accessible = false; + frag.is_building_blocks = false; + frag.allocations = []; for (const block of frag.blocks) { if (!block.node || !block.node._id) { dbg.warn('ORPHAN BLOCK (ignoring)', block); @@ -601,6 +616,8 @@ async function prepare_blocks_from_db(blocks) { exports.GetMapping = GetMapping; exports.PutMapping = PutMapping; exports.select_tier_for_write = select_tier_for_write; +exports.select_mirror_for_write = select_mirror_for_write; +exports.enough_room_in_tier = enough_room_in_tier; exports.make_room_in_tier = make_room_in_tier; exports.prepare_chunks = prepare_chunks; exports.prepare_blocks = prepare_blocks; diff --git a/src/server/object_services/mapper.js b/src/server/object_services/mapper.js index 1e4a4746c5..e434c9253d 100644 --- a/src/server/object_services/mapper.js +++ b/src/server/object_services/mapper.js @@ -417,6 +417,7 @@ function find_local_pool(pools, location_info) { // EXPORTS exports.select_tier_for_write = select_tier_for_write; +exports.select_mirror_for_write = select_mirror_for_write; exports.map_chunk = map_chunk; exports.is_chunk_good_for_dedup = is_chunk_good_for_dedup; diff --git a/src/server/object_services/md_store.js b/src/server/object_services/md_store.js index c7525f8a1e..48649c8bec 100644 --- a/src/server/object_services/md_store.js +++ b/src/server/object_services/md_store.js @@ -926,10 +926,6 @@ class MDStore { $set: { deleted: delete_date }, - $rename: { - // obj: 'obj_del', - num: 'num_del', - } }); } @@ -941,10 +937,6 @@ class MDStore { $set: { deleted: delete_date }, - $rename: { - // obj: 'obj_del', - num: 'num_del', - } }); } @@ -1163,11 +1155,6 @@ class MDStore { $set: { deleted: delete_date }, - $rename: { - // obj: 'obj_del', - start: 'start_del', - // chunk: 'chunk_del', - } }); } @@ -1179,11 +1166,6 @@ class MDStore { $set: { deleted: delete_date }, - $rename: { - // obj: 'obj_del', - start: 'start_del', - // chunk: 'chunk_del', - } }); } @@ -1547,9 +1529,10 @@ class MDStore { /** * @param {nb.ChunkSchemaDB[]} chunks + * @param {?(a: any, b: any) => number} [sorter] * @return {Promise} */ - async load_blocks_for_chunks(chunks) { + async load_blocks_for_chunks(chunks, sorter) { if (!chunks || !chunks.length) return; const blocks = await this._blocks.col().find({ chunk: { $in: mongo_utils.uniq_ids(chunks, '_id') }, @@ -1560,7 +1543,8 @@ class MDStore { for (const chunk of chunks) { const blocks_by_frag = _.groupBy(blocks_by_chunk[chunk._id.toHexString()], 'frag'); for (const frag of chunk.frags) { - frag.blocks = blocks_by_frag[frag._id.toHexString()]; + const frag_blocks = blocks_by_frag[frag._id.toHexString()]; + frag.blocks = sorter ? frag_blocks.sort(sorter) : frag_blocks; } } } @@ -1634,10 +1618,6 @@ class MDStore { $set: { deleted: delete_date }, - // $rename: { - // chunk: 'chunk_del', - // node: 'node_del', - // } }); } diff --git a/src/server/object_services/schemas/object_multipart_indexes.js b/src/server/object_services/schemas/object_multipart_indexes.js index d9eb242957..0dec08abc8 100644 --- a/src/server/object_services/schemas/object_multipart_indexes.js +++ b/src/server/object_services/schemas/object_multipart_indexes.js @@ -10,6 +10,7 @@ module.exports = [{ unique: false, partialFilterExpression: { obj: { $exists: true }, + deleted: null, } } }]; diff --git a/src/server/object_services/schemas/object_part_indexes.js b/src/server/object_services/schemas/object_part_indexes.js index cd85066b32..4bc8cf9454 100644 --- a/src/server/object_services/schemas/object_part_indexes.js +++ b/src/server/object_services/schemas/object_part_indexes.js @@ -15,6 +15,7 @@ module.exports = [{ unique: false, partialFilterExpression: { obj: { $exists: true }, + deleted: null, } } }, diff --git a/src/test/unit_tests/index.js b/src/test/unit_tests/index.js index 059fadadb2..f925feb40f 100644 --- a/src/test/unit_tests/index.js +++ b/src/test/unit_tests/index.js @@ -52,7 +52,7 @@ require('./test_nb_native_b64'); require('./test_node_allocator'); require('./test_bucket_chunks_builder'); require('./test_mirror_writer'); -// require('./test_tiering_upload'); +require('./test_tiering_upload'); // SERVERS require('./test_agent'); diff --git a/src/util/mongo_client.js b/src/util/mongo_client.js index f42001f79f..c70e22d877 100644 --- a/src/util/mongo_client.js +++ b/src/util/mongo_client.js @@ -180,7 +180,15 @@ class MongoClient extends EventEmitter { .then(() => col.db_indexes && P.map(col.db_indexes, index => db.collection(col.name).createIndex(index.fields, _.extend({ background: true }, index.options)) .then(res => dbg.log0('_init_collection: created index', col.name, res)) - )) + .catch(err => { + if (err.codeName === 'IndexOptionsConflict') { + return db.collection(col.name).dropIndex(index.fields) + .then(() => db.collection(col.name).createIndex(index.fields, _.extend({ background: true }, index.options))) + .then(res => dbg.log0('_init_collection: re-created index with new options', col.name, res)); + } else { + throw err; + } + }))) .catch(err => { dbg.error('_init_collection: FAILED', col.name, err); throw err; diff --git a/tsconfig.json b/tsconfig.json index 41481efbf5..4d2b6157c5 100644 --- a/tsconfig.json +++ b/tsconfig.json @@ -14,6 +14,7 @@ "src/server/object_services/*.js", "src/test/unit_tests/test_object_io.js", "src/test/unit_tests/test_agent_blocks_verifier.js", + "src/test/unit_tests/test_tiering_upload.js", "src/tools/mapper_speed.js", ], "exclude": [