Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore(snapshot): Small cleanup in Snapshot code #4377

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/server/rdb_save.h
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,7 @@ class RdbSerializer : public SerializerBase {
// Must be called in the thread to which `it` belongs.
// Returns the serialized rdb_type or the error.
// expire_ms = 0 means no expiry.
// This function might preempt if flush_fun_ is used.
io::Result<uint8_t> SaveEntry(const PrimeKey& pk, const PrimeValue& pv, uint64_t expire_ms,
uint32_t mc_flags, DbIndex dbid);

Expand Down
31 changes: 16 additions & 15 deletions src/server/snapshot.cc
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,11 @@ constexpr size_t kMinBlobSize = 32_KB;

SliceSnapshot::SliceSnapshot(CompressionMode compression_mode, DbSlice* slice,
SnapshotDataConsumerInterface* consumer, Context* cntx)
: db_slice_(slice), compression_mode_(compression_mode), consumer_(consumer), cntx_(cntx) {
db_array_ = slice->databases();
: db_slice_(slice),
db_array_(slice->databases()),
compression_mode_(compression_mode),
consumer_(consumer),
cntx_(cntx) {
tl_slice_snapshots.insert(this);
}

Expand Down Expand Up @@ -163,16 +166,15 @@ void SliceSnapshot::IterateBucketsFb(bool send_full_sync_cut) {

uint64_t last_yield = 0;
PrimeTable* pt = &db_array_[db_indx]->prime;
current_db_ = db_indx;

VLOG(1) << "Start traversing " << pt->size() << " items for index " << db_indx;
do {
if (cntx_->IsCancelled()) {
return;
}

PrimeTable::Cursor next =
pt->TraverseBuckets(cursor, [this](auto it) { return BucketSaveCb(it); });
PrimeTable::Cursor next = pt->TraverseBuckets(
cursor, [this, &db_indx](auto it) { return BucketSaveCb(db_indx, it); });
cursor = next;
PushSerialized(false);

Expand Down Expand Up @@ -248,7 +250,7 @@ void SliceSnapshot::SwitchIncrementalFb(LSN lsn) {
}
}

bool SliceSnapshot::BucketSaveCb(PrimeTable::bucket_iterator it) {
bool SliceSnapshot::BucketSaveCb(DbIndex db_index, PrimeTable::bucket_iterator it) {
std::lock_guard guard(big_value_mu_);

++stats_.savecb_calls;
Expand All @@ -267,7 +269,7 @@ bool SliceSnapshot::BucketSaveCb(PrimeTable::bucket_iterator it) {
return false;
}

db_slice_->FlushChangeToEarlierCallbacks(current_db_, DbSlice::Iterator::FromPrime(it),
db_slice_->FlushChangeToEarlierCallbacks(db_index, DbSlice::Iterator::FromPrime(it),
snapshot_version_);

auto* blocking_counter = db_slice_->BlockingCounter();
Expand All @@ -276,7 +278,7 @@ bool SliceSnapshot::BucketSaveCb(PrimeTable::bucket_iterator it) {
// zero.
std::lock_guard blocking_counter_guard(*blocking_counter);

stats_.loop_serialized += SerializeBucket(current_db_, it);
stats_.loop_serialized += SerializeBucket(db_index, it);

return false;
}
Expand All @@ -292,20 +294,19 @@ unsigned SliceSnapshot::SerializeBucket(DbIndex db_index, PrimeTable::bucket_ite
while (!it.is_done()) {
++result;
// might preempt due to big value serialization.
SerializeEntry(db_index, it->first, it->second, nullopt, serializer_.get());
SerializeEntry(db_index, it->first, it->second);
++it;
}
serialize_bucket_running_ = false;
return result;
}

void SliceSnapshot::SerializeEntry(DbIndex db_indx, const PrimeKey& pk, const PrimeValue& pv,
optional<uint64_t> expire, RdbSerializer* serializer) {
void SliceSnapshot::SerializeEntry(DbIndex db_indx, const PrimeKey& pk, const PrimeValue& pv) {
if (pv.IsExternal() && pv.IsCool())
return SerializeEntry(db_indx, pk, pv.GetCool().record->value, expire, serializer);
return SerializeEntry(db_indx, pk, pv.GetCool().record->value);

time_t expire_time = expire.value_or(0);
if (!expire && pv.HasExpire()) {
time_t expire_time = 0;
if (pv.HasExpire()) {
auto eit = db_array_[db_indx]->expire.Find(pk);
expire_time = db_slice_->ExpireTime(eit);
}
Expand All @@ -322,7 +323,7 @@ void SliceSnapshot::SerializeEntry(DbIndex db_indx, const PrimeKey& pk, const Pr
{db_indx, PrimeKey(pk.ToString()), std::move(future), expire_time, mc_flags});
++type_freq_map_[RDB_TYPE_STRING];
} else {
io::Result<uint8_t> res = serializer->SaveEntry(pk, pv, expire_time, mc_flags, db_indx);
io::Result<uint8_t> res = serializer_->SaveEntry(pk, pv, expire_time, mc_flags, db_indx);
CHECK(res);
++type_freq_map_[*res];
}
Expand Down
11 changes: 4 additions & 7 deletions src/server/snapshot.h
Original file line number Diff line number Diff line change
Expand Up @@ -98,15 +98,14 @@ class SliceSnapshot {
void SwitchIncrementalFb(LSN lsn);

// Called on traversing cursor by IterateBucketsFb.
bool BucketSaveCb(PrimeTable::bucket_iterator it);
bool BucketSaveCb(DbIndex db_index, PrimeTable::bucket_iterator it);

// Serialize single bucket.
// Returns number of serialized entries, updates bucket version to snapshot version.
unsigned SerializeBucket(DbIndex db_index, PrimeTable::bucket_iterator bucket_it);

// Serialize entry into passed serializer.
void SerializeEntry(DbIndex db_index, const PrimeKey& pk, const PrimeValue& pv,
std::optional<uint64_t> expire, RdbSerializer* serializer);
void SerializeEntry(DbIndex db_index, const PrimeKey& pk, const PrimeValue& pv);

// DbChange listener
void OnDbChange(DbIndex db_index, const DbSlice::ChangeReq& req);
Expand Down Expand Up @@ -150,9 +149,7 @@ class SliceSnapshot {
};

DbSlice* db_slice_;
DbTableArray db_array_;

DbIndex current_db_;
const DbTableArray db_array_;

std::unique_ptr<RdbSerializer> serializer_;
std::vector<DelayedEntry> delayed_entries_; // collected during atomic bucket traversal
Expand All @@ -161,7 +158,7 @@ class SliceSnapshot {
bool serialize_bucket_running_ = false;
util::fb2::Fiber snapshot_fb_; // IterateEntriesFb
util::fb2::CondVarAny seq_cond_;
CompressionMode compression_mode_;
const CompressionMode compression_mode_;
RdbTypeFreqMap type_freq_map_;

// version upper bound for entries that should be saved (not included).
Expand Down
Loading