From 03b35d694762a4b5412e0cc2bb003058397485f9 Mon Sep 17 00:00:00 2001 From: Stepan Bagritsevich Date: Thu, 26 Dec 2024 19:02:56 +0400 Subject: [PATCH] chore(snapshot): Small cleanup in Snapshot code Signed-off-by: Stepan Bagritsevich --- src/server/rdb_save.h | 1 + src/server/snapshot.cc | 38 ++++++++++++++++++++++---------------- src/server/snapshot.h | 11 ++++------- 3 files changed, 27 insertions(+), 23 deletions(-) diff --git a/src/server/rdb_save.h b/src/server/rdb_save.h index 723bccd3172f..7f3bb703b1dd 100644 --- a/src/server/rdb_save.h +++ b/src/server/rdb_save.h @@ -225,6 +225,7 @@ class RdbSerializer : public SerializerBase { // Must be called in the thread to which `it` belongs. // Returns the serialized rdb_type or the error. // expire_ms = 0 means no expiry. + // This function might preempt if flush_fun_ is used. io::Result SaveEntry(const PrimeKey& pk, const PrimeValue& pv, uint64_t expire_ms, uint32_t mc_flags, DbIndex dbid); diff --git a/src/server/snapshot.cc b/src/server/snapshot.cc index eb208e00f420..6c5cc52e5490 100644 --- a/src/server/snapshot.cc +++ b/src/server/snapshot.cc @@ -38,8 +38,11 @@ constexpr size_t kMinBlobSize = 32_KB; SliceSnapshot::SliceSnapshot(CompressionMode compression_mode, DbSlice* slice, SnapshotDataConsumerInterface* consumer, Context* cntx) - : db_slice_(slice), compression_mode_(compression_mode), consumer_(consumer), cntx_(cntx) { - db_array_ = slice->databases(); + : db_slice_(slice), + db_array_(slice->databases()), + compression_mode_(compression_mode), + consumer_(consumer), + cntx_(cntx) { tl_slice_snapshots.insert(this); } @@ -163,7 +166,6 @@ void SliceSnapshot::IterateBucketsFb(bool send_full_sync_cut) { uint64_t last_yield = 0; PrimeTable* pt = &db_array_[db_indx]->prime; - current_db_ = db_indx; VLOG(1) << "Start traversing " << pt->size() << " items for index " << db_indx; do { @@ -171,8 +173,8 @@ void SliceSnapshot::IterateBucketsFb(bool send_full_sync_cut) { return; } - PrimeTable::Cursor next = - pt->TraverseBuckets(cursor, [this](auto it) { return BucketSaveCb(it); }); + PrimeTable::Cursor next = pt->TraverseBuckets( + cursor, [this, &db_indx](auto it) { return BucketSaveCb(db_indx, it); }); cursor = next; PushSerialized(false); @@ -248,7 +250,7 @@ void SliceSnapshot::SwitchIncrementalFb(LSN lsn) { } } -bool SliceSnapshot::BucketSaveCb(PrimeTable::bucket_iterator it) { +bool SliceSnapshot::BucketSaveCb(DbIndex db_index, PrimeTable::bucket_iterator it) { std::lock_guard guard(big_value_mu_); ++stats_.savecb_calls; @@ -267,7 +269,7 @@ bool SliceSnapshot::BucketSaveCb(PrimeTable::bucket_iterator it) { return false; } - db_slice_->FlushChangeToEarlierCallbacks(current_db_, DbSlice::Iterator::FromPrime(it), + db_slice_->FlushChangeToEarlierCallbacks(db_index, DbSlice::Iterator::FromPrime(it), snapshot_version_); auto* blocking_counter = db_slice_->BlockingCounter(); @@ -276,7 +278,7 @@ bool SliceSnapshot::BucketSaveCb(PrimeTable::bucket_iterator it) { // zero. std::lock_guard blocking_counter_guard(*blocking_counter); - stats_.loop_serialized += SerializeBucket(current_db_, it); + stats_.loop_serialized += SerializeBucket(db_index, it); return false; } @@ -292,20 +294,19 @@ unsigned SliceSnapshot::SerializeBucket(DbIndex db_index, PrimeTable::bucket_ite while (!it.is_done()) { ++result; // might preempt due to big value serialization. - SerializeEntry(db_index, it->first, it->second, nullopt, serializer_.get()); + SerializeEntry(db_index, it->first, it->second); ++it; } serialize_bucket_running_ = false; return result; } -void SliceSnapshot::SerializeEntry(DbIndex db_indx, const PrimeKey& pk, const PrimeValue& pv, - optional expire, RdbSerializer* serializer) { +void SliceSnapshot::SerializeEntry(DbIndex db_indx, const PrimeKey& pk, const PrimeValue& pv) { if (pv.IsExternal() && pv.IsCool()) - return SerializeEntry(db_indx, pk, pv.GetCool().record->value, expire, serializer); + return SerializeEntry(db_indx, pk, pv.GetCool().record->value); - time_t expire_time = expire.value_or(0); - if (!expire && pv.HasExpire()) { + time_t expire_time = 0; + if (pv.HasExpire()) { auto eit = db_array_[db_indx]->expire.Find(pk); expire_time = db_slice_->ExpireTime(eit); } @@ -322,7 +323,7 @@ void SliceSnapshot::SerializeEntry(DbIndex db_indx, const PrimeKey& pk, const Pr {db_indx, PrimeKey(pk.ToString()), std::move(future), expire_time, mc_flags}); ++type_freq_map_[RDB_TYPE_STRING]; } else { - io::Result res = serializer->SaveEntry(pk, pv, expire_time, mc_flags, db_indx); + io::Result res = serializer_->SaveEntry(pk, pv, expire_time, mc_flags, db_indx); CHECK(res); ++type_freq_map_[*res]; } @@ -330,7 +331,12 @@ void SliceSnapshot::SerializeEntry(DbIndex db_indx, const PrimeKey& pk, const Pr size_t SliceSnapshot::FlushSerialized(SerializerBase::FlushState flush_state) { io::StringFile sfile; - serializer_->FlushToSink(&sfile, flush_state); + + error_code ec = serializer_->FlushToSink(&sfile, flush_state); + if (ec) { + VLOG(2) << "Failed to flush to sink: " << ec.message(); + return 0; + } size_t serialized = sfile.val.size(); if (serialized == 0) diff --git a/src/server/snapshot.h b/src/server/snapshot.h index da4be49bd147..250e61bc63bc 100644 --- a/src/server/snapshot.h +++ b/src/server/snapshot.h @@ -98,15 +98,14 @@ class SliceSnapshot { void SwitchIncrementalFb(LSN lsn); // Called on traversing cursor by IterateBucketsFb. - bool BucketSaveCb(PrimeTable::bucket_iterator it); + bool BucketSaveCb(DbIndex db_index, PrimeTable::bucket_iterator it); // Serialize single bucket. // Returns number of serialized entries, updates bucket version to snapshot version. unsigned SerializeBucket(DbIndex db_index, PrimeTable::bucket_iterator bucket_it); // Serialize entry into passed serializer. - void SerializeEntry(DbIndex db_index, const PrimeKey& pk, const PrimeValue& pv, - std::optional expire, RdbSerializer* serializer); + void SerializeEntry(DbIndex db_index, const PrimeKey& pk, const PrimeValue& pv); // DbChange listener void OnDbChange(DbIndex db_index, const DbSlice::ChangeReq& req); @@ -150,9 +149,7 @@ class SliceSnapshot { }; DbSlice* db_slice_; - DbTableArray db_array_; - - DbIndex current_db_; + const DbTableArray db_array_; std::unique_ptr serializer_; std::vector delayed_entries_; // collected during atomic bucket traversal @@ -161,7 +158,7 @@ class SliceSnapshot { bool serialize_bucket_running_ = false; util::fb2::Fiber snapshot_fb_; // IterateEntriesFb util::fb2::CondVarAny seq_cond_; - CompressionMode compression_mode_; + const CompressionMode compression_mode_; RdbTypeFreqMap type_freq_map_; // version upper bound for entries that should be saved (not included).