From d1e2998cf315e5b478f02bfc86d9c8b410497aef Mon Sep 17 00:00:00 2001 From: Calvin Kirs Date: Fri, 13 Dec 2024 16:26:51 +0800 Subject: [PATCH] restart --- .../apache/doris/job/base/AbstractJob.java | 19 +++++++++++++++---- .../apache/doris/job/manager/JobManager.java | 5 +++-- 2 files changed, 18 insertions(+), 6 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/base/AbstractJob.java b/fe/fe-core/src/main/java/org/apache/doris/job/base/AbstractJob.java index 24e6bbc7fe3b55..a05d781579308c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/base/AbstractJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/base/AbstractJob.java @@ -461,15 +461,26 @@ public void onReplayCreate() throws JobException { public void onReplayEnd(AbstractJob replayJob) throws JobException { log.info(new LogBuilder(LogKey.SCHEDULER_JOB, getJobId()).add("msg", "replay delete scheduler job").build()); } - - private void updateTaskStatusAfterRestart() { + + public void updateTaskStatusAfterRestart() { + List tasks = queryAllTasks(); + if (CollectionUtils.isEmpty(tasks)) { + return; + } + List runningTasks = tasks.stream().filter(task -> task.getStatus().equals(TaskStatus.RUNNING) || task.getStatus().equals(TaskStatus.PENDING)) + .collect(Collectors.toList()); if (CollectionUtils.isEmpty(runningTasks)) { return; } + runningTasks.forEach(task -> { - if (task.getStatus().equals(TaskStatus.RUNNING)) { - task.setStatus(TaskStatus.PENDING); + try { + task.onFail("task failed because of restart"); + } catch (JobException e) { + log.warn("task failed because of restart, job id is {}, task id is {}", + jobId, task.getTaskId(), e); } }); + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/manager/JobManager.java b/fe/fe-core/src/main/java/org/apache/doris/job/manager/JobManager.java index 3ce2c4cc1928c2..8600d017f5b9e0 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/manager/JobManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/manager/JobManager.java @@ -89,14 +89,15 @@ private void writeUnlock() { public void start() { jobScheduler = new JobScheduler(jobMap); + clearTaskStatusWhenFeRestart(); jobScheduler.start(); } - private void clearTaskStatusWhenFeRestart() { + public void clearTaskStatusWhenFeRestart() { List runningJobs = jobMap.values().stream() .filter(job -> job.getJobStatus().equals(JobStatus.RUNNING)).collect(Collectors.toList()); for (T job : runningJobs) { - + job.updateTaskStatusAfterRestart(); } }