Skip to content

Commit

Permalink
fix: race when bumping items while loading a snapshot
Browse files Browse the repository at this point in the history
Signed-off-by: kostas <[email protected]>
  • Loading branch information
kostasrim committed Feb 5, 2025
1 parent 6d1c22b commit d8554a0
Show file tree
Hide file tree
Showing 3 changed files with 26 additions and 7 deletions.
8 changes: 1 addition & 7 deletions src/server/rdb_load.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2034,9 +2034,6 @@ error_code RdbLoader::Load(io::Source* src) {

auto cleanup = absl::Cleanup([&] { FinishLoad(start, &keys_loaded); });

shard_set->AwaitRunningOnShardQueue([](EngineShard* es) {
namespaces->GetDefaultNamespace().GetCurrentDbSlice().SetLoadInProgress(true);
});
while (!stop_early_.load(memory_order_relaxed)) {
if (pause_) {
ThisFiber::SleepFor(100ms);
Expand Down Expand Up @@ -2226,10 +2223,7 @@ void RdbLoader::FinishLoad(absl::Time start_time, size_t* keys_loaded) {
FlushShardAsync(i);

// Send sentinel callbacks to ensure that all previous messages have been processed.
shard_set->Add(i, [bc]() mutable {
namespaces->GetDefaultNamespace().GetCurrentDbSlice().SetLoadInProgress(false);
bc->Dec();
});
shard_set->Add(i, [bc]() mutable { bc->Dec(); });
}
bc->Wait(); // wait for sentinels to report.

Expand Down
15 changes: 15 additions & 0 deletions src/server/replica.cc
Original file line number Diff line number Diff line change
Expand Up @@ -542,6 +542,21 @@ error_code Replica::InitiateDflySync() {
// Lock to prevent the error handler from running instantly
// while the flows are in a mixed state.
lock_guard lk{flows_op_mu_};
absl::Cleanup clean = [this]() {
shard_set->AwaitRunningOnShardQueue([](EngineShard* es) {
namespaces->GetDefaultNamespace().GetCurrentDbSlice().SetLoadInProgress(false);
});
};

// See issue #4554
// We really need two dispatches here because StartSyncFlow executes on different shards.
// If the sync fiber on one of the threads, starts, loads the snapshots and starts flushing
// the loaded data to the shards asynchronously via LoadItemsBuffer() while the other thread
// did not even spawned the Sync flow fiber, it will start Bumping items because
// SetLoadInProgress will still be false.
shard_set->AwaitRunningOnShardQueue([](EngineShard* es) {
namespaces->GetDefaultNamespace().GetCurrentDbSlice().SetLoadInProgress(true);
});
shard_set->pool()->AwaitFiberOnAll(std::move(shard_cb));

size_t num_full_flows =
Expand Down
10 changes: 10 additions & 0 deletions src/server/server_family.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1100,6 +1100,11 @@ std::optional<fb2::Future<GenericError>> ServerFamily::Load(string_view load_pat
return immediate(expand_result.error());
}

// See issue #4554
shard_set->AwaitRunningOnShardQueue([](EngineShard* es) {
namespaces->GetDefaultNamespace().GetCurrentDbSlice().SetLoadInProgress(true);
});

auto new_state = service_.SwitchState(GlobalState::ACTIVE, GlobalState::LOADING);
if (new_state != GlobalState::LOADING) {
LOG(WARNING) << new_state << " in progress, ignored";
Expand Down Expand Up @@ -1155,6 +1160,11 @@ std::optional<fb2::Future<GenericError>> ServerFamily::Load(string_view load_pat

service_.SwitchState(GlobalState::LOADING, GlobalState::ACTIVE);
future.Resolve(*(aggregated_result->first_error));
// See issue #4554
// Once we are done we need to clean the state
shard_set->AwaitRunningOnShardQueue([](EngineShard* es) {
namespaces->GetDefaultNamespace().GetCurrentDbSlice().SetLoadInProgress(false);
});
};
pool.GetNextProactor()->Dispatch(std::move(load_join_func));

Expand Down

0 comments on commit d8554a0

Please sign in to comment.