Skip to content

Commit

Permalink
chore: remove DbSlice mutex and add ConditionFlag in SliceSnapshot
Browse files Browse the repository at this point in the history
Signed-off-by: kostas <[email protected]>
  • Loading branch information
kostasrim committed Nov 6, 2024
1 parent ae3faf5 commit 94955b6
Show file tree
Hide file tree
Showing 11 changed files with 103 additions and 68 deletions.
51 changes: 51 additions & 0 deletions src/server/common.h
Original file line number Diff line number Diff line change
Expand Up @@ -355,4 +355,55 @@ template <typename Mutex> class ABSL_SCOPED_LOCKABLE SharedLock {

extern size_t serialization_max_chunk_size;

struct ConditionFlag {
util::fb2::CondVarAny cond_var;
bool flag = false;
};

// Helper class used to guarantee atomicity between serialization of buckets
class ConditionGuard {
public:
explicit ConditionGuard(ConditionFlag* enclosing) : enclosing_(enclosing) {
util::fb2::NoOpLock noop_lk_;
enclosing_->cond_var.wait(noop_lk_, [this]() { return !enclosing_->flag; });
enclosing_->flag = true;
}

~ConditionGuard() {
enclosing_->flag = false;
enclosing_->cond_var.notify_one();
}

private:
ConditionFlag* enclosing_;
};

class LocalBlockingCounter {
public:
void lock() {
++mutating_;
}

void unlock() {
DCHECK(mutating_ > 0);
--mutating_;
if (mutating_ == 0) {
cond_var_.notify_one();
}
}

void Wait() {
util::fb2::NoOpLock noop_lk_;
cond_var_.wait(noop_lk_, [this]() { return mutating_ == 0; });
}

bool HasMutating() const {
return mutating_ > 0;
}

private:
util::fb2::CondVarAny cond_var_;
size_t mutating_ = 0;
};

} // namespace dfly
22 changes: 6 additions & 16 deletions src/server/db_slice.cc
Original file line number Diff line number Diff line change
Expand Up @@ -481,7 +481,6 @@ OpResult<DbSlice::PrimeItAndExp> DbSlice::FindInternal(const Context& cntx, std:
if (caching_mode_ && IsValid(res.it)) {
if (!change_cb_.empty()) {
FetchedItemsRestorer fetched_restorer(&fetched_items_);
util::fb2::LockGuard lk(local_mu_);
auto bump_cb = [&](PrimeTable::bucket_iterator bit) {
CallChangeCallbacks(cntx.db_index, key, bit);
};
Expand Down Expand Up @@ -574,7 +573,6 @@ OpResult<DbSlice::AddOrFindResult> DbSlice::AddOrFindInternal(const Context& cnt
CHECK(status == OpStatus::KEY_NOTFOUND || status == OpStatus::OUT_OF_MEMORY) << status;

FetchedItemsRestorer fetched_restorer(&fetched_items_);
util::fb2::LockGuard lk(local_mu_);

// It's a new entry.
CallChangeCallbacks(cntx.db_index, key, {key});
Expand Down Expand Up @@ -690,8 +688,6 @@ void DbSlice::ActivateDb(DbIndex db_ind) {
}

bool DbSlice::Del(Context cntx, Iterator it) {
util::fb2::LockGuard lk(local_mu_);

if (!IsValid(it)) {
return false;
}
Expand Down Expand Up @@ -758,7 +754,7 @@ void DbSlice::FlushSlotsFb(const cluster::SlotSet& slot_ids) {
PrimeTable::Cursor cursor;
uint64_t i = 0;
do {
PrimeTable::Cursor next = Traverse(pt, cursor, del_entry_cb);
PrimeTable::Cursor next = pt->Traverse(cursor, del_entry_cb);
++i;
cursor = next;
if (i % 100 == 0) {
Expand Down Expand Up @@ -815,10 +811,7 @@ void DbSlice::FlushDbIndexes(const std::vector<DbIndex>& indexes) {
}

void DbSlice::FlushDb(DbIndex db_ind) {
// We should not flush if serialization of a big value is in progress because this
// could lead to UB or assertion failures (while DashTable::Traverse is iterating over
// a logical bucket).
util::fb2::LockGuard lk(local_mu_);
std::unique_lock<LocalBlockingCounter> lk(block_counter_);
// clear client tracking map.
client_tracking_map_.clear();

Expand All @@ -840,7 +833,6 @@ void DbSlice::FlushDb(DbIndex db_ind) {
}

void DbSlice::AddExpire(DbIndex db_ind, Iterator main_it, uint64_t at) {
util::fb2::LockGuard lk(local_mu_);
uint64_t delta = at - expire_base_[0]; // TODO: employ multigen expire updates.
auto& db = *db_arr_[db_ind];
size_t table_before = db.expire.mem_usage();
Expand All @@ -850,7 +842,6 @@ void DbSlice::AddExpire(DbIndex db_ind, Iterator main_it, uint64_t at) {
}

bool DbSlice::RemoveExpire(DbIndex db_ind, Iterator main_it) {
util::fb2::LockGuard lk(local_mu_);
if (main_it->second.HasExpire()) {
auto& db = *db_arr_[db_ind];
size_t table_before = db.expire.mem_usage();
Expand Down Expand Up @@ -1078,7 +1069,6 @@ bool DbSlice::CheckLock(IntentLock::Mode mode, DbIndex dbid, uint64_t fp) const

void DbSlice::PreUpdate(DbIndex db_ind, Iterator it, std::string_view key) {
FetchedItemsRestorer fetched_restorer(&fetched_items_);
util::fb2::LockGuard lk(local_mu_);
CallChangeCallbacks(db_ind, key, ChangeReq{it.GetInnerIt()});
it.GetInnerIt().SetVersion(NextVersion());
}
Expand Down Expand Up @@ -1180,7 +1170,7 @@ void DbSlice::ExpireAllIfNeeded() {

ExpireTable::Cursor cursor;
do {
cursor = Traverse(&db.expire, cursor, cb);
cursor = db.expire.Traverse(cursor, cb);
} while (cursor);
}
}
Expand All @@ -1191,6 +1181,7 @@ uint64_t DbSlice::RegisterOnChange(ChangeCallback cb) {

void DbSlice::FlushChangeToEarlierCallbacks(DbIndex db_ind, Iterator it, uint64_t upper_bound) {
FetchedItemsRestorer fetched_restorer(&fetched_items_);
std::unique_lock<LocalBlockingCounter> lk(block_counter_);

uint64_t bucket_version = it.GetVersion();
// change_cb_ is ordered by version.
Expand All @@ -1214,7 +1205,7 @@ void DbSlice::FlushChangeToEarlierCallbacks(DbIndex db_ind, Iterator it, uint64_

//! Unregisters the callback.
void DbSlice::UnregisterOnChange(uint64_t id) {
util::fb2::LockGuard lk(local_mu_);
block_counter_.Wait();
auto it = find_if(change_cb_.begin(), change_cb_.end(),
[id](const auto& cb) { return cb.first == id; });
CHECK(it != change_cb_.end());
Expand Down Expand Up @@ -1375,13 +1366,11 @@ void DbSlice::CreateDb(DbIndex db_ind) {
void DbSlice::RegisterWatchedKey(DbIndex db_indx, std::string_view key,
ConnectionState::ExecInfo* exec_info) {
// Because we might insert while another fiber is preempted
util::fb2::LockGuard lk(local_mu_);
db_arr_[db_indx]->watched_keys[key].push_back(exec_info);
}

void DbSlice::UnregisterConnectionWatches(const ConnectionState::ExecInfo* exec_info) {
// Because we might remove while another fiber is preempted and miss a notification
util::fb2::LockGuard lk(local_mu_);
for (const auto& [db_indx, key] : exec_info->watched_keys) {
auto& watched_keys = db_arr_[db_indx]->watched_keys;
if (auto it = watched_keys.find(key); it != watched_keys.end()) {
Expand Down Expand Up @@ -1557,6 +1546,7 @@ void DbSlice::OnCbFinish() {
}

void DbSlice::CallChangeCallbacks(DbIndex id, std::string_view key, const ChangeReq& cr) const {
std::unique_lock<LocalBlockingCounter> lk(block_counter_);
if (change_cb_.empty())
return;

Expand Down
47 changes: 19 additions & 28 deletions src/server/db_slice.h
Original file line number Diff line number Diff line change
Expand Up @@ -305,34 +305,33 @@ class DbSlice {
AddOrFindResult& operator=(ItAndUpdater&& o);
};

OpResult<AddOrFindResult> AddOrFind(const Context& cntx, std::string_view key)
ABSL_LOCKS_EXCLUDED(local_mu_);
OpResult<AddOrFindResult> AddOrFind(const Context& cntx, std::string_view key);

// Same as AddOrSkip, but overwrites in case entry exists.
OpResult<AddOrFindResult> AddOrUpdate(const Context& cntx, std::string_view key, PrimeValue obj,
uint64_t expire_at_ms) ABSL_LOCKS_EXCLUDED(local_mu_);
uint64_t expire_at_ms);

// Adds a new entry. Requires: key does not exist in this slice.
// Returns the iterator to the newly added entry.
// Returns OpStatus::OUT_OF_MEMORY if bad_alloc is thrown
OpResult<ItAndUpdater> AddNew(const Context& cntx, std::string_view key, PrimeValue obj,
uint64_t expire_at_ms) ABSL_LOCKS_EXCLUDED(local_mu_);
uint64_t expire_at_ms);

// Update entry expiration. Return epxiration timepoint in abs milliseconds, or -1 if the entry
// already expired and was deleted;
facade::OpResult<int64_t> UpdateExpire(const Context& cntx, Iterator prime_it, ExpIterator exp_it,
const ExpireParams& params) ABSL_LOCKS_EXCLUDED(local_mu_);
const ExpireParams& params);

// Adds expiry information.
void AddExpire(DbIndex db_ind, Iterator main_it, uint64_t at) ABSL_LOCKS_EXCLUDED(local_mu_);
void AddExpire(DbIndex db_ind, Iterator main_it, uint64_t at);

// Removes the corresponing expiry information if exists.
// Returns true if expiry existed (and removed).
bool RemoveExpire(DbIndex db_ind, Iterator main_it) ABSL_LOCKS_EXCLUDED(local_mu_);
bool RemoveExpire(DbIndex db_ind, Iterator main_it);

// Either adds or removes (if at == 0) expiry. Returns true if a change was made.
// Does not change expiry if at != 0 and expiry already exists.
bool UpdateExpire(DbIndex db_ind, Iterator main_it, uint64_t at) ABSL_LOCKS_EXCLUDED(local_mu_);
bool UpdateExpire(DbIndex db_ind, Iterator main_it, uint64_t at);

void SetMCFlag(DbIndex db_ind, PrimeKey key, uint32_t flag);
uint32_t GetMCFlag(DbIndex db_ind, const PrimeKey& key) const;
Expand All @@ -343,12 +342,12 @@ class DbSlice {
// Delete a key referred by its iterator.
void PerformDeletion(Iterator del_it, DbTable* table);

bool Del(Context cntx, Iterator it) ABSL_LOCKS_EXCLUDED(local_mu_);
bool Del(Context cntx, Iterator it);

constexpr static DbIndex kDbAll = 0xFFFF;

// Flushes db_ind or all databases if kDbAll is passed
void FlushDb(DbIndex db_ind) ABSL_LOCKS_EXCLUDED(local_mu_);
void FlushDb(DbIndex db_ind);

// Flushes the data of given slot ranges.
void FlushSlots(cluster::SlotRanges slot_ranges);
Expand Down Expand Up @@ -439,7 +438,7 @@ class DbSlice {
void FlushChangeToEarlierCallbacks(DbIndex db_ind, Iterator it, uint64_t upper_bound);

//! Unregisters the callback.
void UnregisterOnChange(uint64_t id) ABSL_LOCKS_EXCLUDED(local_mu_);
void UnregisterOnChange(uint64_t id);

struct DeleteExpiredStats {
uint32_t deleted = 0; // number of deleted items due to expiry (less than traversed).
Expand Down Expand Up @@ -496,25 +495,14 @@ class DbSlice {
client_tracking_map_[key].insert(conn_ref);
}

// Provides access to the internal lock of db_slice for flows that serialize
// entries with preemption and need to synchronize with Traverse below which
// acquires the same lock.
ThreadLocalMutex& GetSerializationMutex() {
return local_mu_;
}

// Wrapper around DashTable::Traverse that allows preemptions
template <typename Cb, typename DashTable>
PrimeTable::Cursor Traverse(DashTable* pt, PrimeTable::Cursor cursor, Cb&& cb)
ABSL_LOCKS_EXCLUDED(local_mu_) {
util::fb2::LockGuard lk(local_mu_);
return pt->Traverse(cursor, std::forward<Cb>(cb));
}

// Does not check for non supported events. Callers must parse the string and reject it
// if it's not empty and not EX.
void SetNotifyKeyspaceEvents(std::string_view notify_keyspace_events);

bool HasBlockingCounterMutating() const {
return block_counter_.HasMutating();
}

private:
void PreUpdate(DbIndex db_ind, Iterator it, std::string_view key);
void PostUpdate(DbIndex db_ind, Iterator it, std::string_view key, size_t orig_size);
Expand Down Expand Up @@ -571,8 +559,11 @@ class DbSlice {

void CallChangeCallbacks(DbIndex id, std::string_view key, const ChangeReq& cr) const;

// Used to provide exclusive access while Traversing segments
mutable ThreadLocalMutex local_mu_;
// We need this because registered callbacks might yield and when they do so we want
// to avoid Heartbeat or Flushing the db.
// This counter protects us against this case.
mutable LocalBlockingCounter block_counter_;

ShardId shard_id_;
uint8_t caching_mode_ : 1;

Expand Down
2 changes: 1 addition & 1 deletion src/server/debugcmd.cc
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,7 @@ void DoBuildObjHist(EngineShard* shard, ConnectionContext* cntx, ObjHistMap* obj
continue;
PrimeTable::Cursor cursor;
do {
cursor = db_slice.Traverse(&dbt->prime, cursor, [&](PrimeIterator it) {
cursor = dbt->prime.Traverse(cursor, [&](PrimeIterator it) {
unsigned obj_type = it->second.ObjType();
auto& hist_ptr = (*obj_hist_map)[obj_type];
if (!hist_ptr) {
Expand Down
21 changes: 8 additions & 13 deletions src/server/engine_shard.cc
Original file line number Diff line number Diff line change
Expand Up @@ -329,7 +329,7 @@ bool EngineShard::DoDefrag() {
uint64_t attempts = 0;

do {
cur = slice.Traverse(prime_table, cur, [&](PrimeIterator it) {
cur = prime_table->Traverse(cur, [&](PrimeIterator it) {
// for each value check whether we should move it because it
// seats on underutilized page of memory, and if so, do it.
bool did = it->second.DefragIfNeeded(threshold);
Expand Down Expand Up @@ -661,13 +661,17 @@ void EngineShard::Heartbeat() {

CacheStats();

// TODO: iterate over all namespaces
DbSlice& db_slice = namespaces.GetDefaultNamespace().GetDbSlice(shard_id());
// Skip heartbeat if we are serializing a big value
if (db_slice.HasBlockingCounterMutating()) {
return;
}

if (!IsReplica()) { // Never run expiry/evictions on replica.
RetireExpiredAndEvict();
}

// TODO: iterate over all namespaces
DbSlice& db_slice = namespaces.GetDefaultNamespace().GetDbSlice(shard_id());

// Offset CoolMemoryUsage when consider background offloading.
// TODO: Another approach could be is to align the approach similarly to how we do with
// FreeMemWithEvictionStep, i.e. if memory_budget is below the limit.
Expand All @@ -693,15 +697,6 @@ void EngineShard::Heartbeat() {
void EngineShard::RetireExpiredAndEvict() {
// TODO: iterate over all namespaces
DbSlice& db_slice = namespaces.GetDefaultNamespace().GetDbSlice(shard_id());
// Some of the functions below might acquire the same lock again so we need to unlock it
// asap. We won't yield before we relock the mutex again, so the code below is atomic
// in respect to preemptions of big values. An example of that is the call to
// DeleteExpiredStep() below, which eventually calls ExpireIfNeeded()
// and within that the call to RecordExpiry() will trigger the registered
// callback OnJournalEntry which locks the exact same mutex.
// We need to lock below and immediately release because there should be no other fiber
// that is serializing a big value.
{ std::unique_lock lk(db_slice.GetSerializationMutex()); }
constexpr double kTtlDeleteLimit = 200;
constexpr double kRedLimitFactor = 0.1;

Expand Down
8 changes: 2 additions & 6 deletions src/server/generic_family.cc
Original file line number Diff line number Diff line change
Expand Up @@ -584,12 +584,8 @@ void OpScan(const OpArgs& op_args, const ScanOpts& scan_opts, uint64_t* cursor,
PrimeTable::Cursor cur = *cursor;
auto [prime_table, expire_table] = db_slice.GetTables(op_args.db_cntx.db_index);
string scratch;
do {
cur = db_slice.Traverse(prime_table, cur, [&](PrimeIterator it) {
cnt += ScanCb(op_args, it, scan_opts, &scratch, vec);
});
} while (cur && cnt < scan_opts.limit);

cur = prime_table->Traverse(
cur, [&](PrimeIterator it) { cnt += ScanCb(op_args, it, scan_opts, &scratch, vec); });
VLOG(1) << "OpScan " << db_slice.shard_id() << " cursor: " << cur.value();
*cursor = cur.value();
}
Expand Down
6 changes: 4 additions & 2 deletions src/server/journal/streamer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -212,8 +212,8 @@ void RestoreStreamer::Run() {
do {
if (fiber_cancelled_)
return;

cursor = db_slice_->Traverse(pt, cursor, [&](PrimeTable::bucket_iterator it) {
cursor = pt->Traverse(cursor, [&](PrimeTable::bucket_iterator it) {
ConditionGuard guard(&bucket_ser_);
db_slice_->FlushChangeToEarlierCallbacks(0 /*db_id always 0 for cluster*/,
DbSlice::Iterator::FromPrime(it), snapshot_version_);
WriteBucket(it);
Expand Down Expand Up @@ -302,6 +302,8 @@ void RestoreStreamer::WriteBucket(PrimeTable::bucket_iterator it) {
void RestoreStreamer::OnDbChange(DbIndex db_index, const DbSlice::ChangeReq& req) {
DCHECK_EQ(db_index, 0) << "Restore migration only allowed in cluster mode in db0";

ConditionGuard guard(&bucket_ser_);

PrimeTable* table = db_slice_->GetTables(0).first;

if (const PrimeTable::bucket_iterator* bit = req.update()) {
Expand Down
2 changes: 2 additions & 0 deletions src/server/journal/streamer.h
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,8 @@ class RestoreStreamer : public JournalStreamer {
cluster::SlotSet my_slots_;
bool fiber_cancelled_ = false;
bool snapshot_finished_ = false;

ConditionFlag bucket_ser_;
};

} // namespace dfly
Loading

0 comments on commit 94955b6

Please sign in to comment.