Skip to content

Commit

Permalink
chore: Hide replica info in real cluster if --managed_service_info (#…
Browse files Browse the repository at this point in the history
…4241)

So far we only handled emulated cluster. This PR adds real cluster
support.

Related to #4173
  • Loading branch information
chakaz authored Dec 2, 2024
1 parent 935ae86 commit b0d633f
Show file tree
Hide file tree
Showing 3 changed files with 161 additions and 4 deletions.
29 changes: 25 additions & 4 deletions src/server/cluster/cluster_family.cc
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,27 @@ constexpr string_view kClusterDisabled =

thread_local shared_ptr<ClusterConfig> tl_cluster_config;

ClusterShardInfos GetConfigForStats(ConnectionContext* cntx) {
CHECK(!IsClusterEmulated());
CHECK(tl_cluster_config != nullptr);

auto config = tl_cluster_config->GetConfig();
if (cntx->conn()->IsPrivileged() || !absl::GetFlag(FLAGS_managed_service_info)) {
return config;
}

// We can't mutate `config` so we copy it over
std::vector<ClusterShardInfo> infos;
infos.reserve(config.size());

for (auto& node : config) {
infos.push_back(node);
infos.rbegin()->replicas.clear();
}

return ClusterShardInfos{std::move(infos)};
}

} // namespace

ClusterFamily::ClusterFamily(ServerFamily* server_family) : server_family_(server_family) {
Expand Down Expand Up @@ -210,7 +231,7 @@ void ClusterFamily::ClusterShards(SinkReplyBuilder* builder, ConnectionContext*
if (IsClusterEmulated()) {
return ClusterShardsImpl({GetEmulatedShardInfo(cntx)}, builder);
} else if (tl_cluster_config != nullptr) {
return ClusterShardsImpl(tl_cluster_config->GetConfig(), builder);
return ClusterShardsImpl(GetConfigForStats(cntx), builder);
} else {
return builder->SendError(kClusterNotConfigured);
}
Expand Down Expand Up @@ -255,7 +276,7 @@ void ClusterFamily::ClusterSlots(SinkReplyBuilder* builder, ConnectionContext* c
if (IsClusterEmulated()) {
return ClusterSlotsImpl({GetEmulatedShardInfo(cntx)}, builder);
} else if (tl_cluster_config != nullptr) {
return ClusterSlotsImpl(tl_cluster_config->GetConfig(), builder);
return ClusterSlotsImpl(GetConfigForStats(cntx), builder);
} else {
return builder->SendError(kClusterNotConfigured);
}
Expand Down Expand Up @@ -311,7 +332,7 @@ void ClusterFamily::ClusterNodes(SinkReplyBuilder* builder, ConnectionContext* c
if (IsClusterEmulated()) {
return ClusterNodesImpl({GetEmulatedShardInfo(cntx)}, id_, builder);
} else if (tl_cluster_config != nullptr) {
return ClusterNodesImpl(tl_cluster_config->GetConfig(), id_, builder);
return ClusterNodesImpl(GetConfigForStats(cntx), id_, builder);
} else {
return builder->SendError(kClusterNotConfigured);
}
Expand Down Expand Up @@ -375,7 +396,7 @@ void ClusterFamily::ClusterInfo(SinkReplyBuilder* builder, ConnectionContext* cn
if (IsClusterEmulated()) {
return ClusterInfoImpl({GetEmulatedShardInfo(cntx)}, builder);
} else if (tl_cluster_config != nullptr) {
return ClusterInfoImpl(tl_cluster_config->GetConfig(), builder);
return ClusterInfoImpl(GetConfigForStats(cntx), builder);
} else {
return ClusterInfoImpl({}, builder);
}
Expand Down
1 change: 1 addition & 0 deletions src/server/main_service.cc
Original file line number Diff line number Diff line change
Expand Up @@ -815,6 +815,7 @@ void Service::Init(util::AcceptServer* acceptor, std::vector<facade::Listener*>
config_registry.RegisterMutable("replication_timeout");
config_registry.RegisterMutable("table_growth_margin");
config_registry.RegisterMutable("tcp_keepalive");
config_registry.RegisterMutable("managed_service_info");

config_registry.RegisterMutable(
"notify_keyspace_events", [pool = &pp_](const absl::CommandLineFlag& flag) {
Expand Down
135 changes: 135 additions & 0 deletions tests/dragonfly/cluster_test.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import pytest
import copy
import re
import json
import redis
Expand Down Expand Up @@ -376,6 +377,140 @@ async def test_emulated_cluster_with_replicas(df_factory):
}


@dfly_args({"proactor_threads": 4, "cluster_mode": "yes"})
async def test_cluster_managed_service_info(df_factory):
master = df_factory.create(port=BASE_PORT, admin_port=BASE_PORT + 100)
replica = df_factory.create(port=BASE_PORT + 1, admin_port=BASE_PORT + 101)

df_factory.start_all([master, replica])

c_master = master.client()
c_master_admin = master.admin_client()
master_id = await c_master.execute_command("CLUSTER MYID")

c_replica = replica.client()
c_replica_admin = replica.admin_client()
replica_id = await c_replica.execute_command("CLUSTER MYID")

# Connect replicas to master
rc = await c_replica_admin.execute_command(f"REPLICAOF localhost {master.port}")
assert rc == "OK"
await wait_available_async(c_replica)

nodes = [await create_node_info(master)]
nodes[0].slots = [(0, 16383)]
nodes[0].replicas = [await create_node_info(replica)]
await push_config(json.dumps(generate_config(nodes)), [master.client(), replica.client()])

expected_hidden_cluster_slots = [
[
0,
16383,
[
"127.0.0.1",
master.port,
master_id,
],
],
]
expected_full_cluster_slots = copy.deepcopy(expected_hidden_cluster_slots)
expected_full_cluster_slots[0].append(
[
"127.0.0.1",
replica.port,
replica_id,
]
)
assert await c_master.execute_command("CLUSTER SLOTS") == expected_full_cluster_slots
assert await c_master_admin.execute_command("CLUSTER SLOTS") == expected_full_cluster_slots

expected_hidden_cluster_nodes = {
f"127.0.0.1:{master.port}": {
"connected": True,
"epoch": "0",
"flags": "myself,master",
"last_ping_sent": "0",
"last_pong_rcvd": "0",
"master_id": "-",
"migrations": [],
"node_id": master_id,
"slots": [["0", "16383"]],
},
}
expected_full_cluster_nodes = copy.deepcopy(expected_hidden_cluster_nodes)
expected_full_cluster_nodes[f"127.0.0.1:{replica.port}"] = {
"connected": True,
"epoch": "0",
"flags": "slave",
"last_ping_sent": "0",
"last_pong_rcvd": "0",
"master_id": master_id,
"migrations": [],
"node_id": replica_id,
"slots": [],
}
assert await c_master.execute_command("CLUSTER NODES") == expected_full_cluster_nodes
assert await c_master_admin.execute_command("CLUSTER NODES") == expected_full_cluster_nodes

expected_hidden_cluster_shards = [
[
"slots",
[0, 16383],
"nodes",
[
[
"id",
master_id,
"endpoint",
"127.0.0.1",
"ip",
"127.0.0.1",
"port",
master.port,
"role",
"master",
"replication-offset",
0,
"health",
"online",
],
],
],
]
expected_full_cluster_shards = copy.deepcopy(expected_hidden_cluster_shards)
expected_full_cluster_shards[0][3].append(
[
"id",
replica_id,
"endpoint",
"127.0.0.1",
"ip",
"127.0.0.1",
"port",
replica.port,
"role",
"replica",
"replication-offset",
0,
"health",
"online",
]
)
assert await c_master.execute_command("CLUSTER SHARDS") == expected_full_cluster_shards
assert await c_master_admin.execute_command("CLUSTER SHARDS") == expected_full_cluster_shards

await c_master.execute_command("config set managed_service_info true")

assert await c_master.execute_command("CLUSTER SLOTS") == expected_hidden_cluster_slots
assert await c_master_admin.execute_command("CLUSTER SLOTS") == expected_full_cluster_slots

assert await c_master.execute_command("CLUSTER NODES") == expected_hidden_cluster_nodes
assert await c_master_admin.execute_command("CLUSTER NODES") == expected_full_cluster_nodes

assert await c_master.execute_command("CLUSTER SHARDS") == expected_hidden_cluster_shards
assert await c_master_admin.execute_command("CLUSTER SHARDS") == expected_full_cluster_shards


@dfly_args({"cluster_mode": "emulated"})
async def test_cluster_info(async_client):
res = await async_client.execute_command("CLUSTER INFO")
Expand Down

0 comments on commit b0d633f

Please sign in to comment.