From d10cce6038a367a3d75d22e802fd802b7b3c59b9 Mon Sep 17 00:00:00 2001 From: Calvin Kirs Date: Mon, 13 Nov 2023 18:31:10 +0800 Subject: [PATCH] [Improve](Job)Refactor JOB --- .../apache/doris/analysis/ShowJobStmt.java | 2 +- .../apache/doris/job/base/AbstractJob.java | 57 +++++++++++++++---- .../java/org/apache/doris/job/base/Job.java | 13 +++-- .../job/base/JobExecutionConfiguration.java | 5 +- .../apache/doris/job/common/TaskStatus.java | 1 + .../org/apache/doris/job/common/TaskType.java | 5 +- .../doris/job/disruptor/TimerJobEvent.java | 4 +- .../executor/DefaultTaskExecutorHandler.java | 7 ++- .../job/executor/DispatchTaskHandler.java | 8 ++- .../job/extensions/insert/InsertJob.java | 41 +++++++------ .../job/extensions/insert/InsertTask.java | 13 ++--- .../apache/doris/job/manager/JobManager.java | 47 ++++++++++----- .../manager/TaskDisruptorGroupManager.java | 8 +-- .../doris/job/scheduler/JobScheduler.java | 9 +-- .../apache/doris/job/task/AbstractTask.java | 44 +++++++++++--- .../java/org/apache/doris/qe/DdlExecutor.java | 9 +-- .../org/apache/doris/qe/ShowExecutor.java | 30 +++------- 17 files changed, 193 insertions(+), 110 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowJobStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowJobStmt.java index 85dd5c7f658d0f..dc4e96e05f01bf 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowJobStmt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowJobStmt.java @@ -50,7 +50,7 @@ public class ShowJobStmt extends ShowStmt { .add("ExecuteType") .add("RecurringStrategy") .add("Status") - .add("lastExecuteTaskStatus") + .add("ExecuteSql") .add("CreateTime") .add("Comment") .build(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/base/AbstractJob.java b/fe/fe-core/src/main/java/org/apache/doris/job/base/AbstractJob.java index 8c6dee38cb8439..77f847fc8423a4 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/base/AbstractJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/base/AbstractJob.java @@ -20,8 +20,10 @@ import org.apache.doris.catalog.Env; import org.apache.doris.common.io.Text; import org.apache.doris.common.io.Writable; +import org.apache.doris.common.util.TimeUtils; import org.apache.doris.job.common.JobStatus; import org.apache.doris.job.common.JobType; +import org.apache.doris.job.common.TaskStatus; import org.apache.doris.job.exception.JobException; import org.apache.doris.job.extensions.insert.InsertJob; import org.apache.doris.job.task.AbstractTask; @@ -63,7 +65,10 @@ public abstract class AbstractJob implements Job, Wri @SerializedName(value = "createTimeMs") private Long createTimeMs; - private List runningTasks = new ArrayList<>(); + @SerializedName(value = "executeSql") + String executeSql; + + private List runningTasks = new ArrayList<>(); @Override public void cancel() throws JobException { @@ -71,6 +76,16 @@ public void cancel() throws JobException { return; } runningTasks.forEach(Task::cancel); + + } + + public void initTasks(List tasks) { + tasks.forEach(task -> { + task.setJobId(jobId); + task.setTaskId(Env.getCurrentEnv().getNextId()); + task.setCreateTimeMs(System.currentTimeMillis()); + task.setStatus(TaskStatus.PENDING); + }); } public void checkJobParams() { @@ -89,17 +104,18 @@ public void updateJobStatus(JobStatus newJobStatus) { throw new IllegalArgumentException(String.format("Can't update job %s status to the %s status", jobStatus.name(), this.jobStatus.name())); } - // check other status - } - - public void resumeJob() { - if (jobStatus != JobStatus.PAUSED) { - throw new IllegalArgumentException(String.format("Can't resume job %s status to the %s status", + if (newJobStatus.equals(JobStatus.RUNNING) && !jobStatus.equals(JobStatus.PAUSED)) { + throw new IllegalArgumentException(String.format("Can't update job %s status to the %s status", jobStatus.name(), this.jobStatus.name())); } - jobStatus = JobStatus.RUNNING; + if (newJobStatus.equals(JobStatus.STOPPED) && !jobStatus.equals(JobStatus.RUNNING)) { + throw new IllegalArgumentException(String.format("Can't update job %s status to the %s status", + jobStatus.name(), this.jobStatus.name())); + } + jobStatus = newJobStatus; } + protected abstract void checkJobParamsInternal(); public static AbstractJob readFields(DataInput in) throws IOException { @@ -117,13 +133,14 @@ public static AbstractJob readFields(DataInput in) throws IOException { } @Override - public void onTaskFail(long taskId) { + public void onTaskFail(T task) { updateJobStatusIfEnd(); } @Override - public void onTaskSuccess(long taskId) { + public void onTaskSuccess(T task) { updateJobStatusIfEnd(); + runningTasks.remove(task); } @@ -139,8 +156,10 @@ private void updateJobStatusIfEnd() { Env.getCurrentEnv().getJobManager().getJob(jobId).updateJobStatus(jobStatus); break; case RECURRING: - if (null != getJobConfig().getTimerDefinition().getEndTimeMs() - && getJobConfig().getTimerDefinition().getEndTimeMs() < System.currentTimeMillis()) { + TimerDefinition timerDefinition = getJobConfig().getTimerDefinition(); + if (null != timerDefinition.getEndTimeMs() + && timerDefinition.getEndTimeMs() < System.currentTimeMillis() + + timerDefinition.getIntervalUnit().getIntervalMs(timerDefinition.getInterval())) { jobStatus = JobStatus.FINISHED; Env.getCurrentEnv().getJobManager().getJob(jobId).updateJobStatus(jobStatus); } @@ -149,4 +168,18 @@ && getJobConfig().getTimerDefinition().getEndTimeMs() < System.currentTimeMillis break; } } + + public List getCommonShowInfo() { + List commonShowInfo = new ArrayList<>(); + commonShowInfo.add(String.valueOf(jobId)); + commonShowInfo.add(jobName); + commonShowInfo.add(createUser); + commonShowInfo.add(jobConfig.getExecuteType().name()); + commonShowInfo.add(jobConfig.convertRecurringStrategyToString()); + commonShowInfo.add(jobStatus.name()); + commonShowInfo.add(executeSql); + commonShowInfo.add(TimeUtils.longToTimeString(createTimeMs)); + commonShowInfo.add(comment); + return commonShowInfo; + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/base/Job.java b/fe/fe-core/src/main/java/org/apache/doris/job/base/Job.java index 7895eb8d8b16d4..36b2d417384176 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/base/Job.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/base/Job.java @@ -18,6 +18,7 @@ package org.apache.doris.job.base; import org.apache.doris.job.common.JobType; +import org.apache.doris.job.common.TaskType; import org.apache.doris.job.exception.JobException; import org.apache.doris.job.task.AbstractTask; import org.apache.doris.qe.ShowResultSetMetaData; @@ -33,9 +34,9 @@ */ public interface Job { - List createTasks(); + List createTasks(TaskType taskType); - void cancel() throws JobException; + void cancel(T task) throws JobException; boolean isReadyForScheduling(); @@ -48,12 +49,12 @@ public interface Job { List queryTasks(); - void onTaskFail(long taskId); + void cancel() throws JobException; - void onTaskSuccess(long taskId); + void onTaskFail(T task); - void onTaskCancel(long taskId); + void onTaskSuccess(T task); - void afterTaskRun(long taskId); + void onTaskCancel(T task); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/base/JobExecutionConfiguration.java b/fe/fe-core/src/main/java/org/apache/doris/job/base/JobExecutionConfiguration.java index 5879cb53e08bc3..8422a632185f04 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/base/JobExecutionConfiguration.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/base/JobExecutionConfiguration.java @@ -166,7 +166,8 @@ private List getExecutionDelaySeconds(long windowStartTimeMs, long windowE // Calculate the trigger time list for (long triggerTime = firstTriggerTime; triggerTime <= windowEndTimeMs; triggerTime += intervalMs) { - if (triggerTime >= currentTimeMs) { + if (triggerTime >= currentTimeMs && (null == timerDefinition.getEndTimeMs() + || triggerTime < timerDefinition.getEndTimeMs())) { timestamps.add(queryDelayTimeSecond(currentTimeMs, triggerTime)); } } @@ -174,7 +175,7 @@ private List getExecutionDelaySeconds(long windowStartTimeMs, long windowE return timestamps; } - private String convertRecurringStrategyToString() { + public String convertRecurringStrategyToString() { switch (executeType) { case ONE_TIME: return "AT " + TimeUtils.longToTimeString(timerDefinition.getStartTimeMs()); diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/common/TaskStatus.java b/fe/fe-core/src/main/java/org/apache/doris/job/common/TaskStatus.java index fefdd1eec26e0e..b4040d31e08c31 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/common/TaskStatus.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/common/TaskStatus.java @@ -18,6 +18,7 @@ package org.apache.doris.job.common; public enum TaskStatus { + PENDING, CANCEL, RUNNING, SUCCESS, diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/common/TaskType.java b/fe/fe-core/src/main/java/org/apache/doris/job/common/TaskType.java index 0aab448674ad00..3e37b99004c7a2 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/common/TaskType.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/common/TaskType.java @@ -19,7 +19,6 @@ public enum TaskType { - SCHDULER, - MANUAL, - INSTANT, + SCHEDULER, + MANUAL; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/disruptor/TimerJobEvent.java b/fe/fe-core/src/main/java/org/apache/doris/job/disruptor/TimerJobEvent.java index 65654c225feafb..218fefd04199f3 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/disruptor/TimerJobEvent.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/disruptor/TimerJobEvent.java @@ -23,12 +23,12 @@ import lombok.Data; @Data -public class TimerJobEvent { +public class TimerJobEvent> { private T job; - public static EventFactory> factory() { + public static > EventFactory> factory() { return TimerJobEvent::new; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/executor/DefaultTaskExecutorHandler.java b/fe/fe-core/src/main/java/org/apache/doris/job/executor/DefaultTaskExecutorHandler.java index f0e0726912ebad..103a11fcb7c1bc 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/executor/DefaultTaskExecutorHandler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/executor/DefaultTaskExecutorHandler.java @@ -44,6 +44,10 @@ public void onEvent(ExecuteTaskEvent executeTaskEvent) { log.warn("task is null, ignore,maybe task has been canceled"); return; } + if (task.isCancelled()) { + log.info("task is canceled, ignore"); + return; + } if (null == executeTaskEvent.getJobConfig().getMaxConcurrentTaskNum() || executeTaskEvent.getJobConfig().getMaxConcurrentTaskNum() <= 0) { try { @@ -51,13 +55,12 @@ public void onEvent(ExecuteTaskEvent executeTaskEvent) { return; } catch (Exception e) { log.warn("execute task error, task id is {}", task.getTaskId(), e); - } } - int maxConcurrentTaskNum = executeTaskEvent.getJobConfig().getMaxConcurrentTaskNum(); Semaphore semaphore = null; // get token try { + int maxConcurrentTaskNum = executeTaskEvent.getJobConfig().getMaxConcurrentTaskNum(); semaphore = TaskTokenManager.tryAcquire(task.getJobId(), maxConcurrentTaskNum); task.runTask(); } catch (Exception e) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/executor/DispatchTaskHandler.java b/fe/fe-core/src/main/java/org/apache/doris/job/executor/DispatchTaskHandler.java index 75611c924ae5d3..81fa08f00c48ee 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/executor/DispatchTaskHandler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/executor/DispatchTaskHandler.java @@ -18,7 +18,9 @@ package org.apache.doris.job.executor; import org.apache.doris.job.base.AbstractJob; +import org.apache.doris.job.common.JobStatus; import org.apache.doris.job.common.JobType; +import org.apache.doris.job.common.TaskType; import org.apache.doris.job.disruptor.TaskDisruptor; import org.apache.doris.job.disruptor.TimerJobEvent; import org.apache.doris.job.task.AbstractTask; @@ -41,14 +43,14 @@ public DispatchTaskHandler(Map> disruptorMap) { @Override - public void onEvent(TimerJobEvent event) throws Exception { + public void onEvent(TimerJobEvent event) { try { if (null == event.getJob()) { log.info("job is null,may be job is deleted, ignore"); return; } - if (event.getJob().isReadyForScheduling()) { - List tasks = event.getJob().createTasks(); + if (event.getJob().isReadyForScheduling() && event.getJob().getJobStatus() == JobStatus.RUNNING) { + List tasks = event.getJob().createTasks(TaskType.SCHEDULER); JobType jobType = event.getJob().getJobType(); for (AbstractTask task : tasks) { disruptorMap.get(jobType).publishEvent(task, event.getJob().getJobConfig()); diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertJob.java b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertJob.java index b11257e5670d48..47964f0474007b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertJob.java @@ -21,6 +21,7 @@ import org.apache.doris.common.io.Text; import org.apache.doris.job.base.AbstractJob; import org.apache.doris.job.common.JobType; +import org.apache.doris.job.common.TaskType; import org.apache.doris.job.exception.JobException; import org.apache.doris.persist.gson.GsonUtils; import org.apache.doris.qe.ShowResultSetMetaData; @@ -31,29 +32,38 @@ import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; +import java.util.ArrayList; import java.util.List; @Data -public class InsertJob extends AbstractJob { +public class InsertJob extends AbstractJob { @SerializedName(value = "labelPrefix") String labelPrefix; - @SerializedName(value = "executeSql") - String executeSql; + @Override - public List createTasks() { + public List createTasks(TaskType taskType) { InsertTask task = new InsertTask(null, null, null, null, null); task.setJobId(getJobId()); + task.setTaskType(taskType); task.setTaskId(Env.getCurrentEnv().getNextId()); - getRunningTasks().add(task); - return getRunningTasks(); + ArrayList tasks = new ArrayList<>(); + tasks.add(task); + super.initTasks(tasks); + getRunningTasks().addAll(tasks); + return tasks; + } + + @Override + public void cancel(InsertTask task) throws JobException { + super.cancel(); } @Override public void cancel() throws JobException { - + super.cancel(); } @Override @@ -72,7 +82,7 @@ public static InsertJob readFields(DataInput in) throws IOException { } @Override - public List queryTasks() { + public List queryTasks() { return null; } @@ -92,24 +102,21 @@ public ShowResultSetMetaData getTaskMetaData() { } @Override - public void onTaskFail(long taskId) { - + public void onTaskFail(InsertTask task) { + getRunningTasks().remove(task); } @Override - public void onTaskSuccess(long taskId) { - + public void onTaskSuccess(InsertTask task) { + getRunningTasks().remove(task); } @Override - public void onTaskCancel(long taskId) { + public void onTaskCancel(InsertTask task) { + getRunningTasks().remove(task); } - @Override - public void afterTaskRun(long taskId) { - - } @Override public void write(DataOutput out) throws IOException { diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertTask.java b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertTask.java index 8fec85e8bba376..08a74994fc1541 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertTask.java @@ -17,9 +17,7 @@ package org.apache.doris.job.extensions.insert; -import org.apache.doris.catalog.Env; import org.apache.doris.common.util.TimeUtils; -import org.apache.doris.job.base.Job; import org.apache.doris.job.task.AbstractTask; import org.apache.doris.load.FailMsg; import org.apache.doris.load.loadv2.LoadJob; @@ -42,7 +40,7 @@ public class InsertTask extends AbstractTask { @Override public void before() { - + super.before(); } public InsertTask(String labelName, InsertIntoTableCommand command, LoadJob.LoadStatistic statistic, @@ -62,20 +60,17 @@ public void run() { @Override public void onFail() { - Job job = Env.getCurrentEnv().getJobManager().getJob(getJobId()); - job.onTaskFail(getTaskId()); + super.onFail(); } @Override public void onSuccess() { - Job job = Env.getCurrentEnv().getJobManager().getJob(getJobId()); - job.onTaskSuccess(getTaskId()); + super.onSuccess(); } @Override public void cancel() { - Job job = Env.getCurrentEnv().getJobManager().getJob(getJobId()); - job.onTaskCancel(getTaskId()); + super.cancel(); } @Override diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/manager/JobManager.java b/fe/fe-core/src/main/java/org/apache/doris/job/manager/JobManager.java index 28b99b809bc589..22d0ee8adb8115 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/manager/JobManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/manager/JobManager.java @@ -29,12 +29,14 @@ import org.apache.doris.job.task.AbstractTask; import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.StringUtils; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import java.util.List; import java.util.concurrent.ConcurrentHashMap; +import java.util.stream.Collectors; @Slf4j public class JobManager> implements Writable { @@ -51,7 +53,7 @@ public void start() { public Long registerJob(T job) throws JobException { //job.checkJobParams(); - checkJobNameExist(job.getJobName(), job.getJobType()); + checkJobNameExist(job.getJobName(), job.getJobType(), job.getCurrentDbName()); long id = Env.getCurrentEnv().getNextId(); job.setJobId(id); Env.getCurrentEnv().getEditLog().logCreateJob(job); @@ -63,8 +65,9 @@ public Long registerJob(T job) throws JobException { } - private void checkJobNameExist(String jobName, JobType type) throws JobException { - if (jobMap.values().stream().anyMatch(a -> a.getJobName().equals(jobName) && a.getJobType().equals(type))) { + private void checkJobNameExist(String jobName, JobType type, String currentDbName) throws JobException { + if (jobMap.values().stream().anyMatch(a -> a.getJobName().equals(jobName) && a.getJobType().equals(type) + && (null == a.getCurrentDbName() || a.getCurrentDbName().equals(currentDbName)))) { throw new JobException("job name exist,jobName:" + jobName); } } @@ -80,7 +83,7 @@ public void unregisterJob(Long jobId) throws JobException { public void unregisterJob(String currentDbName, String jobName) throws JobException { for (T a : jobMap.values()) { if (a.getJobName().equals(jobName) && (null != a.getCurrentDbName() - && a.getCurrentDbName().equals(currentDbName))) { + && a.getCurrentDbName().equals(currentDbName)) && a.getJobType().equals(JobType.INSERT)) { try { unregisterJob(a.getJobId()); } catch (JobException e) { @@ -97,11 +100,17 @@ public void alterJobStatus(Long jobId, JobStatus status) throws JobException { Env.getCurrentEnv().getEditLog().logUpdateJob(jobMap.get(jobId)); } - public void resumeJob(Long jobId) throws JobException { - checkJobExist(jobId); - replayUpdateJob(jobMap.get(jobId)); - jobMap.get(jobId).resumeJob(); - jobScheduler.scheduleOneJob(jobMap.get(jobId)); + public void alterJobStatus(String currentDbName, String jobName, JobStatus jobStatus) throws JobException { + for (T a : jobMap.values()) { + if (a.getJobName().equals(jobName) && (null != a.getCurrentDbName() + && a.getCurrentDbName().equals(currentDbName)) && JobType.INSERT.equals(a.getJobType())) { + try { + alterJobStatus(a.getJobId(), jobStatus); + } catch (JobException e) { + throw new JobException("unregister job error,jobName:" + jobName); + } + } + } } private void checkJobExist(Long jobId) throws JobException { @@ -115,6 +124,21 @@ public List queryJobs(JobType type) { .collect(java.util.stream.Collectors.toList()); } + public List queryJobs(String currentDb, String jobName) { + //only query insert job,we just provide insert job + return jobMap.values().stream().filter(a -> checkItsMatch(currentDb, jobName, a)) + .collect(Collectors.toList()); + } + + private boolean checkItsMatch(String currentDb, String jobName, T job) { + if (StringUtils.isBlank(jobName)) { + return job.getJobType().equals(JobType.INSERT) && (null != job.getCurrentDbName() + && job.getCurrentDbName().equals(currentDb)); + } + return job.getJobType().equals(JobType.INSERT) && (null != job.getCurrentDbName() + && job.getCurrentDbName().equals(currentDb)) && job.getJobName().equals(jobName); + } + public List queryTasks(Long jobId) throws JobException { checkJobExist(jobId); return jobMap.get(jobId).queryTasks(); @@ -125,7 +149,6 @@ public void replayCreateJob(T job) { return; } jobMap.putIfAbsent(job.getJobId(), job); - //jobScheduler.scheduleOneJob(job); log.info(new LogBuilder(LogKey.SCHEDULER_JOB, job.getJobId()) .add("msg", "replay create scheduler job").build()); } @@ -135,9 +158,6 @@ public void replayCreateJob(T job) { **/ public void replayUpdateJob(T job) { jobMap.put(job.getJobId(), job); - if (JobStatus.RUNNING.equals(job.getJobStatus())) { - //jobScheduler.scheduleOneJob(job); - } log.info(new LogBuilder(LogKey.SCHEDULER_JOB, job.getJobId()) .add("msg", "replay update scheduler job").build()); } @@ -189,4 +209,5 @@ public void readFields(DataInput in) throws IOException { public T getJob(Long jobId) { return jobMap.get(jobId); } + } diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/manager/TaskDisruptorGroupManager.java b/fe/fe-core/src/main/java/org/apache/doris/job/manager/TaskDisruptorGroupManager.java index 5c80356513f5c1..15939fd2f91c75 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/manager/TaskDisruptorGroupManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/manager/TaskDisruptorGroupManager.java @@ -54,14 +54,14 @@ public void init() { } private void registerDispatchDisruptor() { - EventFactory> dispatchEventFactory = TimerJobEvent.factory(); + EventFactory>> dispatchEventFactory = TimerJobEvent.factory(); ThreadFactory dispatchThreadFactory = new CustomThreadFactory("dispatch-task"); WorkHandler[] dispatchTaskExecutorHandlers = new WorkHandler[5]; for (int i = 0; i < 5; i++) { dispatchTaskExecutorHandlers[i] = new DispatchTaskHandler(this.disruptorMap); } - EventTranslatorVararg> eventTranslator = - (event, sequence, args) -> event.setJob((AbstractJob) args[0]); + EventTranslatorVararg>> eventTranslator = + (event, sequence, args) -> event.setJob((AbstractJob) args[0]); this.dispatchDisruptor = new TaskDisruptor<>(dispatchEventFactory, 1024, dispatchThreadFactory, new BlockingWaitStrategy(), dispatchTaskExecutorHandlers, eventTranslator); } @@ -83,7 +83,7 @@ private void registerInsertDisruptor() { disruptorMap.put(JobType.INSERT, insertDisruptor); } - public void dispatchTimerJob(AbstractJob job) { + public void dispatchTimerJob(AbstractJob job) { dispatchDisruptor.publishEvent(job); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/scheduler/JobScheduler.java b/fe/fe-core/src/main/java/org/apache/doris/job/scheduler/JobScheduler.java index 9b35d9e5f0dcdf..f7fdd943767a85 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/scheduler/JobScheduler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/scheduler/JobScheduler.java @@ -22,6 +22,7 @@ import org.apache.doris.job.base.AbstractJob; import org.apache.doris.job.base.JobExecuteType; import org.apache.doris.job.common.JobStatus; +import org.apache.doris.job.common.TaskType; import org.apache.doris.job.disruptor.TaskDisruptor; import org.apache.doris.job.executor.TimerJobSchedulerTask; import org.apache.doris.job.manager.TaskDisruptorGroupManager; @@ -51,7 +52,7 @@ public class JobScheduler> implements Closeable { private long latestBatchSchedulerTimerTaskTimeMs = 0L; - private static final long BATCH_SCHEDULER_INTERVAL_SECONDS = 600; + private static final long BATCH_SCHEDULER_INTERVAL_SECONDS = 60; private final Map jobMap; @@ -71,7 +72,7 @@ public void start() { taskDisruptorGroupManager = new TaskDisruptorGroupManager(); taskDisruptorGroupManager.init(); this.timerJobDisruptor = taskDisruptorGroupManager.getDispatchDisruptor(); - latestBatchSchedulerTimerTaskTimeMs = System.currentTimeMillis() + BATCH_SCHEDULER_INTERVAL_MILLI_SECONDS; + latestBatchSchedulerTimerTaskTimeMs = System.currentTimeMillis(); batchSchedulerTimerJob(); cycleSystemSchedulerTasks(); } @@ -130,7 +131,7 @@ private void cycleTimerJobScheduler(T job) { private void schedulerImmediateJob(T job) { - List tasks = job.createTasks(); + List tasks = job.createTasks(TaskType.MANUAL); if (CollectionUtils.isEmpty(tasks)) { return; } @@ -143,7 +144,7 @@ private void triggerJob(T job) { if (!job.isReadyForScheduling()) { return; } - List tasks = job.createTasks(); + List tasks = job.createTasks(TaskType.MANUAL); tasks.forEach(task -> taskDisruptorGroupManager.dispatchInstantTask(task, job.getJobType(), job.getJobConfig())); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/task/AbstractTask.java b/fe/fe-core/src/main/java/org/apache/doris/job/task/AbstractTask.java index 9283114b9f53d8..7fc666339af895 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/task/AbstractTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/task/AbstractTask.java @@ -45,32 +45,59 @@ public abstract class AbstractTask implements Task { @Override public void onFail(String msg) { - Env.getCurrentEnv().getJobManager().getJob(jobId).onTaskFail(taskId); + if (!isCallable()) { + return; + } + Env.getCurrentEnv().getJobManager().getJob(jobId).onTaskFail(this); status = TaskStatus.FAILD; } @Override public void onFail() { - Job job = Env.getCurrentEnv().getJobManager().getJob(getJobId()); - job.onTaskFail(getTaskId()); setFinishTimeMs(System.currentTimeMillis()); + if (!isCallable()) { + return; + } + Job job = Env.getCurrentEnv().getJobManager().getJob(getJobId()); + job.onTaskFail(this); + } + + private boolean isCallable() { + if (status.equals(TaskStatus.CANCEL)) { + return false; + } + if (null != Env.getCurrentEnv().getJobManager().getJob(jobId)) { + return true; + } + return false; } @Override public void onSuccess() { - Job job = Env.getCurrentEnv().getJobManager().getJob(getJobId()); - job.onTaskSuccess(getTaskId()); status = TaskStatus.SUCCESS; setFinishTimeMs(System.currentTimeMillis()); + if (!isCallable()) { + return; + } + Job job = Env.getCurrentEnv().getJobManager().getJob(getJobId()); + if (null == job) { + log.info("job is null, job id is {}", jobId); + return; + } + job.onTaskSuccess(this); } @Override public void cancel() { - Job job = Env.getCurrentEnv().getJobManager().getJob(getJobId()); - job.onTaskCancel(getTaskId()); status = TaskStatus.CANCEL; } + @Override + public void before() { + status = TaskStatus.RUNNING; + setStartTimeMs(System.currentTimeMillis()); + } + public void runTask() { try { before(); @@ -82,5 +109,8 @@ public void runTask() { } } + public boolean isCancelled() { + return status.equals(TaskStatus.CANCEL); + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/DdlExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/DdlExecutor.java index 29734fe4db2018..2013216742b655 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/DdlExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/DdlExecutor.java @@ -121,6 +121,7 @@ import org.apache.doris.catalog.Env; import org.apache.doris.common.DdlException; import org.apache.doris.common.util.ProfileManager; +import org.apache.doris.job.common.JobStatus; import org.apache.doris.load.sync.SyncJobManager; import org.apache.doris.persist.CleanQueryStatsInfo; import org.apache.doris.statistics.StatisticsRepository; @@ -188,11 +189,11 @@ public static void execute(Env env, DdlStmt ddlStmt) throws Exception { StopJobStmt stmt = (StopJobStmt) ddlStmt; env.getJobManager().unregisterJob(stmt.getDbFullName(), stmt.getName()); } else if (ddlStmt instanceof PauseJobStmt) { - //PauseJobStmt stmt = (PauseJobStmt) ddlStmt; - //env.getJobManager().pauseJob(stmt.getDbFullName(), stmt.getName(), JobCategory.SQL); + PauseJobStmt stmt = (PauseJobStmt) ddlStmt; + env.getJobManager().alterJobStatus(stmt.getDbFullName(), stmt.getName(), JobStatus.PAUSED); } else if (ddlStmt instanceof ResumeJobStmt) { - //ResumeJobStmt stmt = (ResumeJobStmt) ddlStmt; - //env.getJobRegister().resumeJob(stmt.getDbFullName(), stmt.getName(), JobCategory.SQL); + ResumeJobStmt stmt = (ResumeJobStmt) ddlStmt; + env.getJobManager().alterJobStatus(stmt.getDbFullName(), stmt.getName(), JobStatus.RUNNING); } else if (ddlStmt instanceof CreateUserStmt) { CreateUserStmt stmt = (CreateUserStmt) ddlStmt; env.getAuth().createUser(stmt); diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ShowExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/ShowExecutor.java index bedb5174671d68..1ddd91083f1495 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/ShowExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ShowExecutor.java @@ -425,7 +425,7 @@ public ShowResultSet execute() throws AnalysisException { } else if (stmt instanceof ShowAnalyzeTaskStatus) { handleShowAnalyzeTaskStatus(); } else if (stmt instanceof ShowJobStmt) { - //handleShowJob(); + handleShowJob(); } else if (stmt instanceof ShowJobTaskStmt) { //handleShowJobTask(); } else if (stmt instanceof ShowConvertLSCStmt) { @@ -1439,19 +1439,13 @@ private void handleShowLoadWarningsFromURL(ShowLoadWarningsStmt showWarningsStmt resultSet = new ShowResultSet(showJobTaskStmt.getMetaData(), rows); }*/ - /*private void handleShowJob() throws AnalysisException { + private void handleShowJob() throws AnalysisException { ShowJobStmt showJobStmt = (ShowJobStmt) stmt; List> rows = Lists.newArrayList(); // if job exists - List jobList; - PatternMatcher matcher = null; - if (showJobStmt.getPattern() != null) { - matcher = PatternMatcherWrapper.createMysqlPattern(showJobStmt.getPattern(), - CaseSensibility.JOB.getCaseSensibility()); - } - jobList = Env.getCurrentEnv().getJobRegister() - .getJobs(showJobStmt.getDbFullName(), showJobStmt.getName(), showJobStmt.getJobCategory(), - matcher); + List jobList; + jobList = Env.getCurrentEnv().getJobManager() + .queryJobs(showJobStmt.getDbFullName(), showJobStmt.getName()); if (jobList.isEmpty()) { resultSet = new ShowResultSet(showJobStmt.getMetaData(), rows); @@ -1459,18 +1453,12 @@ private void handleShowLoadWarningsFromURL(ShowLoadWarningsStmt showWarningsStmt } // check auth - for (Job job : jobList) { - if (!Env.getCurrentEnv().getAccessManager() - .checkDbPriv(ConnectContext.get(), job.getDbName(), PrivPredicate.SHOW)) { - ErrorReport.reportAnalysisException(ErrorCode.ERR_DBACCESS_DENIED_ERROR, - ConnectContext.get().getQualifiedUser(), job.getDbName()); - } - } - for (Job job : jobList) { - rows.add(job.getShowInfo()); + + for (org.apache.doris.job.base.AbstractJob job : jobList) { + rows.add(job.getCommonShowInfo()); } resultSet = new ShowResultSet(showJobStmt.getMetaData(), rows); - }*/ + } private void handleShowRoutineLoad() throws AnalysisException { ShowRoutineLoadStmt showRoutineLoadStmt = (ShowRoutineLoadStmt) stmt;