Skip to content

Commit

Permalink
Ensure forced failure handling for tasks from previous master node du…
Browse files Browse the repository at this point in the history
…ring failover or restart

When the Master node restarts or switches to a new primary, the new Master must take over task scheduling. In this scenario, tasks running on the previous Master may remain in an uncertain state (e.g., suspended or incomplete). To ensure system consistency and accurate task states, the new Master should enforce failure handling for these tasks after its initialization to clean up any residual task states.
  • Loading branch information
CalvinKirs committed Dec 16, 2024
1 parent d1e2998 commit d23467e
Show file tree
Hide file tree
Showing 5 changed files with 53 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,12 @@ public void initTasks(Collection<? extends T> tasks, TaskType taskType) {
task.setJobId(getJobId());
task.setCreateTimeMs(System.currentTimeMillis());
task.setStatus(TaskStatus.PENDING);
try {
task.initialize();
} catch (JobException e) {
tasks.remove(task);
log.warn("task initialize failed, job id is {}, task id is {}", jobId, task.getTaskId(), e);
}
});
getRunningTasks().addAll(tasks);
this.startTimeMs = System.currentTimeMillis();
Expand Down Expand Up @@ -467,17 +473,20 @@ public void updateTaskStatusAfterRestart() {
if (CollectionUtils.isEmpty(tasks)) {
return;
}
List<T> runningTasks = tasks.stream().filter(task -> task.getStatus().equals(TaskStatus.RUNNING) || task.getStatus().equals(TaskStatus.PENDING))
List<T> runningTasks = tasks.stream().filter(task -> task.getStatus().equals(TaskStatus.RUNNING)
|| task.getStatus().equals(TaskStatus.PENDING))
.collect(Collectors.toList());
if (CollectionUtils.isEmpty(runningTasks)) {
return;
}

runningTasks.forEach(task -> {
try {
task.onFail("task failed because of restart");
task.onFail("Task has been marked as failed because the Master node restarted"
+ " or switched during failover. Previous Master node's state could not be recovered.");
} catch (JobException e) {
log.warn("task failed because of restart, job id is {}, task id is {}",
log.warn("Failed to mark task as failed during Master node failover. "
+ "Job ID: {}, Task ID: {}, Reason: {}",
jobId, task.getTaskId(), e);
}
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import java.io.DataOutput;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.locks.ReentrantReadWriteLock;

Expand Down Expand Up @@ -133,11 +134,16 @@ public List<MTMVTask> createTasks(TaskType taskType, MTMVTaskContext taskContext
task.setTaskType(taskType);
ArrayList<MTMVTask> tasks = new ArrayList<>();
tasks.add(task);
super.initTasks(tasks, taskType);
initTasks(tasks, taskType);
LOG.info("finish create mtmv task, task: {}", task);
return tasks;
}

@Override
public void initTasks(Collection<? extends MTMVTask> tasks, TaskType taskType) {
super.initTasks(tasks, taskType);
}

/**
* if user trigger, return true
* else, only can have 2 task. because every task can refresh all data.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ public void run() throws JobException {
}

private void exec(Set<String> refreshPartitionNames,
Map<TableIf, String> tableWithPartKey)
Map<TableIf, String> tableWithPartKey)
throws Exception {
ConnectContext ctx = MTMVPlanUtil.createMTMVContext(mtmv);
StatementContext statementContext = new StatementContext();
Expand Down Expand Up @@ -287,18 +287,22 @@ protected synchronized void executeCancelLogic() {
after();
}

@Override
public void initialize() throws JobException {
try {
mtmv = MTMVUtil.getMTMV(dbId, mtmvId);
} catch (Exception e) {
LOG.warn("get mtmv failed, dbId: {}, mtmvId: {}", dbId, mtmvId, e);
throw new JobException(e.getMessage(), e);
}
}

@Override
public void before() throws JobException {
if (LOG.isDebugEnabled()) {
LOG.debug("mtmv task before, taskId: {}", super.getTaskId());
}
super.before();
try {
mtmv = MTMVUtil.getMTMV(dbId, mtmvId);
} catch (UserException e) {
LOG.warn("before task failed:", e);
throw new JobException(e);
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,26 +64,22 @@ private static long getNextTaskId() {

@Override
public void onFail() throws JobException {
status = TaskStatus.FAILED;
if (!isCallable()) {
return;
try {
status = TaskStatus.FAILED;
if (!isCallable()) {
return;
}
setFinishTimeMs(System.currentTimeMillis());
Env.getCurrentEnv().getJobManager().getJob(jobId).onTaskFail(this);
} finally {
closeOrReleaseResources();
}
Env.getCurrentEnv().getJobManager().getJob(jobId).onTaskFail(this);
}

@Override
public void onFail(String errMsg) throws JobException {
if (TaskStatus.CANCELED.equals(status)) {
return;
}
status = TaskStatus.FAILED;
setFinishTimeMs(System.currentTimeMillis());
setErrMsg(errMsg);
if (!isCallable()) {
return;
}
Job job = Env.getCurrentEnv().getJobManager().getJob(getJobId());
job.onTaskFail(this);
onFail();
}

private boolean isCallable() {
Expand Down Expand Up @@ -155,6 +151,13 @@ public void cancel() throws JobException {
*/
protected abstract void executeCancelLogic() throws Exception;

@Override
public void initialize() throws JobException {
this.jobId = getJobId();
this.createTimeMs = System.currentTimeMillis();
this.status = TaskStatus.PENDING;
}

@Override
public void before() throws JobException {
status = TaskStatus.RUNNING;
Expand Down
6 changes: 6 additions & 0 deletions fe/fe-core/src/main/java/org/apache/doris/job/task/Task.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,12 @@
*/
public interface Task {

/**
* This method is called immediately after the task is created.
* Implementations can use this method to perform necessary initialization.
*/
void initialize() throws JobException;

/**
* This method is called before the task is executed.
* Implementations can use this method to perform any necessary setup or initialization.
Expand Down

0 comments on commit d23467e

Please sign in to comment.