diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/base/AbstractJob.java b/fe/fe-core/src/main/java/org/apache/doris/job/base/AbstractJob.java index 5fb6eb7945ee76b..23f853709ffeb1e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/base/AbstractJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/base/AbstractJob.java @@ -41,7 +41,6 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Collection; -import java.util.Collections; import java.util.Comparator; import java.util.List; import java.util.Map; @@ -116,7 +115,7 @@ public List queryAllTasks() { if (CollectionUtils.isEmpty(runningTasks)) { return queryTasks(); } - + List historyTasks = queryTasks(); if (CollectionUtils.isNotEmpty(historyTasks)) { tasks.addAll(historyTasks); @@ -127,7 +126,7 @@ public List queryAllTasks() { tasks.add(task); } }); - Comparator taskComparator = Comparator.comparingLong(AbstractTask::getCreateTimeMs); + Comparator taskComparator = Comparator.comparingLong(T::getCreateTimeMs).reversed(); tasks.sort(taskComparator); return tasks; } @@ -156,7 +155,7 @@ public void checkJobParams() { checkJobParamsInternal(); } - public void updateJobStatus(JobStatus newJobStatus) { + public void updateJobStatus(JobStatus newJobStatus) throws JobException { if (null == newJobStatus) { throw new IllegalArgumentException("jobStatus cannot be null"); } @@ -172,6 +171,9 @@ public void updateJobStatus(JobStatus newJobStatus) { throw new IllegalArgumentException(String.format("Can't update job %s status to the %s status", jobStatus.name(), this.jobStatus.name())); } + if (JobStatus.PAUSED.equals(newJobStatus)) { + cancelAllTasks(); + } jobStatus = newJobStatus; } @@ -186,19 +188,19 @@ public static AbstractJob readFields(DataInput in) throws IOException { } @Override - public void onTaskFail(T task) { + public void onTaskFail(T task) throws JobException { updateJobStatusIfEnd(); runningTasks.remove(task); } @Override - public void onTaskSuccess(T task) { + public void onTaskSuccess(T task) throws JobException { updateJobStatusIfEnd(); runningTasks.remove(task); } - private void updateJobStatusIfEnd() { + private void updateJobStatusIfEnd() throws JobException { JobExecuteType executeType = getJobConfig().getExecuteType(); if (executeType.equals(JobExecuteType.MANUAL)) { return; diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/base/Job.java b/fe/fe-core/src/main/java/org/apache/doris/job/base/Job.java index fef447d160f1899..73e172b5b274142 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/base/Job.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/base/Job.java @@ -102,14 +102,14 @@ public interface Job { * * @param task The failed task. */ - void onTaskFail(T task); + void onTaskFail(T task) throws JobException; /** * Notifies the job when a task execution is successful. * * @param task The successful task. */ - void onTaskSuccess(T task); + void onTaskSuccess(T task) throws JobException; /** * get the job's show info, which is used to sql show the job information diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/executor/DispatchTaskHandler.java b/fe/fe-core/src/main/java/org/apache/doris/job/executor/DispatchTaskHandler.java index f508c20de0ebb3e..c9192b5f2b522ab 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/executor/DispatchTaskHandler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/executor/DispatchTaskHandler.java @@ -17,8 +17,6 @@ package org.apache.doris.job.executor; -import lombok.extern.log4j.Log4j2; -import org.apache.commons.collections.CollectionUtils; import org.apache.doris.job.base.AbstractJob; import org.apache.doris.job.common.JobStatus; import org.apache.doris.job.common.JobType; @@ -28,6 +26,8 @@ import org.apache.doris.job.task.AbstractTask; import com.lmax.disruptor.WorkHandler; +import lombok.extern.log4j.Log4j2; +import org.apache.commons.collections.CollectionUtils; import java.util.List; import java.util.Map; @@ -50,16 +50,18 @@ public DispatchTaskHandler(Map> disruptorMap) { @Override public void onEvent(TimerJobEvent event) { try { - log.info("dispatch timer job, job id is {}, job name is {}", event.getJob().getJobId(), event.getJob().getJobName()); + log.info("dispatch timer job, job id is {}, job name is {}", event.getJob().getJobId(), + event.getJob().getJobName()); if (null == event.getJob()) { log.info("job is null,may be job is deleted, ignore"); return; } if (event.getJob().isReadyForScheduling() && event.getJob().getJobStatus() == JobStatus.RUNNING) { List tasks = event.getJob().createTasks(TaskType.SCHEDULED); - if(CollectionUtils.isEmpty(tasks)) { - log.warn("job is ready for scheduling, but create task is empty, skip scheduler, job id is {}," + - " job name is {}", event.getJob().getJobId(), event.getJob().getJobName()); + if (CollectionUtils.isEmpty(tasks)) { + log.warn("job is ready for scheduling, but create task is empty, skip scheduler," + + "job id is {}," + " job name is {}", event.getJob().getJobId(), + event.getJob().getJobName()); return; } JobType jobType = event.getJob().getJobType(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/executor/TimerJobSchedulerTask.java b/fe/fe-core/src/main/java/org/apache/doris/job/executor/TimerJobSchedulerTask.java index d2c740cd55e4055..aab205435b65e7e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/executor/TimerJobSchedulerTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/executor/TimerJobSchedulerTask.java @@ -17,14 +17,12 @@ package org.apache.doris.job.executor; -import lombok.extern.log4j.Log4j2; import org.apache.doris.job.base.AbstractJob; import org.apache.doris.job.disruptor.TaskDisruptor; import io.netty.util.Timeout; import io.netty.util.TimerTask; -import jline.internal.Log; -import lombok.extern.slf4j.Slf4j; +import lombok.extern.log4j.Log4j2; @Log4j2 public class TimerJobSchedulerTask> implements TimerTask { @@ -43,7 +41,7 @@ public void run(Timeout timeout) { try { dispatchDisruptor.publishEvent(this.job); } catch (Exception e) { - Log.warn("dispatch timer job error, task id is {}", this.job.getJobId(), e); + log.warn("dispatch timer job error, task id is {}", this.job.getJobId(), e); } } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertJob.java b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertJob.java index d1a429238799c08..033e31f3311381e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertJob.java @@ -69,7 +69,7 @@ public class InsertJob extends AbstractJob { @Override public List createTasks(TaskType taskType) { - if(CollectionUtils.isNotEmpty(getRunningTasks())){ + if (CollectionUtils.isNotEmpty(getRunningTasks())) { return new ArrayList<>(); } InsertTask task = new InsertTask(null, getCurrentDbName(), getExecuteSql(), getCreateUser()); @@ -176,7 +176,7 @@ public void onTaskFail(InsertTask task) { } @Override - public void onTaskSuccess(InsertTask task) { + public void onTaskSuccess(InsertTask task) throws JobException { super.onTaskSuccess(task); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertTask.java b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertTask.java index 92208932a69be00..6b5294187f0f2b9 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertTask.java @@ -115,6 +115,9 @@ public InsertTask(String labelName, String currentDb, String sql, UserIdentity u @Override public void run() throws JobException { try { + if (isCanceled.get()) { + return; + } command.run(ctx, stmtExecutor); } catch (Exception e) { throw new JobException(e); @@ -187,11 +190,11 @@ public List getShowInfo() { jobInfo.add(loadJob.getUserInfo().getQualifiedUser()); return jobInfo; } - + // if task not start, load job is null,return pending task show info - private List getPendingTaskShowInfo(){ + private List getPendingTaskShowInfo() { List datas = new ArrayList<>(); - + datas.add(String.valueOf(getTaskId())); datas.add(getJobId() + "_" + getTaskId()); datas.add(getStatus().name()); diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/manager/JobManager.java b/fe/fe-core/src/main/java/org/apache/doris/job/manager/JobManager.java index 4182d3acc902c5a..a6142b7cf3932e0 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/manager/JobManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/manager/JobManager.java @@ -17,7 +17,6 @@ package org.apache.doris.job.manager; -import lombok.extern.log4j.Log4j2; import org.apache.doris.catalog.Env; import org.apache.doris.common.io.Writable; import org.apache.doris.common.util.LogBuilder; @@ -30,7 +29,7 @@ import org.apache.doris.job.scheduler.JobScheduler; import org.apache.doris.job.task.AbstractTask; -import lombok.extern.slf4j.Slf4j; +import lombok.extern.log4j.Log4j2; import org.apache.commons.lang3.StringUtils; import java.io.DataInput; diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/manager/TaskTokenManager.java b/fe/fe-core/src/main/java/org/apache/doris/job/manager/TaskTokenManager.java index cd52740ff1b7fe2..877cb306916f5d8 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/manager/TaskTokenManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/manager/TaskTokenManager.java @@ -19,7 +19,6 @@ import lombok.experimental.UtilityClass; import lombok.extern.log4j.Log4j2; -import lombok.extern.slf4j.Slf4j; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/scheduler/JobScheduler.java b/fe/fe-core/src/main/java/org/apache/doris/job/scheduler/JobScheduler.java index f95df788b8df58c..a9dde5901abb51e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/scheduler/JobScheduler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/scheduler/JobScheduler.java @@ -17,7 +17,6 @@ package org.apache.doris.job.scheduler; -import lombok.extern.log4j.Log4j2; import org.apache.doris.common.CustomThreadFactory; import org.apache.doris.common.util.TimeUtils; import org.apache.doris.job.base.AbstractJob; @@ -31,7 +30,7 @@ import org.apache.doris.job.task.AbstractTask; import io.netty.util.HashedWheelTimer; -import lombok.extern.slf4j.Slf4j; +import lombok.extern.log4j.Log4j2; import org.apache.commons.collections.CollectionUtils; import java.io.Closeable; diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/task/AbstractTask.java b/fe/fe-core/src/main/java/org/apache/doris/job/task/AbstractTask.java index 0a04655a5ca8a6d..181cec555e26d09 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/task/AbstractTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/task/AbstractTask.java @@ -17,7 +17,6 @@ package org.apache.doris.job.task; -import lombok.extern.log4j.Log4j2; import org.apache.doris.catalog.Env; import org.apache.doris.job.base.Job; import org.apache.doris.job.common.TaskStatus; @@ -26,7 +25,7 @@ import com.google.gson.annotations.SerializedName; import lombok.Data; -import lombok.extern.slf4j.Slf4j; +import lombok.extern.log4j.Log4j2; @Data @Log4j2 @@ -50,7 +49,7 @@ public abstract class AbstractTask implements Task { private TaskType taskType; @Override - public void onFail(String msg) { + public void onFail(String msg) throws JobException { status = TaskStatus.FAILD; if (!isCallable()) { return; @@ -84,6 +83,9 @@ private boolean isCallable() { @Override public void onSuccess() throws JobException { + if (TaskStatus.CANCEL.equals(status)) { + return; + } status = TaskStatus.SUCCESS; setFinishTimeMs(System.currentTimeMillis()); if (!isCallable()) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/task/Task.java b/fe/fe-core/src/main/java/org/apache/doris/job/task/Task.java index 3f61ce60c700f73..126aa04e40c0c99 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/task/Task.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/task/Task.java @@ -53,7 +53,7 @@ public interface Task { * * @param msg The error message associated with the failure. */ - void onFail(String msg); + void onFail(String msg) throws JobException; /** * This method is called when the task executes successfully.