diff --git a/src/server/common.h b/src/server/common.h index efe66d6ee684..c7a524c14046 100644 --- a/src/server/common.h +++ b/src/server/common.h @@ -355,4 +355,55 @@ template class ABSL_SCOPED_LOCKABLE SharedLock { extern size_t serialization_max_chunk_size; +struct ConditionFlag { + util::fb2::CondVarAny cond_var; + bool flag = false; +}; + +// Helper class used to guarantee atomicity between serialization of buckets +class ConditionGuard { + public: + explicit ConditionGuard(ConditionFlag* enclosing) : enclosing_(enclosing) { + util::fb2::NoOpLock noop_lk_; + enclosing_->cond_var.wait(noop_lk_, [this]() { return !enclosing_->flag; }); + enclosing_->flag = true; + } + + ~ConditionGuard() { + enclosing_->flag = false; + enclosing_->cond_var.notify_one(); + } + + private: + ConditionFlag* enclosing_; +}; + +class LocalBlockingCounter { + public: + void lock() { + ++mutating_; + } + + void unlock() { + DCHECK(mutating_ > 0); + --mutating_; + if (mutating_ == 0) { + cond_var_.notify_one(); + } + } + + void Wait() { + util::fb2::NoOpLock noop_lk_; + cond_var_.wait(noop_lk_, [this]() { return mutating_ == 0; }); + } + + bool HasMutating() const { + return mutating_ > 0; + } + + private: + util::fb2::CondVarAny cond_var_; + size_t mutating_ = 0; +}; + } // namespace dfly diff --git a/src/server/db_slice.cc b/src/server/db_slice.cc index 71cf28bb41dc..acc3ccfc4d73 100644 --- a/src/server/db_slice.cc +++ b/src/server/db_slice.cc @@ -481,7 +481,6 @@ OpResult DbSlice::FindInternal(const Context& cntx, std: if (caching_mode_ && IsValid(res.it)) { if (!change_cb_.empty()) { FetchedItemsRestorer fetched_restorer(&fetched_items_); - util::fb2::LockGuard lk(local_mu_); auto bump_cb = [&](PrimeTable::bucket_iterator bit) { CallChangeCallbacks(cntx.db_index, key, bit); }; @@ -574,7 +573,6 @@ OpResult DbSlice::AddOrFindInternal(const Context& cnt CHECK(status == OpStatus::KEY_NOTFOUND || status == OpStatus::OUT_OF_MEMORY) << status; FetchedItemsRestorer fetched_restorer(&fetched_items_); - util::fb2::LockGuard lk(local_mu_); // It's a new entry. CallChangeCallbacks(cntx.db_index, key, {key}); @@ -690,8 +688,6 @@ void DbSlice::ActivateDb(DbIndex db_ind) { } bool DbSlice::Del(Context cntx, Iterator it) { - util::fb2::LockGuard lk(local_mu_); - if (!IsValid(it)) { return false; } @@ -758,7 +754,7 @@ void DbSlice::FlushSlotsFb(const cluster::SlotSet& slot_ids) { PrimeTable::Cursor cursor; uint64_t i = 0; do { - PrimeTable::Cursor next = Traverse(pt, cursor, del_entry_cb); + PrimeTable::Cursor next = pt->Traverse(cursor, del_entry_cb); ++i; cursor = next; if (i % 100 == 0) { @@ -815,10 +811,7 @@ void DbSlice::FlushDbIndexes(const std::vector& indexes) { } void DbSlice::FlushDb(DbIndex db_ind) { - // We should not flush if serialization of a big value is in progress because this - // could lead to UB or assertion failures (while DashTable::Traverse is iterating over - // a logical bucket). - util::fb2::LockGuard lk(local_mu_); + std::unique_lock lk(block_counter_); // clear client tracking map. client_tracking_map_.clear(); @@ -840,7 +833,6 @@ void DbSlice::FlushDb(DbIndex db_ind) { } void DbSlice::AddExpire(DbIndex db_ind, Iterator main_it, uint64_t at) { - util::fb2::LockGuard lk(local_mu_); uint64_t delta = at - expire_base_[0]; // TODO: employ multigen expire updates. auto& db = *db_arr_[db_ind]; size_t table_before = db.expire.mem_usage(); @@ -850,7 +842,6 @@ void DbSlice::AddExpire(DbIndex db_ind, Iterator main_it, uint64_t at) { } bool DbSlice::RemoveExpire(DbIndex db_ind, Iterator main_it) { - util::fb2::LockGuard lk(local_mu_); if (main_it->second.HasExpire()) { auto& db = *db_arr_[db_ind]; size_t table_before = db.expire.mem_usage(); @@ -1078,7 +1069,6 @@ bool DbSlice::CheckLock(IntentLock::Mode mode, DbIndex dbid, uint64_t fp) const void DbSlice::PreUpdate(DbIndex db_ind, Iterator it, std::string_view key) { FetchedItemsRestorer fetched_restorer(&fetched_items_); - util::fb2::LockGuard lk(local_mu_); CallChangeCallbacks(db_ind, key, ChangeReq{it.GetInnerIt()}); it.GetInnerIt().SetVersion(NextVersion()); } @@ -1180,7 +1170,7 @@ void DbSlice::ExpireAllIfNeeded() { ExpireTable::Cursor cursor; do { - cursor = Traverse(&db.expire, cursor, cb); + cursor = db.expire.Traverse(cursor, cb); } while (cursor); } } @@ -1191,6 +1181,7 @@ uint64_t DbSlice::RegisterOnChange(ChangeCallback cb) { void DbSlice::FlushChangeToEarlierCallbacks(DbIndex db_ind, Iterator it, uint64_t upper_bound) { FetchedItemsRestorer fetched_restorer(&fetched_items_); + std::unique_lock lk(block_counter_); uint64_t bucket_version = it.GetVersion(); // change_cb_ is ordered by version. @@ -1214,7 +1205,7 @@ void DbSlice::FlushChangeToEarlierCallbacks(DbIndex db_ind, Iterator it, uint64_ //! Unregisters the callback. void DbSlice::UnregisterOnChange(uint64_t id) { - util::fb2::LockGuard lk(local_mu_); + block_counter_.Wait(); auto it = find_if(change_cb_.begin(), change_cb_.end(), [id](const auto& cb) { return cb.first == id; }); CHECK(it != change_cb_.end()); @@ -1375,13 +1366,11 @@ void DbSlice::CreateDb(DbIndex db_ind) { void DbSlice::RegisterWatchedKey(DbIndex db_indx, std::string_view key, ConnectionState::ExecInfo* exec_info) { // Because we might insert while another fiber is preempted - util::fb2::LockGuard lk(local_mu_); db_arr_[db_indx]->watched_keys[key].push_back(exec_info); } void DbSlice::UnregisterConnectionWatches(const ConnectionState::ExecInfo* exec_info) { // Because we might remove while another fiber is preempted and miss a notification - util::fb2::LockGuard lk(local_mu_); for (const auto& [db_indx, key] : exec_info->watched_keys) { auto& watched_keys = db_arr_[db_indx]->watched_keys; if (auto it = watched_keys.find(key); it != watched_keys.end()) { @@ -1557,6 +1546,7 @@ void DbSlice::OnCbFinish() { } void DbSlice::CallChangeCallbacks(DbIndex id, std::string_view key, const ChangeReq& cr) const { + std::unique_lock lk(block_counter_); if (change_cb_.empty()) return; diff --git a/src/server/db_slice.h b/src/server/db_slice.h index 16799530a95a..e522c75e1eed 100644 --- a/src/server/db_slice.h +++ b/src/server/db_slice.h @@ -305,34 +305,33 @@ class DbSlice { AddOrFindResult& operator=(ItAndUpdater&& o); }; - OpResult AddOrFind(const Context& cntx, std::string_view key) - ABSL_LOCKS_EXCLUDED(local_mu_); + OpResult AddOrFind(const Context& cntx, std::string_view key); // Same as AddOrSkip, but overwrites in case entry exists. OpResult AddOrUpdate(const Context& cntx, std::string_view key, PrimeValue obj, - uint64_t expire_at_ms) ABSL_LOCKS_EXCLUDED(local_mu_); + uint64_t expire_at_ms); // Adds a new entry. Requires: key does not exist in this slice. // Returns the iterator to the newly added entry. // Returns OpStatus::OUT_OF_MEMORY if bad_alloc is thrown OpResult AddNew(const Context& cntx, std::string_view key, PrimeValue obj, - uint64_t expire_at_ms) ABSL_LOCKS_EXCLUDED(local_mu_); + uint64_t expire_at_ms); // Update entry expiration. Return epxiration timepoint in abs milliseconds, or -1 if the entry // already expired and was deleted; facade::OpResult UpdateExpire(const Context& cntx, Iterator prime_it, ExpIterator exp_it, - const ExpireParams& params) ABSL_LOCKS_EXCLUDED(local_mu_); + const ExpireParams& params); // Adds expiry information. - void AddExpire(DbIndex db_ind, Iterator main_it, uint64_t at) ABSL_LOCKS_EXCLUDED(local_mu_); + void AddExpire(DbIndex db_ind, Iterator main_it, uint64_t at); // Removes the corresponing expiry information if exists. // Returns true if expiry existed (and removed). - bool RemoveExpire(DbIndex db_ind, Iterator main_it) ABSL_LOCKS_EXCLUDED(local_mu_); + bool RemoveExpire(DbIndex db_ind, Iterator main_it); // Either adds or removes (if at == 0) expiry. Returns true if a change was made. // Does not change expiry if at != 0 and expiry already exists. - bool UpdateExpire(DbIndex db_ind, Iterator main_it, uint64_t at) ABSL_LOCKS_EXCLUDED(local_mu_); + bool UpdateExpire(DbIndex db_ind, Iterator main_it, uint64_t at); void SetMCFlag(DbIndex db_ind, PrimeKey key, uint32_t flag); uint32_t GetMCFlag(DbIndex db_ind, const PrimeKey& key) const; @@ -343,12 +342,12 @@ class DbSlice { // Delete a key referred by its iterator. void PerformDeletion(Iterator del_it, DbTable* table); - bool Del(Context cntx, Iterator it) ABSL_LOCKS_EXCLUDED(local_mu_); + bool Del(Context cntx, Iterator it); constexpr static DbIndex kDbAll = 0xFFFF; // Flushes db_ind or all databases if kDbAll is passed - void FlushDb(DbIndex db_ind) ABSL_LOCKS_EXCLUDED(local_mu_); + void FlushDb(DbIndex db_ind); // Flushes the data of given slot ranges. void FlushSlots(cluster::SlotRanges slot_ranges); @@ -439,7 +438,7 @@ class DbSlice { void FlushChangeToEarlierCallbacks(DbIndex db_ind, Iterator it, uint64_t upper_bound); //! Unregisters the callback. - void UnregisterOnChange(uint64_t id) ABSL_LOCKS_EXCLUDED(local_mu_); + void UnregisterOnChange(uint64_t id); struct DeleteExpiredStats { uint32_t deleted = 0; // number of deleted items due to expiry (less than traversed). @@ -496,25 +495,14 @@ class DbSlice { client_tracking_map_[key].insert(conn_ref); } - // Provides access to the internal lock of db_slice for flows that serialize - // entries with preemption and need to synchronize with Traverse below which - // acquires the same lock. - ThreadLocalMutex& GetSerializationMutex() { - return local_mu_; - } - - // Wrapper around DashTable::Traverse that allows preemptions - template - PrimeTable::Cursor Traverse(DashTable* pt, PrimeTable::Cursor cursor, Cb&& cb) - ABSL_LOCKS_EXCLUDED(local_mu_) { - util::fb2::LockGuard lk(local_mu_); - return pt->Traverse(cursor, std::forward(cb)); - } - // Does not check for non supported events. Callers must parse the string and reject it // if it's not empty and not EX. void SetNotifyKeyspaceEvents(std::string_view notify_keyspace_events); + bool HasBlockingCounterMutating() const { + return block_counter_.HasMutating(); + } + private: void PreUpdate(DbIndex db_ind, Iterator it, std::string_view key); void PostUpdate(DbIndex db_ind, Iterator it, std::string_view key, size_t orig_size); @@ -571,8 +559,11 @@ class DbSlice { void CallChangeCallbacks(DbIndex id, std::string_view key, const ChangeReq& cr) const; - // Used to provide exclusive access while Traversing segments - mutable ThreadLocalMutex local_mu_; + // We need this because registered callbacks might yield and when they do so we want + // to avoid Heartbeat or Flushing the db. + // This counter protects us against this case. + mutable LocalBlockingCounter block_counter_; + ShardId shard_id_; uint8_t caching_mode_ : 1; diff --git a/src/server/debugcmd.cc b/src/server/debugcmd.cc index 7f3d8579c320..2260e4bfdade 100644 --- a/src/server/debugcmd.cc +++ b/src/server/debugcmd.cc @@ -281,7 +281,7 @@ void DoBuildObjHist(EngineShard* shard, ConnectionContext* cntx, ObjHistMap* obj continue; PrimeTable::Cursor cursor; do { - cursor = db_slice.Traverse(&dbt->prime, cursor, [&](PrimeIterator it) { + cursor = dbt->prime.Traverse(cursor, [&](PrimeIterator it) { unsigned obj_type = it->second.ObjType(); auto& hist_ptr = (*obj_hist_map)[obj_type]; if (!hist_ptr) { diff --git a/src/server/engine_shard.cc b/src/server/engine_shard.cc index 53de0b720eaa..6ca3661c2e8d 100644 --- a/src/server/engine_shard.cc +++ b/src/server/engine_shard.cc @@ -329,7 +329,7 @@ bool EngineShard::DoDefrag() { uint64_t attempts = 0; do { - cur = slice.Traverse(prime_table, cur, [&](PrimeIterator it) { + cur = prime_table->Traverse(cur, [&](PrimeIterator it) { // for each value check whether we should move it because it // seats on underutilized page of memory, and if so, do it. bool did = it->second.DefragIfNeeded(threshold); @@ -661,13 +661,17 @@ void EngineShard::Heartbeat() { CacheStats(); + // TODO: iterate over all namespaces + DbSlice& db_slice = namespaces.GetDefaultNamespace().GetDbSlice(shard_id()); + // Skip heartbeat if we are serializing a big value + if (db_slice.HasBlockingCounterMutating()) { + return; + } + if (!IsReplica()) { // Never run expiry/evictions on replica. RetireExpiredAndEvict(); } - // TODO: iterate over all namespaces - DbSlice& db_slice = namespaces.GetDefaultNamespace().GetDbSlice(shard_id()); - // Offset CoolMemoryUsage when consider background offloading. // TODO: Another approach could be is to align the approach similarly to how we do with // FreeMemWithEvictionStep, i.e. if memory_budget is below the limit. @@ -693,15 +697,6 @@ void EngineShard::Heartbeat() { void EngineShard::RetireExpiredAndEvict() { // TODO: iterate over all namespaces DbSlice& db_slice = namespaces.GetDefaultNamespace().GetDbSlice(shard_id()); - // Some of the functions below might acquire the same lock again so we need to unlock it - // asap. We won't yield before we relock the mutex again, so the code below is atomic - // in respect to preemptions of big values. An example of that is the call to - // DeleteExpiredStep() below, which eventually calls ExpireIfNeeded() - // and within that the call to RecordExpiry() will trigger the registered - // callback OnJournalEntry which locks the exact same mutex. - // We need to lock below and immediately release because there should be no other fiber - // that is serializing a big value. - { std::unique_lock lk(db_slice.GetSerializationMutex()); } constexpr double kTtlDeleteLimit = 200; constexpr double kRedLimitFactor = 0.1; diff --git a/src/server/generic_family.cc b/src/server/generic_family.cc index 390d70793a7d..a3d8dc3dd677 100644 --- a/src/server/generic_family.cc +++ b/src/server/generic_family.cc @@ -584,12 +584,8 @@ void OpScan(const OpArgs& op_args, const ScanOpts& scan_opts, uint64_t* cursor, PrimeTable::Cursor cur = *cursor; auto [prime_table, expire_table] = db_slice.GetTables(op_args.db_cntx.db_index); string scratch; - do { - cur = db_slice.Traverse(prime_table, cur, [&](PrimeIterator it) { - cnt += ScanCb(op_args, it, scan_opts, &scratch, vec); - }); - } while (cur && cnt < scan_opts.limit); - + cur = prime_table->Traverse( + cur, [&](PrimeIterator it) { cnt += ScanCb(op_args, it, scan_opts, &scratch, vec); }); VLOG(1) << "OpScan " << db_slice.shard_id() << " cursor: " << cur.value(); *cursor = cur.value(); } diff --git a/src/server/journal/streamer.cc b/src/server/journal/streamer.cc index 6d4bb7f81af9..86a7af5ededc 100644 --- a/src/server/journal/streamer.cc +++ b/src/server/journal/streamer.cc @@ -212,8 +212,8 @@ void RestoreStreamer::Run() { do { if (fiber_cancelled_) return; - - cursor = db_slice_->Traverse(pt, cursor, [&](PrimeTable::bucket_iterator it) { + cursor = pt->Traverse(cursor, [&](PrimeTable::bucket_iterator it) { + ConditionGuard guard(&bucket_ser_); db_slice_->FlushChangeToEarlierCallbacks(0 /*db_id always 0 for cluster*/, DbSlice::Iterator::FromPrime(it), snapshot_version_); WriteBucket(it); @@ -302,6 +302,8 @@ void RestoreStreamer::WriteBucket(PrimeTable::bucket_iterator it) { void RestoreStreamer::OnDbChange(DbIndex db_index, const DbSlice::ChangeReq& req) { DCHECK_EQ(db_index, 0) << "Restore migration only allowed in cluster mode in db0"; + ConditionGuard guard(&bucket_ser_); + PrimeTable* table = db_slice_->GetTables(0).first; if (const PrimeTable::bucket_iterator* bit = req.update()) { diff --git a/src/server/journal/streamer.h b/src/server/journal/streamer.h index c625b60c5157..c983bce58d6a 100644 --- a/src/server/journal/streamer.h +++ b/src/server/journal/streamer.h @@ -106,6 +106,8 @@ class RestoreStreamer : public JournalStreamer { cluster::SlotSet my_slots_; bool fiber_cancelled_ = false; bool snapshot_finished_ = false; + + ConditionFlag bucket_ser_; }; } // namespace dfly diff --git a/src/server/snapshot.cc b/src/server/snapshot.cc index 745233552526..00b8beb2a558 100644 --- a/src/server/snapshot.cc +++ b/src/server/snapshot.cc @@ -168,7 +168,7 @@ void SliceSnapshot::IterateBucketsFb(const Cancellation* cll, bool send_full_syn return; PrimeTable::Cursor next = - db_slice_->Traverse(pt, cursor, absl::bind_front(&SliceSnapshot::BucketSaveCb, this)); + pt->Traverse(cursor, absl::bind_front(&SliceSnapshot::BucketSaveCb, this)); cursor = next; PushSerialized(false); @@ -243,6 +243,7 @@ void SliceSnapshot::SwitchIncrementalFb(Context* cntx, LSN lsn) { } bool SliceSnapshot::BucketSaveCb(PrimeIterator it) { + ConditionGuard guard(&bucket_ser_); ++stats_.savecb_calls; auto check = [&](auto v) { @@ -372,6 +373,8 @@ bool SliceSnapshot::PushSerialized(bool force) { } void SliceSnapshot::OnDbChange(DbIndex db_index, const DbSlice::ChangeReq& req) { + ConditionGuard guard(&bucket_ser_); + PrimeTable* table = db_slice_->GetTables(db_index).first; const PrimeTable::bucket_iterator* bit = req.update(); @@ -396,7 +399,7 @@ void SliceSnapshot::OnJournalEntry(const journal::JournalItem& item, bool await) // To enable journal flushing to sync after non auto journal command is executed we call // TriggerJournalWriteToSink. This call uses the NOOP opcode with await=true. Since there is no // additional journal change to serialize, it simply invokes PushSerialized. - std::unique_lock lk(db_slice_->GetSerializationMutex()); + ConditionGuard guard(&bucket_ser_); if (item.opcode != journal::Op::NOOP) { serializer_->WriteJournalEntry(item.data); } diff --git a/src/server/snapshot.h b/src/server/snapshot.h index 38ad86c889ad..339a963c9204 100644 --- a/src/server/snapshot.h +++ b/src/server/snapshot.h @@ -169,6 +169,8 @@ class SliceSnapshot { size_t keys_total = 0; } stats_; + ConditionFlag bucket_ser_; + std::function on_push_; std::function on_snapshot_finish_; }; diff --git a/tests/dragonfly/instance.py b/tests/dragonfly/instance.py index d7dd7405ee26..8550114ac9ea 100644 --- a/tests/dragonfly/instance.py +++ b/tests/dragonfly/instance.py @@ -124,6 +124,9 @@ def __init__(self, params: DflyParams, args): if threads > 1: self.args["num_shards"] = threads - 1 + # Add 1 byte limit for big values + self.args["serialization_max_chunk_size"] = 1 + def __del__(self): assert self.proc == None