diff --git a/src/server/cluster/outgoing_slot_migration.cc b/src/server/cluster/outgoing_slot_migration.cc index b77c44c0b727..9abe9cf08bc5 100644 --- a/src/server/cluster/outgoing_slot_migration.cc +++ b/src/server/cluster/outgoing_slot_migration.cc @@ -37,7 +37,7 @@ class OutgoingMigration::SliceSlotMigration : private ProtocolClient { } ~SliceSlotMigration() { - streamer_.Cancel(); + Cancel(); cntx_.JoinErrorHandler(); } @@ -81,6 +81,7 @@ class OutgoingMigration::SliceSlotMigration : private ProtocolClient { } void Cancel() { + cntx_.Cancel(); streamer_.Cancel(); } diff --git a/src/server/container_utils.cc b/src/server/container_utils.cc index 07c5e51ba55d..31f08c9e5917 100644 --- a/src/server/container_utils.cc +++ b/src/server/container_utils.cc @@ -274,13 +274,13 @@ bool IterateMap(const PrimeValue& pv, const IterateKVFunc& func) { bool finished = true; if (pv.Encoding() == kEncodingListPack) { - uint8_t intbuf[LP_INTBUF_SIZE]; + uint8_t k_intbuf[LP_INTBUF_SIZE], v_intbuf[LP_INTBUF_SIZE]; uint8_t* lp = (uint8_t*)pv.RObjPtr(); uint8_t* fptr = lpFirst(lp); while (fptr) { - string_view key = LpGetView(fptr, intbuf); + string_view key = LpGetView(fptr, k_intbuf); fptr = lpNext(lp, fptr); - string_view val = LpGetView(fptr, intbuf); + string_view val = LpGetView(fptr, v_intbuf); fptr = lpNext(lp, fptr); if (!func(ContainerEntry{key.data(), key.size()}, ContainerEntry{val.data(), val.size()})) { finished = false; diff --git a/src/server/journal/streamer.cc b/src/server/journal/streamer.cc index d6654d3aa9eb..c224d25dc95e 100644 --- a/src/server/journal/streamer.cc +++ b/src/server/journal/streamer.cc @@ -41,7 +41,9 @@ JournalStreamer::JournalStreamer(journal::Journal* journal, Context* cntx) } JournalStreamer::~JournalStreamer() { - DCHECK_EQ(in_flight_bytes_, 0u); + if (!cntx_->IsCancelled()) { + DCHECK_EQ(in_flight_bytes_, 0u); + } VLOG(1) << "~JournalStreamer"; } @@ -79,7 +81,9 @@ void JournalStreamer::Cancel() { VLOG(1) << "JournalStreamer::Cancel"; waker_.notifyAll(); journal_->UnregisterOnChange(journal_cb_id_); - WaitForInflightToComplete(); + if (!cntx_->IsCancelled()) { + WaitForInflightToComplete(); + } } size_t JournalStreamer::GetTotalBufferCapacities() const { @@ -215,8 +219,15 @@ void RestoreStreamer::Run() { return; cursor = db_slice_->Traverse(pt, cursor, [&](PrimeTable::bucket_iterator it) { + if (fiber_cancelled_) // Could be cancelled any time as Traverse may preempt + return; + db_slice_->FlushChangeToEarlierCallbacks(0 /*db_id always 0 for cluster*/, DbSlice::Iterator::FromPrime(it), snapshot_version_); + + if (fiber_cancelled_) // Could have been cancelled in above call too + return; + WriteBucket(it); }); diff --git a/src/server/multi_command_squasher.cc b/src/server/multi_command_squasher.cc index a2b8d7bf937e..c6b7ffbb1882 100644 --- a/src/server/multi_command_squasher.cc +++ b/src/server/multi_command_squasher.cc @@ -112,6 +112,8 @@ MultiCommandSquasher::SquashResult MultiCommandSquasher::TrySquash(StoredCmd* cm cmd->Fill(&tmp_keylist_); auto args = absl::MakeSpan(tmp_keylist_); + if (args.empty()) + return SquashResult::NOT_SQUASHED; auto keys = DetermineKeys(cmd->Cid(), args); if (!keys.ok()) diff --git a/tests/dragonfly/cluster_test.py b/tests/dragonfly/cluster_test.py index d75e1c74addb..27fce9d43399 100644 --- a/tests/dragonfly/cluster_test.py +++ b/tests/dragonfly/cluster_test.py @@ -1290,7 +1290,7 @@ async def test_migration_with_key_ttl(df_factory): @dfly_args({"proactor_threads": 4, "cluster_mode": "yes"}) -async def test_network_disconnect_during_migration(df_factory, df_seeder_factory): +async def test_network_disconnect_during_migration(df_factory): instances = [ df_factory.create(port=BASE_PORT + i, admin_port=BASE_PORT + i + 1000) for i in range(2) ] @@ -1328,7 +1328,7 @@ async def test_network_disconnect_during_migration(df_factory, df_seeder_factory await proxy.start() - await wait_for_status(nodes[0].admin_client, nodes[1].id, "FINISHED", 20) + await wait_for_status(nodes[0].admin_client, nodes[1].id, "FINISHED", 60) nodes[0].migrations = [] nodes[0].slots = [] nodes[1].slots = [(0, 16383)] @@ -1336,6 +1336,7 @@ async def test_network_disconnect_during_migration(df_factory, df_seeder_factory await push_config(json.dumps(generate_config(nodes)), [node.admin_client for node in nodes]) assert (await StaticSeeder.capture(nodes[1].client)) == start_capture + await proxy.close() @pytest.mark.parametrize(