Skip to content

Commit 7dc0e19

Browse files
committed
[fix](cloud) Fix the residual metrics of cluster after drop the cluster in cloud
1 parent 94d7f2e commit 7dc0e19

File tree

6 files changed

+188
-1
lines changed

6 files changed

+188
-1
lines changed

fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudClusterChecker.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -152,10 +152,10 @@ private void checkToDelCluster(Map<String, ClusterPB> remoteClusterIdToPB, Set<S
152152
// del clusterName
153153
String delClusterName = cloudSystemInfoService.getClusterNameByClusterId(delId);
154154
if (delClusterName.isEmpty()) {
155-
LOG.warn("can't get delClusterName, clusterId: {}, plz check", delId);
156155
return;
157156
}
158157
// del clusterID
158+
MetricRepo.unregisterCloudMetrics(delId, delClusterName, toDel);
159159
cloudSystemInfoService.dropCluster(delId, delClusterName);
160160
}
161161
);

fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudInstanceStatusChecker.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -472,6 +472,8 @@ private void removeObsoleteVirtualGroups(List<Cloud.ClusterPB> virtualClusters)
472472
// in fe mem, but not in meta server
473473
if (!msVirtualClusters.contains(computeGroup.getId())) {
474474
LOG.info("virtual compute group {} will be removed.", computeGroup.getName());
475+
MetricRepo.unregisterCloudMetrics(computeGroup.getId(), computeGroup.getName(),
476+
Collections.emptyList());
475477
cloudSystemInfoService.removeComputeGroup(computeGroup.getId(), computeGroup.getName());
476478
// cancel invalid job
477479
if (!computeGroup.getPolicy().getCacheWarmupJobIds().isEmpty()) {

fe/fe-core/src/main/java/org/apache/doris/cloud/system/CloudSystemInfoService.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -406,6 +406,7 @@ public void updateCloudClusterMapNoLock(List<Backend> toAdd, List<Backend> toDel
406406
// ATTN: Empty clusters are treated as dropped clusters.
407407
if (be.isEmpty()) {
408408
LOG.info("del clusterId {} and clusterName {} due to be nodes eq 0", clusterId, clusterName);
409+
MetricRepo.unregisterCloudMetrics(clusterId, clusterName, toDel);
409410
boolean succ = clusterNameToId.remove(clusterName, clusterId);
410411

411412
// remove from computeGroupIdToComputeGroup

fe/fe-core/src/main/java/org/apache/doris/metric/AutoMappedMetric.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,4 +38,7 @@ public Map<String, M> getMetrics() {
3838
return nameToMetric;
3939
}
4040

41+
public void remove(String name) {
42+
nameToMetric.remove(name);
43+
}
4144
}

fe/fe-core/src/main/java/org/apache/doris/metric/MetricRepo.java

Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1532,4 +1532,77 @@ public static void updateClusterQueryLatency(String clusterName, long elapseMs)
15321532
String key = clusterId + CloudMetrics.CLOUD_CLUSTER_DELIMITER + clusterName;
15331533
CloudMetrics.CLUSTER_QUERY_LATENCY_HISTO.getOrAdd(key).update(elapseMs);
15341534
}
1535+
1536+
public static void unregisterCloudMetrics(String clusterId, String clusterName, List<Backend> backends) {
1537+
if (!MetricRepo.isInit || Config.isNotCloudMode() || Strings.isNullOrEmpty(clusterId)) {
1538+
return;
1539+
}
1540+
LOG.debug("unregister cloud metrics for cluster {}", clusterId);
1541+
try {
1542+
List<MetricLabel> labels = new ArrayList<>();
1543+
labels.add(new MetricLabel("cluster_id", clusterId));
1544+
labels.add(new MetricLabel("cluster_name", clusterName));
1545+
1546+
LongCounterMetric requestAllCounter = CloudMetrics.CLUSTER_REQUEST_ALL_COUNTER.getOrAdd(clusterId);
1547+
CloudMetrics.CLUSTER_REQUEST_ALL_COUNTER.remove(clusterId);
1548+
DORIS_METRIC_REGISTER.removeMetricsByNameAndLabels(requestAllCounter.getName(), labels);
1549+
1550+
LongCounterMetric queryAllCounter = CloudMetrics.CLUSTER_QUERY_ALL_COUNTER.getOrAdd(clusterId);
1551+
CloudMetrics.CLUSTER_QUERY_ALL_COUNTER.remove(clusterId);
1552+
DORIS_METRIC_REGISTER.removeMetricsByNameAndLabels(queryAllCounter.getName(), labels);
1553+
1554+
LongCounterMetric queryErrCounter = CloudMetrics.CLUSTER_QUERY_ERR_COUNTER.getOrAdd(clusterId);
1555+
CloudMetrics.CLUSTER_QUERY_ERR_COUNTER.remove(clusterId);
1556+
DORIS_METRIC_REGISTER.removeMetricsByNameAndLabels(queryErrCounter.getName(), labels);
1557+
1558+
LongCounterMetric warmUpJobExecCounter = CloudMetrics.CLUSTER_WARM_UP_JOB_EXEC_COUNT.getOrAdd(clusterId);
1559+
CloudMetrics.CLUSTER_WARM_UP_JOB_EXEC_COUNT.remove(clusterId);
1560+
DORIS_METRIC_REGISTER.removeMetricsByNameAndLabels(warmUpJobExecCounter.getName(), labels);
1561+
1562+
LongCounterMetric warmUpJobRequestedTablets =
1563+
CloudMetrics.CLUSTER_WARM_UP_JOB_REQUESTED_TABLETS.getOrAdd(clusterId);
1564+
CloudMetrics.CLUSTER_WARM_UP_JOB_REQUESTED_TABLETS.remove(clusterId);
1565+
DORIS_METRIC_REGISTER.removeMetricsByNameAndLabels(warmUpJobRequestedTablets.getName(), labels);
1566+
1567+
LongCounterMetric warmUpJobFinishedTablets =
1568+
CloudMetrics.CLUSTER_WARM_UP_JOB_FINISHED_TABLETS.getOrAdd(clusterId);
1569+
CloudMetrics.CLUSTER_WARM_UP_JOB_FINISHED_TABLETS.remove(clusterId);
1570+
DORIS_METRIC_REGISTER.removeMetricsByNameAndLabels(warmUpJobFinishedTablets.getName(), labels);
1571+
1572+
GaugeMetricImpl<Double> requestPerSecondGauge = CloudMetrics.CLUSTER_REQUEST_PER_SECOND_GAUGE
1573+
.getOrAdd(clusterId);
1574+
CloudMetrics.CLUSTER_REQUEST_PER_SECOND_GAUGE.remove(clusterId);
1575+
DORIS_METRIC_REGISTER.removeMetricsByNameAndLabels(requestPerSecondGauge.getName(), labels);
1576+
1577+
GaugeMetricImpl<Double> queryPerSecondGauge = CloudMetrics.CLUSTER_QUERY_PER_SECOND_GAUGE
1578+
.getOrAdd(clusterId);
1579+
CloudMetrics.CLUSTER_QUERY_PER_SECOND_GAUGE.remove(clusterId);
1580+
DORIS_METRIC_REGISTER.removeMetricsByNameAndLabels(queryPerSecondGauge.getName(), labels);
1581+
1582+
GaugeMetricImpl<Double> queryErrRateGauge = CloudMetrics.CLUSTER_QUERY_ERR_RATE_GAUGE.getOrAdd(clusterId);
1583+
CloudMetrics.CLUSTER_QUERY_ERR_RATE_GAUGE.remove(clusterId);
1584+
DORIS_METRIC_REGISTER.removeMetricsByNameAndLabels(queryErrRateGauge.getName(), labels);
1585+
1586+
METRIC_REGISTER.getHistograms().keySet().stream()
1587+
.filter(k -> k.contains(clusterId))
1588+
.forEach(METRIC_REGISTER::remove);
1589+
1590+
for (Backend backend : backends) {
1591+
List<MetricLabel> backendLabels = new ArrayList<>();
1592+
backendLabels.add(new MetricLabel("cluster_id", clusterId));
1593+
backendLabels.add(new MetricLabel("cluster_name", clusterName));
1594+
backendLabels.add(new MetricLabel("address", backend.getAddress()));
1595+
String key = clusterId + "_" + backend.getAddress();
1596+
GaugeMetricImpl<Integer> metric = CloudMetrics.CLUSTER_BACKEND_ALIVE.getOrAdd(key);
1597+
MetricRepo.DORIS_METRIC_REGISTER.removeMetricsByNameAndLabels(metric.getName(), backendLabels);
1598+
}
1599+
1600+
GaugeMetricImpl<Integer> backendAliveTotal = CloudMetrics.CLUSTER_BACKEND_ALIVE_TOTAL.getOrAdd(clusterId);
1601+
CloudMetrics.CLUSTER_BACKEND_ALIVE_TOTAL.remove(clusterId);
1602+
MetricRepo.DORIS_METRIC_REGISTER.removeMetricsByNameAndLabels(backendAliveTotal.getName(), labels);
1603+
1604+
} catch (Throwable t) {
1605+
LOG.warn("unregister cloud metrics for cluster {} failed", clusterId, t);
1606+
}
1607+
}
15351608
}
Lines changed: 108 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,108 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
import org.apache.doris.regression.suite.ClusterOptions
19+
import groovy.json.JsonOutput
20+
21+
suite('test_drop_cluster_clean_metrics', 'docker') {
22+
if (!isCloudMode()) {
23+
return;
24+
}
25+
def options = new ClusterOptions()
26+
options.feConfigs += [
27+
'cloud_cluster_check_interval_second=1',
28+
'cloud_tablet_rebalancer_interval_second=2',
29+
'sys_log_verbose_modules=org',
30+
'heartbeat_interval_second=1',
31+
'rehash_tablet_after_be_dead_seconds=3600',
32+
'enable_cloud_warm_up_for_rebalance=false'
33+
]
34+
options.beConfigs += [
35+
'report_tablet_interval_seconds=1',
36+
'schedule_sync_tablets_interval_s=18000',
37+
'disable_auto_compaction=true',
38+
'sys_log_verbose_modules=*'
39+
]
40+
options.setFeNum(2)
41+
options.setBeNum(2)
42+
options.cloudMode = true
43+
options.enableDebugPoints()
44+
45+
def drop_cluster_api = { msHttpPort, request_body, check_func ->
46+
httpTest {
47+
endpoint msHttpPort
48+
uri "/MetaService/http/drop_cluster?token=$token"
49+
body request_body
50+
check check_func
51+
}
52+
}
53+
54+
def getFEMetrics = {ip, port, name ->
55+
def url = "http://${ip}:${port}/metrics"
56+
logger.info("getFEMetrics1, url: ${url}, name: ${name}")
57+
def metrics = new URL(url).text
58+
59+
def metricLinePattern = java.util.regex.Pattern.compile(java.util.regex.Pattern.quote(name))
60+
61+
def matcher = metricLinePattern.matcher(metrics)
62+
boolean found = false
63+
while (matcher.find()) {
64+
found = true
65+
logger.info("getFEMetrics MATCH FOUND: ${matcher.group(0)}")
66+
}
67+
68+
if (found) {
69+
return true
70+
} else {
71+
def snippet = metrics.length() > 2000 ? metrics.substring(0, 2000) + "..." : metrics
72+
logger.info("getFEMetrics NO MATCH for name=${name}, metrics snippet:\n${snippet}")
73+
return false
74+
}
75+
}
76+
77+
def testCase = { ->
78+
def ms = cluster.getAllMetaservices().get(0)
79+
def msHttpPort = ms.host + ":" + ms.httpPort
80+
def fe = cluster.getOneFollowerFe();
81+
sleep(3000) // wait for metrics ready
82+
def metrics1 = """cluster_id="compute_cluster_id","""
83+
assertTrue(getFEMetrics(fe.host, fe.httpPort, metrics1))
84+
85+
// drop compute cluster
86+
def beClusterMap = [cluster_id:"compute_cluster_id"]
87+
def instance = [instance_id: "default_instance_id", cluster: beClusterMap]
88+
def jsonOutput = new JsonOutput()
89+
def dropFeClusterBody = jsonOutput.toJson(instance)
90+
drop_cluster_api.call(msHttpPort, dropFeClusterBody) {
91+
respCode, body ->
92+
log.info("drop fe cluster http cli result: ${body} ${respCode}".toString())
93+
def json = parseJson(body)
94+
}
95+
sleep(3000) // wait for metrics cleaned
96+
assertFalse(getFEMetrics(fe.host, fe.httpPort, metrics1))
97+
98+
cluster.addBackend(2, "new_cluster")
99+
100+
sleep(3000) // wait for metrics cleaned
101+
def metrics2 = """cluster_id="new_cluster_id","""
102+
assertTrue(getFEMetrics(fe.host, fe.httpPort, metrics2))
103+
}
104+
105+
docker(options) {
106+
testCase()
107+
}
108+
}

0 commit comments

Comments
 (0)