diff --git a/src/server/db_slice.cc b/src/server/db_slice.cc index e76435a91ff0..b2c36d93fe58 100644 --- a/src/server/db_slice.cc +++ b/src/server/db_slice.cc @@ -622,6 +622,7 @@ OpResult DbSlice::AddOrFindInternal(const Context& cnt } table_memory_ += (db.table_memory() - table_before); + entries_count_++; db.stats.inline_keys += it->first.IsInline(); AccountObjectMemory(key, it->first.ObjType(), it->first.MallocUsed(), &db); // Account for key @@ -756,6 +757,8 @@ void DbSlice::FlushDbIndexes(const std::vector& indexes) { for (DbIndex index : indexes) { table_memory_ -= db_arr_[index]->table_memory(); + entries_count_ -= db_arr_[index]->prime.size(); + InvalidateDbWatches(index); flush_db_arr[index] = std::move(db_arr_[index]); @@ -764,6 +767,7 @@ void DbSlice::FlushDbIndexes(const std::vector& indexes) { } CHECK(fetched_items_.empty()); + auto cb = [indexes, flush_db_arr = std::move(flush_db_arr)]() mutable { flush_db_arr.clear(); ServerState::tlocal()->DecommitMemory(ServerState::kDataHeap | ServerState::kBackingHeap | @@ -1464,6 +1468,7 @@ void DbSlice::PerformDeletion(Iterator del_it, ExpIterator exp_it, DbTable* tabl table->prime.Erase(del_it.GetInnerIt()); table_memory_ += (table->table_memory() - table_before); + --entries_count_; SendInvalidationTrackingMessage(del_it.key()); } diff --git a/src/server/db_slice.h b/src/server/db_slice.h index 9a0e11444b13..03f90ad2f450 100644 --- a/src/server/db_slice.h +++ b/src/server/db_slice.h @@ -391,7 +391,6 @@ class DbSlice { // Returns existing keys count in the db. size_t DbSize(DbIndex db_ind) const; - // Callback functions called upon writing to the existing key. DbTableStats* MutableStats(DbIndex db_ind) { return &db_arr_[db_ind]->stats; } @@ -417,6 +416,10 @@ class DbSlice { return table_memory_; } + size_t entries_count() const { + return entries_count_; + } + using ChangeCallback = std::function; //! Registers the callback to be called for each change. @@ -567,6 +570,7 @@ class DbSlice { size_t bytes_per_object_ = 0; size_t soft_budget_limit_ = 0; size_t table_memory_ = 0; + uint64_t entries_count_ = 0; mutable SliceEvents events_; // we may change this even for const operations. diff --git a/src/server/dfly_bench.cc b/src/server/dfly_bench.cc index 13a110119645..6f934fe38391 100644 --- a/src/server/dfly_bench.cc +++ b/src/server/dfly_bench.cc @@ -162,6 +162,15 @@ struct ClientStats { uint64_t hit_count = 0; uint64_t hit_opportunities = 0; uint64_t num_errors = 0; + + ClientStats& operator+=(const ClientStats& o) { + hist.Merge(o.hist); + num_responses += o.num_responses; + hit_count += o.hit_count; + hit_opportunities += o.hit_opportunities; + num_errors += o.num_errors; + return *this; + } }; // Per connection driver. @@ -474,29 +483,30 @@ void WatchFiber(absl::Time start_time, atomic_bool* finish_signal, ProactorPool* ThisFiber::SleepFor(1s); absl::Time now = absl::Now(); if (now - last_print > absl::Seconds(5)) { - uint64_t num_resp = 0; - uint64_t num_errors = 0; - + ClientStats client_stats; pp->AwaitFiberOnAll([&](auto* p) { unique_lock lk(mutex); - - num_resp += client->stats.num_responses; - num_errors += client->stats.num_errors; + client_stats += client->stats; lk.unlock(); }); uint64_t total_ms = (now - start_time) / absl::Milliseconds(1); uint64_t period_ms = (now - last_print) / absl::Milliseconds(1); - uint64_t period_resp_cnt = num_resp - num_last_resp_cnt; - double done_perc = double(num_resp) * 100 / resp_goal; - + uint64_t period_resp_cnt = client_stats.num_responses - num_last_resp_cnt; + double done_perc = double(client_stats.num_responses) * 100 / resp_goal; + double hitrate = + client_stats.hit_opportunities > 0 + ? 100 * double(client_stats.hit_count) / double(client_stats.hit_opportunities) + : 0; CONSOLE_INFO << total_ms / 1000 << "s: " << absl::StrFormat("%.1f", done_perc) << "% done, effective RPS(now/accumulated): " - << period_resp_cnt * 1000 / period_ms << "/" << num_resp * 1000 / total_ms - << ", errors: " << num_errors; + << period_resp_cnt * 1000 / period_ms << "/" + << client_stats.num_responses * 1000 / total_ms + << ", errors: " << client_stats.num_errors + << ", hitrate: " << absl::StrFormat("%.1f", hitrate) << "%"; last_print = now; - num_last_resp_cnt = num_resp; + num_last_resp_cnt = client_stats.num_responses; } } } @@ -599,31 +609,27 @@ int main(int argc, char* argv[]) { base::Histogram hist; LOG(INFO) << "Resetting all threads"; - uint64_t hit_opportunities = 0, hit_count = 0, num_errors = 0, num_responses = 0; + ClientStats summary; pp->AwaitFiberOnAll([&](auto* p) { unique_lock lk(mutex); - hist.Merge(client->stats.hist); - - hit_opportunities += client->stats.hit_opportunities; - hit_count += client->stats.hit_count; - num_errors += client->stats.num_errors; - num_responses += client->stats.num_responses; + summary += client->stats; lk.unlock(); client.reset(); }); - CONSOLE_INFO << "\nTotal time: " << duration << ". Overall number of requests: " << num_responses - << ", QPS: " << num_responses / (duration / absl::Seconds(1)); + CONSOLE_INFO << "\nTotal time: " << duration + << ". Overall number of requests: " << summary.num_responses + << ", QPS: " << summary.num_responses / (duration / absl::Seconds(1)); - if (num_errors) { - CONSOLE_INFO << "Got " << num_errors << " error responses!"; + if (summary.num_errors) { + CONSOLE_INFO << "Got " << summary.num_errors << " error responses!"; } CONSOLE_INFO << "Latency summary, all times are in usec:\n" << hist.ToString(); - if (hit_opportunities) { + if (summary.hit_opportunities) { CONSOLE_INFO << "----------------------------------\nHit rate: " - << 100 * double(hit_count) / double(hit_opportunities) << "%\n"; + << 100 * double(summary.hit_count) / double(summary.hit_opportunities) << "%\n"; } pp->Stop(); diff --git a/src/server/engine_shard_set.cc b/src/server/engine_shard_set.cc index 32e67e550059..2ece3eae3926 100644 --- a/src/server/engine_shard_set.cc +++ b/src/server/engine_shard_set.cc @@ -620,10 +620,15 @@ void EngineShard::Heartbeat() { } ssize_t eviction_redline = size_t(max_memory_limit * kRedLimitFactor) / shard_set->size(); + + // Offset CoolMemoryUsage when consider background offloading. + // TODO: Another approach could be is to align the approach similarly to how we do with + // FreeMemWithEvictionStep, i.e. if memory_budget is below the limit. size_t tiering_offload_threshold = - tiered_storage_ - ? size_t(max_memory_limit * GetFlag(FLAGS_tiered_offload_threshold)) / shard_set->size() - : std::numeric_limits::max(); + tiered_storage_ ? tiered_storage_->CoolMemoryUsage() + + size_t(max_memory_limit * GetFlag(FLAGS_tiered_offload_threshold)) / + shard_set->size() + : std::numeric_limits::max(); DbContext db_cntx; db_cntx.time_now_ms = GetCurrentTimeMs(); @@ -738,6 +743,7 @@ void EngineShard::CacheStats() { } } DCHECK_EQ(table_memory, db_slice.table_memory()); + DCHECK_EQ(entries, db_slice.entries_count()); if (tiered_storage_) { table_memory += tiered_storage_->CoolMemoryUsage(); } @@ -764,7 +770,10 @@ bool EngineShard::ShouldThrottleForTiering() const { // see header for formula size_t tiering_redline = (max_memory_limit * GetFlag(FLAGS_tiered_offload_threshold)) / shard_set->size(); - return UsedMemory() > tiering_redline && tiered_storage_->WriteDepthUsage() > 0.3; + + // UsedMemory includes CoolMemoryUsage, so we are offsetting it to remove the cool cache impact. + return tiered_storage_->WriteDepthUsage() > 0.3 && + (UsedMemory() > tiering_redline + tiered_storage_->CoolMemoryUsage()); } auto EngineShard::AnalyzeTxQueue() const -> TxQueueInfo { diff --git a/src/server/tiered_storage.cc b/src/server/tiered_storage.cc index d920f126b1cb..effad5ea16c0 100644 --- a/src/server/tiered_storage.cc +++ b/src/server/tiered_storage.cc @@ -31,7 +31,7 @@ ABSL_FLAG(uint32_t, tiered_storage_memory_margin, 10_MB, "In bytes. If memory budget on a shard goes below this limit, tiering stops " "hot-loading values into ram."); -ABSL_FLAG(bool, tiered_experimental_cooling, false, +ABSL_FLAG(bool, tiered_experimental_cooling, true, "If true, uses intermidate cooling layer " "when offloading values to storage"); @@ -267,6 +267,8 @@ bool TieredStorage::ShardOpManager::NotifyFetched(EntryId id, string_view value, } bool TieredStorage::ShardOpManager::NotifyDelete(tiering::DiskSegment segment) { + DVLOG(2) << "NotifyDelete [" << segment.offset << "," << segment.length << "]"; + if (OccupiesWholePages(segment.length)) return true; @@ -398,22 +400,33 @@ util::fb2::Future TieredStorage::Modify(DbIndex dbid, std::string_view key, const PrimeValue& value, std::function modf) { DCHECK(value.IsExternal()); - DCHECK(!value.IsCool()); // TBD util::fb2::Future future; - PrimeValue decoder; - decoder.ImportExternal(value); - - auto cb = [future, modf = std::move(modf), decoder = std::move(decoder)]( - bool is_raw, std::string* raw_val) mutable { - if (is_raw) { - decoder.Materialize(*raw_val, true); - decoder.GetString(raw_val); - } - future.Resolve(modf(raw_val)); - return true; - }; - op_manager_->Enqueue(KeyRef(dbid, key), value.GetExternalSlice(), std::move(cb)); + if (value.IsCool()) { + PrimeValue hot = Warmup(dbid, value.GetCool()); + string tmp; + + DCHECK_EQ(value.Size(), hot.Size()); + hot.GetString(&tmp); + future.Resolve(modf(&tmp)); + + // TODO: An awful hack - to fix later. + const_cast(value).Materialize(tmp, false); + } else { + PrimeValue decoder; + decoder.ImportExternal(value); + + auto cb = [future, modf = std::move(modf), decoder = std::move(decoder)]( + bool is_raw, std::string* raw_val) mutable { + if (is_raw) { + decoder.Materialize(*raw_val, true); + decoder.GetString(raw_val); + } + future.Resolve(modf(raw_val)); + return true; + }; + op_manager_->Enqueue(KeyRef(dbid, key), value.GetExternalSlice(), std::move(cb)); + } return future; } diff --git a/src/server/tiered_storage_test.cc b/src/server/tiered_storage_test.cc index 3ea8118ad58b..10c377da4263 100644 --- a/src/server/tiered_storage_test.cc +++ b/src/server/tiered_storage_test.cc @@ -26,6 +26,7 @@ ABSL_DECLARE_FLAG(bool, force_epoll); ABSL_DECLARE_FLAG(string, tiered_prefix); ABSL_DECLARE_FLAG(float, tiered_offload_threshold); ABSL_DECLARE_FLAG(unsigned, tiered_storage_write_depth); +ABSL_DECLARE_FLAG(bool, tiered_experimental_cooling); namespace dfly { @@ -189,7 +190,9 @@ TEST_F(TieredStorageTest, Defrag) { // This tirggers defragmentation, as only 3 < 7/2 remain left Run({"GET", string(1, 'd')}); - Run({"GET", string(1, 'd')}); + + // Wait that any reads caused by defrags has been finished. + ExpectConditionWithinTimeout([this] { return GetMetrics().tiered_stats.pending_read_cnt == 0; }); metrics = GetMetrics(); EXPECT_EQ(metrics.tiered_stats.total_defrags, 3u); EXPECT_EQ(metrics.tiered_stats.small_bins_cnt, 0u); @@ -200,6 +203,9 @@ TEST_F(TieredStorageTest, BackgroundOffloading) { absl::FlagSaver saver; SetFlag(&FLAGS_tiered_offload_threshold, 0.0f); // offload all values + // The setup works without cooling buffers. + SetFlag(&FLAGS_tiered_experimental_cooling, false); + const int kNum = 500; max_memory_limit = kNum * 4096; @@ -246,6 +252,13 @@ TEST_F(TieredStorageTest, FlushAll) { absl::FlagSaver saver; SetFlag(&FLAGS_tiered_offload_threshold, 0.0f); // offload all values + // We want to cover the interaction of FlushAll with concurrent reads from disk. + // For that we disable tiered_experimental_cooling. + // TODO: seems that our replacement policy will upload the entries to RAM in any case, + // making this test ineffective. We should add the ability to disable promotion of offloaded + // entries to RAM upon reads. + SetFlag(&FLAGS_tiered_experimental_cooling, false); + const int kNum = 500; for (size_t i = 0; i < kNum; i++) { Run({"SET", absl::StrCat("k", i), BuildString(3000)});