Skip to content

Commit

Permalink
SystemStore to fetch data in deltas
Browse files Browse the repository at this point in the history
  • Loading branch information
jackyalbo authored and nimrod-becker committed Jul 24, 2019
1 parent 1c3dbb3 commit 4a4cac8
Show file tree
Hide file tree
Showing 8 changed files with 386 additions and 138 deletions.
81 changes: 69 additions & 12 deletions src/server/bg_services/md_aggregator.js
Original file line number Diff line number Diff line change
Expand Up @@ -32,18 +32,49 @@ async function run_md_aggregator(md_store, system_store, target_now, delay) {
const system = system_store.data.systems[0];
if (!system || system_utils.system_in_maintenance(system._id)) return;

const global_last_update = system.global_last_update;

let has_more = true;
let update_range = true;
let range = {};

const md_local_store = {
data: {
buckets: _.clone(system_store.data.buckets),
pools: _.clone(system_store.data.pools),
}
};

while (has_more) {
if (update_range) range = await find_next_range({ target_now, system_store });
const changes = range && await range_md_aggregator({ md_store, system_store, range });
if (update_range) {
range = await find_next_range({
target_now,
system_store: md_local_store,
global_last_update,
original_system_store: system_store,
});
}
const changes = range && await range_md_aggregator({
md_store,
system_store: md_local_store,
range,
global_last_update
});
if (changes) {
const update = _.omit(changes, 'more_updates');
await system_store.make_changes({ update });
await P.delay(delay);
update_range = !changes.more_updates;
if (update_range) {
await system_store.make_changes({
update: {
systems: [{
_id: system._id,
global_last_update: range.till_time,
}]
}
});
}
await P.delay(delay);
} else {
has_more = false;
}
Expand All @@ -56,13 +87,15 @@ async function run_md_aggregator(md_store, system_store, target_now, delay) {
function find_minimal_range({
target_now,
system_store,
global_last_update
}) {
let from_time = target_now;
let till_time = target_now;
let should_reset_all = false;

_.forEach(system_store.data.buckets, bucket => {
const last_update = _.get(bucket, 'storage_stats.last_update') || config.NOOBAA_EPOCH;
const bucket_last_update = _.get(bucket, 'storage_stats.last_update') || config.NOOBAA_EPOCH;
const last_update = global_last_update > bucket_last_update ? global_last_update : bucket_last_update;
if (last_update > target_now) {
dbg.error('find_next_range: time skew detected for bucket', bucket.name,
'last_update', last_update,
Expand All @@ -78,7 +111,8 @@ function find_minimal_range({
}
});
_.forEach(system_store.data.pools, pool => {
const last_update = _.get(pool, 'storage_stats.last_update') || config.NOOBAA_EPOCH;
const pool_last_update = _.get(pool, 'storage_stats.last_update') || config.NOOBAA_EPOCH;
const last_update = global_last_update > pool_last_update ? global_last_update : pool_last_update;
if (last_update > target_now) {
dbg.error('find_next_range: time skew detected for pool', pool.name,
'last_update', last_update,
Expand All @@ -100,25 +134,32 @@ function find_minimal_range({
function find_next_range({
target_now,
system_store,
global_last_update,
original_system_store,
}) {
let { from_time, till_time, should_reset_all } = find_minimal_range({
target_now,
system_store,
global_last_update
});

// printing the range and the buckets/pools relative info
dbg.log0('find_next_range:',
'from_time', from_time,
'till_time*', till_time - from_time,
'target_now*', target_now - from_time
'target_now*', target_now - from_time,
'global_last_update', global_last_update,
);
_.forEach(system_store.data.buckets, bucket => {
const last_update = _.get(bucket, 'storage_stats.last_update') || config.NOOBAA_EPOCH;
const bucket_last_update = _.get(bucket, 'storage_stats.last_update') || config.NOOBAA_EPOCH;
const last_update = bucket_last_update > global_last_update ? bucket_last_update : global_last_update;
dbg.log1('find_next_range: bucket', bucket.name,
'last_update*', last_update - from_time
);
});
_.forEach(system_store.data.pools, pool => {
const last_update = _.get(pool, 'storage_stats.last_update') || config.NOOBAA_EPOCH;
const pool_last_update = _.get(pool, 'storage_stats.last_update') || config.NOOBAA_EPOCH;
const last_update = pool_last_update > global_last_update ? pool_last_update : global_last_update;
dbg.log1('find_next_range: pool', pool.name,
'last_update*', last_update - from_time
);
Expand All @@ -132,8 +173,12 @@ function find_next_range({
);
// Assigning NOOBAA_EPOCH so we will gather all data again till the new time
// This means that we will be eventually consistent
return system_store.make_changes({
return original_system_store.make_changes({
update: {
systems: [{
_id: original_system_store.data.systems[0]._id,
global_last_update: config.NOOBAA_EPOCH,
}],
buckets: _.map(system_store.data.buckets, bucket => ({
_id: bucket._id,
storage_stats: {
Expand Down Expand Up @@ -193,8 +238,8 @@ function range_md_aggregator({
const till_time = range.till_time;
let more_updates = false;

const filtered_buckets = _.filter(system_store.data.buckets, bucket => bucket.storage_stats.last_update === from_time);
const filtered_pools = _.filter(system_store.data.pools, pool => pool.storage_stats.last_update === from_time);
const filtered_buckets = _.filter(system_store.data.buckets, bucket => bucket.storage_stats.last_update <= from_time);
const filtered_pools = _.filter(system_store.data.pools, pool => pool.storage_stats.last_update <= from_time);
if (filtered_buckets.length > config.MD_AGGREGATOR_BATCH || filtered_pools.length > config.MD_AGGREGATOR_BATCH) {
more_updates = true;
}
Expand Down Expand Up @@ -228,6 +273,7 @@ function range_md_aggregator({
);

const buckets_updates = _.map(buckets, bucket => {
let dont_change_last_update = false;
const new_storage_stats = calculate_new_bucket({
bucket,
existing_chunks_aggregate,
Expand All @@ -237,30 +283,41 @@ function range_md_aggregator({
existing_blocks_aggregate,
deleted_blocks_aggregate
});
if (_.isEqual(_.omit(bucket.storage_stats, 'last_update'), new_storage_stats)) {
dont_change_last_update = true;
}
new_storage_stats.last_update = till_time;
bucket.storage_stats = new_storage_stats;
return {
_id: bucket._id,
storage_stats: new_storage_stats,
dont_change_last_update
};
});

const pools_updates = _.map(pools, pool => {
let dont_change_last_update = false;
const new_storage_stats = calculate_new_pool({
pool,
existing_blocks_aggregate,
deleted_blocks_aggregate
});
if (_.isEqual(_.omit(pool.storage_stats, 'last_update'), new_storage_stats)) {
dont_change_last_update = true;
}
new_storage_stats.last_update = till_time;
pool.storage_stats = new_storage_stats;
return {
_id: pool._id,
storage_stats: new_storage_stats,
dont_change_last_update,
};
});

return {
buckets: buckets_updates,
pools: pools_updates,
more_updates
more_updates,
};
});
}
Expand Down
17 changes: 13 additions & 4 deletions src/server/system_services/bucket_server.js
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,9 @@ function new_bucket_defaults(name, system_id, tiering_policy_id, tag) {
objects_size: 0,
objects_count: 0,
objects_hist: [],
last_update: now - (2 * config.MD_GRACE_IN_MILLISECONDS)
// new buckets creation date will be rounded down to config.MD_AGGREGATOR_INTERVAL (30 seconds)
last_update: (Math.floor(now / config.MD_AGGREGATOR_INTERVAL) * config.MD_AGGREGATOR_INTERVAL) -
(2 * config.MD_GRACE_IN_MILLISECONDS),
},
lambda_triggers: [],
versioning: 'DISABLED'
Expand Down Expand Up @@ -1102,9 +1104,12 @@ function get_bucket_info({
};
const metrics = _calc_metrics({ bucket, nodes_aggregate_pool, hosts_aggregate_pool, tiering_pools_status, info });

const bucket_last_update = _.get(bucket, 'storage_stats.last_update') || config.NOOBAA_EPOCH;
const system_last_update = _.get(bucket, 'system.global.last_update') || config.NOOBAA_EPOCH;
const last_update = Math.max(system_last_update, bucket_last_update);
info.usage_by_pool = {
pools: {},
last_update: _.get(bucket, 'storage_stats.last_update')
last_update,
};

info.usage_by_pool.pools = [];
Expand Down Expand Up @@ -1255,14 +1260,18 @@ function _calc_metrics({
const bucket_total = bucket_free.plus(bucket_used)
.plus(bucket_used_other);

const bucket_last_update = _.get(bucket, 'storage_stats.last_update') || config.NOOBAA_EPOCH;
const system_last_update = _.get(bucket, 'system.global_last_update') || config.NOOBAA_EPOCH;
const last_update = Math.max(system_last_update, bucket_last_update);

info.storage = {
values: size_utils.to_bigint_storage({
used: bucket_used,
used_other: bucket_used_other,
total: bucket_total,
free: bucket_free,
}),
last_update: _.get(bucket, 'storage_stats.last_update')
last_update
};

const actual_free = size_utils.json_to_bigint(_.get(info, 'tiering.data.free') || 0);
Expand Down Expand Up @@ -1291,7 +1300,7 @@ function _calc_metrics({
size_reduced: bucket_chunks_capacity,
free: actual_free,
available_for_upload,
last_update: _.get(bucket, 'storage_stats.last_update')
last_update,
});

return {
Expand Down
8 changes: 6 additions & 2 deletions src/server/system_services/schemas/system_schema.js
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,11 @@ module.exports = {
}
}
}
},
}
}
},

global_last_update: {
idate: true
},
}
};
Loading

0 comments on commit 4a4cac8

Please sign in to comment.