diff --git a/fluss-rpc/src/main/java/org/apache/fluss/rpc/metrics/ClientMetricGroup.java b/fluss-rpc/src/main/java/org/apache/fluss/rpc/metrics/ClientMetricGroup.java index 163ee8fced..65e1577ece 100644 --- a/fluss-rpc/src/main/java/org/apache/fluss/rpc/metrics/ClientMetricGroup.java +++ b/fluss-rpc/src/main/java/org/apache/fluss/rpc/metrics/ClientMetricGroup.java @@ -21,6 +21,8 @@ import org.apache.fluss.metrics.groups.AbstractMetricGroup; import org.apache.fluss.metrics.registry.MetricRegistry; +import javax.annotation.Nullable; + import java.util.Map; /** The metric group for clients. */ @@ -29,10 +31,16 @@ public class ClientMetricGroup extends AbstractMetricGroup { private static final String NAME = "client"; private final String clientId; + private final @Nullable String clusterId; public ClientMetricGroup(MetricRegistry registry, String clientId) { + this(registry, null, clientId); + } + + public ClientMetricGroup(MetricRegistry registry, @Nullable String clusterId, String clientId) { super(registry, new String[] {NAME}, null); this.clientId = clientId; + this.clusterId = clusterId; } @Override @@ -43,6 +51,11 @@ protected String getGroupName(CharacterFilter filter) { @Override protected final void putVariables(Map variables) { variables.put("client_id", clientId); + if (clusterId != null) { + variables.put("cluster_id", clusterId); + } else { + variables.put("cluster_id", ""); + } } public MetricRegistry getMetricRegistry() { diff --git a/fluss-server/src/main/java/org/apache/fluss/server/tablet/TabletServer.java b/fluss-server/src/main/java/org/apache/fluss/server/tablet/TabletServer.java index 9f8db80007..61c52edbbc 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/tablet/TabletServer.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/tablet/TabletServer.java @@ -202,7 +202,10 @@ protected void startServices() throws Exception { // rpc client to sent request to the tablet server where the leader replica is located // to fetch log. this.clientMetricGroup = - new ClientMetricGroup(metricRegistry, SERVER_NAME + "-" + serverId); + new ClientMetricGroup( + metricRegistry, + ServerMetricUtils.validateAndGetClusterId(conf), + SERVER_NAME + "-" + serverId); this.rpcClient = RpcClient.create(conf, clientMetricGroup, true); CoordinatorGateway coordinatorGateway =