Skip to content

Commit f49b444

Browse files
[opt](agent-task) Add a daemon thread to clean up agent tasks on dead BEs
1 parent e29025e commit f49b444

File tree

5 files changed

+143
-1
lines changed

5 files changed

+143
-1
lines changed

fe/fe-common/src/main/java/org/apache/doris/common/Config.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3651,4 +3651,6 @@ public static int metaServiceRpcRetryTimes() {
36513651

36523652
@ConfField(mutable = true)
36533653
public static String aws_credentials_provider_version = "v2";
3654+
3655+
public static long agent_task_health_check_intervals_ms = 5 * 60 * 1000L; // 5 min
36543656
}
Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
package org.apache.doris.task;
19+
20+
import org.apache.doris.catalog.Env;
21+
import org.apache.doris.common.Config;
22+
import org.apache.doris.common.Status;
23+
import org.apache.doris.common.util.MasterDaemon;
24+
import org.apache.doris.system.Backend;
25+
import org.apache.doris.system.SystemInfoService;
26+
import org.apache.doris.thrift.TStatusCode;
27+
28+
public class AgentTaskCleanupDaemon extends MasterDaemon {
29+
public AgentTaskCleanupDaemon() {
30+
super("agent-task-cleanup", Config.agent_task_health_check_intervals_ms);
31+
}
32+
33+
@Override
34+
protected void runAfterCatalogReady() {
35+
SystemInfoService infoService = Env.getCurrentSystemInfo();
36+
infoService.getAllClusterBackends(false)
37+
.stream()
38+
.filter(backend -> !backend.isAlive())
39+
.map(Backend::getId)
40+
.forEach(this::removeInactiveBeAgentTasks);
41+
}
42+
43+
private void removeInactiveBeAgentTasks(Long beId) {
44+
AgentTaskQueue.removeTask(beId, (agentTask -> {
45+
long tabletId = agentTask.getTabletId();
46+
String errMsg = "BE down, agent task is aborted. BE=" + beId + ", tablet=" + tabletId;
47+
if (agentTask instanceof PushTask) {
48+
PushTask task = ((PushTask) agentTask);
49+
task.countDownLatchWithStatus(beId, tabletId, new Status(TStatusCode.ABORTED, errMsg));
50+
}
51+
agentTask.setFinished(true);
52+
agentTask.setErrorCode(TStatusCode.ABORTED);
53+
agentTask.setErrorMsg(errMsg);
54+
}));
55+
}
56+
}

fe/fe-core/src/main/java/org/apache/doris/task/AgentTaskQueue.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import java.util.List;
3434
import java.util.Map;
3535
import java.util.Set;
36+
import java.util.function.Consumer;
3637

3738
/**
3839
* Task queue
@@ -96,6 +97,16 @@ public static synchronized void removeTask(long backendId, TTaskType type, long
9697
--taskNum;
9798
}
9899

100+
public static synchronized void removeTask(long backendId, Consumer<AgentTask> onTaskRemoved) {
101+
Map<TTaskType, Map<Long, AgentTask>> tasks = AgentTaskQueue.tasks.row(backendId);
102+
tasks.forEach((type, taskSet) -> {
103+
taskSet.forEach((signature, task) -> {
104+
removeTask(backendId, type, signature);
105+
onTaskRemoved.accept(task);
106+
});
107+
});
108+
}
109+
99110
/*
100111
* we cannot define a push task with only 'backendId', 'signature' and 'TTaskType'
101112
* add version and TPushType to help

fe/fe-core/src/main/java/org/apache/doris/task/PushTask.java

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ public class PushTask extends AgentTask {
5858
private TPushType pushType;
5959
private List<Predicate> conditions;
6060
// for synchronous delete
61-
private MarkedCountDownLatch latch;
61+
private MarkedCountDownLatch<Long, Long> latch;
6262

6363
// lzop decompress or not
6464
private boolean needDecompress;
@@ -212,6 +212,18 @@ public void countDownLatch(long backendId, long tabletId) {
212212
}
213213
}
214214

215+
public void countDownLatchWithStatus(long backendId, long tabletId, Status st) {
216+
if (this.latch == null) {
217+
return;
218+
}
219+
if (latch.markedCountDownWithStatus(backendId, tabletId, st)) {
220+
if (LOG.isDebugEnabled()) {
221+
LOG.debug("pushTask current latch count with status: {}. backend: {}, tablet:{}, st::{}",
222+
latch.getCount(), backendId, tabletId, st);
223+
}
224+
}
225+
}
226+
215227
// call this always means one of tasks is failed. count down to zero to finish entire task
216228
public void countDownToZero(TStatusCode code, String errMsg) {
217229
if (this.latch != null) {
Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
import org.apache.doris.regression.suite.ClusterOptions
2+
3+
suite("test_sc_stuck_when_be_down", "docker") {
4+
def options = new ClusterOptions()
5+
options.cloudMode = false
6+
options.beNum = 3
7+
options.feNum = 2
8+
options.enableDebugPoints()
9+
options.feConfigs += ["agent_task_health_check_intervals_ms=5000"]
10+
11+
docker(options) {
12+
GetDebugPoint().clearDebugPointsForAllBEs()
13+
14+
def tblName = "test_sc_stuck_when_be_down"
15+
sql """ DROP TABLE IF EXISTS ${tblName} """
16+
sql """
17+
CREATE TABLE IF NOT EXISTS ${tblName} (
18+
`k` int NOT NULL,
19+
`v0` int NOT NULL,
20+
`v1` int NOT NULL
21+
)
22+
DUPLICATE KEY(`k`)
23+
DISTRIBUTED BY HASH(`k`) BUCKETS 24
24+
PROPERTIES (
25+
"replication_allocation" = "tag.location.default: 3"
26+
)
27+
"""
28+
sql """ INSERT INTO ${tblName} SELECT number, number, number from numbers("number" = "1024") """
29+
30+
GetDebugPoint().enableDebugPointForAllBEs("SchemaChangeJob::_do_process_alter_tablet.block")
31+
try {
32+
sql """ ALTER TABLE ${tblName} MODIFY COLUMN v0 VARCHAR(100) """
33+
sleep(3000)
34+
cluster.stopBackends(1)
35+
GetDebugPoint().clearDebugPointsForAllBEs()
36+
waitForSchemaChangeDone {
37+
sql """ SHOW ALTER TABLE COLUMN WHERE TableName='${tblName}' ORDER BY createtime DESC LIMIT 1 """
38+
time 600
39+
}
40+
} finally {
41+
GetDebugPoint().clearDebugPointsForAllBEs()
42+
}
43+
44+
GetDebugPoint().enableDebugPointForAllBEs("SchemaChangeJob::_do_process_alter_tablet.block")
45+
try {
46+
sql """ ALTER TABLE ${tblName} MODIFY COLUMN v1 VARCHAR(100) """
47+
sleep(3000)
48+
cluster.stopBackends(1, 2)
49+
GetDebugPoint().clearDebugPointsForAllBEs()
50+
waitForSchemaChangeDone {
51+
sql """ SHOW ALTER TABLE COLUMN WHERE TableName='${tblName}' ORDER BY createtime DESC LIMIT 1 """
52+
time 600
53+
}
54+
assertTrue(false)
55+
} catch (Exception ignore) {
56+
// do nothing
57+
} finally {
58+
GetDebugPoint().clearDebugPointsForAllBEs()
59+
}
60+
}
61+
}

0 commit comments

Comments
 (0)