diff --git a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudClusterChecker.java b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudClusterChecker.java index 4ba162dfe1139e..338d619604f372 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudClusterChecker.java +++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudClusterChecker.java @@ -152,10 +152,10 @@ private void checkToDelCluster(Map remoteClusterIdToPB, Set virtualClusters) // in fe mem, but not in meta server if (!msVirtualClusters.contains(computeGroup.getId())) { LOG.info("virtual compute group {} will be removed.", computeGroup.getName()); + MetricRepo.unregisterCloudMetrics(computeGroup.getId(), computeGroup.getName(), + Collections.emptyList()); cloudSystemInfoService.removeComputeGroup(computeGroup.getId(), computeGroup.getName()); // cancel invalid job if (!computeGroup.getPolicy().getCacheWarmupJobIds().isEmpty()) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/cloud/system/CloudSystemInfoService.java b/fe/fe-core/src/main/java/org/apache/doris/cloud/system/CloudSystemInfoService.java index 62ab6e3b9ec86a..9f30f4a0590ff4 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/cloud/system/CloudSystemInfoService.java +++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/system/CloudSystemInfoService.java @@ -406,6 +406,7 @@ public void updateCloudClusterMapNoLock(List toAdd, List toDel // ATTN: Empty clusters are treated as dropped clusters. if (be.isEmpty()) { LOG.info("del clusterId {} and clusterName {} due to be nodes eq 0", clusterId, clusterName); + MetricRepo.unregisterCloudMetrics(clusterId, clusterName, toDel); boolean succ = clusterNameToId.remove(clusterName, clusterId); // remove from computeGroupIdToComputeGroup diff --git a/fe/fe-core/src/main/java/org/apache/doris/metric/AutoMappedMetric.java b/fe/fe-core/src/main/java/org/apache/doris/metric/AutoMappedMetric.java index 440e00330b481f..5b348c73b1593a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/metric/AutoMappedMetric.java +++ b/fe/fe-core/src/main/java/org/apache/doris/metric/AutoMappedMetric.java @@ -38,4 +38,7 @@ public Map getMetrics() { return nameToMetric; } + public void remove(String name) { + nameToMetric.remove(name); + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/metric/MetricRepo.java b/fe/fe-core/src/main/java/org/apache/doris/metric/MetricRepo.java index 2b611e8699c246..eb5fa69c9f0255 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/metric/MetricRepo.java +++ b/fe/fe-core/src/main/java/org/apache/doris/metric/MetricRepo.java @@ -1593,4 +1593,107 @@ public static void updateClusterCloudBalanceNum(String clusterName, String clust counter.setLabels(labels); MetricRepo.DORIS_METRIC_REGISTER.addMetrics(counter); } + + public static void unregisterCloudMetrics(String clusterId, String clusterName, List backends) { + if (!MetricRepo.isInit || Config.isNotCloudMode() || Strings.isNullOrEmpty(clusterId)) { + return; + } + LOG.debug("unregister cloud metrics for cluster {}", clusterId); + try { + List labels = new ArrayList<>(); + labels.add(new MetricLabel("cluster_id", clusterId)); + labels.add(new MetricLabel("cluster_name", clusterName)); + + LongCounterMetric requestAllCounter = CloudMetrics.CLUSTER_REQUEST_ALL_COUNTER.getOrAdd(clusterId); + CloudMetrics.CLUSTER_REQUEST_ALL_COUNTER.remove(clusterId); + DORIS_METRIC_REGISTER.removeMetricsByNameAndLabels(requestAllCounter.getName(), labels); + + LongCounterMetric queryAllCounter = CloudMetrics.CLUSTER_QUERY_ALL_COUNTER.getOrAdd(clusterId); + CloudMetrics.CLUSTER_QUERY_ALL_COUNTER.remove(clusterId); + DORIS_METRIC_REGISTER.removeMetricsByNameAndLabels(queryAllCounter.getName(), labels); + + LongCounterMetric queryErrCounter = CloudMetrics.CLUSTER_QUERY_ERR_COUNTER.getOrAdd(clusterId); + CloudMetrics.CLUSTER_QUERY_ERR_COUNTER.remove(clusterId); + DORIS_METRIC_REGISTER.removeMetricsByNameAndLabels(queryErrCounter.getName(), labels); + + LongCounterMetric warmUpJobExecCounter = CloudMetrics.CLUSTER_WARM_UP_JOB_EXEC_COUNT.getOrAdd(clusterId); + CloudMetrics.CLUSTER_WARM_UP_JOB_EXEC_COUNT.remove(clusterId); + DORIS_METRIC_REGISTER.removeMetricsByNameAndLabels(warmUpJobExecCounter.getName(), labels); + + LongCounterMetric warmUpJobRequestedTablets = + CloudMetrics.CLUSTER_WARM_UP_JOB_REQUESTED_TABLETS.getOrAdd(clusterId); + CloudMetrics.CLUSTER_WARM_UP_JOB_REQUESTED_TABLETS.remove(clusterId); + DORIS_METRIC_REGISTER.removeMetricsByNameAndLabels(warmUpJobRequestedTablets.getName(), labels); + + LongCounterMetric warmUpJobFinishedTablets = + CloudMetrics.CLUSTER_WARM_UP_JOB_FINISHED_TABLETS.getOrAdd(clusterId); + CloudMetrics.CLUSTER_WARM_UP_JOB_FINISHED_TABLETS.remove(clusterId); + DORIS_METRIC_REGISTER.removeMetricsByNameAndLabels(warmUpJobFinishedTablets.getName(), labels); + + GaugeMetricImpl requestPerSecondGauge = CloudMetrics.CLUSTER_REQUEST_PER_SECOND_GAUGE + .getOrAdd(clusterId); + CloudMetrics.CLUSTER_REQUEST_PER_SECOND_GAUGE.remove(clusterId); + DORIS_METRIC_REGISTER.removeMetricsByNameAndLabels(requestPerSecondGauge.getName(), labels); + + GaugeMetricImpl queryPerSecondGauge = CloudMetrics.CLUSTER_QUERY_PER_SECOND_GAUGE + .getOrAdd(clusterId); + CloudMetrics.CLUSTER_QUERY_PER_SECOND_GAUGE.remove(clusterId); + DORIS_METRIC_REGISTER.removeMetricsByNameAndLabels(queryPerSecondGauge.getName(), labels); + + GaugeMetricImpl queryErrRateGauge = CloudMetrics.CLUSTER_QUERY_ERR_RATE_GAUGE.getOrAdd(clusterId); + CloudMetrics.CLUSTER_QUERY_ERR_RATE_GAUGE.remove(clusterId); + DORIS_METRIC_REGISTER.removeMetricsByNameAndLabels(queryErrRateGauge.getName(), labels); + + LongCounterMetric clusterCloudPartitionBalanceNum = CloudMetrics + .CLUSTER_CLOUD_PARTITION_BALANCE_NUM.getOrAdd(clusterId); + CloudMetrics.CLUSTER_CLOUD_PARTITION_BALANCE_NUM.remove(clusterId); + MetricRepo.DORIS_METRIC_REGISTER + .removeMetricsByNameAndLabels(clusterCloudPartitionBalanceNum.getName(), labels); + + LongCounterMetric clusterCloudTableBalanceNum = CloudMetrics + .CLUSTER_CLOUD_TABLE_BALANCE_NUM.getOrAdd(clusterId); + CloudMetrics.CLUSTER_CLOUD_TABLE_BALANCE_NUM.remove(clusterId); + MetricRepo.DORIS_METRIC_REGISTER + .removeMetricsByNameAndLabels(clusterCloudTableBalanceNum.getName(), labels); + + LongCounterMetric clusterCloudGlobalBalanceNum = CloudMetrics + .CLUSTER_CLOUD_GLOBAL_BALANCE_NUM.getOrAdd(clusterId); + CloudMetrics.CLUSTER_CLOUD_GLOBAL_BALANCE_NUM.remove(clusterId); + MetricRepo.DORIS_METRIC_REGISTER + .removeMetricsByNameAndLabels(clusterCloudGlobalBalanceNum.getName(), labels); + + LongCounterMetric clusterCloudUpgradeBalanceNum = CloudMetrics + .CLUSTER_CLOUD_SMOOTH_UPGRADE_BALANCE_NUM.getOrAdd(clusterId); + CloudMetrics.CLUSTER_CLOUD_SMOOTH_UPGRADE_BALANCE_NUM.remove(clusterId); + MetricRepo.DORIS_METRIC_REGISTER + .removeMetricsByNameAndLabels(clusterCloudUpgradeBalanceNum.getName(), labels); + + LongCounterMetric clusterCloudWarmUpBalanceNum = CloudMetrics + .CLUSTER_CLOUD_WARM_UP_CACHE_BALANCE_NUM.getOrAdd(clusterId); + CloudMetrics.CLUSTER_CLOUD_WARM_UP_CACHE_BALANCE_NUM.remove(clusterId); + MetricRepo.DORIS_METRIC_REGISTER + .removeMetricsByNameAndLabels(clusterCloudWarmUpBalanceNum.getName(), labels); + + METRIC_REGISTER.getHistograms().keySet().stream() + .filter(k -> k.contains(clusterId)) + .forEach(METRIC_REGISTER::remove); + + for (Backend backend : backends) { + List backendLabels = new ArrayList<>(); + backendLabels.add(new MetricLabel("cluster_id", clusterId)); + backendLabels.add(new MetricLabel("cluster_name", clusterName)); + backendLabels.add(new MetricLabel("address", backend.getAddress())); + String key = clusterId + "_" + backend.getAddress(); + GaugeMetricImpl metric = CloudMetrics.CLUSTER_BACKEND_ALIVE.getOrAdd(key); + MetricRepo.DORIS_METRIC_REGISTER.removeMetricsByNameAndLabels(metric.getName(), backendLabels); + } + + GaugeMetricImpl backendAliveTotal = CloudMetrics.CLUSTER_BACKEND_ALIVE_TOTAL.getOrAdd(clusterId); + CloudMetrics.CLUSTER_BACKEND_ALIVE_TOTAL.remove(clusterId); + MetricRepo.DORIS_METRIC_REGISTER.removeMetricsByNameAndLabels(backendAliveTotal.getName(), labels); + + } catch (Throwable t) { + LOG.warn("unregister cloud metrics for cluster {} failed", clusterId, t); + } + } } diff --git a/regression-test/suites/cloud_p0/multi_cluster/test_drop_cluster_clean_metrics.groovy b/regression-test/suites/cloud_p0/multi_cluster/test_drop_cluster_clean_metrics.groovy new file mode 100644 index 00000000000000..91410e246c1c81 --- /dev/null +++ b/regression-test/suites/cloud_p0/multi_cluster/test_drop_cluster_clean_metrics.groovy @@ -0,0 +1,108 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import org.apache.doris.regression.suite.ClusterOptions +import groovy.json.JsonOutput + +suite('test_drop_cluster_clean_metrics', 'docker') { + if (!isCloudMode()) { + return; + } + def options = new ClusterOptions() + options.feConfigs += [ + 'cloud_cluster_check_interval_second=1', + 'cloud_tablet_rebalancer_interval_second=2', + 'sys_log_verbose_modules=org', + 'heartbeat_interval_second=1', + 'rehash_tablet_after_be_dead_seconds=3600', + 'enable_cloud_warm_up_for_rebalance=false' + ] + options.beConfigs += [ + 'report_tablet_interval_seconds=1', + 'schedule_sync_tablets_interval_s=18000', + 'disable_auto_compaction=true', + 'sys_log_verbose_modules=*' + ] + options.setFeNum(2) + options.setBeNum(2) + options.cloudMode = true + options.enableDebugPoints() + + def drop_cluster_api = { msHttpPort, request_body, check_func -> + httpTest { + endpoint msHttpPort + uri "/MetaService/http/drop_cluster?token=$token" + body request_body + check check_func + } + } + + def getFEMetrics = {ip, port, name -> + def url = "http://${ip}:${port}/metrics" + logger.info("getFEMetrics1, url: ${url}, name: ${name}") + def metrics = new URL(url).text + + def metricLinePattern = java.util.regex.Pattern.compile(java.util.regex.Pattern.quote(name)) + + def matcher = metricLinePattern.matcher(metrics) + boolean found = false + while (matcher.find()) { + found = true + logger.info("getFEMetrics MATCH FOUND: ${matcher.group(0)}") + } + + if (found) { + return true + } else { + def snippet = metrics.length() > 2000 ? metrics.substring(0, 2000) + "..." : metrics + logger.info("getFEMetrics NO MATCH for name=${name}, metrics snippet:\n${snippet}") + return false + } + } + + def testCase = { -> + def ms = cluster.getAllMetaservices().get(0) + def msHttpPort = ms.host + ":" + ms.httpPort + def fe = cluster.getOneFollowerFe(); + sleep(3000) // wait for metrics ready + def metrics1 = """cluster_id="compute_cluster_id",""" + assertTrue(getFEMetrics(fe.host, fe.httpPort, metrics1)) + + // drop compute cluster + def beClusterMap = [cluster_id:"compute_cluster_id"] + def instance = [instance_id: "default_instance_id", cluster: beClusterMap] + def jsonOutput = new JsonOutput() + def dropFeClusterBody = jsonOutput.toJson(instance) + drop_cluster_api.call(msHttpPort, dropFeClusterBody) { + respCode, body -> + log.info("drop fe cluster http cli result: ${body} ${respCode}".toString()) + def json = parseJson(body) + } + sleep(3000) // wait for metrics cleaned + assertFalse(getFEMetrics(fe.host, fe.httpPort, metrics1)) + + cluster.addBackend(2, "new_cluster") + + sleep(3000) // wait for metrics cleaned + def metrics2 = """cluster_id="new_cluster_id",""" + assertTrue(getFEMetrics(fe.host, fe.httpPort, metrics2)) + } + + docker(options) { + testCase() + } +} \ No newline at end of file