Skip to content

Commit

Permalink
chore: tiering - make Modify work with cool storage (#3395)
Browse files Browse the repository at this point in the history
1. Fully support tiered_experimental_cooling for all operations
2. Offset cool storage usage when computing memory pressure situations in Hearbeat.
3. Introduce realtime entry counting per db_slice and provide DCHECK to verify it vs the old approach.
   Later we will switch to realtime entry and free memory computations when computing bytes per object,
   and remove the old approach in CacheStats().
4. Show hit rate during the run of dfly_bench loadtest.

Signed-off-by: Roman Gershman <[email protected]>
  • Loading branch information
romange authored Jul 27, 2024
1 parent 9d16bd6 commit 6b67f44
Show file tree
Hide file tree
Showing 6 changed files with 96 additions and 46 deletions.
5 changes: 5 additions & 0 deletions src/server/db_slice.cc
Original file line number Diff line number Diff line change
Expand Up @@ -622,6 +622,7 @@ OpResult<DbSlice::AddOrFindResult> DbSlice::AddOrFindInternal(const Context& cnt
}

table_memory_ += (db.table_memory() - table_before);
entries_count_++;

db.stats.inline_keys += it->first.IsInline();
AccountObjectMemory(key, it->first.ObjType(), it->first.MallocUsed(), &db); // Account for key
Expand Down Expand Up @@ -756,6 +757,8 @@ void DbSlice::FlushDbIndexes(const std::vector<DbIndex>& indexes) {

for (DbIndex index : indexes) {
table_memory_ -= db_arr_[index]->table_memory();
entries_count_ -= db_arr_[index]->prime.size();

InvalidateDbWatches(index);
flush_db_arr[index] = std::move(db_arr_[index]);

Expand All @@ -764,6 +767,7 @@ void DbSlice::FlushDbIndexes(const std::vector<DbIndex>& indexes) {
}

CHECK(fetched_items_.empty());

auto cb = [indexes, flush_db_arr = std::move(flush_db_arr)]() mutable {
flush_db_arr.clear();
ServerState::tlocal()->DecommitMemory(ServerState::kDataHeap | ServerState::kBackingHeap |
Expand Down Expand Up @@ -1464,6 +1468,7 @@ void DbSlice::PerformDeletion(Iterator del_it, ExpIterator exp_it, DbTable* tabl

table->prime.Erase(del_it.GetInnerIt());
table_memory_ += (table->table_memory() - table_before);
--entries_count_;

SendInvalidationTrackingMessage(del_it.key());
}
Expand Down
6 changes: 5 additions & 1 deletion src/server/db_slice.h
Original file line number Diff line number Diff line change
Expand Up @@ -391,7 +391,6 @@ class DbSlice {
// Returns existing keys count in the db.
size_t DbSize(DbIndex db_ind) const;

// Callback functions called upon writing to the existing key.
DbTableStats* MutableStats(DbIndex db_ind) {
return &db_arr_[db_ind]->stats;
}
Expand All @@ -417,6 +416,10 @@ class DbSlice {
return table_memory_;
}

size_t entries_count() const {
return entries_count_;
}

using ChangeCallback = std::function<void(DbIndex, const ChangeReq&)>;

//! Registers the callback to be called for each change.
Expand Down Expand Up @@ -567,6 +570,7 @@ class DbSlice {
size_t bytes_per_object_ = 0;
size_t soft_budget_limit_ = 0;
size_t table_memory_ = 0;
uint64_t entries_count_ = 0;

mutable SliceEvents events_; // we may change this even for const operations.

Expand Down
56 changes: 31 additions & 25 deletions src/server/dfly_bench.cc
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,15 @@ struct ClientStats {
uint64_t hit_count = 0;
uint64_t hit_opportunities = 0;
uint64_t num_errors = 0;

ClientStats& operator+=(const ClientStats& o) {
hist.Merge(o.hist);
num_responses += o.num_responses;
hit_count += o.hit_count;
hit_opportunities += o.hit_opportunities;
num_errors += o.num_errors;
return *this;
}
};

// Per connection driver.
Expand Down Expand Up @@ -474,29 +483,30 @@ void WatchFiber(absl::Time start_time, atomic_bool* finish_signal, ProactorPool*
ThisFiber::SleepFor(1s);
absl::Time now = absl::Now();
if (now - last_print > absl::Seconds(5)) {
uint64_t num_resp = 0;
uint64_t num_errors = 0;

ClientStats client_stats;
pp->AwaitFiberOnAll([&](auto* p) {
unique_lock lk(mutex);

num_resp += client->stats.num_responses;
num_errors += client->stats.num_errors;
client_stats += client->stats;
lk.unlock();
});

uint64_t total_ms = (now - start_time) / absl::Milliseconds(1);
uint64_t period_ms = (now - last_print) / absl::Milliseconds(1);
uint64_t period_resp_cnt = num_resp - num_last_resp_cnt;
double done_perc = double(num_resp) * 100 / resp_goal;

uint64_t period_resp_cnt = client_stats.num_responses - num_last_resp_cnt;
double done_perc = double(client_stats.num_responses) * 100 / resp_goal;
double hitrate =
client_stats.hit_opportunities > 0
? 100 * double(client_stats.hit_count) / double(client_stats.hit_opportunities)
: 0;
CONSOLE_INFO << total_ms / 1000 << "s: " << absl::StrFormat("%.1f", done_perc)
<< "% done, effective RPS(now/accumulated): "
<< period_resp_cnt * 1000 / period_ms << "/" << num_resp * 1000 / total_ms
<< ", errors: " << num_errors;
<< period_resp_cnt * 1000 / period_ms << "/"
<< client_stats.num_responses * 1000 / total_ms
<< ", errors: " << client_stats.num_errors
<< ", hitrate: " << absl::StrFormat("%.1f", hitrate) << "%";

last_print = now;
num_last_resp_cnt = num_resp;
num_last_resp_cnt = client_stats.num_responses;
}
}
}
Expand Down Expand Up @@ -599,31 +609,27 @@ int main(int argc, char* argv[]) {
base::Histogram hist;

LOG(INFO) << "Resetting all threads";
uint64_t hit_opportunities = 0, hit_count = 0, num_errors = 0, num_responses = 0;

ClientStats summary;
pp->AwaitFiberOnAll([&](auto* p) {
unique_lock lk(mutex);
hist.Merge(client->stats.hist);

hit_opportunities += client->stats.hit_opportunities;
hit_count += client->stats.hit_count;
num_errors += client->stats.num_errors;
num_responses += client->stats.num_responses;
summary += client->stats;
lk.unlock();
client.reset();
});

CONSOLE_INFO << "\nTotal time: " << duration << ". Overall number of requests: " << num_responses
<< ", QPS: " << num_responses / (duration / absl::Seconds(1));
CONSOLE_INFO << "\nTotal time: " << duration
<< ". Overall number of requests: " << summary.num_responses
<< ", QPS: " << summary.num_responses / (duration / absl::Seconds(1));

if (num_errors) {
CONSOLE_INFO << "Got " << num_errors << " error responses!";
if (summary.num_errors) {
CONSOLE_INFO << "Got " << summary.num_errors << " error responses!";
}

CONSOLE_INFO << "Latency summary, all times are in usec:\n" << hist.ToString();
if (hit_opportunities) {
if (summary.hit_opportunities) {
CONSOLE_INFO << "----------------------------------\nHit rate: "
<< 100 * double(hit_count) / double(hit_opportunities) << "%\n";
<< 100 * double(summary.hit_count) / double(summary.hit_opportunities) << "%\n";
}
pp->Stop();

Expand Down
17 changes: 13 additions & 4 deletions src/server/engine_shard_set.cc
Original file line number Diff line number Diff line change
Expand Up @@ -620,10 +620,15 @@ void EngineShard::Heartbeat() {
}

ssize_t eviction_redline = size_t(max_memory_limit * kRedLimitFactor) / shard_set->size();

// Offset CoolMemoryUsage when consider background offloading.
// TODO: Another approach could be is to align the approach similarly to how we do with
// FreeMemWithEvictionStep, i.e. if memory_budget is below the limit.
size_t tiering_offload_threshold =
tiered_storage_
? size_t(max_memory_limit * GetFlag(FLAGS_tiered_offload_threshold)) / shard_set->size()
: std::numeric_limits<size_t>::max();
tiered_storage_ ? tiered_storage_->CoolMemoryUsage() +
size_t(max_memory_limit * GetFlag(FLAGS_tiered_offload_threshold)) /
shard_set->size()
: std::numeric_limits<size_t>::max();

DbContext db_cntx;
db_cntx.time_now_ms = GetCurrentTimeMs();
Expand Down Expand Up @@ -738,6 +743,7 @@ void EngineShard::CacheStats() {
}
}
DCHECK_EQ(table_memory, db_slice.table_memory());
DCHECK_EQ(entries, db_slice.entries_count());
if (tiered_storage_) {
table_memory += tiered_storage_->CoolMemoryUsage();
}
Expand All @@ -764,7 +770,10 @@ bool EngineShard::ShouldThrottleForTiering() const { // see header for formula

size_t tiering_redline =
(max_memory_limit * GetFlag(FLAGS_tiered_offload_threshold)) / shard_set->size();
return UsedMemory() > tiering_redline && tiered_storage_->WriteDepthUsage() > 0.3;

// UsedMemory includes CoolMemoryUsage, so we are offsetting it to remove the cool cache impact.
return tiered_storage_->WriteDepthUsage() > 0.3 &&
(UsedMemory() > tiering_redline + tiered_storage_->CoolMemoryUsage());
}

auto EngineShard::AnalyzeTxQueue() const -> TxQueueInfo {
Expand Down
43 changes: 28 additions & 15 deletions src/server/tiered_storage.cc
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ ABSL_FLAG(uint32_t, tiered_storage_memory_margin, 10_MB,
"In bytes. If memory budget on a shard goes below this limit, tiering stops "
"hot-loading values into ram.");

ABSL_FLAG(bool, tiered_experimental_cooling, false,
ABSL_FLAG(bool, tiered_experimental_cooling, true,
"If true, uses intermidate cooling layer "
"when offloading values to storage");

Expand Down Expand Up @@ -267,6 +267,8 @@ bool TieredStorage::ShardOpManager::NotifyFetched(EntryId id, string_view value,
}

bool TieredStorage::ShardOpManager::NotifyDelete(tiering::DiskSegment segment) {
DVLOG(2) << "NotifyDelete [" << segment.offset << "," << segment.length << "]";

if (OccupiesWholePages(segment.length))
return true;

Expand Down Expand Up @@ -398,22 +400,33 @@ util::fb2::Future<T> TieredStorage::Modify(DbIndex dbid, std::string_view key,
const PrimeValue& value,
std::function<T(std::string*)> modf) {
DCHECK(value.IsExternal());
DCHECK(!value.IsCool()); // TBD

util::fb2::Future<T> future;
PrimeValue decoder;
decoder.ImportExternal(value);

auto cb = [future, modf = std::move(modf), decoder = std::move(decoder)](
bool is_raw, std::string* raw_val) mutable {
if (is_raw) {
decoder.Materialize(*raw_val, true);
decoder.GetString(raw_val);
}
future.Resolve(modf(raw_val));
return true;
};
op_manager_->Enqueue(KeyRef(dbid, key), value.GetExternalSlice(), std::move(cb));
if (value.IsCool()) {
PrimeValue hot = Warmup(dbid, value.GetCool());
string tmp;

DCHECK_EQ(value.Size(), hot.Size());
hot.GetString(&tmp);
future.Resolve(modf(&tmp));

// TODO: An awful hack - to fix later.
const_cast<PrimeValue&>(value).Materialize(tmp, false);
} else {
PrimeValue decoder;
decoder.ImportExternal(value);

auto cb = [future, modf = std::move(modf), decoder = std::move(decoder)](
bool is_raw, std::string* raw_val) mutable {
if (is_raw) {
decoder.Materialize(*raw_val, true);
decoder.GetString(raw_val);
}
future.Resolve(modf(raw_val));
return true;
};
op_manager_->Enqueue(KeyRef(dbid, key), value.GetExternalSlice(), std::move(cb));
}
return future;
}

Expand Down
15 changes: 14 additions & 1 deletion src/server/tiered_storage_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ ABSL_DECLARE_FLAG(bool, force_epoll);
ABSL_DECLARE_FLAG(string, tiered_prefix);
ABSL_DECLARE_FLAG(float, tiered_offload_threshold);
ABSL_DECLARE_FLAG(unsigned, tiered_storage_write_depth);
ABSL_DECLARE_FLAG(bool, tiered_experimental_cooling);

namespace dfly {

Expand Down Expand Up @@ -189,7 +190,9 @@ TEST_F(TieredStorageTest, Defrag) {

// This tirggers defragmentation, as only 3 < 7/2 remain left
Run({"GET", string(1, 'd')});
Run({"GET", string(1, 'd')});

// Wait that any reads caused by defrags has been finished.
ExpectConditionWithinTimeout([this] { return GetMetrics().tiered_stats.pending_read_cnt == 0; });
metrics = GetMetrics();
EXPECT_EQ(metrics.tiered_stats.total_defrags, 3u);
EXPECT_EQ(metrics.tiered_stats.small_bins_cnt, 0u);
Expand All @@ -200,6 +203,9 @@ TEST_F(TieredStorageTest, BackgroundOffloading) {
absl::FlagSaver saver;
SetFlag(&FLAGS_tiered_offload_threshold, 0.0f); // offload all values

// The setup works without cooling buffers.
SetFlag(&FLAGS_tiered_experimental_cooling, false);

const int kNum = 500;

max_memory_limit = kNum * 4096;
Expand Down Expand Up @@ -246,6 +252,13 @@ TEST_F(TieredStorageTest, FlushAll) {
absl::FlagSaver saver;
SetFlag(&FLAGS_tiered_offload_threshold, 0.0f); // offload all values

// We want to cover the interaction of FlushAll with concurrent reads from disk.
// For that we disable tiered_experimental_cooling.
// TODO: seems that our replacement policy will upload the entries to RAM in any case,
// making this test ineffective. We should add the ability to disable promotion of offloaded
// entries to RAM upon reads.
SetFlag(&FLAGS_tiered_experimental_cooling, false);

const int kNum = 500;
for (size_t i = 0; i < kNum; i++) {
Run({"SET", absl::StrCat("k", i), BuildString(3000)});
Expand Down

0 comments on commit 6b67f44

Please sign in to comment.