From 14e0f6c9f5e8dfad53be32e9696a3c0ca627b5a6 Mon Sep 17 00:00:00 2001 From: kostas Date: Wed, 6 Nov 2024 14:09:18 +0200 Subject: [PATCH 01/32] chore: remove DbSlice mutex and add ConditionFlag in SliceSnapshot Signed-off-by: kostas --- helio | 2 +- src/server/common.h | 51 ++++++++++++++++++++++++++++++++++ src/server/db_slice.cc | 22 ++++----------- src/server/db_slice.h | 47 +++++++++++++------------------ src/server/debugcmd.cc | 2 +- src/server/engine_shard.cc | 21 ++++++-------- src/server/generic_family.cc | 8 ++---- src/server/journal/streamer.cc | 6 ++-- src/server/journal/streamer.h | 2 ++ src/server/snapshot.cc | 7 +++-- src/server/snapshot.h | 2 ++ tests/dragonfly/instance.py | 3 ++ 12 files changed, 104 insertions(+), 69 deletions(-) diff --git a/helio b/helio index 438c86139eac..6e77555e7deb 160000 --- a/helio +++ b/helio @@ -1 +1 @@ -Subproject commit 438c86139eac2e6400f2aae1d46cff03b31c166f +Subproject commit 6e77555e7deb619fbd3398d085f8c604aa71869b diff --git a/src/server/common.h b/src/server/common.h index 13e6db336628..7099897a2b6d 100644 --- a/src/server/common.h +++ b/src/server/common.h @@ -383,4 +383,55 @@ struct BorrowedInterpreter { extern size_t serialization_max_chunk_size; +struct ConditionFlag { + util::fb2::CondVarAny cond_var; + bool flag = false; +}; + +// Helper class used to guarantee atomicity between serialization of buckets +class ConditionGuard { + public: + explicit ConditionGuard(ConditionFlag* enclosing) : enclosing_(enclosing) { + util::fb2::NoOpLock noop_lk_; + enclosing_->cond_var.wait(noop_lk_, [this]() { return !enclosing_->flag; }); + enclosing_->flag = true; + } + + ~ConditionGuard() { + enclosing_->flag = false; + enclosing_->cond_var.notify_one(); + } + + private: + ConditionFlag* enclosing_; +}; + +class LocalBlockingCounter { + public: + void lock() { + ++mutating_; + } + + void unlock() { + DCHECK(mutating_ > 0); + --mutating_; + if (mutating_ == 0) { + cond_var_.notify_one(); + } + } + + void Wait() { + util::fb2::NoOpLock noop_lk_; + cond_var_.wait(noop_lk_, [this]() { return mutating_ == 0; }); + } + + bool HasMutating() const { + return mutating_ > 0; + } + + private: + util::fb2::CondVarAny cond_var_; + size_t mutating_ = 0; +}; + } // namespace dfly diff --git a/src/server/db_slice.cc b/src/server/db_slice.cc index 71cf28bb41dc..acc3ccfc4d73 100644 --- a/src/server/db_slice.cc +++ b/src/server/db_slice.cc @@ -481,7 +481,6 @@ OpResult DbSlice::FindInternal(const Context& cntx, std: if (caching_mode_ && IsValid(res.it)) { if (!change_cb_.empty()) { FetchedItemsRestorer fetched_restorer(&fetched_items_); - util::fb2::LockGuard lk(local_mu_); auto bump_cb = [&](PrimeTable::bucket_iterator bit) { CallChangeCallbacks(cntx.db_index, key, bit); }; @@ -574,7 +573,6 @@ OpResult DbSlice::AddOrFindInternal(const Context& cnt CHECK(status == OpStatus::KEY_NOTFOUND || status == OpStatus::OUT_OF_MEMORY) << status; FetchedItemsRestorer fetched_restorer(&fetched_items_); - util::fb2::LockGuard lk(local_mu_); // It's a new entry. CallChangeCallbacks(cntx.db_index, key, {key}); @@ -690,8 +688,6 @@ void DbSlice::ActivateDb(DbIndex db_ind) { } bool DbSlice::Del(Context cntx, Iterator it) { - util::fb2::LockGuard lk(local_mu_); - if (!IsValid(it)) { return false; } @@ -758,7 +754,7 @@ void DbSlice::FlushSlotsFb(const cluster::SlotSet& slot_ids) { PrimeTable::Cursor cursor; uint64_t i = 0; do { - PrimeTable::Cursor next = Traverse(pt, cursor, del_entry_cb); + PrimeTable::Cursor next = pt->Traverse(cursor, del_entry_cb); ++i; cursor = next; if (i % 100 == 0) { @@ -815,10 +811,7 @@ void DbSlice::FlushDbIndexes(const std::vector& indexes) { } void DbSlice::FlushDb(DbIndex db_ind) { - // We should not flush if serialization of a big value is in progress because this - // could lead to UB or assertion failures (while DashTable::Traverse is iterating over - // a logical bucket). - util::fb2::LockGuard lk(local_mu_); + std::unique_lock lk(block_counter_); // clear client tracking map. client_tracking_map_.clear(); @@ -840,7 +833,6 @@ void DbSlice::FlushDb(DbIndex db_ind) { } void DbSlice::AddExpire(DbIndex db_ind, Iterator main_it, uint64_t at) { - util::fb2::LockGuard lk(local_mu_); uint64_t delta = at - expire_base_[0]; // TODO: employ multigen expire updates. auto& db = *db_arr_[db_ind]; size_t table_before = db.expire.mem_usage(); @@ -850,7 +842,6 @@ void DbSlice::AddExpire(DbIndex db_ind, Iterator main_it, uint64_t at) { } bool DbSlice::RemoveExpire(DbIndex db_ind, Iterator main_it) { - util::fb2::LockGuard lk(local_mu_); if (main_it->second.HasExpire()) { auto& db = *db_arr_[db_ind]; size_t table_before = db.expire.mem_usage(); @@ -1078,7 +1069,6 @@ bool DbSlice::CheckLock(IntentLock::Mode mode, DbIndex dbid, uint64_t fp) const void DbSlice::PreUpdate(DbIndex db_ind, Iterator it, std::string_view key) { FetchedItemsRestorer fetched_restorer(&fetched_items_); - util::fb2::LockGuard lk(local_mu_); CallChangeCallbacks(db_ind, key, ChangeReq{it.GetInnerIt()}); it.GetInnerIt().SetVersion(NextVersion()); } @@ -1180,7 +1170,7 @@ void DbSlice::ExpireAllIfNeeded() { ExpireTable::Cursor cursor; do { - cursor = Traverse(&db.expire, cursor, cb); + cursor = db.expire.Traverse(cursor, cb); } while (cursor); } } @@ -1191,6 +1181,7 @@ uint64_t DbSlice::RegisterOnChange(ChangeCallback cb) { void DbSlice::FlushChangeToEarlierCallbacks(DbIndex db_ind, Iterator it, uint64_t upper_bound) { FetchedItemsRestorer fetched_restorer(&fetched_items_); + std::unique_lock lk(block_counter_); uint64_t bucket_version = it.GetVersion(); // change_cb_ is ordered by version. @@ -1214,7 +1205,7 @@ void DbSlice::FlushChangeToEarlierCallbacks(DbIndex db_ind, Iterator it, uint64_ //! Unregisters the callback. void DbSlice::UnregisterOnChange(uint64_t id) { - util::fb2::LockGuard lk(local_mu_); + block_counter_.Wait(); auto it = find_if(change_cb_.begin(), change_cb_.end(), [id](const auto& cb) { return cb.first == id; }); CHECK(it != change_cb_.end()); @@ -1375,13 +1366,11 @@ void DbSlice::CreateDb(DbIndex db_ind) { void DbSlice::RegisterWatchedKey(DbIndex db_indx, std::string_view key, ConnectionState::ExecInfo* exec_info) { // Because we might insert while another fiber is preempted - util::fb2::LockGuard lk(local_mu_); db_arr_[db_indx]->watched_keys[key].push_back(exec_info); } void DbSlice::UnregisterConnectionWatches(const ConnectionState::ExecInfo* exec_info) { // Because we might remove while another fiber is preempted and miss a notification - util::fb2::LockGuard lk(local_mu_); for (const auto& [db_indx, key] : exec_info->watched_keys) { auto& watched_keys = db_arr_[db_indx]->watched_keys; if (auto it = watched_keys.find(key); it != watched_keys.end()) { @@ -1557,6 +1546,7 @@ void DbSlice::OnCbFinish() { } void DbSlice::CallChangeCallbacks(DbIndex id, std::string_view key, const ChangeReq& cr) const { + std::unique_lock lk(block_counter_); if (change_cb_.empty()) return; diff --git a/src/server/db_slice.h b/src/server/db_slice.h index 16799530a95a..e522c75e1eed 100644 --- a/src/server/db_slice.h +++ b/src/server/db_slice.h @@ -305,34 +305,33 @@ class DbSlice { AddOrFindResult& operator=(ItAndUpdater&& o); }; - OpResult AddOrFind(const Context& cntx, std::string_view key) - ABSL_LOCKS_EXCLUDED(local_mu_); + OpResult AddOrFind(const Context& cntx, std::string_view key); // Same as AddOrSkip, but overwrites in case entry exists. OpResult AddOrUpdate(const Context& cntx, std::string_view key, PrimeValue obj, - uint64_t expire_at_ms) ABSL_LOCKS_EXCLUDED(local_mu_); + uint64_t expire_at_ms); // Adds a new entry. Requires: key does not exist in this slice. // Returns the iterator to the newly added entry. // Returns OpStatus::OUT_OF_MEMORY if bad_alloc is thrown OpResult AddNew(const Context& cntx, std::string_view key, PrimeValue obj, - uint64_t expire_at_ms) ABSL_LOCKS_EXCLUDED(local_mu_); + uint64_t expire_at_ms); // Update entry expiration. Return epxiration timepoint in abs milliseconds, or -1 if the entry // already expired and was deleted; facade::OpResult UpdateExpire(const Context& cntx, Iterator prime_it, ExpIterator exp_it, - const ExpireParams& params) ABSL_LOCKS_EXCLUDED(local_mu_); + const ExpireParams& params); // Adds expiry information. - void AddExpire(DbIndex db_ind, Iterator main_it, uint64_t at) ABSL_LOCKS_EXCLUDED(local_mu_); + void AddExpire(DbIndex db_ind, Iterator main_it, uint64_t at); // Removes the corresponing expiry information if exists. // Returns true if expiry existed (and removed). - bool RemoveExpire(DbIndex db_ind, Iterator main_it) ABSL_LOCKS_EXCLUDED(local_mu_); + bool RemoveExpire(DbIndex db_ind, Iterator main_it); // Either adds or removes (if at == 0) expiry. Returns true if a change was made. // Does not change expiry if at != 0 and expiry already exists. - bool UpdateExpire(DbIndex db_ind, Iterator main_it, uint64_t at) ABSL_LOCKS_EXCLUDED(local_mu_); + bool UpdateExpire(DbIndex db_ind, Iterator main_it, uint64_t at); void SetMCFlag(DbIndex db_ind, PrimeKey key, uint32_t flag); uint32_t GetMCFlag(DbIndex db_ind, const PrimeKey& key) const; @@ -343,12 +342,12 @@ class DbSlice { // Delete a key referred by its iterator. void PerformDeletion(Iterator del_it, DbTable* table); - bool Del(Context cntx, Iterator it) ABSL_LOCKS_EXCLUDED(local_mu_); + bool Del(Context cntx, Iterator it); constexpr static DbIndex kDbAll = 0xFFFF; // Flushes db_ind or all databases if kDbAll is passed - void FlushDb(DbIndex db_ind) ABSL_LOCKS_EXCLUDED(local_mu_); + void FlushDb(DbIndex db_ind); // Flushes the data of given slot ranges. void FlushSlots(cluster::SlotRanges slot_ranges); @@ -439,7 +438,7 @@ class DbSlice { void FlushChangeToEarlierCallbacks(DbIndex db_ind, Iterator it, uint64_t upper_bound); //! Unregisters the callback. - void UnregisterOnChange(uint64_t id) ABSL_LOCKS_EXCLUDED(local_mu_); + void UnregisterOnChange(uint64_t id); struct DeleteExpiredStats { uint32_t deleted = 0; // number of deleted items due to expiry (less than traversed). @@ -496,25 +495,14 @@ class DbSlice { client_tracking_map_[key].insert(conn_ref); } - // Provides access to the internal lock of db_slice for flows that serialize - // entries with preemption and need to synchronize with Traverse below which - // acquires the same lock. - ThreadLocalMutex& GetSerializationMutex() { - return local_mu_; - } - - // Wrapper around DashTable::Traverse that allows preemptions - template - PrimeTable::Cursor Traverse(DashTable* pt, PrimeTable::Cursor cursor, Cb&& cb) - ABSL_LOCKS_EXCLUDED(local_mu_) { - util::fb2::LockGuard lk(local_mu_); - return pt->Traverse(cursor, std::forward(cb)); - } - // Does not check for non supported events. Callers must parse the string and reject it // if it's not empty and not EX. void SetNotifyKeyspaceEvents(std::string_view notify_keyspace_events); + bool HasBlockingCounterMutating() const { + return block_counter_.HasMutating(); + } + private: void PreUpdate(DbIndex db_ind, Iterator it, std::string_view key); void PostUpdate(DbIndex db_ind, Iterator it, std::string_view key, size_t orig_size); @@ -571,8 +559,11 @@ class DbSlice { void CallChangeCallbacks(DbIndex id, std::string_view key, const ChangeReq& cr) const; - // Used to provide exclusive access while Traversing segments - mutable ThreadLocalMutex local_mu_; + // We need this because registered callbacks might yield and when they do so we want + // to avoid Heartbeat or Flushing the db. + // This counter protects us against this case. + mutable LocalBlockingCounter block_counter_; + ShardId shard_id_; uint8_t caching_mode_ : 1; diff --git a/src/server/debugcmd.cc b/src/server/debugcmd.cc index e8c264c74191..d3064a1e592c 100644 --- a/src/server/debugcmd.cc +++ b/src/server/debugcmd.cc @@ -281,7 +281,7 @@ void DoBuildObjHist(EngineShard* shard, ConnectionContext* cntx, ObjHistMap* obj continue; PrimeTable::Cursor cursor; do { - cursor = db_slice.Traverse(&dbt->prime, cursor, [&](PrimeIterator it) { + cursor = dbt->prime.Traverse(cursor, [&](PrimeIterator it) { unsigned obj_type = it->second.ObjType(); auto& hist_ptr = (*obj_hist_map)[obj_type]; if (!hist_ptr) { diff --git a/src/server/engine_shard.cc b/src/server/engine_shard.cc index 29c7957f2eb5..07e3090fa017 100644 --- a/src/server/engine_shard.cc +++ b/src/server/engine_shard.cc @@ -329,7 +329,7 @@ bool EngineShard::DoDefrag() { uint64_t attempts = 0; do { - cur = slice.Traverse(prime_table, cur, [&](PrimeIterator it) { + cur = prime_table->Traverse(cur, [&](PrimeIterator it) { // for each value check whether we should move it because it // seats on underutilized page of memory, and if so, do it. bool did = it->second.DefragIfNeeded(threshold); @@ -661,13 +661,17 @@ void EngineShard::Heartbeat() { CacheStats(); + // TODO: iterate over all namespaces + DbSlice& db_slice = namespaces.GetDefaultNamespace().GetDbSlice(shard_id()); + // Skip heartbeat if we are serializing a big value + if (db_slice.HasBlockingCounterMutating()) { + return; + } + if (!IsReplica()) { // Never run expiry/evictions on replica. RetireExpiredAndEvict(); } - // TODO: iterate over all namespaces - DbSlice& db_slice = namespaces->GetDefaultNamespace().GetDbSlice(shard_id()); - // Offset CoolMemoryUsage when consider background offloading. // TODO: Another approach could be is to align the approach similarly to how we do with // FreeMemWithEvictionStep, i.e. if memory_budget is below the limit. @@ -693,15 +697,6 @@ void EngineShard::Heartbeat() { void EngineShard::RetireExpiredAndEvict() { // TODO: iterate over all namespaces DbSlice& db_slice = namespaces->GetDefaultNamespace().GetDbSlice(shard_id()); - // Some of the functions below might acquire the same lock again so we need to unlock it - // asap. We won't yield before we relock the mutex again, so the code below is atomic - // in respect to preemptions of big values. An example of that is the call to - // DeleteExpiredStep() below, which eventually calls ExpireIfNeeded() - // and within that the call to RecordExpiry() will trigger the registered - // callback OnJournalEntry which locks the exact same mutex. - // We need to lock below and immediately release because there should be no other fiber - // that is serializing a big value. - { std::unique_lock lk(db_slice.GetSerializationMutex()); } constexpr double kTtlDeleteLimit = 200; constexpr double kRedLimitFactor = 0.1; diff --git a/src/server/generic_family.cc b/src/server/generic_family.cc index 5cfe94c5fcc8..b38bb1e5b222 100644 --- a/src/server/generic_family.cc +++ b/src/server/generic_family.cc @@ -584,12 +584,8 @@ void OpScan(const OpArgs& op_args, const ScanOpts& scan_opts, uint64_t* cursor, PrimeTable::Cursor cur = *cursor; auto [prime_table, expire_table] = db_slice.GetTables(op_args.db_cntx.db_index); string scratch; - do { - cur = db_slice.Traverse(prime_table, cur, [&](PrimeIterator it) { - cnt += ScanCb(op_args, it, scan_opts, &scratch, vec); - }); - } while (cur && cnt < scan_opts.limit); - + cur = prime_table->Traverse( + cur, [&](PrimeIterator it) { cnt += ScanCb(op_args, it, scan_opts, &scratch, vec); }); VLOG(1) << "OpScan " << db_slice.shard_id() << " cursor: " << cur.value(); *cursor = cur.value(); } diff --git a/src/server/journal/streamer.cc b/src/server/journal/streamer.cc index 6d4bb7f81af9..86a7af5ededc 100644 --- a/src/server/journal/streamer.cc +++ b/src/server/journal/streamer.cc @@ -212,8 +212,8 @@ void RestoreStreamer::Run() { do { if (fiber_cancelled_) return; - - cursor = db_slice_->Traverse(pt, cursor, [&](PrimeTable::bucket_iterator it) { + cursor = pt->Traverse(cursor, [&](PrimeTable::bucket_iterator it) { + ConditionGuard guard(&bucket_ser_); db_slice_->FlushChangeToEarlierCallbacks(0 /*db_id always 0 for cluster*/, DbSlice::Iterator::FromPrime(it), snapshot_version_); WriteBucket(it); @@ -302,6 +302,8 @@ void RestoreStreamer::WriteBucket(PrimeTable::bucket_iterator it) { void RestoreStreamer::OnDbChange(DbIndex db_index, const DbSlice::ChangeReq& req) { DCHECK_EQ(db_index, 0) << "Restore migration only allowed in cluster mode in db0"; + ConditionGuard guard(&bucket_ser_); + PrimeTable* table = db_slice_->GetTables(0).first; if (const PrimeTable::bucket_iterator* bit = req.update()) { diff --git a/src/server/journal/streamer.h b/src/server/journal/streamer.h index c625b60c5157..c983bce58d6a 100644 --- a/src/server/journal/streamer.h +++ b/src/server/journal/streamer.h @@ -106,6 +106,8 @@ class RestoreStreamer : public JournalStreamer { cluster::SlotSet my_slots_; bool fiber_cancelled_ = false; bool snapshot_finished_ = false; + + ConditionFlag bucket_ser_; }; } // namespace dfly diff --git a/src/server/snapshot.cc b/src/server/snapshot.cc index 77cec6d9ef7e..2c0e00bcb326 100644 --- a/src/server/snapshot.cc +++ b/src/server/snapshot.cc @@ -168,7 +168,7 @@ void SliceSnapshot::IterateBucketsFb(const Cancellation* cll, bool send_full_syn return; PrimeTable::Cursor next = - pt->TraverseBuckets(cursor, absl::bind_front(&SliceSnapshot::BucketSaveCb, this)); + pt->Traverse(cursor, absl::bind_front(&SliceSnapshot::BucketSaveCb, this)); cursor = next; PushSerialized(false); @@ -243,6 +243,7 @@ void SliceSnapshot::SwitchIncrementalFb(Context* cntx, LSN lsn) { } bool SliceSnapshot::BucketSaveCb(PrimeTable::bucket_iterator it) { + ConditionGuard guard(&bucket_ser_); ++stats_.savecb_calls; auto check = [&](auto v) { @@ -371,6 +372,8 @@ bool SliceSnapshot::PushSerialized(bool force) { } void SliceSnapshot::OnDbChange(DbIndex db_index, const DbSlice::ChangeReq& req) { + ConditionGuard guard(&bucket_ser_); + PrimeTable* table = db_slice_->GetTables(db_index).first; const PrimeTable::bucket_iterator* bit = req.update(); @@ -395,7 +398,7 @@ void SliceSnapshot::OnJournalEntry(const journal::JournalItem& item, bool await) // To enable journal flushing to sync after non auto journal command is executed we call // TriggerJournalWriteToSink. This call uses the NOOP opcode with await=true. Since there is no // additional journal change to serialize, it simply invokes PushSerialized. - std::unique_lock lk(db_slice_->GetSerializationMutex()); + ConditionGuard guard(&bucket_ser_); if (item.opcode != journal::Op::NOOP) { serializer_->WriteJournalEntry(item.data); } diff --git a/src/server/snapshot.h b/src/server/snapshot.h index e3839fb9bd88..99207914fa23 100644 --- a/src/server/snapshot.h +++ b/src/server/snapshot.h @@ -169,6 +169,8 @@ class SliceSnapshot { size_t keys_total = 0; } stats_; + ConditionFlag bucket_ser_; + std::function on_push_; std::function on_snapshot_finish_; }; diff --git a/tests/dragonfly/instance.py b/tests/dragonfly/instance.py index d7dd7405ee26..8550114ac9ea 100644 --- a/tests/dragonfly/instance.py +++ b/tests/dragonfly/instance.py @@ -124,6 +124,9 @@ def __init__(self, params: DflyParams, args): if threads > 1: self.args["num_shards"] = threads - 1 + # Add 1 byte limit for big values + self.args["serialization_max_chunk_size"] = 1 + def __del__(self): assert self.proc == None From ef6fa2c8bbf2e78892339aa7156971c9789842bc Mon Sep 17 00:00:00 2001 From: kostas Date: Wed, 6 Nov 2024 15:30:20 +0200 Subject: [PATCH 02/32] fixes --- src/server/db_slice.cc | 3 +-- src/server/generic_family.cc | 6 ++++-- tests/dragonfly/instance.py | 7 +++++-- tests/dragonfly/replication_test.py | 12 ++++++++++-- 4 files changed, 20 insertions(+), 8 deletions(-) diff --git a/src/server/db_slice.cc b/src/server/db_slice.cc index acc3ccfc4d73..2eebf79c27cd 100644 --- a/src/server/db_slice.cc +++ b/src/server/db_slice.cc @@ -811,7 +811,7 @@ void DbSlice::FlushDbIndexes(const std::vector& indexes) { } void DbSlice::FlushDb(DbIndex db_ind) { - std::unique_lock lk(block_counter_); + block_counter_.Wait(); // clear client tracking map. client_tracking_map_.clear(); @@ -1370,7 +1370,6 @@ void DbSlice::RegisterWatchedKey(DbIndex db_indx, std::string_view key, } void DbSlice::UnregisterConnectionWatches(const ConnectionState::ExecInfo* exec_info) { - // Because we might remove while another fiber is preempted and miss a notification for (const auto& [db_indx, key] : exec_info->watched_keys) { auto& watched_keys = db_arr_[db_indx]->watched_keys; if (auto it = watched_keys.find(key); it != watched_keys.end()) { diff --git a/src/server/generic_family.cc b/src/server/generic_family.cc index b38bb1e5b222..170463882475 100644 --- a/src/server/generic_family.cc +++ b/src/server/generic_family.cc @@ -584,8 +584,10 @@ void OpScan(const OpArgs& op_args, const ScanOpts& scan_opts, uint64_t* cursor, PrimeTable::Cursor cur = *cursor; auto [prime_table, expire_table] = db_slice.GetTables(op_args.db_cntx.db_index); string scratch; - cur = prime_table->Traverse( - cur, [&](PrimeIterator it) { cnt += ScanCb(op_args, it, scan_opts, &scratch, vec); }); + do { + cur = prime_table->Traverse( + cur, [&](PrimeIterator it) { cnt += ScanCb(op_args, it, scan_opts, &scratch, vec); }); + } while (cur && cnt < scan_opts.limit); VLOG(1) << "OpScan " << db_slice.shard_id() << " cursor: " << cur.value(); *cursor = cur.value(); } diff --git a/tests/dragonfly/instance.py b/tests/dragonfly/instance.py index 8550114ac9ea..67885f48da31 100644 --- a/tests/dragonfly/instance.py +++ b/tests/dragonfly/instance.py @@ -124,8 +124,11 @@ def __init__(self, params: DflyParams, args): if threads > 1: self.args["num_shards"] = threads - 1 - # Add 1 byte limit for big values - self.args["serialization_max_chunk_size"] = 1 + if "disable_serialization_max_chunk_size" not in self.args: + # Add 1 byte limit for big values + self.args["serialization_max_chunk_size"] = 1 + else: + self.args.pop("disable_serialization_max_chunk_size") def __del__(self): assert self.proc == None diff --git a/tests/dragonfly/replication_test.py b/tests/dragonfly/replication_test.py index cd793b6d76a0..fea3def7b1e3 100644 --- a/tests/dragonfly/replication_test.py +++ b/tests/dragonfly/replication_test.py @@ -2407,9 +2407,17 @@ async def test_replicate_old_master( dfly_version = "v1.19.2" released_dfly_path = download_dragonfly_release(dfly_version) - master = df_factory.create(version=1.19, path=released_dfly_path, cluster_mode=cluster_mode) + master = df_factory.create( + disable_serialization_max_chunk_size=0, + version=1.19, + path=released_dfly_path, + cluster_mode=cluster_mode, + ) replica = df_factory.create( - cluster_mode=cluster_mode, cluster_announce_ip=announce_ip, announce_port=announce_port + disable_serialization_max_chunk_size=0, + cluster_mode=cluster_mode, + cluster_announce_ip=announce_ip, + announce_port=announce_port, ) df_factory.start_all([master, replica]) From b1254f602a2f82abd7e79eca803ad5b5406b181f Mon Sep 17 00:00:00 2001 From: kostas Date: Thu, 7 Nov 2024 17:54:35 +0200 Subject: [PATCH 03/32] fix notify deadlock --- src/server/common.h | 2 +- src/server/db_slice.cc | 10 +++++----- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/src/server/common.h b/src/server/common.h index 7099897a2b6d..2d935dc19746 100644 --- a/src/server/common.h +++ b/src/server/common.h @@ -416,7 +416,7 @@ class LocalBlockingCounter { DCHECK(mutating_ > 0); --mutating_; if (mutating_ == 0) { - cond_var_.notify_one(); + cond_var_.notify_all(); } } diff --git a/src/server/db_slice.cc b/src/server/db_slice.cc index 2eebf79c27cd..ad8f471a5ce0 100644 --- a/src/server/db_slice.cc +++ b/src/server/db_slice.cc @@ -1132,11 +1132,6 @@ DbSlice::PrimeItAndExp DbSlice::ExpireIfNeeded(const Context& cntx, PrimeIterato << ", prime table size: " << db->prime.size() << util::fb2::GetStacktrace(); } - // Replicate expiry - if (auto journal = owner_->journal(); journal) { - RecordExpiry(cntx.db_index, key); - } - if (expired_keys_events_recording_) db->expired_keys_events_.emplace_back(key); @@ -1148,6 +1143,11 @@ DbSlice::PrimeItAndExp DbSlice::ExpireIfNeeded(const Context& cntx, PrimeIterato const_cast(this)->PerformDeletion(Iterator(it, StringOrView::FromView(key)), ExpIterator(expire_it, StringOrView::FromView(key)), db.get()); + // Replicate expiry + if (auto journal = owner_->journal(); journal) { + RecordExpiry(cntx.db_index, key); + } + ++events_.expired_keys; return {PrimeIterator{}, ExpireIterator{}}; From 3a31267e8e6583b317432ed108e1b2c8d3ea1a10 Mon Sep 17 00:00:00 2001 From: kostas Date: Mon, 11 Nov 2024 15:46:41 +0200 Subject: [PATCH 04/32] fixes --- helio | 2 +- src/server/engine_shard.cc | 2 +- src/server/snapshot.cc | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/helio b/helio index 6e77555e7deb..438c86139eac 160000 --- a/helio +++ b/helio @@ -1 +1 @@ -Subproject commit 6e77555e7deb619fbd3398d085f8c604aa71869b +Subproject commit 438c86139eac2e6400f2aae1d46cff03b31c166f diff --git a/src/server/engine_shard.cc b/src/server/engine_shard.cc index 07e3090fa017..a7c826231d4f 100644 --- a/src/server/engine_shard.cc +++ b/src/server/engine_shard.cc @@ -662,7 +662,7 @@ void EngineShard::Heartbeat() { CacheStats(); // TODO: iterate over all namespaces - DbSlice& db_slice = namespaces.GetDefaultNamespace().GetDbSlice(shard_id()); + DbSlice& db_slice = namespaces->GetDefaultNamespace().GetDbSlice(shard_id()); // Skip heartbeat if we are serializing a big value if (db_slice.HasBlockingCounterMutating()) { return; diff --git a/src/server/snapshot.cc b/src/server/snapshot.cc index 2c0e00bcb326..f02d60545a84 100644 --- a/src/server/snapshot.cc +++ b/src/server/snapshot.cc @@ -168,7 +168,7 @@ void SliceSnapshot::IterateBucketsFb(const Cancellation* cll, bool send_full_syn return; PrimeTable::Cursor next = - pt->Traverse(cursor, absl::bind_front(&SliceSnapshot::BucketSaveCb, this)); + pt->TraverseBuckets(cursor, absl::bind_front(&SliceSnapshot::BucketSaveCb, this)); cursor = next; PushSerialized(false); From 293b400d5ab2105174a8984333eeae37678c264c Mon Sep 17 00:00:00 2001 From: kostas Date: Wed, 13 Nov 2024 16:57:01 +0200 Subject: [PATCH 05/32] comments --- src/server/common.h | 23 ---------- src/server/db_slice.cc | 1 - src/server/db_slice.h | 4 ++ src/server/engine_shard.cc | 79 +++++++++++++++++++--------------- src/server/journal/streamer.cc | 3 -- src/server/journal/streamer.h | 2 - src/server/rdb_save.h | 1 + src/server/server_family.cc | 3 ++ src/server/snapshot.cc | 16 +++++-- src/server/snapshot.h | 3 +- 10 files changed, 67 insertions(+), 68 deletions(-) diff --git a/src/server/common.h b/src/server/common.h index 2d935dc19746..64e72f41be48 100644 --- a/src/server/common.h +++ b/src/server/common.h @@ -383,29 +383,6 @@ struct BorrowedInterpreter { extern size_t serialization_max_chunk_size; -struct ConditionFlag { - util::fb2::CondVarAny cond_var; - bool flag = false; -}; - -// Helper class used to guarantee atomicity between serialization of buckets -class ConditionGuard { - public: - explicit ConditionGuard(ConditionFlag* enclosing) : enclosing_(enclosing) { - util::fb2::NoOpLock noop_lk_; - enclosing_->cond_var.wait(noop_lk_, [this]() { return !enclosing_->flag; }); - enclosing_->flag = true; - } - - ~ConditionGuard() { - enclosing_->flag = false; - enclosing_->cond_var.notify_one(); - } - - private: - ConditionFlag* enclosing_; -}; - class LocalBlockingCounter { public: void lock() { diff --git a/src/server/db_slice.cc b/src/server/db_slice.cc index ad8f471a5ce0..dd78588bbef8 100644 --- a/src/server/db_slice.cc +++ b/src/server/db_slice.cc @@ -811,7 +811,6 @@ void DbSlice::FlushDbIndexes(const std::vector& indexes) { } void DbSlice::FlushDb(DbIndex db_ind) { - block_counter_.Wait(); // clear client tracking map. client_tracking_map_.clear(); diff --git a/src/server/db_slice.h b/src/server/db_slice.h index e522c75e1eed..96789732f878 100644 --- a/src/server/db_slice.h +++ b/src/server/db_slice.h @@ -503,6 +503,10 @@ class DbSlice { return block_counter_.HasMutating(); } + LocalBlockingCounter* BlockingCounter() { + return &block_counter_; + } + private: void PreUpdate(DbIndex db_ind, Iterator it, std::string_view key); void PostUpdate(DbIndex db_ind, Iterator it, std::string_view key, size_t orig_size); diff --git a/src/server/engine_shard.cc b/src/server/engine_shard.cc index a7c826231d4f..59a22f0e0542 100644 --- a/src/server/engine_shard.cc +++ b/src/server/engine_shard.cc @@ -664,9 +664,17 @@ void EngineShard::Heartbeat() { // TODO: iterate over all namespaces DbSlice& db_slice = namespaces->GetDefaultNamespace().GetDbSlice(shard_id()); // Skip heartbeat if we are serializing a big value + static size_t skipped_hearbeats = 0; if (db_slice.HasBlockingCounterMutating()) { + ++skipped_hearbeats; + // We run Heartbeat 600 times every minute. If Heartbeat stalls more than 300 times, + // it means it hasn't ran for at least 30 seconds. This can be tuned a little bit more, + // as the actualy frequency of this is configurable via a flag. + if (skipped_hearbeats == 300) + LOG(WARNING) << "Stalling heartbeat() fiber because of big value serialization"; return; } + skipped_hearbeats = 0; if (!IsReplica()) { // Never run expiry/evictions on replica. RetireExpiredAndEvict(); @@ -695,46 +703,49 @@ void EngineShard::Heartbeat() { } void EngineShard::RetireExpiredAndEvict() { - // TODO: iterate over all namespaces - DbSlice& db_slice = namespaces->GetDefaultNamespace().GetDbSlice(shard_id()); - constexpr double kTtlDeleteLimit = 200; - constexpr double kRedLimitFactor = 0.1; - - uint32_t traversed = GetMovingSum6(TTL_TRAVERSE); - uint32_t deleted = GetMovingSum6(TTL_DELETE); - unsigned ttl_delete_target = 5; - - if (deleted > 10) { - // deleted should be <= traversed. - // hence we map our delete/traversed ratio into a range [0, kTtlDeleteLimit). - // The higher ttl_delete_target the more likely we have lots of expired items that need - // to be deleted. - ttl_delete_target = kTtlDeleteLimit * double(deleted) / (double(traversed) + 10); - } + { + FiberAtomicGuard guard; + // TODO: iterate over all namespaces + DbSlice& db_slice = namespaces->GetDefaultNamespace().GetDbSlice(shard_id()); + constexpr double kTtlDeleteLimit = 200; + constexpr double kRedLimitFactor = 0.1; + + uint32_t traversed = GetMovingSum6(TTL_TRAVERSE); + uint32_t deleted = GetMovingSum6(TTL_DELETE); + unsigned ttl_delete_target = 5; + + if (deleted > 10) { + // deleted should be <= traversed. + // hence we map our delete/traversed ratio into a range [0, kTtlDeleteLimit). + // The higher ttl_delete_target the more likely we have lots of expired items that need + // to be deleted. + ttl_delete_target = kTtlDeleteLimit * double(deleted) / (double(traversed) + 10); + } - ssize_t eviction_redline = size_t(max_memory_limit * kRedLimitFactor) / shard_set->size(); + ssize_t eviction_redline = size_t(max_memory_limit * kRedLimitFactor) / shard_set->size(); - DbContext db_cntx; - db_cntx.time_now_ms = GetCurrentTimeMs(); + DbContext db_cntx; + db_cntx.time_now_ms = GetCurrentTimeMs(); - for (unsigned i = 0; i < db_slice.db_array_size(); ++i) { - if (!db_slice.IsDbValid(i)) - continue; + for (unsigned i = 0; i < db_slice.db_array_size(); ++i) { + if (!db_slice.IsDbValid(i)) + continue; - db_cntx.db_index = i; - auto [pt, expt] = db_slice.GetTables(i); - if (expt->size() > pt->size() / 4) { - DbSlice::DeleteExpiredStats stats = db_slice.DeleteExpiredStep(db_cntx, ttl_delete_target); + db_cntx.db_index = i; + auto [pt, expt] = db_slice.GetTables(i); + if (expt->size() > pt->size() / 4) { + DbSlice::DeleteExpiredStats stats = db_slice.DeleteExpiredStep(db_cntx, ttl_delete_target); - counter_[TTL_TRAVERSE].IncBy(stats.traversed); - counter_[TTL_DELETE].IncBy(stats.deleted); - } + counter_[TTL_TRAVERSE].IncBy(stats.traversed); + counter_[TTL_DELETE].IncBy(stats.deleted); + } - // if our budget is below the limit - if (db_slice.memory_budget() < eviction_redline && GetFlag(FLAGS_enable_heartbeat_eviction)) { - uint32_t starting_segment_id = rand() % pt->GetSegmentCount(); - db_slice.FreeMemWithEvictionStep(i, starting_segment_id, - eviction_redline - db_slice.memory_budget()); + // if our budget is below the limit + if (db_slice.memory_budget() < eviction_redline && GetFlag(FLAGS_enable_heartbeat_eviction)) { + uint32_t starting_segment_id = rand() % pt->GetSegmentCount(); + db_slice.FreeMemWithEvictionStep(i, starting_segment_id, + eviction_redline - db_slice.memory_budget()); + } } } diff --git a/src/server/journal/streamer.cc b/src/server/journal/streamer.cc index 86a7af5ededc..c89e7094e1f6 100644 --- a/src/server/journal/streamer.cc +++ b/src/server/journal/streamer.cc @@ -213,7 +213,6 @@ void RestoreStreamer::Run() { if (fiber_cancelled_) return; cursor = pt->Traverse(cursor, [&](PrimeTable::bucket_iterator it) { - ConditionGuard guard(&bucket_ser_); db_slice_->FlushChangeToEarlierCallbacks(0 /*db_id always 0 for cluster*/, DbSlice::Iterator::FromPrime(it), snapshot_version_); WriteBucket(it); @@ -302,8 +301,6 @@ void RestoreStreamer::WriteBucket(PrimeTable::bucket_iterator it) { void RestoreStreamer::OnDbChange(DbIndex db_index, const DbSlice::ChangeReq& req) { DCHECK_EQ(db_index, 0) << "Restore migration only allowed in cluster mode in db0"; - ConditionGuard guard(&bucket_ser_); - PrimeTable* table = db_slice_->GetTables(0).first; if (const PrimeTable::bucket_iterator* bit = req.update()) { diff --git a/src/server/journal/streamer.h b/src/server/journal/streamer.h index c983bce58d6a..c625b60c5157 100644 --- a/src/server/journal/streamer.h +++ b/src/server/journal/streamer.h @@ -106,8 +106,6 @@ class RestoreStreamer : public JournalStreamer { cluster::SlotSet my_slots_; bool fiber_cancelled_ = false; bool snapshot_finished_ = false; - - ConditionFlag bucket_ser_; }; } // namespace dfly diff --git a/src/server/rdb_save.h b/src/server/rdb_save.h index ae3b9272bde2..913598fec7e8 100644 --- a/src/server/rdb_save.h +++ b/src/server/rdb_save.h @@ -122,6 +122,7 @@ class RdbSaver { struct SnapshotStats { size_t current_keys = 0; size_t total_keys = 0; + size_t big_value_preemptions = 0; }; SnapshotStats GetCurrentSnapshotProgress() const; diff --git a/src/server/server_family.cc b/src/server/server_family.cc index 34e8ff15031f..be7f0c1870da 100644 --- a/src/server/server_family.cc +++ b/src/server/server_family.cc @@ -2409,6 +2409,7 @@ void ServerFamily::Info(CmdArgList args, Transaction* tx, SinkReplyBuilder* buil double perc = 0; bool is_saving = false; uint32_t curent_durration_sec = 0; + size_t big_value_preemptions = 0; { util::fb2::LockGuard lk{save_mu_}; if (save_controller_) { @@ -2419,6 +2420,7 @@ void ServerFamily::Info(CmdArgList args, Transaction* tx, SinkReplyBuilder* buil current_snap_keys = res.current_keys; total_snap_keys = res.total_keys; perc = (static_cast(current_snap_keys) / total_snap_keys) * 100; + big_value_preemptions = res.big_value_preemptions; } } } @@ -2426,6 +2428,7 @@ void ServerFamily::Info(CmdArgList args, Transaction* tx, SinkReplyBuilder* buil append("current_snapshot_perc", perc); append("current_save_keys_processed", current_snap_keys); append("current_save_keys_total", total_snap_keys); + append("current_snapshot_bug_value_preemptions", big_value_preemptions); auto save_info = GetLastSaveInfo(); // when last success save diff --git a/src/server/snapshot.cc b/src/server/snapshot.cc index f02d60545a84..1f8d94ca51a3 100644 --- a/src/server/snapshot.cc +++ b/src/server/snapshot.cc @@ -84,6 +84,7 @@ void SliceSnapshot::Start(bool stream_journal, const Cancellation* cll, Snapshot if (bytes_serialized > flush_threshold) { size_t serialized = FlushSerialized(flush_state); VLOG(2) << "FlushSerialized " << serialized << " bytes"; + ++stats_.big_value_preemptions; } }; } @@ -243,7 +244,7 @@ void SliceSnapshot::SwitchIncrementalFb(Context* cntx, LSN lsn) { } bool SliceSnapshot::BucketSaveCb(PrimeTable::bucket_iterator it) { - ConditionGuard guard(&bucket_ser_); + std::lock_guard guard(big_value_mu_); ++stats_.savecb_calls; auto check = [&](auto v) { @@ -264,6 +265,12 @@ bool SliceSnapshot::BucketSaveCb(PrimeTable::bucket_iterator it) { db_slice_->FlushChangeToEarlierCallbacks(current_db_, DbSlice::Iterator::FromPrime(it), snapshot_version_); + auto* blocking_counter = db_slice_->BlockingCounter(); + // Locking this never preempts. We merely just increment the underline counter such that + // if SerializeBucket preempts, Heartbeat() won't run because the blocking counter is not + // zero. + std::lock_guard blocking_counter_guard(*blocking_counter); + stats_.loop_serialized += SerializeBucket(current_db_, it); return false; @@ -372,7 +379,7 @@ bool SliceSnapshot::PushSerialized(bool force) { } void SliceSnapshot::OnDbChange(DbIndex db_index, const DbSlice::ChangeReq& req) { - ConditionGuard guard(&bucket_ser_); + std::lock_guard guard(big_value_mu_); PrimeTable* table = db_slice_->GetTables(db_index).first; const PrimeTable::bucket_iterator* bit = req.update(); @@ -398,7 +405,7 @@ void SliceSnapshot::OnJournalEntry(const journal::JournalItem& item, bool await) // To enable journal flushing to sync after non auto journal command is executed we call // TriggerJournalWriteToSink. This call uses the NOOP opcode with await=true. Since there is no // additional journal change to serialize, it simply invokes PushSerialized. - ConditionGuard guard(&bucket_ser_); + std::lock_guard guard(big_value_mu_); if (item.opcode != journal::Op::NOOP) { serializer_->WriteJournalEntry(item.data); } @@ -427,7 +434,8 @@ size_t SliceSnapshot::GetTempBuffersSize() const { } RdbSaver::SnapshotStats SliceSnapshot::GetCurrentSnapshotProgress() const { - return {stats_.loop_serialized + stats_.side_saved, stats_.keys_total}; + return {stats_.loop_serialized + stats_.side_saved, stats_.keys_total, + stats_.big_value_preemptions}; } } // namespace dfly diff --git a/src/server/snapshot.h b/src/server/snapshot.h index 99207914fa23..fe8010a9165f 100644 --- a/src/server/snapshot.h +++ b/src/server/snapshot.h @@ -167,9 +167,10 @@ class SliceSnapshot { size_t side_saved = 0; size_t savecb_calls = 0; size_t keys_total = 0; + size_t big_value_preemptions = 0; } stats_; - ConditionFlag bucket_ser_; + ThreadLocalMutex big_value_mu_; std::function on_push_; std::function on_snapshot_finish_; From 8cad815cf3053b74f6b1f23043413d6ec268a796 Mon Sep 17 00:00:00 2001 From: kostas Date: Wed, 13 Nov 2024 19:49:25 +0200 Subject: [PATCH 06/32] fixes --- src/server/dflycmd.cc | 21 ++++++++++----------- src/server/dflycmd.h | 2 +- 2 files changed, 11 insertions(+), 12 deletions(-) diff --git a/src/server/dflycmd.cc b/src/server/dflycmd.cc index 7c1ba2cf19d0..cef3145089e5 100644 --- a/src/server/dflycmd.cc +++ b/src/server/dflycmd.cc @@ -365,18 +365,14 @@ void DflyCmd::StartStable(CmdArgList args, Transaction* tx, RedisReplyBuilder* r { Transaction::Guard tg{tx}; - AggregateStatus status; - auto cb = [this, &status, replica_ptr = replica_ptr](EngineShard* shard) { + auto cb = [this, replica_ptr = replica_ptr](EngineShard* shard) { FlowInfo* flow = &replica_ptr->flows[shard->shard_id()]; StopFullSyncInThread(flow, &replica_ptr->cntx, shard); - status = StartStableSyncInThread(flow, &replica_ptr->cntx, shard); + StartStableSyncInThread(flow, &replica_ptr->cntx, shard); }; shard_set->RunBlockingInParallel(std::move(cb)); - - if (*status != OpStatus::OK) - return rb->SendError(kInvalidState); } LOG(INFO) << "Transitioned into stable sync with replica " << replica_ptr->address << ":" @@ -592,6 +588,13 @@ OpStatus DflyCmd::StartFullSyncInThread(FlowInfo* flow, Context* cntx, EngineSha void DflyCmd::StopFullSyncInThread(FlowInfo* flow, Context* cntx, EngineShard* shard) { DCHECK(shard); + + absl::Cleanup on_exit([&] { + // Reset cleanup and saver + flow->cleanup = []() {}; + flow->saver.reset(); + }); + error_code ec = flow->saver->StopFullSyncInShard(shard); if (ec) { cntx->ReportError(ec); @@ -603,13 +606,9 @@ void DflyCmd::StopFullSyncInThread(FlowInfo* flow, Context* cntx, EngineShard* s cntx->ReportError(ec); return; } - - // Reset cleanup and saver - flow->cleanup = []() {}; - flow->saver.reset(); } -OpStatus DflyCmd::StartStableSyncInThread(FlowInfo* flow, Context* cntx, EngineShard* shard) { +void DflyCmd::StartStableSyncInThread(FlowInfo* flow, Context* cntx, EngineShard* shard) { // Create streamer for shard flows. DCHECK(shard); DCHECK(flow->conn); diff --git a/src/server/dflycmd.h b/src/server/dflycmd.h index f67d7fee1abc..1521eaefb877 100644 --- a/src/server/dflycmd.h +++ b/src/server/dflycmd.h @@ -212,7 +212,7 @@ class DflyCmd { void StopFullSyncInThread(FlowInfo* flow, Context* cntx, EngineShard* shard); // Start stable sync in thread. Called for each flow. - facade::OpStatus StartStableSyncInThread(FlowInfo* flow, Context* cntx, EngineShard* shard); + void StartStableSyncInThread(FlowInfo* flow, Context* cntx, EngineShard* shard); // Get ReplicaInfo by sync_id. std::shared_ptr GetReplicaInfo(uint32_t sync_id) ABSL_LOCKS_EXCLUDED(mu_); From 0d67a80441e6c16dfa36fc417f5d898e8e43c6e6 Mon Sep 17 00:00:00 2001 From: kostas Date: Wed, 13 Nov 2024 20:00:05 +0200 Subject: [PATCH 07/32] fix build --- src/server/dflycmd.cc | 2 -- 1 file changed, 2 deletions(-) diff --git a/src/server/dflycmd.cc b/src/server/dflycmd.cc index cef3145089e5..0f8d42726558 100644 --- a/src/server/dflycmd.cc +++ b/src/server/dflycmd.cc @@ -624,8 +624,6 @@ void DflyCmd::StartStableSyncInThread(FlowInfo* flow, Context* cntx, EngineShard flow->streamer->Cancel(); } }; - - return OpStatus::OK; } auto DflyCmd::CreateSyncSession(ConnectionState* state) -> std::pair { From e149a89ef9853f755f689ba5207c3937e3b2ec3a Mon Sep 17 00:00:00 2001 From: kostas Date: Thu, 14 Nov 2024 11:01:03 +0200 Subject: [PATCH 08/32] this is a test commit --- src/server/common.cc | 2 +- src/server/dflycmd.cc | 10 ++++------ src/server/engine_shard.cc | 4 +++- 3 files changed, 8 insertions(+), 8 deletions(-) diff --git a/src/server/common.cc b/src/server/common.cc index 56520ff05186..00ee8e205b3c 100644 --- a/src/server/common.cc +++ b/src/server/common.cc @@ -430,7 +430,7 @@ ThreadLocalMutex::ThreadLocalMutex() { } ThreadLocalMutex::~ThreadLocalMutex() { - DCHECK_EQ(EngineShard::tlocal(), shard_); + // DCHECK_EQ(EngineShard::tlocal(), shard_); } void ThreadLocalMutex::lock() { diff --git a/src/server/dflycmd.cc b/src/server/dflycmd.cc index 0f8d42726558..0d1f4180910a 100644 --- a/src/server/dflycmd.cc +++ b/src/server/dflycmd.cc @@ -589,12 +589,6 @@ OpStatus DflyCmd::StartFullSyncInThread(FlowInfo* flow, Context* cntx, EngineSha void DflyCmd::StopFullSyncInThread(FlowInfo* flow, Context* cntx, EngineShard* shard) { DCHECK(shard); - absl::Cleanup on_exit([&] { - // Reset cleanup and saver - flow->cleanup = []() {}; - flow->saver.reset(); - }); - error_code ec = flow->saver->StopFullSyncInShard(shard); if (ec) { cntx->ReportError(ec); @@ -606,6 +600,10 @@ void DflyCmd::StopFullSyncInThread(FlowInfo* flow, Context* cntx, EngineShard* s cntx->ReportError(ec); return; } + + // Reset cleanup and saver + flow->cleanup = []() {}; + flow->saver.reset(); } void DflyCmd::StartStableSyncInThread(FlowInfo* flow, Context* cntx, EngineShard* shard) { diff --git a/src/server/engine_shard.cc b/src/server/engine_shard.cc index 59a22f0e0542..2b74a5226384 100644 --- a/src/server/engine_shard.cc +++ b/src/server/engine_shard.cc @@ -670,8 +670,10 @@ void EngineShard::Heartbeat() { // We run Heartbeat 600 times every minute. If Heartbeat stalls more than 300 times, // it means it hasn't ran for at least 30 seconds. This can be tuned a little bit more, // as the actualy frequency of this is configurable via a flag. - if (skipped_hearbeats == 300) + if (skipped_hearbeats == 300) { LOG(WARNING) << "Stalling heartbeat() fiber because of big value serialization"; + skipped_hearbeats = 0; + } return; } skipped_hearbeats = 0; From 964e35673ba930366bb1c3db029c682f36a10e51 Mon Sep 17 00:00:00 2001 From: kostas Date: Thu, 14 Nov 2024 12:12:39 +0200 Subject: [PATCH 09/32] test --- src/server/snapshot.cc | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/src/server/snapshot.cc b/src/server/snapshot.cc index 1f8d94ca51a3..8f62658a058c 100644 --- a/src/server/snapshot.cc +++ b/src/server/snapshot.cc @@ -244,7 +244,6 @@ void SliceSnapshot::SwitchIncrementalFb(Context* cntx, LSN lsn) { } bool SliceSnapshot::BucketSaveCb(PrimeTable::bucket_iterator it) { - std::lock_guard guard(big_value_mu_); ++stats_.savecb_calls; auto check = [&](auto v) { @@ -257,13 +256,15 @@ bool SliceSnapshot::BucketSaveCb(PrimeTable::bucket_iterator it) { return true; }; - uint64_t v = it.GetVersion(); - if (!check(v)) { + if (!check(it.GetVersion())) { return false; } db_slice_->FlushChangeToEarlierCallbacks(current_db_, DbSlice::Iterator::FromPrime(it), snapshot_version_); + if (!check(it.GetVersion())) { + return false; + } auto* blocking_counter = db_slice_->BlockingCounter(); // Locking this never preempts. We merely just increment the underline counter such that @@ -278,6 +279,7 @@ bool SliceSnapshot::BucketSaveCb(PrimeTable::bucket_iterator it) { unsigned SliceSnapshot::SerializeBucket(DbIndex db_index, PrimeTable::bucket_iterator it) { DCHECK_LT(it.GetVersion(), snapshot_version_); + std::lock_guard guard(big_value_mu_); // traverse physical bucket and write it into string file. serialize_bucket_running_ = true; @@ -379,8 +381,6 @@ bool SliceSnapshot::PushSerialized(bool force) { } void SliceSnapshot::OnDbChange(DbIndex db_index, const DbSlice::ChangeReq& req) { - std::lock_guard guard(big_value_mu_); - PrimeTable* table = db_slice_->GetTables(db_index).first; const PrimeTable::bucket_iterator* bit = req.update(); From c04f96d5babee020ca0ad2e34b196a24603f7975 Mon Sep 17 00:00:00 2001 From: kostas Date: Thu, 14 Nov 2024 14:11:54 +0200 Subject: [PATCH 10/32] revert mutex change --- src/server/snapshot.cc | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/server/snapshot.cc b/src/server/snapshot.cc index 8f62658a058c..4dfa0547f4d6 100644 --- a/src/server/snapshot.cc +++ b/src/server/snapshot.cc @@ -244,6 +244,8 @@ void SliceSnapshot::SwitchIncrementalFb(Context* cntx, LSN lsn) { } bool SliceSnapshot::BucketSaveCb(PrimeTable::bucket_iterator it) { + std::lock_guard guard(big_value_mu_); + ++stats_.savecb_calls; auto check = [&](auto v) { @@ -262,9 +264,6 @@ bool SliceSnapshot::BucketSaveCb(PrimeTable::bucket_iterator it) { db_slice_->FlushChangeToEarlierCallbacks(current_db_, DbSlice::Iterator::FromPrime(it), snapshot_version_); - if (!check(it.GetVersion())) { - return false; - } auto* blocking_counter = db_slice_->BlockingCounter(); // Locking this never preempts. We merely just increment the underline counter such that @@ -279,7 +278,6 @@ bool SliceSnapshot::BucketSaveCb(PrimeTable::bucket_iterator it) { unsigned SliceSnapshot::SerializeBucket(DbIndex db_index, PrimeTable::bucket_iterator it) { DCHECK_LT(it.GetVersion(), snapshot_version_); - std::lock_guard guard(big_value_mu_); // traverse physical bucket and write it into string file. serialize_bucket_running_ = true; @@ -381,6 +379,8 @@ bool SliceSnapshot::PushSerialized(bool force) { } void SliceSnapshot::OnDbChange(DbIndex db_index, const DbSlice::ChangeReq& req) { + std::lock_guard guard(big_value_mu_); + PrimeTable* table = db_slice_->GetTables(db_index).first; const PrimeTable::bucket_iterator* bit = req.update(); From 7b50a5d7f6a64ee49cae8501bf1e3f762c402124 Mon Sep 17 00:00:00 2001 From: kostas Date: Thu, 14 Nov 2024 16:05:29 +0200 Subject: [PATCH 11/32] properly clean up thread local --- src/server/common.cc | 2 +- src/server/db_slice.cc | 11 +++++------ src/server/dflycmd.cc | 10 ++++++---- 3 files changed, 12 insertions(+), 11 deletions(-) diff --git a/src/server/common.cc b/src/server/common.cc index 00ee8e205b3c..56520ff05186 100644 --- a/src/server/common.cc +++ b/src/server/common.cc @@ -430,7 +430,7 @@ ThreadLocalMutex::ThreadLocalMutex() { } ThreadLocalMutex::~ThreadLocalMutex() { - // DCHECK_EQ(EngineShard::tlocal(), shard_); + DCHECK_EQ(EngineShard::tlocal(), shard_); } void ThreadLocalMutex::lock() { diff --git a/src/server/db_slice.cc b/src/server/db_slice.cc index dd78588bbef8..01c4dee120e4 100644 --- a/src/server/db_slice.cc +++ b/src/server/db_slice.cc @@ -206,8 +206,6 @@ unsigned PrimeEvictionPolicy::Evict(const PrimeTable::HotspotBuckets& eb, PrimeT if (auto journal = db_slice_->shard_owner()->journal(); journal) { RecordExpiry(cntx_.db_index, key); } - // Safe we already acquired util::fb2::LockGuard lk(db_slice_->GetSerializationMutex()); - // on the flows that call this function db_slice_->PerformDeletion(DbSlice::Iterator(last_slot_it, StringOrView::FromView(key)), table); ++evicted_; @@ -1131,6 +1129,11 @@ DbSlice::PrimeItAndExp DbSlice::ExpireIfNeeded(const Context& cntx, PrimeIterato << ", prime table size: " << db->prime.size() << util::fb2::GetStacktrace(); } + // Replicate expiry + if (auto journal = owner_->journal(); journal) { + RecordExpiry(cntx.db_index, key); + } + if (expired_keys_events_recording_) db->expired_keys_events_.emplace_back(key); @@ -1142,10 +1145,6 @@ DbSlice::PrimeItAndExp DbSlice::ExpireIfNeeded(const Context& cntx, PrimeIterato const_cast(this)->PerformDeletion(Iterator(it, StringOrView::FromView(key)), ExpIterator(expire_it, StringOrView::FromView(key)), db.get()); - // Replicate expiry - if (auto journal = owner_->journal(); journal) { - RecordExpiry(cntx.db_index, key); - } ++events_.expired_keys; diff --git a/src/server/dflycmd.cc b/src/server/dflycmd.cc index 0d1f4180910a..532378694604 100644 --- a/src/server/dflycmd.cc +++ b/src/server/dflycmd.cc @@ -589,6 +589,12 @@ OpStatus DflyCmd::StartFullSyncInThread(FlowInfo* flow, Context* cntx, EngineSha void DflyCmd::StopFullSyncInThread(FlowInfo* flow, Context* cntx, EngineShard* shard) { DCHECK(shard); + absl::Cleanup clean([&]() { + // Reset cleanup and saver + flow->cleanup = []() {}; + flow->saver.reset(); + }); + error_code ec = flow->saver->StopFullSyncInShard(shard); if (ec) { cntx->ReportError(ec); @@ -600,10 +606,6 @@ void DflyCmd::StopFullSyncInThread(FlowInfo* flow, Context* cntx, EngineShard* s cntx->ReportError(ec); return; } - - // Reset cleanup and saver - flow->cleanup = []() {}; - flow->saver.reset(); } void DflyCmd::StartStableSyncInThread(FlowInfo* flow, Context* cntx, EngineShard* shard) { From e5055aff2f25aec8c766dcc000a733327910ad22 Mon Sep 17 00:00:00 2001 From: kostas Date: Fri, 15 Nov 2024 10:35:40 +0000 Subject: [PATCH 12/32] log stalled heartbeat --- src/server/engine_shard.cc | 14 +++++--------- 1 file changed, 5 insertions(+), 9 deletions(-) diff --git a/src/server/engine_shard.cc b/src/server/engine_shard.cc index 2b74a5226384..dc1866c52b9f 100644 --- a/src/server/engine_shard.cc +++ b/src/server/engine_shard.cc @@ -664,19 +664,15 @@ void EngineShard::Heartbeat() { // TODO: iterate over all namespaces DbSlice& db_slice = namespaces->GetDefaultNamespace().GetDbSlice(shard_id()); // Skip heartbeat if we are serializing a big value - static size_t skipped_hearbeats = 0; + static auto start = std::chrono::system_clock::now(); if (db_slice.HasBlockingCounterMutating()) { - ++skipped_hearbeats; - // We run Heartbeat 600 times every minute. If Heartbeat stalls more than 300 times, - // it means it hasn't ran for at least 30 seconds. This can be tuned a little bit more, - // as the actualy frequency of this is configurable via a flag. - if (skipped_hearbeats == 300) { - LOG(WARNING) << "Stalling heartbeat() fiber because of big value serialization"; - skipped_hearbeats = 0; + const auto elapsed = std::chrono::system_clock::now() - start; + if(elapsed > std::chrono::seconds(1)) { + LOG(WARNING) << "Stalled heartbeat() fiber for " << elapsed.count() << " seconds because of big value serialization"; } return; } - skipped_hearbeats = 0; + start = std::chrono::system_clock::now(); if (!IsReplica()) { // Never run expiry/evictions on replica. RetireExpiredAndEvict(); From 44681016fa2654a12ff770dd4c79d40634b1680d Mon Sep 17 00:00:00 2001 From: kostas Date: Sun, 24 Nov 2024 11:47:26 +0200 Subject: [PATCH 13/32] comments --- src/server/db_slice.cc | 2 +- src/server/dflycmd.cc | 12 +++++------- src/server/server_family.cc | 4 +--- src/server/server_state.cc | 5 ++++- src/server/server_state.h | 2 ++ src/server/snapshot.cc | 10 ++++++---- src/server/snapshot.h | 1 - 7 files changed, 19 insertions(+), 17 deletions(-) diff --git a/src/server/db_slice.cc b/src/server/db_slice.cc index 37af62ba413c..0d097e8bf173 100644 --- a/src/server/db_slice.cc +++ b/src/server/db_slice.cc @@ -165,7 +165,7 @@ unsigned PrimeEvictionPolicy::GarbageCollect(const PrimeTable::HotspotBuckets& e } unsigned PrimeEvictionPolicy::Evict(const PrimeTable::HotspotBuckets& eb, PrimeTable* me) { - if (!can_evict_) + if (!can_evict_ || db_slice_->HasBlockingCounterMutating()) return 0; constexpr size_t kNumStashBuckets = ABSL_ARRAYSIZE(eb.probes.by_type.stash_buckets); diff --git a/src/server/dflycmd.cc b/src/server/dflycmd.cc index 532378694604..390dc504f6d1 100644 --- a/src/server/dflycmd.cc +++ b/src/server/dflycmd.cc @@ -368,7 +368,6 @@ void DflyCmd::StartStable(CmdArgList args, Transaction* tx, RedisReplyBuilder* r auto cb = [this, replica_ptr = replica_ptr](EngineShard* shard) { FlowInfo* flow = &replica_ptr->flows[shard->shard_id()]; - StopFullSyncInThread(flow, &replica_ptr->cntx, shard); StartStableSyncInThread(flow, &replica_ptr->cntx, shard); }; @@ -589,12 +588,6 @@ OpStatus DflyCmd::StartFullSyncInThread(FlowInfo* flow, Context* cntx, EngineSha void DflyCmd::StopFullSyncInThread(FlowInfo* flow, Context* cntx, EngineShard* shard) { DCHECK(shard); - absl::Cleanup clean([&]() { - // Reset cleanup and saver - flow->cleanup = []() {}; - flow->saver.reset(); - }); - error_code ec = flow->saver->StopFullSyncInShard(shard); if (ec) { cntx->ReportError(ec); @@ -606,6 +599,10 @@ void DflyCmd::StopFullSyncInThread(FlowInfo* flow, Context* cntx, EngineShard* s cntx->ReportError(ec); return; } + + // Reset cleanup and saver + flow->cleanup = []() {}; + flow->saver.reset(); } void DflyCmd::StartStableSyncInThread(FlowInfo* flow, Context* cntx, EngineShard* shard) { @@ -692,6 +689,7 @@ void DflyCmd::BreakStalledFlowsInShard() { return; ShardId sid = EngineShard::tlocal()->shard_id(); + vector deleted; for (auto [sync_id, replica_ptr] : replica_infos_) { diff --git a/src/server/server_family.cc b/src/server/server_family.cc index ae8ccc68e8c8..232249c33144 100644 --- a/src/server/server_family.cc +++ b/src/server/server_family.cc @@ -2360,6 +2360,7 @@ void ServerFamily::Info(CmdArgList args, Transaction* tx, SinkReplyBuilder* buil append("total_net_output_bytes", reply_stats.io_write_bytes); append("rdb_save_usec", m.coordinator_stats.rdb_save_usec); append("rdb_save_count", m.coordinator_stats.rdb_save_count); + append("big_value_preemptions", m.coordinator_stats.big_value_preemptions); append("instantaneous_input_kbps", -1); append("instantaneous_output_kbps", -1); append("rejected_connections", -1); @@ -2428,7 +2429,6 @@ void ServerFamily::Info(CmdArgList args, Transaction* tx, SinkReplyBuilder* buil double perc = 0; bool is_saving = false; uint32_t curent_durration_sec = 0; - size_t big_value_preemptions = 0; { util::fb2::LockGuard lk{save_mu_}; if (save_controller_) { @@ -2439,7 +2439,6 @@ void ServerFamily::Info(CmdArgList args, Transaction* tx, SinkReplyBuilder* buil current_snap_keys = res.current_keys; total_snap_keys = res.total_keys; perc = (static_cast(current_snap_keys) / total_snap_keys) * 100; - big_value_preemptions = res.big_value_preemptions; } } } @@ -2447,7 +2446,6 @@ void ServerFamily::Info(CmdArgList args, Transaction* tx, SinkReplyBuilder* buil append("current_snapshot_perc", perc); append("current_save_keys_processed", current_snap_keys); append("current_save_keys_total", total_snap_keys); - append("current_snapshot_bug_value_preemptions", big_value_preemptions); auto save_info = GetLastSaveInfo(); // when last success save diff --git a/src/server/server_state.cc b/src/server/server_state.cc index a906c2de2ffd..3cf7dc653189 100644 --- a/src/server/server_state.cc +++ b/src/server/server_state.cc @@ -27,7 +27,7 @@ ServerState::Stats::Stats(unsigned num_shards) : tx_width_freq_arr(num_shards) { } ServerState::Stats& ServerState::Stats::Add(const ServerState::Stats& other) { - static_assert(sizeof(Stats) == 17 * 8, "Stats size mismatch"); + static_assert(sizeof(Stats) == 18 * 8, "Stats size mismatch"); #define ADD(x) this->x += (other.x) @@ -49,6 +49,9 @@ ServerState::Stats& ServerState::Stats::Add(const ServerState::Stats& other) { ADD(blocked_on_interpreter); ADD(rdb_save_usec); ADD(rdb_save_count); + + ADD(big_value_preemptions); + ADD(oom_error_cmd_cnt); if (this->tx_width_freq_arr.size() > 0) { diff --git a/src/server/server_state.h b/src/server/server_state.h index f8be766a5816..0cfc48be1634 100644 --- a/src/server/server_state.h +++ b/src/server/server_state.h @@ -122,6 +122,8 @@ class ServerState { // public struct - to allow initialization. uint64_t rdb_save_usec = 0; uint64_t rdb_save_count = 0; + uint64_t big_value_preemptions = 0; + // Number of times we rejected command dispatch due to OOM condition. uint64_t oom_error_cmd_cnt = 0; diff --git a/src/server/snapshot.cc b/src/server/snapshot.cc index 4dfa0547f4d6..5c48ea981669 100644 --- a/src/server/snapshot.cc +++ b/src/server/snapshot.cc @@ -18,6 +18,7 @@ #include "server/journal/journal.h" #include "server/rdb_extensions.h" #include "server/rdb_save.h" +#include "server/server_state.h" #include "server/tiered_storage.h" #include "util/fibers/synchronization.h" @@ -84,7 +85,8 @@ void SliceSnapshot::Start(bool stream_journal, const Cancellation* cll, Snapshot if (bytes_serialized > flush_threshold) { size_t serialized = FlushSerialized(flush_state); VLOG(2) << "FlushSerialized " << serialized << " bytes"; - ++stats_.big_value_preemptions; + auto& stats = ServerState::tlocal()->stats; + ++stats.big_value_preemptions; } }; } @@ -165,8 +167,9 @@ void SliceSnapshot::IterateBucketsFb(const Cancellation* cll, bool send_full_syn VLOG(1) << "Start traversing " << pt->size() << " items for index " << db_indx; do { - if (cll->IsCancelled()) + if (cll->IsCancelled()) { return; + } PrimeTable::Cursor next = pt->TraverseBuckets(cursor, absl::bind_front(&SliceSnapshot::BucketSaveCb, this)); @@ -434,8 +437,7 @@ size_t SliceSnapshot::GetTempBuffersSize() const { } RdbSaver::SnapshotStats SliceSnapshot::GetCurrentSnapshotProgress() const { - return {stats_.loop_serialized + stats_.side_saved, stats_.keys_total, - stats_.big_value_preemptions}; + return {stats_.loop_serialized + stats_.side_saved, stats_.keys_total}; } } // namespace dfly diff --git a/src/server/snapshot.h b/src/server/snapshot.h index fe8010a9165f..98ef11c78166 100644 --- a/src/server/snapshot.h +++ b/src/server/snapshot.h @@ -167,7 +167,6 @@ class SliceSnapshot { size_t side_saved = 0; size_t savecb_calls = 0; size_t keys_total = 0; - size_t big_value_preemptions = 0; } stats_; ThreadLocalMutex big_value_mu_; From 55d4b34daa2717ca8e9bf24a0055ea6ecdeff00d Mon Sep 17 00:00:00 2001 From: kostas Date: Sun, 24 Nov 2024 12:41:04 +0200 Subject: [PATCH 14/32] add missing cleanup of saver --- src/server/dflycmd.cc | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/src/server/dflycmd.cc b/src/server/dflycmd.cc index 390dc504f6d1..d2d398282bac 100644 --- a/src/server/dflycmd.cc +++ b/src/server/dflycmd.cc @@ -120,6 +120,12 @@ void DflyCmd::ReplicaInfo::Cancel() { if (flow->cleanup) { flow->cleanup(); } + // flow->cleanup() might be set when we transition to StableSync but the saver + // might still be active. We should clean this. + if (flow->saver) { + flow->saver->CancelInShard(shard); // stops writing to journal stream to channel + flow->saver.reset(); + } VLOG(2) << "After flow cleanup " << shard->shard_id(); flow->conn = nullptr; }); From cc69b6481be3e699e83e62beb4453c8c2d0e644a Mon Sep 17 00:00:00 2001 From: kostas Date: Tue, 26 Nov 2024 11:47:52 +0200 Subject: [PATCH 15/32] turn compression off when big value ser is on Signed-off-by: kostas --- src/server/rdb_save.cc | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/src/server/rdb_save.cc b/src/server/rdb_save.cc index 940bace269c5..e4468a1c79c0 100644 --- a/src/server/rdb_save.cc +++ b/src/server/rdb_save.cc @@ -159,7 +159,13 @@ std::string AbslUnparseFlag(dfly::CompressionMode flag) { } dfly::CompressionMode GetDefaultCompressionMode() { - return absl::GetFlag(FLAGS_compression_mode); + const auto flag = absl::GetFlag(FLAGS_compression_mode); + if (serialization_max_chunk_size == 0) { + return flag; + } + + LOG_IF(WARNING, flag != dfly::CompressionMode::NONE); + return dfly::CompressionMode::NONE; } uint8_t RdbObjectType(const PrimeValue& pv) { From 967940adde30fbd028112f6af91ba9aadf083e72 Mon Sep 17 00:00:00 2001 From: kostas Date: Tue, 26 Nov 2024 17:57:19 +0200 Subject: [PATCH 16/32] polish --- src/server/rdb_save.cc | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/server/rdb_save.cc b/src/server/rdb_save.cc index e4468a1c79c0..825a7de007e2 100644 --- a/src/server/rdb_save.cc +++ b/src/server/rdb_save.cc @@ -164,7 +164,8 @@ dfly::CompressionMode GetDefaultCompressionMode() { return flag; } - LOG_IF(WARNING, flag != dfly::CompressionMode::NONE); + LOG_IF(WARNING, flag != dfly::CompressionMode::NONE) + << "Setting CompressionMode to NONE because big value serialization is on"; return dfly::CompressionMode::NONE; } From ab7dd0f2c80ba608f307ff4798c961eee6eef5c3 Mon Sep 17 00:00:00 2001 From: kostas Date: Wed, 27 Nov 2024 12:40:07 +0200 Subject: [PATCH 17/32] remove reduntant log warning messages --- src/server/rdb_save.cc | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/server/rdb_save.cc b/src/server/rdb_save.cc index 4fcbf51320a7..7327ad49b627 100644 --- a/src/server/rdb_save.cc +++ b/src/server/rdb_save.cc @@ -164,8 +164,9 @@ dfly::CompressionMode GetDefaultCompressionMode() { return flag; } - LOG_IF(WARNING, flag != dfly::CompressionMode::NONE) - << "Setting CompressionMode to NONE because big value serialization is on"; + static bool once = flag != dfly::CompressionMode::NONE; + LOG_IF(WARNING, once) << "Setting CompressionMode to NONE because big value serialization is on"; + once = false; return dfly::CompressionMode::NONE; } From 2d9b07c5ab3d5e684bc124c10bb5a7dfab5e14dc Mon Sep 17 00:00:00 2001 From: kostas Date: Wed, 27 Nov 2024 13:52:17 +0200 Subject: [PATCH 18/32] fix failing test --- tests/dragonfly/replication_test.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/tests/dragonfly/replication_test.py b/tests/dragonfly/replication_test.py index 78ecf4596c63..c8707e136dde 100644 --- a/tests/dragonfly/replication_test.py +++ b/tests/dragonfly/replication_test.py @@ -1667,8 +1667,9 @@ async def test_df_crash_on_replicaof_flag(df_factory): async def test_network_disconnect(df_factory, df_seeder_factory): - master = df_factory.create(proactor_threads=6) - replica = df_factory.create(proactor_threads=4) + # See issue #4207 + master = df_factory.create(proactor_threads=6, disable_serialization_max_chunk_size=0) + replica = df_factory.create(proactor_threads=4, disable_serialization_max_chunk_size=0) df_factory.start_all([replica, master]) seeder = df_seeder_factory.create(port=master.port) From aa377e1481603e0ac4e2b0fe1f404c914503c88f Mon Sep 17 00:00:00 2001 From: kostas Date: Wed, 27 Nov 2024 15:33:07 +0200 Subject: [PATCH 19/32] fixes --- tests/dragonfly/cluster_test.py | 5 ++++- tests/dragonfly/replication_test.py | 5 ++--- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/tests/dragonfly/cluster_test.py b/tests/dragonfly/cluster_test.py index d3454d8ab229..92d53ff9ffd8 100644 --- a/tests/dragonfly/cluster_test.py +++ b/tests/dragonfly/cluster_test.py @@ -1289,7 +1289,10 @@ async def test_migration_with_key_ttl(df_factory): assert await nodes[1].client.execute_command("stick k_sticky") == 0 -@dfly_args({"proactor_threads": 4, "cluster_mode": "yes"}) +# See issue #4207 +@dfly_args( + {"proactor_threads": 4, "cluster_mode": "yes", "disable_serialization_max_chunk_size": 0} +) async def test_network_disconnect_during_migration(df_factory, df_seeder_factory): instances = [ df_factory.create(port=BASE_PORT + i, admin_port=BASE_PORT + i + 1000) for i in range(2) diff --git a/tests/dragonfly/replication_test.py b/tests/dragonfly/replication_test.py index c8707e136dde..78ecf4596c63 100644 --- a/tests/dragonfly/replication_test.py +++ b/tests/dragonfly/replication_test.py @@ -1667,9 +1667,8 @@ async def test_df_crash_on_replicaof_flag(df_factory): async def test_network_disconnect(df_factory, df_seeder_factory): - # See issue #4207 - master = df_factory.create(proactor_threads=6, disable_serialization_max_chunk_size=0) - replica = df_factory.create(proactor_threads=4, disable_serialization_max_chunk_size=0) + master = df_factory.create(proactor_threads=6) + replica = df_factory.create(proactor_threads=4) df_factory.start_all([replica, master]) seeder = df_seeder_factory.create(port=master.port) From 540d24bab8b36d03d8ebf9ba39bcef10c582c588 Mon Sep 17 00:00:00 2001 From: kostas Date: Wed, 27 Nov 2024 17:07:22 +0200 Subject: [PATCH 20/32] cluster tests again --- tests/dragonfly/cluster_test.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/tests/dragonfly/cluster_test.py b/tests/dragonfly/cluster_test.py index 92d53ff9ffd8..20d8394576ce 100644 --- a/tests/dragonfly/cluster_test.py +++ b/tests/dragonfly/cluster_test.py @@ -1828,7 +1828,9 @@ async def node1size0(): assert str(i) == await nodes[1].client.get(f"{{key50}}:{i}") -@dfly_args({"proactor_threads": 2, "cluster_mode": "yes"}) +@dfly_args( + {"proactor_threads": 2, "cluster_mode": "yes", "disable_serialization_max_chunk_size": 0} +) @pytest.mark.asyncio async def test_cluster_migration_huge_container(df_factory: DflyInstanceFactory): instances = [ From d5797d3cd5c0e387db8148b81a12eff8f43dbf53 Mon Sep 17 00:00:00 2001 From: kostas Date: Wed, 27 Nov 2024 20:07:10 +0200 Subject: [PATCH 21/32] rename + corner cases --- src/server/common.h | 2 +- src/server/db_slice.cc | 4 +++- src/server/db_slice.h | 4 ++-- src/server/engine_shard.cc | 7 ++++--- 4 files changed, 10 insertions(+), 7 deletions(-) diff --git a/src/server/common.h b/src/server/common.h index 0da1329633bc..37ee3f8b1c07 100644 --- a/src/server/common.h +++ b/src/server/common.h @@ -402,7 +402,7 @@ class LocalBlockingCounter { cond_var_.wait(noop_lk_, [this]() { return mutating_ == 0; }); } - bool HasMutating() const { + bool IsBlocked() const { return mutating_ > 0; } diff --git a/src/server/db_slice.cc b/src/server/db_slice.cc index 0d097e8bf173..9523c172d475 100644 --- a/src/server/db_slice.cc +++ b/src/server/db_slice.cc @@ -139,6 +139,7 @@ bool PrimeEvictionPolicy::CanGrow(const PrimeTable& tbl) const { } unsigned PrimeEvictionPolicy::GarbageCollect(const PrimeTable::HotspotBuckets& eb, PrimeTable* me) { + db_slice_->BlockingCounter()->Wait(); unsigned res = 0; // bool should_print = (eb.key_hash % 128) == 0; @@ -165,7 +166,7 @@ unsigned PrimeEvictionPolicy::GarbageCollect(const PrimeTable::HotspotBuckets& e } unsigned PrimeEvictionPolicy::Evict(const PrimeTable::HotspotBuckets& eb, PrimeTable* me) { - if (!can_evict_ || db_slice_->HasBlockingCounterMutating()) + if (!can_evict_ || db_slice_->WillBlockOnJournalWrite()) return 0; constexpr size_t kNumStashBuckets = ABSL_ARRAYSIZE(eb.probes.by_type.stash_buckets); @@ -1084,6 +1085,7 @@ DbSlice::PrimeItAndExp DbSlice::ExpireIfNeeded(const Context& cntx, PrimeIterato LOG(ERROR) << "Invalid call to ExpireIfNeeded"; return {it, ExpireIterator{}}; } + block_counter_.Wait(); auto& db = db_arr_[cntx.db_index]; diff --git a/src/server/db_slice.h b/src/server/db_slice.h index 1b8a613b33ff..bfe2c4393f75 100644 --- a/src/server/db_slice.h +++ b/src/server/db_slice.h @@ -499,8 +499,8 @@ class DbSlice { // if it's not empty and not EX. void SetNotifyKeyspaceEvents(std::string_view notify_keyspace_events); - bool HasBlockingCounterMutating() const { - return block_counter_.HasMutating(); + bool WillBlockOnJournalWrite() const { + return block_counter_.IsBlocked(); } LocalBlockingCounter* BlockingCounter() { diff --git a/src/server/engine_shard.cc b/src/server/engine_shard.cc index 018ed4aebae4..033965f4e446 100644 --- a/src/server/engine_shard.cc +++ b/src/server/engine_shard.cc @@ -664,10 +664,11 @@ void EngineShard::Heartbeat() { DbSlice& db_slice = namespaces->GetDefaultNamespace().GetDbSlice(shard_id()); // Skip heartbeat if we are serializing a big value static auto start = std::chrono::system_clock::now(); - if (db_slice.HasBlockingCounterMutating()) { + if (db_slice.WillBlockOnJournalWrite()) { const auto elapsed = std::chrono::system_clock::now() - start; - if(elapsed > std::chrono::seconds(1)) { - LOG(WARNING) << "Stalled heartbeat() fiber for " << elapsed.count() << " seconds because of big value serialization"; + if (elapsed > std::chrono::seconds(1)) { + LOG(WARNING) << "Stalled heartbeat() fiber for " << elapsed.count() + << " seconds because of big value serialization"; } return; } From 26b0793ecca132af4fc657a4c8bab2578d467523 Mon Sep 17 00:00:00 2001 From: kostas Date: Thu, 28 Nov 2024 12:35:29 +0200 Subject: [PATCH 22/32] polish --- tests/dragonfly/cluster_test.py | 4 +--- tests/dragonfly/instance.py | 10 ++-------- tests/dragonfly/replication_test.py | 2 -- 3 files changed, 3 insertions(+), 13 deletions(-) diff --git a/tests/dragonfly/cluster_test.py b/tests/dragonfly/cluster_test.py index 20d8394576ce..49ff0708284e 100644 --- a/tests/dragonfly/cluster_test.py +++ b/tests/dragonfly/cluster_test.py @@ -1828,9 +1828,7 @@ async def node1size0(): assert str(i) == await nodes[1].client.get(f"{{key50}}:{i}") -@dfly_args( - {"proactor_threads": 2, "cluster_mode": "yes", "disable_serialization_max_chunk_size": 0} -) +@dfly_args({"proactor_threads": 2, "cluster_mode": "yes", "serialization_max_chunk_size": 0}) @pytest.mark.asyncio async def test_cluster_migration_huge_container(df_factory: DflyInstanceFactory): instances = [ diff --git a/tests/dragonfly/instance.py b/tests/dragonfly/instance.py index f31b407ead33..3814d970c9ef 100644 --- a/tests/dragonfly/instance.py +++ b/tests/dragonfly/instance.py @@ -125,12 +125,6 @@ def __init__(self, params: DflyParams, args): if threads > 1: self.args["num_shards"] = threads - 1 - if "disable_serialization_max_chunk_size" not in self.args: - # Add 1 byte limit for big values - self.args["serialization_max_chunk_size"] = 1 - else: - self.args.pop("disable_serialization_max_chunk_size") - def __del__(self): assert self.proc == None @@ -431,9 +425,9 @@ def create(self, existing_port=None, path=None, version=100, **kwargs) -> DflyIn args.setdefault("list_experimental_v2") args.setdefault("log_dir", self.params.log_dir) - if version >= 1.21: + if version >= 1.21 and "serialization_max_chunk_size" not in args: # Add 1 byte limit for big values - args.setdefault("serialization_max_chunk_size", 0) + args.setdefault("serialization_max_chunk_size", 1) for k, v in args.items(): args[k] = v.format(**self.params.env) if isinstance(v, str) else v diff --git a/tests/dragonfly/replication_test.py b/tests/dragonfly/replication_test.py index 78ecf4596c63..6ff68fc390e9 100644 --- a/tests/dragonfly/replication_test.py +++ b/tests/dragonfly/replication_test.py @@ -2373,13 +2373,11 @@ async def test_replicate_old_master( dfly_version = "v1.19.2" released_dfly_path = download_dragonfly_release(dfly_version) master = df_factory.create( - disable_serialization_max_chunk_size=0, version=1.19, path=released_dfly_path, cluster_mode=cluster_mode, ) replica = df_factory.create( - disable_serialization_max_chunk_size=0, cluster_mode=cluster_mode, cluster_announce_ip=announce_ip, announce_port=announce_port, From 86d7532e2abb33a7b8d3d96249f75279d5b98bfd Mon Sep 17 00:00:00 2001 From: kostas Date: Thu, 28 Nov 2024 12:55:56 +0200 Subject: [PATCH 23/32] enable git leak back --- .pre-commit-config.yaml | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index cd060647ae86..ea3413e5ae61 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -31,8 +31,7 @@ repos: hooks: - id: black - # - repo: https://github.com/gitleaks/gitleaks - # rev: v8.16.3 - # hooks: - # - id: gitleaks - # + - repo: https://github.com/gitleaks/gitleaks + rev: v8.16.3 + hooks: + - id: gitleaks From 4198773c5ed5513445de5315abec252ff18b810a Mon Sep 17 00:00:00 2001 From: kostas Date: Thu, 28 Nov 2024 13:32:34 +0200 Subject: [PATCH 24/32] fixes --- tests/dragonfly/cluster_test.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/tests/dragonfly/cluster_test.py b/tests/dragonfly/cluster_test.py index 49ff0708284e..a61cdd2d3aab 100644 --- a/tests/dragonfly/cluster_test.py +++ b/tests/dragonfly/cluster_test.py @@ -1290,9 +1290,7 @@ async def test_migration_with_key_ttl(df_factory): # See issue #4207 -@dfly_args( - {"proactor_threads": 4, "cluster_mode": "yes", "disable_serialization_max_chunk_size": 0} -) +@dfly_args({"proactor_threads": 4, "cluster_mode": "yes", "serialization_max_chunk_size": 0}) async def test_network_disconnect_during_migration(df_factory, df_seeder_factory): instances = [ df_factory.create(port=BASE_PORT + i, admin_port=BASE_PORT + i + 1000) for i in range(2) From fc907eb3fa78718bf3767681035e57376a52aa75 Mon Sep 17 00:00:00 2001 From: kostas Date: Thu, 28 Nov 2024 15:47:10 +0200 Subject: [PATCH 25/32] add fiber guard and replace wait with skip --- src/server/db_slice.cc | 6 +++++- src/server/tx_base.cc | 1 + 2 files changed, 6 insertions(+), 1 deletion(-) diff --git a/src/server/db_slice.cc b/src/server/db_slice.cc index 9523c172d475..bcdc3dbbc072 100644 --- a/src/server/db_slice.cc +++ b/src/server/db_slice.cc @@ -139,8 +139,12 @@ bool PrimeEvictionPolicy::CanGrow(const PrimeTable& tbl) const { } unsigned PrimeEvictionPolicy::GarbageCollect(const PrimeTable::HotspotBuckets& eb, PrimeTable* me) { - db_slice_->BlockingCounter()->Wait(); unsigned res = 0; + + if (db_slice_->WillBlockOnJournalWrite()) { + return res; + } + // bool should_print = (eb.key_hash % 128) == 0; // based on tests - it's more efficient to pass regular buckets to gc. diff --git a/src/server/tx_base.cc b/src/server/tx_base.cc index 9eb6ba09b6be..07b736652087 100644 --- a/src/server/tx_base.cc +++ b/src/server/tx_base.cc @@ -65,6 +65,7 @@ void RecordJournal(const OpArgs& op_args, std::string_view cmd, facade::ArgSlice void RecordExpiry(DbIndex dbid, string_view key) { auto journal = EngineShard::tlocal()->journal(); CHECK(journal); + util::FiberAtomicGuard guard; journal->RecordEntry(0, journal::Op::EXPIRED, dbid, 1, cluster::KeySlot(key), Payload("DEL", ArgSlice{key}), false); } From 72009f137eba020629090974f821ac98d060c2c0 Mon Sep 17 00:00:00 2001 From: kostas Date: Thu, 28 Nov 2024 16:42:38 +0200 Subject: [PATCH 26/32] migrations again --- tests/dragonfly/cluster_test.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/dragonfly/cluster_test.py b/tests/dragonfly/cluster_test.py index 4545925d5597..a2b8d1219d61 100644 --- a/tests/dragonfly/cluster_test.py +++ b/tests/dragonfly/cluster_test.py @@ -2289,7 +2289,7 @@ async def test_cluster_memory_consumption_migration(df_factory: DflyInstanceFact @pytest.mark.asyncio -@dfly_args({"proactor_threads": 4, "cluster_mode": "yes"}) +@dfly_args({"proactor_threads": 4, "cluster_mode": "yes", "serialization_max_chunk_size": 0}) async def test_migration_timeout_on_sync(df_factory: DflyInstanceFactory, df_seeder_factory): # Timeout set to 3 seconds because we must first saturate the socket before we get the timeout instances = [ From 390115fb6ba577cb2f717a57dbf64e42a43a2846 Mon Sep 17 00:00:00 2001 From: kostas Date: Mon, 2 Dec 2024 11:22:27 +0200 Subject: [PATCH 27/32] remove unused merged flow --- src/server/dflycmd.cc | 6 ------ 1 file changed, 6 deletions(-) diff --git a/src/server/dflycmd.cc b/src/server/dflycmd.cc index cb582272556a..b03d6e7343a3 100644 --- a/src/server/dflycmd.cc +++ b/src/server/dflycmd.cc @@ -120,12 +120,6 @@ void DflyCmd::ReplicaInfo::Cancel() { if (flow->cleanup) { flow->cleanup(); } - // flow->cleanup() might be set when we transition to StableSync but the saver - // might still be active. We should clean this. - if (flow->saver) { - flow->saver->CancelInShard(shard); // stops writing to journal stream to channel - flow->saver.reset(); - } VLOG(2) << "After flow cleanup " << shard->shard_id(); flow->conn = nullptr; }); From 5ec921b562be88f137284edd84e300d313c6cee2 Mon Sep 17 00:00:00 2001 From: kostas Date: Mon, 2 Dec 2024 11:58:27 +0200 Subject: [PATCH 28/32] foxes --- src/server/common.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/server/common.h b/src/server/common.h index 37ee3f8b1c07..8f01bfce1662 100644 --- a/src/server/common.h +++ b/src/server/common.h @@ -390,7 +390,7 @@ class LocalBlockingCounter { } void unlock() { - DCHECK(mutating_ > 0); + DCHECK_GT(mutating_, 0); --mutating_; if (mutating_ == 0) { cond_var_.notify_all(); From e7a2066e15c0a5dd667b0008a987fd0693c70fff Mon Sep 17 00:00:00 2001 From: kostas Date: Mon, 2 Dec 2024 12:16:09 +0200 Subject: [PATCH 29/32] move to source --- src/server/common.cc | 13 +++++++++++++ src/server/common.h | 13 ++----------- 2 files changed, 15 insertions(+), 11 deletions(-) diff --git a/src/server/common.cc b/src/server/common.cc index 56520ff05186..04b64727cd27 100644 --- a/src/server/common.cc +++ b/src/server/common.cc @@ -481,4 +481,17 @@ BorrowedInterpreter::~BorrowedInterpreter() { ServerState::tlocal()->ReturnInterpreter(interpreter_); } +void LocalBlockingCounter::unlock() { + DCHECK_GT(mutating_, 0); + --mutating_; + if (mutating_ == 0) { + cond_var_.notify_all(); + } +} + +void LocalBlockingCounter::Wait() { + util::fb2::NoOpLock noop_lk_; + cond_var_.wait(noop_lk_, [this]() { return mutating_ == 0; }); +} + } // namespace dfly diff --git a/src/server/common.h b/src/server/common.h index 8f01bfce1662..4141c28f5353 100644 --- a/src/server/common.h +++ b/src/server/common.h @@ -389,18 +389,9 @@ class LocalBlockingCounter { ++mutating_; } - void unlock() { - DCHECK_GT(mutating_, 0); - --mutating_; - if (mutating_ == 0) { - cond_var_.notify_all(); - } - } + void unlock(); - void Wait() { - util::fb2::NoOpLock noop_lk_; - cond_var_.wait(noop_lk_, [this]() { return mutating_ == 0; }); - } + void Wait(); bool IsBlocked() const { return mutating_ > 0; From 197cb14a7ab669f8bb31074006f6248a20aab7d2 Mon Sep 17 00:00:00 2001 From: kostas Date: Mon, 2 Dec 2024 12:21:59 +0200 Subject: [PATCH 30/32] werror --- src/server/common.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/server/common.cc b/src/server/common.cc index 04b64727cd27..603455c82710 100644 --- a/src/server/common.cc +++ b/src/server/common.cc @@ -482,7 +482,7 @@ BorrowedInterpreter::~BorrowedInterpreter() { } void LocalBlockingCounter::unlock() { - DCHECK_GT(mutating_, 0); + DCHECK(mutating_ > 0); --mutating_; if (mutating_ == 0) { cond_var_.notify_all(); From 8ce1463dff192ff051ad20ddeb6e5e84b3cef20e Mon Sep 17 00:00:00 2001 From: kostas Date: Wed, 4 Dec 2024 16:03:21 +0200 Subject: [PATCH 31/32] small fixes Signed-off-by: kostas --- src/server/db_slice.cc | 4 ++++ src/server/generic_family.cc | 4 ++++ tests/dragonfly/cluster_test.py | 2 +- 3 files changed, 9 insertions(+), 1 deletion(-) diff --git a/src/server/db_slice.cc b/src/server/db_slice.cc index 82e4ad1aa3be..2255f84a2dd1 100644 --- a/src/server/db_slice.cc +++ b/src/server/db_slice.cc @@ -1137,6 +1137,10 @@ DbSlice::PrimeItAndExp DbSlice::ExpireIfNeeded(const Context& cntx, PrimeIterato } void DbSlice::ExpireAllIfNeeded() { + // We hold no locks to any of the keys so we should Wait() here such that + // we don't preempt in ExpireIfNeeded + block_counter_.Wait(); + for (DbIndex db_index = 0; db_index < db_arr_.size(); db_index++) { if (!db_arr_[db_index]) continue; diff --git a/src/server/generic_family.cc b/src/server/generic_family.cc index 623b23a5ce94..3dbd21d2848e 100644 --- a/src/server/generic_family.cc +++ b/src/server/generic_family.cc @@ -597,6 +597,10 @@ void OpScan(const OpArgs& op_args, const ScanOpts& scan_opts, uint64_t* cursor, auto& db_slice = op_args.GetDbSlice(); DCHECK(db_slice.IsDbValid(op_args.db_cntx.db_index)); + // We need to make sure we don't preempt below, because we don't hold any locks to the keys + // and ExpireIfNeeded requires that. + db_slice.BlockingCounter()->Wait(); + util::FiberAtomicGuard guard; unsigned cnt = 0; diff --git a/tests/dragonfly/cluster_test.py b/tests/dragonfly/cluster_test.py index 4c4beb35e1da..9a5253dd9890 100644 --- a/tests/dragonfly/cluster_test.py +++ b/tests/dragonfly/cluster_test.py @@ -1424,7 +1424,7 @@ async def test_migration_with_key_ttl(df_factory): assert await nodes[1].client.execute_command("stick k_sticky") == 0 -@dfly_args({"proactor_threads": 4, "cluster_mode": "yes"}) +@dfly_args({"proactor_threads": 4, "cluster_mode": "yes", "serialization_max_chunk_size": 0}) async def test_network_disconnect_during_migration(df_factory): instances = [ df_factory.create(port=BASE_PORT + i, admin_port=BASE_PORT + i + 1000) for i in range(2) From 88b5116004d0c5a85796033a0bbba9e647c7f6f3 Mon Sep 17 00:00:00 2001 From: kostas Date: Thu, 5 Dec 2024 11:41:44 +0200 Subject: [PATCH 32/32] comments --- src/server/db_slice.cc | 1 - src/server/dflycmd.cc | 4 ++++ src/server/engine_shard.cc | 4 ++-- src/server/generic_family.cc | 5 +++-- 4 files changed, 9 insertions(+), 5 deletions(-) diff --git a/src/server/db_slice.cc b/src/server/db_slice.cc index 2255f84a2dd1..ad7aed04eb5a 100644 --- a/src/server/db_slice.cc +++ b/src/server/db_slice.cc @@ -1090,7 +1090,6 @@ DbSlice::PrimeItAndExp DbSlice::ExpireIfNeeded(const Context& cntx, PrimeIterato LOG(ERROR) << "Invalid call to ExpireIfNeeded"; return {it, ExpireIterator{}}; } - block_counter_.Wait(); auto& db = db_arr_[cntx.db_index]; diff --git a/src/server/dflycmd.cc b/src/server/dflycmd.cc index 513fd115c3f1..5038264e18fe 100644 --- a/src/server/dflycmd.cc +++ b/src/server/dflycmd.cc @@ -369,6 +369,7 @@ void DflyCmd::StartStable(CmdArgList args, Transaction* tx, RedisReplyBuilder* r auto cb = [this, &status, replica_ptr = replica_ptr](EngineShard* shard) { FlowInfo* flow = &replica_ptr->flows[shard->shard_id()]; + status = StopFullSyncInThread(flow, &replica_ptr->cntx, shard); if (*status != OpStatus::OK) { return; @@ -376,6 +377,9 @@ void DflyCmd::StartStable(CmdArgList args, Transaction* tx, RedisReplyBuilder* r StartStableSyncInThread(flow, &replica_ptr->cntx, shard); }; shard_set->RunBlockingInParallel(std::move(cb)); + + if (*status != OpStatus::OK) + return rb->SendError(kInvalidState); } LOG(INFO) << "Transitioned into stable sync with replica " << replica_ptr->address << ":" diff --git a/src/server/engine_shard.cc b/src/server/engine_shard.cc index 033965f4e446..467cd32ac189 100644 --- a/src/server/engine_shard.cc +++ b/src/server/engine_shard.cc @@ -667,8 +667,8 @@ void EngineShard::Heartbeat() { if (db_slice.WillBlockOnJournalWrite()) { const auto elapsed = std::chrono::system_clock::now() - start; if (elapsed > std::chrono::seconds(1)) { - LOG(WARNING) << "Stalled heartbeat() fiber for " << elapsed.count() - << " seconds because of big value serialization"; + LOG_EVERY_T(WARNING, 5) << "Stalled heartbeat() fiber for " << elapsed.count() + << " seconds because of big value serialization"; } return; } diff --git a/src/server/generic_family.cc b/src/server/generic_family.cc index 3dbd21d2848e..140c94b3d992 100644 --- a/src/server/generic_family.cc +++ b/src/server/generic_family.cc @@ -597,8 +597,9 @@ void OpScan(const OpArgs& op_args, const ScanOpts& scan_opts, uint64_t* cursor, auto& db_slice = op_args.GetDbSlice(); DCHECK(db_slice.IsDbValid(op_args.db_cntx.db_index)); - // We need to make sure we don't preempt below, because we don't hold any locks to the keys - // and ExpireIfNeeded requires that. + // ScanCb can preempt due to journaling expired entries and we need to make sure that + // we enter the callback in a timing when journaling will not cause preemptions. Otherwise, + // the bucket might change as we Traverse and yield. db_slice.BlockingCounter()->Wait(); util::FiberAtomicGuard guard;