Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: race when bumping items while loading a snapshot #4564

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions src/server/db_slice.cc
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,6 @@ DbSlice::DbSlice(uint32_t index, bool cache_mode, EngineShard* owner)
cache_mode_(cache_mode),
owner_(owner),
client_tracking_map_(owner->memory_resource()) {
load_in_progress_ = false;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please remove the CHECK at db_slice.cc:783] Check failed: fetched_items_.empty()
and replace it with DFATAL

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh I forgot this, you mentioned in it in the standup

db_arr_.emplace_back();
CreateDb(0);
expire_base_[0] = expire_base_[1] = 0;
Expand Down Expand Up @@ -795,7 +794,8 @@ void DbSlice::FlushDbIndexes(const std::vector<DbIndex>& indexes) {
std::swap(db_arr_[index]->trans_locks, flush_db_arr[index]->trans_locks);
}

CHECK(fetched_items_.empty());
LOG_IF(DFATAL, !fetched_items_.empty())
<< "Some operation might bumped up items outside of a transaction";

auto cb = [indexes, flush_db_arr = std::move(flush_db_arr)]() mutable {
flush_db_arr.clear();
Expand Down
12 changes: 8 additions & 4 deletions src/server/db_slice.h
Original file line number Diff line number Diff line change
Expand Up @@ -473,11 +473,15 @@ class DbSlice {

bool IsCacheMode() const {
// During loading time we never bump elements.
return cache_mode_ && !load_in_progress_;
return cache_mode_ && (load_ref_count_ == 0);
}

void SetLoadInProgress(bool in_progress) {
load_in_progress_ = in_progress;
void IncrLoadInProgress() {
++load_ref_count_;
}

void DecrLoadInProgress() {
--load_ref_count_;
}

// Test hook to inspect last locked keys.
Expand Down Expand Up @@ -585,7 +589,6 @@ class DbSlice {

ShardId shard_id_;
uint8_t cache_mode_ : 1;
uint8_t load_in_progress_ : 1;

EngineShard* owner_;

Expand All @@ -598,6 +601,7 @@ class DbSlice {
size_t soft_budget_limit_ = 0;
size_t table_memory_ = 0;
uint64_t entries_count_ = 0;
unsigned load_ref_count_ = 0;

mutable SliceEvents events_; // we may change this even for const operations.

Expand Down
24 changes: 17 additions & 7 deletions src/server/rdb_load.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2034,9 +2034,11 @@ error_code RdbLoader::Load(io::Source* src) {

auto cleanup = absl::Cleanup([&] { FinishLoad(start, &keys_loaded); });

shard_set->AwaitRunningOnShardQueue([](EngineShard* es) {
namespaces->GetDefaultNamespace().GetCurrentDbSlice().SetLoadInProgress(true);
});
// Increment local one if it exists
if (EngineShard* es = EngineShard::tlocal(); es) {
namespaces->GetDefaultNamespace().GetCurrentDbSlice().IncrLoadInProgress();
}

while (!stop_early_.load(memory_order_relaxed)) {
if (pause_) {
ThisFiber::SleepFor(100ms);
Expand Down Expand Up @@ -2226,12 +2228,13 @@ void RdbLoader::FinishLoad(absl::Time start_time, size_t* keys_loaded) {
FlushShardAsync(i);

// Send sentinel callbacks to ensure that all previous messages have been processed.
shard_set->Add(i, [bc]() mutable {
namespaces->GetDefaultNamespace().GetCurrentDbSlice().SetLoadInProgress(false);
bc->Dec();
});
shard_set->Add(i, [bc]() mutable { bc->Dec(); });
}
bc->Wait(); // wait for sentinels to report.
// Decrement local one if it exists
if (EngineShard* es = EngineShard::tlocal(); es) {
namespaces->GetDefaultNamespace().GetCurrentDbSlice().IncrLoadInProgress();
}

absl::Duration dur = absl::Now() - start_time;
load_time_ = double(absl::ToInt64Milliseconds(dur)) / 1000;
Expand Down Expand Up @@ -2515,7 +2518,12 @@ void RdbLoader::FlushShardAsync(ShardId sid) {
return;

auto cb = [indx = this->cur_db_index_, this, ib = std::move(out_buf)] {
// Before we start loading, increment LoadInProgress.
// This is required because FlushShardAsync dispatches to multiple shards, and those shards
// might have not yet have their state (load in progress) incremented.
namespaces->GetDefaultNamespace().GetCurrentDbSlice().IncrLoadInProgress();
this->LoadItemsBuffer(indx, ib);
namespaces->GetDefaultNamespace().GetCurrentDbSlice().DecrLoadInProgress();

// Block, if tiered storage is active, but can't keep up
while (EngineShard::tlocal()->ShouldThrottleForTiering()) {
Expand Down Expand Up @@ -2554,6 +2562,8 @@ void RdbLoader::LoadItemsBuffer(DbIndex db_ind, const ItemsBuf& ib) {
DbContext db_cntx{&namespaces->GetDefaultNamespace(), db_ind, GetCurrentTimeMs()};
DbSlice& db_slice = db_cntx.GetDbSlice(es->shard_id());

DCHECK(!db_slice.IsCacheMode());
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe CHECK instead ?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no need


auto error_msg = [](const auto* item, auto db_ind) {
return absl::StrCat("Found empty key: ", item->key, " in DB ", db_ind, " rdb_type ",
item->val.rdb_type);
Expand Down
15 changes: 15 additions & 0 deletions src/server/rdb_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -735,4 +735,19 @@ TEST_F(RdbTest, HugeKeyIssue4497) {
EXPECT_EQ(Run({"flushall"}), "OK");
}

TEST_F(RdbTest, HugeKeyIssue4554) {
Copy link
Contributor Author

@kostasrim kostasrim Feb 6, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This fails without my changes.

IMO, I wanted to reproduce via a replication test, because only replication is used on the issue. However, it proved to be a little more difficult than expected for reasons hard for me to explain in a few lines. I am fairly positive that this PR will address all the problems but I would like to have a binary asap to sent and verify that we did see the same/similar case on the issue as here.

SetTestFlag("cache_mode", "true");
// We need to stress one flow/shard such that the others finish early. Lock on hashtags allows
// that.
SetTestFlag("lock_on_hashtags", "true");
ResetService();

EXPECT_EQ(
Run({"debug", "populate", "20", "{tmp}", "20", "rand", "type", "set", "elements", "10000"}),
"OK");
EXPECT_EQ(Run({"save", "df", "hugekey"}), "OK");
EXPECT_EQ(Run({"dfly", "load", "hugekey-summary.dfs"}), "OK");
EXPECT_EQ(Run({"flushall"}), "OK");
}

} // namespace dfly
1 change: 1 addition & 0 deletions src/server/replica.cc
Original file line number Diff line number Diff line change
Expand Up @@ -542,6 +542,7 @@ error_code Replica::InitiateDflySync() {
// Lock to prevent the error handler from running instantly
// while the flows are in a mixed state.
lock_guard lk{flows_op_mu_};

shard_set->pool()->AwaitFiberOnAll(std::move(shard_cb));

size_t num_full_flows =
Expand Down
Loading