Skip to content

Commit

Permalink
[Feature](job)support cancel task and fix log invalid
Browse files Browse the repository at this point in the history
Running task can be show and fix cancel fail
  • Loading branch information
CalvinKirs committed Nov 28, 2023
1 parent 4458948 commit 8f3c81b
Show file tree
Hide file tree
Showing 11 changed files with 37 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -116,7 +115,7 @@ public List<T> queryAllTasks() {
if (CollectionUtils.isEmpty(runningTasks)) {
return queryTasks();
}

List<T> historyTasks = queryTasks();
if (CollectionUtils.isNotEmpty(historyTasks)) {
tasks.addAll(historyTasks);
Expand All @@ -127,7 +126,7 @@ public List<T> queryAllTasks() {
tasks.add(task);
}
});
Comparator<T> taskComparator = Comparator.comparingLong(AbstractTask::getCreateTimeMs);
Comparator<T> taskComparator = Comparator.comparingLong(T::getCreateTimeMs).reversed();
tasks.sort(taskComparator);
return tasks;
}
Expand Down Expand Up @@ -156,7 +155,7 @@ public void checkJobParams() {
checkJobParamsInternal();
}

public void updateJobStatus(JobStatus newJobStatus) {
public void updateJobStatus(JobStatus newJobStatus) throws JobException {
if (null == newJobStatus) {
throw new IllegalArgumentException("jobStatus cannot be null");
}
Expand All @@ -172,6 +171,9 @@ public void updateJobStatus(JobStatus newJobStatus) {
throw new IllegalArgumentException(String.format("Can't update job %s status to the %s status",
jobStatus.name(), this.jobStatus.name()));
}
if (JobStatus.PAUSED.equals(newJobStatus)) {
cancelAllTasks();
}
jobStatus = newJobStatus;
}

Expand All @@ -186,19 +188,19 @@ public static AbstractJob readFields(DataInput in) throws IOException {
}

@Override
public void onTaskFail(T task) {
public void onTaskFail(T task) throws JobException {
updateJobStatusIfEnd();
runningTasks.remove(task);
}

@Override
public void onTaskSuccess(T task) {
public void onTaskSuccess(T task) throws JobException {
updateJobStatusIfEnd();
runningTasks.remove(task);

}

private void updateJobStatusIfEnd() {
private void updateJobStatusIfEnd() throws JobException {
JobExecuteType executeType = getJobConfig().getExecuteType();
if (executeType.equals(JobExecuteType.MANUAL)) {
return;
Expand Down
4 changes: 2 additions & 2 deletions fe/fe-core/src/main/java/org/apache/doris/job/base/Job.java
Original file line number Diff line number Diff line change
Expand Up @@ -102,14 +102,14 @@ public interface Job<T extends AbstractTask> {
*
* @param task The failed task.
*/
void onTaskFail(T task);
void onTaskFail(T task) throws JobException;

/**
* Notifies the job when a task execution is successful.
*
* @param task The successful task.
*/
void onTaskSuccess(T task);
void onTaskSuccess(T task) throws JobException;

/**
* get the job's show info, which is used to sql show the job information
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@

package org.apache.doris.job.executor;

import lombok.extern.log4j.Log4j2;
import org.apache.commons.collections.CollectionUtils;
import org.apache.doris.job.base.AbstractJob;
import org.apache.doris.job.common.JobStatus;
import org.apache.doris.job.common.JobType;
Expand All @@ -28,6 +26,8 @@
import org.apache.doris.job.task.AbstractTask;

import com.lmax.disruptor.WorkHandler;
import lombok.extern.log4j.Log4j2;
import org.apache.commons.collections.CollectionUtils;

import java.util.List;
import java.util.Map;
Expand All @@ -50,16 +50,18 @@ public DispatchTaskHandler(Map<JobType, TaskDisruptor<T>> disruptorMap) {
@Override
public void onEvent(TimerJobEvent<T> event) {
try {
log.info("dispatch timer job, job id is {}, job name is {}", event.getJob().getJobId(), event.getJob().getJobName());
log.info("dispatch timer job, job id is {}, job name is {}", event.getJob().getJobId(),
event.getJob().getJobName());
if (null == event.getJob()) {
log.info("job is null,may be job is deleted, ignore");
return;
}
if (event.getJob().isReadyForScheduling() && event.getJob().getJobStatus() == JobStatus.RUNNING) {
List<? extends AbstractTask> tasks = event.getJob().createTasks(TaskType.SCHEDULED);
if(CollectionUtils.isEmpty(tasks)) {
log.warn("job is ready for scheduling, but create task is empty, skip scheduler, job id is {}," +
" job name is {}", event.getJob().getJobId(), event.getJob().getJobName());
if (CollectionUtils.isEmpty(tasks)) {
log.warn("job is ready for scheduling, but create task is empty, skip scheduler,"
+ "job id is {}," + " job name is {}", event.getJob().getJobId(),
event.getJob().getJobName());
return;
}
JobType jobType = event.getJob().getJobType();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,12 @@

package org.apache.doris.job.executor;

import lombok.extern.log4j.Log4j2;
import org.apache.doris.job.base.AbstractJob;
import org.apache.doris.job.disruptor.TaskDisruptor;

import io.netty.util.Timeout;
import io.netty.util.TimerTask;
import jline.internal.Log;
import lombok.extern.slf4j.Slf4j;
import lombok.extern.log4j.Log4j2;

@Log4j2
public class TimerJobSchedulerTask<T extends AbstractJob<?>> implements TimerTask {
Expand All @@ -43,7 +41,7 @@ public void run(Timeout timeout) {
try {
dispatchDisruptor.publishEvent(this.job);
} catch (Exception e) {
Log.warn("dispatch timer job error, task id is {}", this.job.getJobId(), e);
log.warn("dispatch timer job error, task id is {}", this.job.getJobId(), e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ public class InsertJob extends AbstractJob<InsertTask> {

@Override
public List<InsertTask> createTasks(TaskType taskType) {
if(CollectionUtils.isNotEmpty(getRunningTasks())){
if (CollectionUtils.isNotEmpty(getRunningTasks())) {
return new ArrayList<>();
}
InsertTask task = new InsertTask(null, getCurrentDbName(), getExecuteSql(), getCreateUser());
Expand Down Expand Up @@ -176,7 +176,7 @@ public void onTaskFail(InsertTask task) {
}

@Override
public void onTaskSuccess(InsertTask task) {
public void onTaskSuccess(InsertTask task) throws JobException {
super.onTaskSuccess(task);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,9 @@ public InsertTask(String labelName, String currentDb, String sql, UserIdentity u
@Override
public void run() throws JobException {
try {
if (isCanceled.get()) {
return;
}
command.run(ctx, stmtExecutor);
} catch (Exception e) {
throw new JobException(e);
Expand Down Expand Up @@ -187,11 +190,11 @@ public List<String> getShowInfo() {
jobInfo.add(loadJob.getUserInfo().getQualifiedUser());
return jobInfo;
}

// if task not start, load job is null,return pending task show info
private List<String> getPendingTaskShowInfo(){
private List<String> getPendingTaskShowInfo() {
List<String> datas = new ArrayList<>();

datas.add(String.valueOf(getTaskId()));
datas.add(getJobId() + "_" + getTaskId());
datas.add(getStatus().name());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

package org.apache.doris.job.manager;

import lombok.extern.log4j.Log4j2;
import org.apache.doris.catalog.Env;
import org.apache.doris.common.io.Writable;
import org.apache.doris.common.util.LogBuilder;
Expand All @@ -30,7 +29,7 @@
import org.apache.doris.job.scheduler.JobScheduler;
import org.apache.doris.job.task.AbstractTask;

import lombok.extern.slf4j.Slf4j;
import lombok.extern.log4j.Log4j2;
import org.apache.commons.lang3.StringUtils;

import java.io.DataInput;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

import lombok.experimental.UtilityClass;
import lombok.extern.log4j.Log4j2;
import lombok.extern.slf4j.Slf4j;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

package org.apache.doris.job.scheduler;

import lombok.extern.log4j.Log4j2;
import org.apache.doris.common.CustomThreadFactory;
import org.apache.doris.common.util.TimeUtils;
import org.apache.doris.job.base.AbstractJob;
Expand All @@ -31,7 +30,7 @@
import org.apache.doris.job.task.AbstractTask;

import io.netty.util.HashedWheelTimer;
import lombok.extern.slf4j.Slf4j;
import lombok.extern.log4j.Log4j2;
import org.apache.commons.collections.CollectionUtils;

import java.io.Closeable;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

package org.apache.doris.job.task;

import lombok.extern.log4j.Log4j2;
import org.apache.doris.catalog.Env;
import org.apache.doris.job.base.Job;
import org.apache.doris.job.common.TaskStatus;
Expand All @@ -26,7 +25,7 @@

import com.google.gson.annotations.SerializedName;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import lombok.extern.log4j.Log4j2;

@Data
@Log4j2
Expand All @@ -50,7 +49,7 @@ public abstract class AbstractTask implements Task {
private TaskType taskType;

@Override
public void onFail(String msg) {
public void onFail(String msg) throws JobException {
status = TaskStatus.FAILD;
if (!isCallable()) {
return;
Expand Down Expand Up @@ -84,6 +83,9 @@ private boolean isCallable() {

@Override
public void onSuccess() throws JobException {
if (TaskStatus.CANCEL.equals(status)) {
return;
}
status = TaskStatus.SUCCESS;
setFinishTimeMs(System.currentTimeMillis());
if (!isCallable()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ public interface Task {
*
* @param msg The error message associated with the failure.
*/
void onFail(String msg);
void onFail(String msg) throws JobException;

/**
* This method is called when the task executes successfully.
Expand Down

0 comments on commit 8f3c81b

Please sign in to comment.