Skip to content

Commit

Permalink
feat: yield when serialization is in progress (#3220)
Browse files Browse the repository at this point in the history
* allow preemption when we serialize buckets
* add condition variable to protect interleaved preemptions

Signed-off-by: kostas <[email protected]>
  • Loading branch information
kostasrim authored Jul 11, 2024
1 parent f9ded47 commit bf2e5fd
Show file tree
Hide file tree
Showing 5 changed files with 62 additions and 22 deletions.
23 changes: 23 additions & 0 deletions src/server/common.h
Original file line number Diff line number Diff line change
Expand Up @@ -359,4 +359,27 @@ class UniquePicksGenerator : public PicksGenerator {
absl::BitGen bitgen_{};
};

struct ConditionFlag {
util::fb2::CondVarAny cond_var;
bool flag = false;
};

// Helper class used to guarantee atomicity between serialization of buckets
class ConditionGuard {
public:
explicit ConditionGuard(ConditionFlag* enclosing) : enclosing_(enclosing) {
util::fb2::NoOpLock noop_lk_;
enclosing_->cond_var.wait(noop_lk_, [this]() { return !enclosing_->flag; });
enclosing_->flag = true;
}

~ConditionGuard() {
enclosing_->flag = false;
enclosing_->cond_var.notify_one();
}

private:
ConditionFlag* enclosing_;
};

} // namespace dfly
12 changes: 6 additions & 6 deletions src/server/journal/streamer.cc
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2022, DragonflyDB authors. All rights reserved.
// Copyright 2024, DragonflyDB authors. All rights reserved.
// See LICENSE for licensing terms.
//

Expand All @@ -9,6 +9,7 @@
#include "base/flags.h"
#include "base/logging.h"
#include "server/cluster/cluster_defs.h"
#include "util/fibers/synchronization.h"

using namespace facade;

Expand Down Expand Up @@ -211,6 +212,8 @@ void RestoreStreamer::Start(util::FiberSocketBase* dest, bool send_lsn) {

bool written = false;
cursor = pt->Traverse(cursor, [&](PrimeTable::bucket_iterator it) {
ConditionGuard guard(&bucket_ser_);

db_slice_->FlushChangeToEarlierCallbacks(0 /*db_id always 0 for cluster*/,
DbSlice::Iterator::FromPrime(it), snapshot_version_);
if (WriteBucket(it)) {
Expand Down Expand Up @@ -280,10 +283,6 @@ bool RestoreStreamer::ShouldWrite(cluster::SlotId slot_id) const {
}

bool RestoreStreamer::WriteBucket(PrimeTable::bucket_iterator it) {
// Can't switch fibers because that could invalidate iterator or cause bucket splits which may
// move keys between buckets.
FiberAtomicGuard fg;

bool written = false;

if (it.GetVersion() < snapshot_version_) {
Expand Down Expand Up @@ -312,7 +311,8 @@ bool RestoreStreamer::WriteBucket(PrimeTable::bucket_iterator it) {
void RestoreStreamer::OnDbChange(DbIndex db_index, const DbSlice::ChangeReq& req) {
DCHECK_EQ(db_index, 0) << "Restore migration only allowed in cluster mode in db0";

FiberAtomicGuard fg;
ConditionGuard guard(&bucket_ser_);

PrimeTable* table = db_slice_->GetTables(0).first;

if (const PrimeTable::bucket_iterator* bit = req.update()) {
Expand Down
5 changes: 4 additions & 1 deletion src/server/journal/streamer.h
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
// Copyright 2022, DragonflyDB authors. All rights reserved.
// Copyright 2024, DragonflyDB authors. All rights reserved.
// See LICENSE for licensing terms.
//

#pragma once

#include "server/common.h"
#include "server/db_slice.h"
#include "server/journal/journal.h"
#include "server/journal/serializer.h"
Expand Down Expand Up @@ -102,6 +103,8 @@ class RestoreStreamer : public JournalStreamer {
cluster::SlotSet my_slots_;
bool fiber_cancelled_ = false;
bool snapshot_finished_ = false;

ConditionFlag bucket_ser_;
};

} // namespace dfly
39 changes: 25 additions & 14 deletions src/server/snapshot.cc
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2022, DragonflyDB authors. All rights reserved.
// Copyright 2024, DragonflyDB authors. All rights reserved.
// See LICENSE for licensing terms.
//

Expand All @@ -8,6 +8,8 @@
#include <absl/strings/match.h>
#include <absl/strings/str_cat.h>

#include <mutex>

#include "base/logging.h"
#include "core/heap_size.h"
#include "server/db_slice.h"
Expand All @@ -16,6 +18,7 @@
#include "server/rdb_extensions.h"
#include "server/rdb_save.h"
#include "server/tiered_storage.h"
#include "util/fibers/synchronization.h"

namespace dfly {

Expand Down Expand Up @@ -235,30 +238,35 @@ void SliceSnapshot::IterateBucketsFb(const Cancellation* cll, bool send_full_syn
}

bool SliceSnapshot::BucketSaveCb(PrimeIterator it) {
ConditionGuard guard(&bucket_ser_);

++stats_.savecb_calls;

auto check = [&](auto v) {
if (v >= snapshot_version_) {
// either has been already serialized or added after snapshotting started.
DVLOG(3) << "Skipped " << it.segment_id() << ":" << it.bucket_id() << ":" << it.slot_id()
<< " at " << v;
++stats_.skipped;
return false;
}
return true;
};

uint64_t v = it.GetVersion();
if (v >= snapshot_version_) {
// either has been already serialized or added after snapshotting started.
DVLOG(3) << "Skipped " << it.segment_id() << ":" << it.bucket_id() << ":" << it.slot_id()
<< " at " << v;
++stats_.skipped;
if (!check(v)) {
return false;
}

db_slice_->FlushChangeToEarlierCallbacks(current_db_, DbSlice::Iterator::FromPrime(it),
snapshot_version_);

stats_.loop_serialized += SerializeBucket(current_db_, it);

return false;
}

unsigned SliceSnapshot::SerializeBucket(DbIndex db_index, PrimeTable::bucket_iterator it) {
// Must be atomic because after after we call it.snapshot_version_ we're starting
// to send incremental updates instead of serializing the whole bucket: We must not
// send the update until the initial SerializeBucket is called.
// Relying on the atomicity of SerializeBucket is Ok here because only one thread may handle this
// bucket.
FiberAtomicGuard fg;
DCHECK_LT(it.GetVersion(), snapshot_version_);

// traverse physical bucket and write it into string file.
Expand All @@ -268,6 +276,7 @@ unsigned SliceSnapshot::SerializeBucket(DbIndex db_index, PrimeTable::bucket_ite

while (!it.is_done()) {
++result;
// might yield
SerializeEntry(db_index, it->first, it->second, nullopt, serializer_.get());
++it;
}
Expand Down Expand Up @@ -330,10 +339,12 @@ bool SliceSnapshot::PushSerializedToChannel(bool force) {
}

void SliceSnapshot::OnDbChange(DbIndex db_index, const DbSlice::ChangeReq& req) {
FiberAtomicGuard fg;
ConditionGuard guard(&bucket_ser_);

PrimeTable* table = db_slice_->GetTables(db_index).first;
const PrimeTable::bucket_iterator* bit = req.update();

if (const PrimeTable::bucket_iterator* bit = req.update()) {
if (bit) {
if (bit->GetVersion() < snapshot_version_) {
stats_.side_saved += SerializeBucket(db_index, *bit);
}
Expand Down
5 changes: 4 additions & 1 deletion src/server/snapshot.h
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2022, DragonflyDB authors. All rights reserved.
// Copyright 2024, DragonflyDB authors. All rights reserved.
// See LICENSE for licensing terms.
//

Expand All @@ -10,6 +10,7 @@
#include "base/pod_array.h"
#include "core/size_tracking_channel.h"
#include "io/file.h"
#include "server/common.h"
#include "server/db_slice.h"
#include "server/rdb_save.h"
#include "server/table.h"
Expand Down Expand Up @@ -171,6 +172,8 @@ class SliceSnapshot {
size_t savecb_calls = 0;
size_t keys_total = 0;
} stats_;

ConditionFlag bucket_ser_;
};

} // namespace dfly

0 comments on commit bf2e5fd

Please sign in to comment.