diff --git a/src/server/cluster/cluster_defs.h b/src/server/cluster/cluster_defs.h index 0eff25d977bc..da89ec5c1a47 100644 --- a/src/server/cluster/cluster_defs.h +++ b/src/server/cluster/cluster_defs.h @@ -114,7 +114,6 @@ using ClusterShardInfos = std::vector; // MigrationState constants are ordered in state changing order enum class MigrationState : uint8_t { - C_NO_STATE, C_CONNECTING, C_SYNC, C_ERROR, diff --git a/src/server/cluster/cluster_family.cc b/src/server/cluster/cluster_family.cc index 514fc73c8344..2ff729c90aca 100644 --- a/src/server/cluster/cluster_family.cc +++ b/src/server/cluster/cluster_family.cc @@ -679,8 +679,6 @@ void ClusterFamily::StartSlotMigrations(std::vector migrations) { static string_view StateToStr(MigrationState state) { switch (state) { - case MigrationState::C_NO_STATE: - return "NO_STATE"sv; case MigrationState::C_CONNECTING: return "CONNECTING"sv; case MigrationState::C_SYNC: @@ -708,15 +706,22 @@ void ClusterFamily::DflySlotMigrationStatus(CmdArgList args, ConnectionContext* } } - vector reply; + struct Reply { + string_view direction; + string node_id; + string_view state; + size_t keys_number; + string error; + }; + vector reply; reply.reserve(incoming_migrations_jobs_.size() + outgoing_migration_jobs_.size()); - auto append_answer = [&reply](string_view direction, string_view node_id, string_view filter, - MigrationState state, size_t keys_number, string_view error) { + auto append_answer = [&reply](string_view direction, string node_id, string_view filter, + MigrationState state, size_t keys_number, string error) { if (filter.empty() || filter == node_id) { error = error.empty() ? "0" : error; - reply.push_back(absl::StrCat(direction, " ", node_id, " ", StateToStr(state), - " keys:", keys_number, " errors:", error)); + reply.emplace_back( + Reply{direction, std::move(node_id), StateToStr(state), keys_number, std::move(error)}); } }; @@ -730,10 +735,14 @@ void ClusterFamily::DflySlotMigrationStatus(CmdArgList args, ConnectionContext* m->GetKeyCount(), m->GetErrorStr()); } - if (reply.empty()) { - rb->SendSimpleString(StateToStr(MigrationState::C_NO_STATE)); - } else { - rb->SendStringArr(reply); + rb->StartArray(reply.size()); + for (const auto& r : reply) { + rb->StartArray(5); + rb->SendBulkString(r.direction); + rb->SendBulkString(r.node_id); + rb->SendBulkString(r.state); + rb->SendLong(r.keys_number); + rb->SendBulkString(r.error); } } diff --git a/src/server/cluster/incoming_slot_migration.h b/src/server/cluster/incoming_slot_migration.h index 116e6b5ae59f..02ad3c2021c8 100644 --- a/src/server/cluster/incoming_slot_migration.h +++ b/src/server/cluster/incoming_slot_migration.h @@ -64,7 +64,7 @@ class IncomingSlotMigration { Service& service_; std::vector> shard_flows_; SlotRanges slots_; - std::atomic state_ = MigrationState::C_NO_STATE; + std::atomic state_ = MigrationState::C_CONNECTING; Context cntx_; mutable util::fb2::Mutex error_mu_; dfly::GenericError last_error_ ABSL_GUARDED_BY(error_mu_); diff --git a/src/server/cluster/outgoing_slot_migration.cc b/src/server/cluster/outgoing_slot_migration.cc index 8bdaa970c283..a9ecc67a6590 100644 --- a/src/server/cluster/outgoing_slot_migration.cc +++ b/src/server/cluster/outgoing_slot_migration.cc @@ -142,7 +142,6 @@ void OutgoingMigration::Finish(bool is_error) { case MigrationState::C_FINISHED: return; // Already finished, nothing else to do - case MigrationState::C_NO_STATE: case MigrationState::C_CONNECTING: should_cancel_flows = false; break; diff --git a/src/server/cluster/outgoing_slot_migration.h b/src/server/cluster/outgoing_slot_migration.h index bd1487d76459..d0bf6929d6b0 100644 --- a/src/server/cluster/outgoing_slot_migration.h +++ b/src/server/cluster/outgoing_slot_migration.h @@ -90,7 +90,7 @@ class OutgoingMigration : private ProtocolClient { util::fb2::Fiber main_sync_fb_; mutable util::fb2::Mutex state_mu_; - MigrationState state_ ABSL_GUARDED_BY(state_mu_) = MigrationState::C_NO_STATE; + MigrationState state_ ABSL_GUARDED_BY(state_mu_) = MigrationState::C_CONNECTING; boost::intrusive_ptr tx_; diff --git a/tests/dragonfly/cluster_test.py b/tests/dragonfly/cluster_test.py index d2b88461f655..6fd63fe2c95c 100644 --- a/tests/dragonfly/cluster_test.py +++ b/tests/dragonfly/cluster_test.py @@ -153,16 +153,14 @@ async def wait_for_status(admin_client, node_id, status, timeout=10): ) async for states, breaker in tick_timer(get_status, timeout=timeout): - if type(states) != list: - states = [states] with breaker: - assert all(status in state for state in states), states + assert len(states) != 0 and all(status == state[2] for state in states), states async def check_for_no_state_status(admin_clients): for client in admin_clients: state = await client.execute_command("DFLYCLUSTER", "SLOT-MIGRATION-STATUS") - if state != "NO_STATE": + if len(state) != 0: logging.debug(f"SLOT-MIGRATION-STATUS is {state}, instead of NO_STATE") assert False @@ -1059,7 +1057,7 @@ async def test_config_consistency(df_factory: DflyInstanceFactory): await asyncio.sleep(0.2) await wait_for_status(nodes[0].admin_client, nodes[1].id, "CONNECTING") - await wait_for_status(nodes[1].admin_client, nodes[0].id, "NO_STATE") + await check_for_no_state_status([nodes[1].admin_client]) logging.debug("Push migration config to target node") await push_config(json.dumps(generate_config(nodes)), [nodes[1].admin_client]) @@ -1113,13 +1111,11 @@ async def test_cluster_flushall_during_migration( await nodes[0].client.execute_command("flushall") + status1 = await nodes[1].admin_client.execute_command( + "DFLYCLUSTER", "SLOT-MIGRATION-STATUS", nodes[0].id + ) assert ( - "FINISHED" - not in ( - await nodes[1].admin_client.execute_command( - "DFLYCLUSTER", "SLOT-MIGRATION-STATUS", nodes[0].id - ) - )[0] + len(status1) == 0 or "FINISHED" not in status1[0] ), "Weak test case - finished migration too early" await wait_for_status(nodes[0].admin_client, nodes[1].id, "FINISHED") @@ -1193,16 +1189,17 @@ async def test_cluster_data_migration(df_factory: DflyInstanceFactory, interrupt key = "KEY" + str(i) assert await nodes[0 if (key_slot(key) // 3000) == 0 else 1].client.set(key, "value") - assert ( - await nodes[0].admin_client.execute_command( - "DFLYCLUSTER", "SLOT-MIGRATION-STATUS", nodes[1].id - ) - )[0].startswith(f"out {nodes[1].id} FINISHED keys:7") - assert ( - await nodes[1].admin_client.execute_command( - "DFLYCLUSTER", "SLOT-MIGRATION-STATUS", nodes[0].id - ) - )[0].startswith(f"in {nodes[0].id} FINISHED keys:7") + status = await nodes[0].admin_client.execute_command( + "DFLYCLUSTER", "SLOT-MIGRATION-STATUS", nodes[1].id + ) + status[0].pop() + assert status[0] == ["out", nodes[1].id, "FINISHED", 7] + + status = await nodes[1].admin_client.execute_command( + "DFLYCLUSTER", "SLOT-MIGRATION-STATUS", nodes[0].id + ) + status[0].pop() + assert status[0] == ["in", nodes[0].id, "FINISHED", 7] nodes[0].migrations = [] nodes[0].slots = [(0, 2999)] @@ -1418,13 +1415,9 @@ async def all_finished(): res = True for node in nodes: states = await node.admin_client.execute_command("DFLYCLUSTER", "SLOT-MIGRATION-STATUS") - if states != "NO_STATE": - logging.debug(states) + logging.debug(states) for state in states: - parsed_state = re.search("([a-z]+) ([a-z0-9]+) ([A-Z]+)", state) - if parsed_state == None: - continue - direction, node_id, st = parsed_state.group(1, 2, 3) + direction, node_id, st, _, _ = state if direction == "out": if st == "FINISHED": m_id = [id for id, x in enumerate(node.migrations) if x.node_id == node_id][