Skip to content

Commit

Permalink
resolve some review's comment
Browse files Browse the repository at this point in the history
  • Loading branch information
CalvinKirs committed Nov 14, 2023
1 parent 40cd59a commit 7b16240
Show file tree
Hide file tree
Showing 14 changed files with 209 additions and 100 deletions.
38 changes: 37 additions & 1 deletion fe/fe-common/src/main/java/org/apache/doris/common/Config.java
Original file line number Diff line number Diff line change
Expand Up @@ -1570,10 +1570,46 @@ public class Config extends ConfigBase {

@ConfField
public static boolean enable_pipeline_load = false;
/*---------------------- JOB CONFIG START------------------------*/
/**
* The number of threads used to dispatch timer job.
* If we have a lot of timer jobs, we need more threads to dispatch them.
* All timer job will be dispatched to a thread pool, and they will be dispatched to the thread queue of the
* corresponding type of job
* The value should be greater than 0, if it is 0 or <=0, set it to 5
*/
@ConfField
public static int job_dispatch_timer_job_thread_num = 5;

/**
* The number of timer jobs that can be queued.
* if consumer is slow, the queue will be full, and the producer will be blocked.
* if you have a lot of timer jobs, you need to increase this value or increase the number of
* {@code @dispatch_timer_job_thread_num}
* The value should be greater than 0, if it is 0 or <=0, set it to 1024
*/
@ConfField
public static int job_dispatch_timer_job_queue_size = 1024;

/**
* The number of threads used to consume insert tasks.
* if you have a lot of insert jobs,and the average execution frequency is relatively high you need to increase
* this value or increase the number of {@code @job_insert_task_queue_size}
* The value should be greater than 0, if it is 0 or <=0, set it to 5
*/
@ConfField
public static int scheduler_job_task_max_saved_count = 20;
public static int job_insert_task_consumer_thread_num = 5;

/**
* The number of insert tasks that can be queued.
* if consumer is slow, the queue will be full, and the producer will be blocked.
* if you have a lot of timer jobs, you need to increase this value or increase the number of
* {@code @job_insert_task_consumer_thread_num}
* The value should be greater than 0, if it is 0 or <=0, set it to 1024
*/
@ConfField
public static int job_insert_task_queue_size = 1024;
/*---------------------- JOB CONFIG END------------------------*/
/**
* The number of async tasks that can be queued. @See TaskDisruptor
* if consumer is slow, the queue will be full, and the producer will be blocked.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ public void analyze(Analyzer analyzer) throws UserException {
Env.getCurrentInternalCatalog().getDbOrAnalysisException(dbName);
analyzerSqlStmt();
// check its insert stmt,currently only support insert stmt
//todo used InsertIntoCommand if job is InsertJob
InsertJob job = new InsertJob();
JobExecutionConfiguration jobExecutionConfiguration = new JobExecutionConfiguration();
jobExecutionConfiguration.setExecuteType(executeType);
Expand Down Expand Up @@ -141,8 +142,7 @@ public void analyze(Analyzer analyzer) throws UserException {
job.setComment(comment);
job.setCurrentDbName(labelName.getDbName());
job.setJobName(labelName.getLabelName());
job.setComment(comment);
job.setCreateUser(ConnectContext.get().getQualifiedUser());
job.setCreateUser(ConnectContext.get().getCurrentUserIdentity());
job.setJobStatus(JobStatus.RUNNING);
job.checkJobParams();
String originStmt = getOrigStmt().originStmt;
Expand Down
27 changes: 16 additions & 11 deletions fe/fe-core/src/main/java/org/apache/doris/job/base/AbstractJob.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.doris.job.base;

import org.apache.doris.analysis.UserIdentity;
import org.apache.doris.catalog.Env;
import org.apache.doris.common.io.Text;
import org.apache.doris.common.io.Writable;
Expand All @@ -41,31 +42,31 @@
@Data
public abstract class AbstractJob<T extends AbstractTask> implements Job<T>, Writable {

@SerializedName(value = "jobId")
@SerializedName(value = "jid")
private Long jobId;

@SerializedName(value = "jobName")
@SerializedName(value = "jn")
private String jobName;

@SerializedName(value = "jobStatus")
@SerializedName(value = "js")
private JobStatus jobStatus;

@SerializedName(value = "currentDbName")
@SerializedName(value = "cdb")
private String currentDbName;

@SerializedName(value = "comment")
@SerializedName(value = "c")
private String comment;

@SerializedName(value = "jobType")
private String createUser;
@SerializedName(value = "cu")
private UserIdentity createUser;

@SerializedName(value = "jobConfig")
@SerializedName(value = "jc")
private JobExecutionConfiguration jobConfig;

@SerializedName(value = "createTimeMs")
@SerializedName(value = "ctms")
private Long createTimeMs;

@SerializedName(value = "executeSql")
@SerializedName(value = "sql")
String executeSql;

private List<T> runningTasks = new ArrayList<>();
Expand Down Expand Up @@ -98,6 +99,9 @@ public void initTasks(List<T> tasks) {
}

public void checkJobParams() {
if (null == jobId) {
throw new IllegalArgumentException("jobId cannot be null");
}
if (null == jobConfig) {
throw new IllegalArgumentException("jobConfig cannot be null");
}
Expand Down Expand Up @@ -128,6 +132,7 @@ public void updateJobStatus(JobStatus newJobStatus) {
protected abstract void checkJobParamsInternal();

public static AbstractJob readFields(DataInput in) throws IOException {
// todo use RuntimeTypeAdapterFactory of Gson to do the serde
JobType jobType = JobType.valueOf(Text.readString(in));
switch (jobType) {
case INSERT:
Expand Down Expand Up @@ -188,7 +193,7 @@ public List<String> getCommonShowInfo() {
List<String> commonShowInfo = new ArrayList<>();
commonShowInfo.add(String.valueOf(jobId));
commonShowInfo.add(jobName);
commonShowInfo.add(createUser);
commonShowInfo.add(createUser.getQualifiedUser());
commonShowInfo.add(jobConfig.getExecuteType().name());
commonShowInfo.add(jobConfig.convertRecurringStrategyToString());
commonShowInfo.add(jobStatus.name());
Expand Down
75 changes: 46 additions & 29 deletions fe/fe-core/src/main/java/org/apache/doris/job/base/Job.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,78 +26,95 @@
import java.util.List;

/**
* Job is the core of the scheduler module, which is used to store the Job information of the job module.
* We can use the job to uniquely identify a Job.
* The jobName is used to identify the job, which is not unique.
* The jobStatus is used to identify the status of the Job, which is used to control the execution of the
* job.
* The Job interface represents a job in the scheduler module, which stores the information of a job.
* A job can be uniquely identified using the job identifier.
* The job name is used for identification purposes and is not necessarily unique.
* The job status is used to control the execution of the job.
*
* @param <T> The type of task associated with the job, extending AbstractTask.
*/
public interface Job<T extends AbstractTask> {

/**
* Creates a list of tasks of the specified type for this job.
*
* @param taskType The type of tasks to create.
* @return A list of tasks.
*/
List<T> createTasks(TaskType taskType);

/**
* cancel the task by taskId
* Cancels the task with the specified taskId.
*
* @param taskId taskId
* @throws JobException if the task is not in the running state,it's maybe finish,
* it cannot be cancelled,and throw JobException
* @param taskId The ID of the task to cancel.
* @throws JobException If the task is not in the running state, it may have already
* finished and cannot be cancelled.
*/
void cancel(long taskId) throws JobException;

/**
* when start the schedule job, we will call this method
* if the job is not ready for scheduling, we will cancel this one scheduler
* Checks if the job is ready for scheduling.
* This method is called when starting the scheduled job,
* and if the job is not ready for scheduling, the scheduler will cancel it.
*
* @return True if the job is ready for scheduling, false otherwise.
*/
boolean isReadyForScheduling();


/**
* get the job's metadata title, which is used to show the job information
* @return ShowResultSetMetaData job metadata
* Retrieves the metadata for the job, which is used to display job information.
*
* @return The metadata for the job.
*/
ShowResultSetMetaData getJobMetaData();

/**
* get the task metadata title, which is used to show the task information
* eg: taskId, taskStatus, taskType, taskStartTime, taskEndTime, taskProgress
* @return ShowResultSetMetaData task metadata
* Retrieves the metadata for the tasks, which is used to display task information.
* The metadata includes fields such as taskId, taskStatus, taskType, taskStartTime, taskEndTime, and taskProgress.
*
* @return The metadata for the tasks.
*/
ShowResultSetMetaData getTaskMetaData();

/**
* JobType is used to identify the type of the job, which is used to distinguish the different types of jobs.
* @return JobType
* Retrieves the type of the job, which is used to identify different types of jobs.
*
* @return The type of the job.
*/
JobType getJobType();

/**
* Query the task list of this job
* Queries the list of tasks associated with this job.
*
* @return The list of tasks.
*/
List<T> queryTasks();

/**
* cancel this job's all running task
* @throws JobException if running task cancel failed, throw JobException
* Cancels all running tasks of this job.
*
* @throws JobException If cancelling a running task fails.
*/
void cancel() throws JobException;

/**
* When the task executed result is failed, the task will call this method to notify the job
* @param task task
* Notifies the job when a task execution fails.
*
* @param task The failed task.
*/
void onTaskFail(T task);

/**
* When the task executed is success, the task will call this method to notify the job
* @param task task
* Notifies the job when a task execution is successful.
*
* @param task The successful task.
*/
void onTaskSuccess(T task);

/**
* When the task executed is cancel, the task will call this method to notify the job
* @param task task
* Notifies the job when a task execution is cancelled.
*
* @param task The cancelled task.
*/
void onTaskCancel(T task);

}
Original file line number Diff line number Diff line change
Expand Up @@ -20,23 +20,28 @@
import org.apache.doris.common.util.TimeUtils;

import com.google.gson.annotations.SerializedName;
import lombok.Data;
import lombok.Getter;
import lombok.Setter;

import java.util.ArrayList;
import java.util.List;


@Data
public class JobExecutionConfiguration {

@SerializedName(value = "timerDefinition")
@Getter
@Setter
@SerializedName(value = "td")
private TimerDefinition timerDefinition;
@SerializedName(value = "executeType")
@Getter
@Setter
@SerializedName(value = "ec")
private JobExecuteType executeType;

/**
* Maximum number of concurrent tasks, <= 0 means no limit
* if the number of tasks exceeds the limit, the task will be delayed execution
* todo: implement this later, we need to consider concurrency strategies
*/
@SerializedName(value = "maxConcurrentTaskNum")
private Integer maxConcurrentTaskNum;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,14 @@
@Data
public class TimerDefinition {

@SerializedName(value = "interval")
@SerializedName(value = "il")
private Long interval;

@SerializedName(value = "intervalUnit")
@SerializedName(value = "iu")
private IntervalUnit intervalUnit;
@SerializedName(value = "startTimeMs")
@SerializedName(value = "stm")
private Long startTimeMs;
@SerializedName(value = "endTimeMs")
@SerializedName(value = "etm")
private Long endTimeMs;

private Long latestSchedulerTimeMs;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,6 @@

public enum TaskType {

SCHEDULER,
SCHEDULED,
MANUAL;
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,11 @@
package org.apache.doris.job.executor;

import org.apache.doris.job.disruptor.ExecuteTaskEvent;
import org.apache.doris.job.manager.TaskTokenManager;
import org.apache.doris.job.task.AbstractTask;

import com.lmax.disruptor.WorkHandler;
import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.Semaphore;

/**
* DefaultTaskExecutor is an implementation of the TaskExecutor interface.
* if you need to implement your own TaskExecutor, you could refer to this class. and need to register
Expand All @@ -48,16 +45,14 @@ public void onEvent(ExecuteTaskEvent<T> executeTaskEvent) {
log.info("task is canceled, ignore");
return;
}
if (null == executeTaskEvent.getJobConfig().getMaxConcurrentTaskNum()
|| executeTaskEvent.getJobConfig().getMaxConcurrentTaskNum() <= 0) {
try {
task.runTask();
return;
} catch (Exception e) {
log.warn("execute task error, task id is {}", task.getTaskId(), e);
}
try {
task.runTask();
} catch (Exception e) {
//if task.onFail() throw exception, we will catch it here
log.warn("task before error, task id is {}", task.getTaskId(), e);
}
Semaphore semaphore = null;
//todo we need discuss whether we need to use semaphore to control the concurrent task num
/* Semaphore semaphore = null;
// get token
try {
int maxConcurrentTaskNum = executeTaskEvent.getJobConfig().getMaxConcurrentTaskNum();
Expand All @@ -69,7 +64,6 @@ public void onEvent(ExecuteTaskEvent<T> executeTaskEvent) {
} finally {
if (null != semaphore) {
semaphore.release();
}
}
}*/
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ public void onEvent(TimerJobEvent<T> event) {
return;
}
if (event.getJob().isReadyForScheduling() && event.getJob().getJobStatus() == JobStatus.RUNNING) {
List<? extends AbstractTask> tasks = event.getJob().createTasks(TaskType.SCHEDULER);
List<? extends AbstractTask> tasks = event.getJob().createTasks(TaskType.SCHEDULED);
JobType jobType = event.getJob().getJobType();
for (AbstractTask task : tasks) {
disruptorMap.get(jobType).publishEvent(task, event.getJob().getJobConfig());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,9 @@

import lombok.extern.slf4j.Slf4j;


/**
* todo implement this later
*/
@Slf4j
public class InsertTask extends AbstractTask {

Expand Down
Loading

0 comments on commit 7b16240

Please sign in to comment.