Skip to content

Commit

Permalink
fix: Fix test_network_disconnect_during_migration test (#4224)
Browse files Browse the repository at this point in the history
There are actually a few failures fixed in this PR, only one of which is a test bug:

* `db_slice_->Traverse()` can yield, causing `fiber_cancelled_`'s value to change
* When a migration is cancelled, it may never finish `WaitForInflightToComplete()` because it has `in_flight_bytes_` that will never reach destination due to the cancellation
* `IterateMap()` with numeric key/values overrode the key's buffer with the value's buffer

Fixes #4207
  • Loading branch information
chakaz authored Dec 2, 2024
1 parent dcee9a9 commit 779bba7
Show file tree
Hide file tree
Showing 5 changed files with 23 additions and 8 deletions.
3 changes: 2 additions & 1 deletion src/server/cluster/outgoing_slot_migration.cc
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ class OutgoingMigration::SliceSlotMigration : private ProtocolClient {
}

~SliceSlotMigration() {
streamer_.Cancel();
Cancel();
cntx_.JoinErrorHandler();
}

Expand Down Expand Up @@ -81,6 +81,7 @@ class OutgoingMigration::SliceSlotMigration : private ProtocolClient {
}

void Cancel() {
cntx_.Cancel();
streamer_.Cancel();
}

Expand Down
6 changes: 3 additions & 3 deletions src/server/container_utils.cc
Original file line number Diff line number Diff line change
Expand Up @@ -274,13 +274,13 @@ bool IterateMap(const PrimeValue& pv, const IterateKVFunc& func) {
bool finished = true;

if (pv.Encoding() == kEncodingListPack) {
uint8_t intbuf[LP_INTBUF_SIZE];
uint8_t k_intbuf[LP_INTBUF_SIZE], v_intbuf[LP_INTBUF_SIZE];
uint8_t* lp = (uint8_t*)pv.RObjPtr();
uint8_t* fptr = lpFirst(lp);
while (fptr) {
string_view key = LpGetView(fptr, intbuf);
string_view key = LpGetView(fptr, k_intbuf);
fptr = lpNext(lp, fptr);
string_view val = LpGetView(fptr, intbuf);
string_view val = LpGetView(fptr, v_intbuf);
fptr = lpNext(lp, fptr);
if (!func(ContainerEntry{key.data(), key.size()}, ContainerEntry{val.data(), val.size()})) {
finished = false;
Expand Down
15 changes: 13 additions & 2 deletions src/server/journal/streamer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,9 @@ JournalStreamer::JournalStreamer(journal::Journal* journal, Context* cntx)
}

JournalStreamer::~JournalStreamer() {
DCHECK_EQ(in_flight_bytes_, 0u);
if (!cntx_->IsCancelled()) {
DCHECK_EQ(in_flight_bytes_, 0u);
}
VLOG(1) << "~JournalStreamer";
}

Expand Down Expand Up @@ -79,7 +81,9 @@ void JournalStreamer::Cancel() {
VLOG(1) << "JournalStreamer::Cancel";
waker_.notifyAll();
journal_->UnregisterOnChange(journal_cb_id_);
WaitForInflightToComplete();
if (!cntx_->IsCancelled()) {
WaitForInflightToComplete();
}
}

size_t JournalStreamer::GetTotalBufferCapacities() const {
Expand Down Expand Up @@ -215,8 +219,15 @@ void RestoreStreamer::Run() {
return;

cursor = db_slice_->Traverse(pt, cursor, [&](PrimeTable::bucket_iterator it) {
if (fiber_cancelled_) // Could be cancelled any time as Traverse may preempt
return;

db_slice_->FlushChangeToEarlierCallbacks(0 /*db_id always 0 for cluster*/,
DbSlice::Iterator::FromPrime(it), snapshot_version_);

if (fiber_cancelled_) // Could have been cancelled in above call too
return;

WriteBucket(it);
});

Expand Down
2 changes: 2 additions & 0 deletions src/server/multi_command_squasher.cc
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,8 @@ MultiCommandSquasher::SquashResult MultiCommandSquasher::TrySquash(StoredCmd* cm

cmd->Fill(&tmp_keylist_);
auto args = absl::MakeSpan(tmp_keylist_);
if (args.empty())
return SquashResult::NOT_SQUASHED;

auto keys = DetermineKeys(cmd->Cid(), args);
if (!keys.ok())
Expand Down
5 changes: 3 additions & 2 deletions tests/dragonfly/cluster_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -1290,7 +1290,7 @@ async def test_migration_with_key_ttl(df_factory):


@dfly_args({"proactor_threads": 4, "cluster_mode": "yes"})
async def test_network_disconnect_during_migration(df_factory, df_seeder_factory):
async def test_network_disconnect_during_migration(df_factory):
instances = [
df_factory.create(port=BASE_PORT + i, admin_port=BASE_PORT + i + 1000) for i in range(2)
]
Expand Down Expand Up @@ -1328,14 +1328,15 @@ async def test_network_disconnect_during_migration(df_factory, df_seeder_factory

await proxy.start()

await wait_for_status(nodes[0].admin_client, nodes[1].id, "FINISHED", 20)
await wait_for_status(nodes[0].admin_client, nodes[1].id, "FINISHED", 60)
nodes[0].migrations = []
nodes[0].slots = []
nodes[1].slots = [(0, 16383)]
logging.debug("remove finished migrations")
await push_config(json.dumps(generate_config(nodes)), [node.admin_client for node in nodes])

assert (await StaticSeeder.capture(nodes[1].client)) == start_capture
await proxy.close()


@pytest.mark.parametrize(
Expand Down

0 comments on commit 779bba7

Please sign in to comment.