Skip to content

Commit

Permalink
Adding location support + some tiering fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
jackyalbo authored and nimrod-becker committed Jul 24, 2019
1 parent 805e35c commit 9d978ed
Show file tree
Hide file tree
Showing 15 changed files with 183 additions and 120 deletions.
3 changes: 1 addition & 2 deletions config.js
Original file line number Diff line number Diff line change
Expand Up @@ -188,8 +188,7 @@ config.MIRROR_WRITER_EMPTY_DELAY = 30000;
config.MIRROR_WRITER_MARKER_STORE_PERIOD = 10 * 60000; // store markers every 10 min

config.TIER_TTF_WORKER_ENABLED = true;
config.TIER_TTF_WORKER_BATCH_SIZE = 50;
config.TIER_TTF_WORKER_BATCH_DELAY = 50;
config.TIER_TTF_WORKER_BATCH_DELAY = 500;
config.TIER_TTF_WORKER_EMPTY_DELAY = 30000;

config.TIER_SPILLBACK_WORKER_ENABLED = true;
Expand Down
9 changes: 3 additions & 6 deletions frontend/src/app/reducers/host-parts-reducer.js
Original file line number Diff line number Diff line change
Expand Up @@ -82,15 +82,13 @@ function onFailFetchHostObjects(state, { payload }) {

function onFetchCloudResourceObjects(state, { payload }) {
return onFetchHostObjects(
state,
{ payload: _resourceQueryToHostQuery(payload) }
state, { payload: _resourceQueryToHostQuery(payload) }
);
}

function onCompleteFetchCloudResourceObjects(state, { payload }) {
return onCompleteFetchHostObjects(
state,
{
state, {
payload: {
query: _resourceQueryToHostQuery(payload.query),
response: payload.response
Expand All @@ -101,8 +99,7 @@ function onCompleteFetchCloudResourceObjects(state, { payload }) {

function onFailCloudResourceObjects(state, { payload }) {
return onFailFetchHostObjects(
state,
{
state, {
payload: {
query: _resourceQueryToHostQuery(payload.query),
error: payload.error
Expand Down
2 changes: 1 addition & 1 deletion src/sdk/map_client.js
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ class MapClient {
async put_mapping() {
// TODO should we filter out chunk.had_errors from put mapping?
await this.rpc_client.object.put_mapping({
chunks: this.chunks.map(chunk => chunk.to_api()),
chunks: this.chunks.filter(chunk => !chunk.had_errors).map(chunk => chunk.to_api()),
move_to_tier: this.move_to_tier && this.move_to_tier._id,
});
}
Expand Down
53 changes: 34 additions & 19 deletions src/server/bg_services/tier_ttf_worker.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,11 @@ const _ = require('lodash');
const config = require('../../../config');
const dbg = require('../../util/debug_module')(__filename);
const system_store = require('../system_services/system_store').get_instance();
const UsageReportStore = require('../analytic_services/usage_report_store').UsageReportStore;
const usage_aggregator = require('./usage_aggregator');
const MDStore = require('../object_services/md_store').MDStore;
const system_utils = require('../utils/system_utils');
const size_utils = require('../../util/size_utils');
const auth_server = require('../common_services/auth_server');
const nodes_client = require('../node_services/nodes_client');
const node_allocator = require('../node_services/node_allocator');
const mapper = require('../object_services/mapper');

Expand All @@ -20,6 +19,7 @@ class TieringTTFWorker {
this.name = name;
this.client = client;
this.initialized = false;
this.last_run = 'force';
}

_can_run() {
Expand All @@ -41,6 +41,7 @@ class TieringTTFWorker {
const multi_tiered_buckets = this._get_multi_tiered_buckets();
if (!multi_tiered_buckets || !multi_tiered_buckets.length) {
dbg.log0('no buckets with more than one tier. nothing to do');
this.last_run = 'force';
return config.TIER_TTF_WORKER_EMPTY_DELAY;
}

Expand All @@ -50,7 +51,7 @@ class TieringTTFWorker {
}

_get_multi_tiered_buckets() {
return system_store.data.buckets.filter(bucket => bucket.tiering.tiers.length > 2);
return system_store.data.buckets.filter(bucket => bucket.tiering.tiers.length > 1);
}

async _rebuild_need_to_move_chunks(buckets) {
Expand All @@ -63,28 +64,35 @@ class TieringTTFWorker {

const now = Date.now();
for (const bucket of buckets) {
await node_allocator.refresh_tiering_alloc(bucket.tiering);
await node_allocator.refresh_tiering_alloc(bucket.tiering, this.last_run);
const tiering_status = node_allocator.get_tiering_status(bucket.tiering);
const selected_tier = mapper.select_tier_for_write(bucket.tiering, tiering_status);

const storage = await nodes_client.instance().aggregate_data_free_by_tier([String(selected_tier._id)],
selected_tier.system._id);
const tier_storage_free = size_utils.json_to_bigint(storage[String(selected_tier._id)][0].free);
const reports = await UsageReportStore.instance().get_usage_reports({
const tier_storage_free = size_utils.json_to_bigint(size_utils.reduce_minimum(
'free', tiering_status[String(selected_tier._id)].mirrors_storage.map(storage => (storage.free || 0))
));
const valid = _.values(tiering_status[String(selected_tier._id)].pools).every(pool => pool.valid_for_allocation);
const reports = await usage_aggregator.get_bandwidth_report({
bucket: bucket._id,
since: now - (1000 * 60 * 60),
till: now,
bucket: bucket._id,
time_range: 'hour'
});
const report = reports[0];
bucket.TTF = report && report.write_bytes ? tier_storage_free.divide(size_utils.json_to_bigint(report.write_bytes).divide(60)) :
const time = valid && report ? Math.floor((now - report.timestamp) / 1000 / 60) : 60;
bucket.TTF = valid && report && report.write_bytes ? tier_storage_free
.divide(size_utils.json_to_bigint(report.write_bytes).divide(time)) : // how much time in minutes will it take to fill (avg by last report)
MAX_TTF; // time to fill in minutes
bucket.tier = selected_tier._id;
dbg.log1('TTF bucket', bucket.name, 'storage_free', tier_storage_free, 'report', report, 'TTF:', bucket.TTF);
}
const sorted_buckets = buckets.filter(bucket => bucket.TTF.lesser(TOO_BIG_TTF)).sort(compare_buckets_by_TTF);
let chunks_to_rebuild = 0;
if (_.isEmpty(sorted_buckets)) return config.TIER_TTF_WORKER_EMPTY_DELAY;
if (_.isEmpty(sorted_buckets)) {
this.last_run = 'force';
return config.TIER_TTF_WORKER_EMPTY_DELAY;
}
for (const bucket of sorted_buckets) {
switch (bucket.TTF.value) {
const bucket_ttf = bucket.TTF.toJSNumber();
switch (bucket_ttf) {
case 0:
chunks_to_rebuild = 30;
break;
Expand Down Expand Up @@ -112,18 +120,25 @@ class TieringTTFWorker {
default:
chunks_to_rebuild = 1;
}
if (!chunks_to_rebuild) return config.TIER_TTF_WORKER_EMPTY_DELAY;
const chunk_ids = await MDStore.instance().find_oldest_tier_chunk_ids(bucket.tier, chunks_to_rebuild, 1);
await node_allocator.refresh_tiering_alloc(bucket.tiering);
if (!chunks_to_rebuild) continue;
const tiering_status = node_allocator.get_tiering_status(bucket.tiering);
const next_tier = mapper.select_tier_for_write(bucket.tiering, tiering_status, bucket.tier);
const previous_tier = mapper.select_tier_for_write(bucket.tiering, tiering_status);
const next_tier_order = this.find_tier_order_in_tiering(bucket, previous_tier) + 1;
const chunk_ids = await MDStore.instance().find_oldest_tier_chunk_ids(previous_tier._id, chunks_to_rebuild, 1);
if (!chunk_ids.length) continue;
const next_tier = mapper.select_tier_for_write(bucket.tiering, tiering_status, next_tier_order);
if (!next_tier) continue;
console.log(`TieringTTFWorker: Moving the following ${chunks_to_rebuild} chunks to next tier ${next_tier._id}`, chunk_ids);
console.log(`TieringTTFWorker: Moving the following ${chunks_to_rebuild} from ${previous_tier._id} to chunks to next tier ${next_tier._id}`, chunk_ids);
await this._build_chunks(chunk_ids, next_tier._id);
}
this.last_run = undefined;
return config.TIER_TTF_WORKER_BATCH_DELAY;
}

find_tier_order_in_tiering(bucket, tier) {
return bucket.tiering.tiers.find(t => String(t.tier._id) === String(tier._id)).order;
}

async _build_chunks(chunk_ids, next_tier) {
return this.client.scrubber.build_chunks({ chunk_ids, tier: next_tier }, {
auth_token: auth_server.make_auth_token({
Expand Down
3 changes: 3 additions & 0 deletions src/server/object_services/map_builder.js
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,9 @@ class MapBuilder {

await builder_lock.surround_keys(_.map(this.chunk_ids, String), async () => {

if (this.move_to_tier) {
await MDStore.instance().update_chunks_by_ids(this.chunk_ids, { tier: this.move_to_tier._id });
}
// we run the build twice. first time to perform all allocation, second time to perform deletions
await this.run_build(this.chunk_ids);

Expand Down
2 changes: 1 addition & 1 deletion src/server/object_services/map_db_types.js
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ class ChunkDB {
/** @returns {nb.ChunkConfig} */
get chunk_config() { return system_store.data.get_by_id(this.chunk_db.chunk_config); }


/** @returns {FragDB[]} */
get frags() {
if (!this.__frags) this.__frags = this.chunk_db.frags.map(frag_db => new_frag_db(frag_db, this));
return this.__frags;
Expand Down
156 changes: 98 additions & 58 deletions src/server/object_services/map_reader.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,16 @@
/** @typedef {typeof import('../../sdk/nb')} nb */

const _ = require('lodash');
// const util = require('util');
const util = require('util');

// const dbg = require('../../util/debug_module')(__filename);
const dbg = require('../../util/debug_module')(__filename);
const config = require('../../../config.js');
const MDStore = require('./md_store').MDStore;
const map_server = require('./map_server');
const mongo_utils = require('../../util/mongo_utils');
const { ChunkDB } = require('./map_db_types');
const server_rpc = require('../server_rpc');
const auth_server = require('../common_services/auth_server');

/**
*
Expand Down Expand Up @@ -44,7 +46,10 @@ async function read_object_mapping(obj, start, end, location_info) {
end_gt: rng.start,
});
// console.log('TODO GGG read_object_mapping', parts);
const chunks = await read_parts_mapping(parts);
let chunks = await read_parts_mapping(parts, location_info);
if (await update_chunks_on_read(chunks, location_info)) {
chunks = await read_parts_mapping(parts, location_info);
}
return chunks;
}

Expand Down Expand Up @@ -89,11 +94,13 @@ async function read_node_mapping(node_ids, skip, limit) {
/**
*
* @param {nb.PartSchemaDB[]} parts
* @param {nb.LocationInfo} [location_info]
* @returns {Promise<nb.Chunk[]>}
*/
async function read_parts_mapping(parts) {
async function read_parts_mapping(parts, location_info) {
const chunks_db = await MDStore.instance().find_chunks_by_ids(mongo_utils.uniq_ids(parts, 'chunk'));
await MDStore.instance().load_blocks_for_chunks(chunks_db);
const sorter = location_info ? _block_sorter_local(location_info) : _block_sorter_basic;
await MDStore.instance().load_blocks_for_chunks(chunks_db, sorter);
const chunks_db_by_id = _.keyBy(chunks_db, '_id');
const chunks = parts.map(part => {
const chunk = new ChunkDB({ ...chunks_db_by_id[part.chunk.toHexString()], parts: [part] });
Expand All @@ -104,59 +111,48 @@ async function read_parts_mapping(parts) {
}


// /**
// * @param {nb.Chunk[]} chunks
// * @param {nb.LocationInfo} [location_info]
// */
// async function update_chunks_on_read(chunks, location_info) {
// const chunks = _.map(parts, 'chunk');
// const tiering_status_by_bucket_id = {};

// for (const chunk of chunks) {
// map_server.populate_chunk(chunk);
// }

// await _load_chunk_mappings(chunks, tiering_status_by_bucket_id);

// const chunks_to_scrub = [];
// try {
// const bucket = system_store.data.get_by_id(chunks[0].bucket);
// const tiering_status = tiering_status_by_bucket_id[bucket._id];
// const selected_tier = mapper.select_tier_for_write(bucket.tiering, tiering_status);
// for (const chunk of chunks) {
// map_server.populate_chunk(chunk);
// if (!chunk.tier._id || !_.isEqual(chunk.tier._id, selected_tier._id)) {
// dbg.log0('Chunk with low tier will be sent for rebuilding', chunk);
// chunks_to_scrub.push(chunk);
// } else if (location_info) {
// const chunk_info = mapper.get_chunk_info(chunk);
// const mapping = mapper.map_chunk(chunk_info, chunk.tier, bucket.tiering, tiering_status, location_info);
// if (mapper.should_rebuild_chunk_to_local_mirror(mapping, location_info)) {
// dbg.log2('Chunk with following mapping will be sent for rebuilding', chunk, mapping);
// chunks_to_scrub.push(chunk);
// }
// }
// }
// if (chunks_to_scrub.length) {
// dbg.log1('Chunks wasn\'t found in local pool - the following will be rebuilt:', util.inspect(chunks_to_scrub));
// await server_rpc.client.scrubber.build_chunks({
// chunk_ids: _.map(chunks_to_scrub, '_id'),
// tier: selected_tier._id,
// }, {
// auth_token: auth_server.make_auth_token({
// system_id: chunks_to_scrub[0].system,
// role: 'admin'
// })
// });
// }
// } catch (err) {
// dbg.warn('Chunks failed to rebuilt - skipping');
// }
// if (chunks_to_scrub.length) {
// // mismatch type...
// await MDStore.instance().load_blocks_for_chunks(chunks);
// }
// }
/**
* @param {nb.Chunk[]} chunks
* @param {nb.LocationInfo} [location_info]
*/
async function update_chunks_on_read(chunks, location_info) {
const chunks_to_scrub = [];
try {
const bucket = chunks[0].bucket;
const selected_tier = await map_server.select_tier_for_write(bucket);
for (const chunk of chunks) {
if ((!chunk.tier._id || !_.isEqual(chunk.tier._id, selected_tier._id)) &&
map_server.enough_room_in_tier(selected_tier, bucket)) {
dbg.log0('Chunk with low tier will be sent for rebuilding', chunk._id);
chunks_to_scrub.push(chunk);
} else if (location_info) {
if (chunk.tier.data_placement !== 'MIRROR') return;
const mirror = await map_server.select_mirror_for_write(chunk.tier, bucket.tiering, location_info);
if (mirror.spread_pools.find(pool => (location_info.region && location_info.region === pool.region) ||
location_info.pool_id === String(pool._id))) {
dbg.log2('Chunk with following mapping will be sent for rebuilding', chunk);
chunks_to_scrub.push(chunk);
}
}
}
if (chunks_to_scrub.length) {
const chunk_ids = _.map(chunks_to_scrub, '_id');
dbg.log1('Chunks wasn\'t found in local pool/upper tier - the following will be rebuilt:', util.inspect(chunks_to_scrub));
await server_rpc.client.scrubber.build_chunks({
chunk_ids,
tier: selected_tier._id,
}, {
auth_token: auth_server.make_auth_token({
system_id: bucket.system._id,
role: 'admin'
})
});
}
} catch (err) {
dbg.warn('Chunks failed to rebuilt - skipping');
}
return chunks_to_scrub.length;
}

// sanitizing start & end: we want them to be integers, positive, up to obj.size.
function sanitize_object_range(obj, start, end) {
Expand Down Expand Up @@ -184,6 +180,50 @@ function sanitize_object_range(obj, start, end) {
};
}

/**
* sorting function for sorting blocks with most recent heartbeat first
* @param {nb.Block} block1
* @param {nb.Block} block2
*/
function _block_sorter_basic(block1, block2) {
const node1 = block1.node;
const node2 = block2.node;
if (node2.readable && !node1.readable) return 1;
if (node1.readable && !node2.readable) return -1;
return node2.heartbeat - node1.heartbeat;
}

/**
* locality sorting function for blocks
* @param {nb.LocationInfo} location_info
*/
function _block_sorter_local(location_info) {
return sort_func;
/**
* locality sorting function for blocks
* @param {nb.Block} block1
* @param {nb.Block} block2
*/
function sort_func(block1, block2) {
const node1 = block1.node;
const node2 = block2.node;
const { node_id, host_id, pool_id, region } = location_info;
if (node2.readable && !node1.readable) return 1;
if (node1.readable && !node2.readable) return -1;
if (String(node2._id) === node_id && String(node1._id) !== node_id) return 1;
if (String(node1._id) === node_id && String(node2._id) !== node_id) return -1;
if (node2.host_id === host_id && node1.host_id !== host_id) return 1;
if (node1.host_id === host_id && node2.host_id !== host_id) return -1;
if (String(block2.pool) === pool_id && String(block1.pool) !== pool_id) return 1;
if (String(block1.pool) === pool_id && String(block2.pool) !== pool_id) return -1;
if (region) {
if (block2.pool.region === region && block1.pool.region !== region) return 1;
if (block1.pool.region === region && block2.pool.region !== region) return -1;
}
return node2.heartbeat - node1.heartbeat;
}
}

// EXPORTS
exports.read_object_mapping = read_object_mapping;
exports.read_object_mapping_admin = read_object_mapping_admin;
Expand Down
Loading

0 comments on commit 9d978ed

Please sign in to comment.