Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -152,10 +152,10 @@ private void checkToDelCluster(Map<String, ClusterPB> remoteClusterIdToPB, Set<S
// del clusterName
String delClusterName = cloudSystemInfoService.getClusterNameByClusterId(delId);
if (delClusterName.isEmpty()) {
LOG.warn("can't get delClusterName, clusterId: {}, plz check", delId);
return;
}
// del clusterID
MetricRepo.unregisterCloudMetrics(delId, delClusterName, toDel);
cloudSystemInfoService.dropCluster(delId, delClusterName);
}
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -469,6 +469,8 @@ private void removeObsoleteVirtualGroups(List<Cloud.ClusterPB> virtualClusters)
// in fe mem, but not in meta server
if (!msVirtualClusters.contains(computeGroup.getId())) {
LOG.info("virtual compute group {} will be removed.", computeGroup.getName());
MetricRepo.unregisterCloudMetrics(computeGroup.getId(), computeGroup.getName(),
Collections.emptyList());
cloudSystemInfoService.removeComputeGroup(computeGroup.getId(), computeGroup.getName());
// cancel invalid job
if (!computeGroup.getPolicy().getCacheWarmupJobIds().isEmpty()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -406,6 +406,7 @@ public void updateCloudClusterMapNoLock(List<Backend> toAdd, List<Backend> toDel
// ATTN: Empty clusters are treated as dropped clusters.
if (be.isEmpty()) {
LOG.info("del clusterId {} and clusterName {} due to be nodes eq 0", clusterId, clusterName);
MetricRepo.unregisterCloudMetrics(clusterId, clusterName, toDel);
boolean succ = clusterNameToId.remove(clusterName, clusterId);

// remove from computeGroupIdToComputeGroup
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,4 +38,7 @@ public Map<String, M> getMetrics() {
return nameToMetric;
}

public void remove(String name) {
nameToMetric.remove(name);
}
}
103 changes: 103 additions & 0 deletions fe/fe-core/src/main/java/org/apache/doris/metric/MetricRepo.java
Original file line number Diff line number Diff line change
Expand Up @@ -1593,4 +1593,107 @@ public static void updateClusterCloudBalanceNum(String clusterName, String clust
counter.setLabels(labels);
MetricRepo.DORIS_METRIC_REGISTER.addMetrics(counter);
}

public static void unregisterCloudMetrics(String clusterId, String clusterName, List<Backend> backends) {
if (!MetricRepo.isInit || Config.isNotCloudMode() || Strings.isNullOrEmpty(clusterId)) {
return;
}
LOG.debug("unregister cloud metrics for cluster {}", clusterId);
try {
List<MetricLabel> labels = new ArrayList<>();
labels.add(new MetricLabel("cluster_id", clusterId));
labels.add(new MetricLabel("cluster_name", clusterName));

LongCounterMetric requestAllCounter = CloudMetrics.CLUSTER_REQUEST_ALL_COUNTER.getOrAdd(clusterId);
CloudMetrics.CLUSTER_REQUEST_ALL_COUNTER.remove(clusterId);
DORIS_METRIC_REGISTER.removeMetricsByNameAndLabels(requestAllCounter.getName(), labels);

LongCounterMetric queryAllCounter = CloudMetrics.CLUSTER_QUERY_ALL_COUNTER.getOrAdd(clusterId);
CloudMetrics.CLUSTER_QUERY_ALL_COUNTER.remove(clusterId);
DORIS_METRIC_REGISTER.removeMetricsByNameAndLabels(queryAllCounter.getName(), labels);

LongCounterMetric queryErrCounter = CloudMetrics.CLUSTER_QUERY_ERR_COUNTER.getOrAdd(clusterId);
CloudMetrics.CLUSTER_QUERY_ERR_COUNTER.remove(clusterId);
DORIS_METRIC_REGISTER.removeMetricsByNameAndLabels(queryErrCounter.getName(), labels);

LongCounterMetric warmUpJobExecCounter = CloudMetrics.CLUSTER_WARM_UP_JOB_EXEC_COUNT.getOrAdd(clusterId);
CloudMetrics.CLUSTER_WARM_UP_JOB_EXEC_COUNT.remove(clusterId);
DORIS_METRIC_REGISTER.removeMetricsByNameAndLabels(warmUpJobExecCounter.getName(), labels);

LongCounterMetric warmUpJobRequestedTablets =
CloudMetrics.CLUSTER_WARM_UP_JOB_REQUESTED_TABLETS.getOrAdd(clusterId);
CloudMetrics.CLUSTER_WARM_UP_JOB_REQUESTED_TABLETS.remove(clusterId);
DORIS_METRIC_REGISTER.removeMetricsByNameAndLabels(warmUpJobRequestedTablets.getName(), labels);

LongCounterMetric warmUpJobFinishedTablets =
CloudMetrics.CLUSTER_WARM_UP_JOB_FINISHED_TABLETS.getOrAdd(clusterId);
CloudMetrics.CLUSTER_WARM_UP_JOB_FINISHED_TABLETS.remove(clusterId);
DORIS_METRIC_REGISTER.removeMetricsByNameAndLabels(warmUpJobFinishedTablets.getName(), labels);

GaugeMetricImpl<Double> requestPerSecondGauge = CloudMetrics.CLUSTER_REQUEST_PER_SECOND_GAUGE
.getOrAdd(clusterId);
CloudMetrics.CLUSTER_REQUEST_PER_SECOND_GAUGE.remove(clusterId);
DORIS_METRIC_REGISTER.removeMetricsByNameAndLabels(requestPerSecondGauge.getName(), labels);

GaugeMetricImpl<Double> queryPerSecondGauge = CloudMetrics.CLUSTER_QUERY_PER_SECOND_GAUGE
.getOrAdd(clusterId);
CloudMetrics.CLUSTER_QUERY_PER_SECOND_GAUGE.remove(clusterId);
DORIS_METRIC_REGISTER.removeMetricsByNameAndLabels(queryPerSecondGauge.getName(), labels);

GaugeMetricImpl<Double> queryErrRateGauge = CloudMetrics.CLUSTER_QUERY_ERR_RATE_GAUGE.getOrAdd(clusterId);
CloudMetrics.CLUSTER_QUERY_ERR_RATE_GAUGE.remove(clusterId);
DORIS_METRIC_REGISTER.removeMetricsByNameAndLabels(queryErrRateGauge.getName(), labels);

LongCounterMetric clusterCloudPartitionBalanceNum = CloudMetrics
.CLUSTER_CLOUD_PARTITION_BALANCE_NUM.getOrAdd(clusterId);
CloudMetrics.CLUSTER_CLOUD_PARTITION_BALANCE_NUM.remove(clusterId);
MetricRepo.DORIS_METRIC_REGISTER
.removeMetricsByNameAndLabels(clusterCloudPartitionBalanceNum.getName(), labels);

LongCounterMetric clusterCloudTableBalanceNum = CloudMetrics
.CLUSTER_CLOUD_TABLE_BALANCE_NUM.getOrAdd(clusterId);
CloudMetrics.CLUSTER_CLOUD_TABLE_BALANCE_NUM.remove(clusterId);
MetricRepo.DORIS_METRIC_REGISTER
.removeMetricsByNameAndLabels(clusterCloudTableBalanceNum.getName(), labels);

LongCounterMetric clusterCloudGlobalBalanceNum = CloudMetrics
.CLUSTER_CLOUD_GLOBAL_BALANCE_NUM.getOrAdd(clusterId);
CloudMetrics.CLUSTER_CLOUD_GLOBAL_BALANCE_NUM.remove(clusterId);
MetricRepo.DORIS_METRIC_REGISTER
.removeMetricsByNameAndLabels(clusterCloudGlobalBalanceNum.getName(), labels);

LongCounterMetric clusterCloudUpgradeBalanceNum = CloudMetrics
.CLUSTER_CLOUD_SMOOTH_UPGRADE_BALANCE_NUM.getOrAdd(clusterId);
CloudMetrics.CLUSTER_CLOUD_SMOOTH_UPGRADE_BALANCE_NUM.remove(clusterId);
MetricRepo.DORIS_METRIC_REGISTER
.removeMetricsByNameAndLabels(clusterCloudUpgradeBalanceNum.getName(), labels);

LongCounterMetric clusterCloudWarmUpBalanceNum = CloudMetrics
.CLUSTER_CLOUD_WARM_UP_CACHE_BALANCE_NUM.getOrAdd(clusterId);
CloudMetrics.CLUSTER_CLOUD_WARM_UP_CACHE_BALANCE_NUM.remove(clusterId);
MetricRepo.DORIS_METRIC_REGISTER
.removeMetricsByNameAndLabels(clusterCloudWarmUpBalanceNum.getName(), labels);

METRIC_REGISTER.getHistograms().keySet().stream()
.filter(k -> k.contains(clusterId))
.forEach(METRIC_REGISTER::remove);

for (Backend backend : backends) {
List<MetricLabel> backendLabels = new ArrayList<>();
backendLabels.add(new MetricLabel("cluster_id", clusterId));
backendLabels.add(new MetricLabel("cluster_name", clusterName));
backendLabels.add(new MetricLabel("address", backend.getAddress()));
String key = clusterId + "_" + backend.getAddress();
GaugeMetricImpl<Integer> metric = CloudMetrics.CLUSTER_BACKEND_ALIVE.getOrAdd(key);
MetricRepo.DORIS_METRIC_REGISTER.removeMetricsByNameAndLabels(metric.getName(), backendLabels);
}

GaugeMetricImpl<Integer> backendAliveTotal = CloudMetrics.CLUSTER_BACKEND_ALIVE_TOTAL.getOrAdd(clusterId);
CloudMetrics.CLUSTER_BACKEND_ALIVE_TOTAL.remove(clusterId);
MetricRepo.DORIS_METRIC_REGISTER.removeMetricsByNameAndLabels(backendAliveTotal.getName(), labels);

} catch (Throwable t) {
LOG.warn("unregister cloud metrics for cluster {} failed", clusterId, t);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

import org.apache.doris.regression.suite.ClusterOptions
import groovy.json.JsonOutput

suite('test_drop_cluster_clean_metrics', 'docker') {
if (!isCloudMode()) {
return;
}
def options = new ClusterOptions()
options.feConfigs += [
'cloud_cluster_check_interval_second=1',
'cloud_tablet_rebalancer_interval_second=2',
'sys_log_verbose_modules=org',
'heartbeat_interval_second=1',
'rehash_tablet_after_be_dead_seconds=3600',
'enable_cloud_warm_up_for_rebalance=false'
]
options.beConfigs += [
'report_tablet_interval_seconds=1',
'schedule_sync_tablets_interval_s=18000',
'disable_auto_compaction=true',
'sys_log_verbose_modules=*'
]
options.setFeNum(2)
options.setBeNum(2)
options.cloudMode = true
options.enableDebugPoints()

def drop_cluster_api = { msHttpPort, request_body, check_func ->
httpTest {
endpoint msHttpPort
uri "/MetaService/http/drop_cluster?token=$token"
body request_body
check check_func
}
}

def getFEMetrics = {ip, port, name ->
def url = "http://${ip}:${port}/metrics"
logger.info("getFEMetrics1, url: ${url}, name: ${name}")
def metrics = new URL(url).text

def metricLinePattern = java.util.regex.Pattern.compile(java.util.regex.Pattern.quote(name))

def matcher = metricLinePattern.matcher(metrics)
boolean found = false
while (matcher.find()) {
found = true
logger.info("getFEMetrics MATCH FOUND: ${matcher.group(0)}")
}

if (found) {
return true
} else {
def snippet = metrics.length() > 2000 ? metrics.substring(0, 2000) + "..." : metrics
logger.info("getFEMetrics NO MATCH for name=${name}, metrics snippet:\n${snippet}")
return false
}
}

def testCase = { ->
def ms = cluster.getAllMetaservices().get(0)
def msHttpPort = ms.host + ":" + ms.httpPort
def fe = cluster.getOneFollowerFe();
sleep(3000) // wait for metrics ready
def metrics1 = """cluster_id="compute_cluster_id","""
assertTrue(getFEMetrics(fe.host, fe.httpPort, metrics1))

// drop compute cluster
def beClusterMap = [cluster_id:"compute_cluster_id"]
def instance = [instance_id: "default_instance_id", cluster: beClusterMap]
def jsonOutput = new JsonOutput()
def dropFeClusterBody = jsonOutput.toJson(instance)
drop_cluster_api.call(msHttpPort, dropFeClusterBody) {
respCode, body ->
log.info("drop fe cluster http cli result: ${body} ${respCode}".toString())
def json = parseJson(body)
}
sleep(3000) // wait for metrics cleaned
assertFalse(getFEMetrics(fe.host, fe.httpPort, metrics1))

cluster.addBackend(2, "new_cluster")

sleep(3000) // wait for metrics cleaned
def metrics2 = """cluster_id="new_cluster_id","""
assertTrue(getFEMetrics(fe.host, fe.httpPort, metrics2))
}

docker(options) {
testCase()
}
}