Skip to content

Commit

Permalink
[Improve](Job)Refactor JOB
Browse files Browse the repository at this point in the history
  • Loading branch information
CalvinKirs committed Nov 13, 2023
1 parent 47f98c4 commit 4fb7457
Show file tree
Hide file tree
Showing 15 changed files with 222 additions and 132 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.doris.job.base.TimerDefinition;
import org.apache.doris.job.common.IntervalUnit;
import org.apache.doris.job.common.JobStatus;
import org.apache.doris.job.extensions.insert.InsertJob;
import org.apache.doris.mysql.privilege.PrivPredicate;
import org.apache.doris.qe.ConnectContext;

Expand Down Expand Up @@ -61,12 +62,11 @@
@Slf4j
public class CreateJobStmt extends DdlStmt {


@Getter
private StatementBase stmt;
private StatementBase doStmt;

@Getter
private AbstractJob job;
private AbstractJob<?> jobInstance;

private final LabelName labelName;

Expand All @@ -81,6 +81,7 @@ public class CreateJobStmt extends DdlStmt {
private final String endsTimeStamp;

private final String comment;
private JobExecuteType executeType;

private String timezone = TimeUtils.DEFAULT_TIME_ZONE;

Expand All @@ -100,10 +101,23 @@ public CreateJobStmt(LabelName labelName, JobExecuteType executeType, String onc
this.startsTimeStamp = startsTimeStamp;
this.endsTimeStamp = endsTimeStamp;
this.comment = comment;
this.stmt = doStmt;
this.doStmt = doStmt;
this.executeType = executeType;
}

@Override
public void analyze(Analyzer analyzer) throws UserException {
super.analyze(analyzer);
checkAuth();
labelName.analyze(analyzer);
String dbName = labelName.getDbName();
Env.getCurrentInternalCatalog().getDbOrAnalysisException(dbName);
analyzerSqlStmt();
// check its insert stmt,currently only support insert stmt
InsertJob job = new InsertJob();
JobExecutionConfiguration jobExecutionConfiguration = new JobExecutionConfiguration();
jobExecutionConfiguration.setExecuteType(executeType);

job.setCreateTimeMs(System.currentTimeMillis());
TimerDefinition timerDefinition = new TimerDefinition();

if (null != onceJobStartTimestamp) {
Expand All @@ -122,34 +136,19 @@ public CreateJobStmt(LabelName labelName, JobExecuteType executeType, String onc
timerDefinition.setEndTimeMs(TimeUtils.timeStringToLong(endsTimeStamp));
}
jobExecutionConfiguration.setTimerDefinition(timerDefinition);
job.setComment(comment);
}

private String parseExecuteSql(String sql) throws AnalysisException {
sql = sql.toLowerCase();
int executeSqlIndex = sql.indexOf(" do ");
String executeSql = sql.substring(executeSqlIndex + 4).trim();
if (StringUtils.isBlank(executeSql)) {
throw new AnalysisException("execute sql has invalid format");
}
return executeSql;
}
job.setJobConfig(jobExecutionConfiguration);

@Override
public void analyze(Analyzer analyzer) throws UserException {
super.analyze(analyzer);
checkAuth();
labelName.analyze(analyzer);
String dbName = labelName.getDbName();
Env.getCurrentInternalCatalog().getDbOrAnalysisException(dbName);
job.setComment(comment);
job.setCurrentDbName(labelName.getDbName());
job.setJobName(labelName.getLabelName());
job.checkJobParams();
job.setComment(comment);
//todo support user define
job.setCurrentUser(ConnectContext.get().getQualifiedUser());
job.setCreateUser(ConnectContext.get().getQualifiedUser());
job.setJobStatus(JobStatus.RUNNING);
analyzerSqlStmt();
job.checkJobParams();
String originStmt = getOrigStmt().originStmt;
String executeSql = parseExecuteSql(originStmt);
job.setExecuteSql(executeSql);
jobInstance = job;
}

protected static void checkAuth() throws AnalysisException {
Expand All @@ -159,12 +158,12 @@ protected static void checkAuth() throws AnalysisException {
}

private void checkStmtSupport() throws AnalysisException {
if (supportStmtClassNamesCache.contains(stmt.getClass().getSimpleName())) {
if (supportStmtClassNamesCache.contains(doStmt.getClass().getSimpleName())) {
return;
}
for (Class<? extends DdlStmt> clazz : supportStmtSuperClass) {
if (clazz.isAssignableFrom(stmt.getClass())) {
supportStmtClassNamesCache.add(stmt.getClass().getSimpleName());
if (clazz.isAssignableFrom(doStmt.getClass())) {
supportStmtClassNamesCache.add(doStmt.getClass().getSimpleName());
return;
}
}
Expand All @@ -173,9 +172,16 @@ private void checkStmtSupport() throws AnalysisException {

private void analyzerSqlStmt() throws UserException {
checkStmtSupport();
stmt.analyze(analyzer);
//String originStmt = getOrigStmt().originStmt;
//String executeSql = parseExecuteSql(originStmt);
doStmt.analyze(analyzer);
}

private String parseExecuteSql(String sql) throws AnalysisException {
sql = sql.toLowerCase();
int executeSqlIndex = sql.indexOf(" do ");
String executeSql = sql.substring(executeSqlIndex + 4).trim();
if (StringUtils.isBlank(executeSql)) {
throw new AnalysisException("execute sql has invalid format");
}
return executeSql;
}
}
66 changes: 52 additions & 14 deletions fe/fe-core/src/main/java/org/apache/doris/job/base/AbstractJob.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,42 +17,52 @@

package org.apache.doris.job.base;

import org.apache.doris.catalog.Env;
import org.apache.doris.common.io.Text;
import org.apache.doris.common.io.Writable;
import org.apache.doris.job.common.JobStatus;
import org.apache.doris.job.common.JobType;
import org.apache.doris.job.exception.JobException;
import org.apache.doris.job.extensions.insert.InsertJob;
import org.apache.doris.job.task.AbstractTask;
import org.apache.doris.job.task.Task;
import org.apache.doris.persist.gson.GsonUtils;

import com.google.gson.annotations.SerializedName;
import lombok.Data;
import lombok.Getter;
import org.apache.commons.collections.CollectionUtils;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

@Data
public abstract class AbstractJob<T extends AbstractTask> implements Job<T>, Writable {

@Getter
@SerializedName(value = "jobId")
private Long jobId;

@SerializedName(value = "jobName")
private String jobName;

@SerializedName(value = "jobStatus")
private JobStatus jobStatus;

@SerializedName(value = "currentDbName")
private String currentDbName;

@SerializedName(value = "comment")
private String comment;

private String currentUser;
@SerializedName(value = "jobType")
private String createUser;

@SerializedName(value = "jobConfig")
private JobExecutionConfiguration jobConfig;

@SerializedName(value = "createTimeMs")
private Long createTimeMs;

private List<? extends AbstractTask> runningTasks = new ArrayList<>();

@Override
Expand All @@ -67,7 +77,7 @@ public void checkJobParams() {
if (null == jobConfig) {
throw new IllegalArgumentException("jobConfig cannot be null");
}
jobConfig.checkParams();
jobConfig.checkParams(createTimeMs);
checkJobParamsInternal();
}

Expand All @@ -92,23 +102,51 @@ public void resumeJob() {

protected abstract void checkJobParamsInternal();

@Override
public void write(DataOutput out) throws IOException {
String jobData = GsonUtils.GSON.toJson(this);
Text.writeString(out, jobData);
}

public static AbstractJob readFields(DataInput in) throws IOException {
return GsonUtils.GSON.fromJson(Text.readString(in), AbstractJob.class);
JobType jobType = JobType.valueOf(Text.readString(in));
switch (jobType) {
case INSERT:
return InsertJob.readFields(in);
case MTMV:
// return MTMVJob.readFields(in);
break;
default:
throw new IllegalArgumentException("unknown job type");
}
throw new IllegalArgumentException("unknown job type");
}

@Override
public void onTaskFail(long taskId) {
// AbstractTask task=runningTasks.stream().findFirst();
updateJobStatusIfEnd();
}

@Override
public void onTaskSuccess(long taskId) {
updateJobStatusIfEnd();

}

private void updateJobStatusIfEnd() {
JobExecuteType executeType = getJobConfig().getExecuteType();
if (executeType.equals(JobExecuteType.MANUAL)) {
return;
}
switch (executeType) {
case ONE_TIME:
case INSTANT:
jobStatus = JobStatus.FINISHED;
Env.getCurrentEnv().getJobManager().getJob(jobId).updateJobStatus(jobStatus);
break;
case RECURRING:
if (null != getJobConfig().getTimerDefinition().getEndTimeMs()
&& getJobConfig().getTimerDefinition().getEndTimeMs() < System.currentTimeMillis()) {
jobStatus = JobStatus.FINISHED;
Env.getCurrentEnv().getJobManager().getJob(jobId).updateJobStatus(jobStatus);
}
break;
default:
break;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import org.apache.doris.common.util.TimeUtils;

import com.google.gson.annotations.SerializedName;
import lombok.Data;

import java.util.ArrayList;
Expand All @@ -28,16 +29,19 @@
@Data
public class JobExecutionConfiguration {

@SerializedName(value = "timerDefinition")
private TimerDefinition timerDefinition;
@SerializedName(value = "executeType")
private JobExecuteType executeType;

/**
* Maximum number of concurrent tasks, <= 0 means no limit
* if the number of tasks exceeds the limit, the task will be delayed execution
*/
private Integer maxConcurrentTaskNum = -1;
@SerializedName(value = "maxConcurrentTaskNum")
private Integer maxConcurrentTaskNum;

public void checkParams() {
public void checkParams(Long createTimeMs) {
if (executeType == null) {
throw new IllegalArgumentException("executeType cannot be null");
}
Expand All @@ -46,7 +50,7 @@ public void checkParams() {
return;
}

checkTimerDefinition();
checkTimerDefinition(createTimeMs);

if (executeType == JobExecuteType.ONE_TIME) {
validateStartTimeMs();
Expand All @@ -65,22 +69,15 @@ public void checkParams() {
if (timerDefinition.getIntervalUnit() == null) {
throw new IllegalArgumentException("intervalUnit cannot be null when executeType is RECURRING");
}
validateStartTimeMs();
if (timerDefinition.getEndTimeMs() != null && timerDefinition.getEndTimeMs() < System.currentTimeMillis()) {
throw new IllegalArgumentException("endTimeMs cannot be less than current time");
}
if (timerDefinition.getStartTimeMs() != null && timerDefinition.getEndTimeMs() != null
&& timerDefinition.getStartTimeMs() > timerDefinition.getEndTimeMs()) {
throw new IllegalArgumentException("startTimeMs cannot be greater than endTimeMs");
}
}
}

private void checkTimerDefinition() {
private void checkTimerDefinition(long createTimeMs) {
if (timerDefinition == null) {
throw new IllegalArgumentException(
"timerDefinition cannot be null when executeType is not instant or manual");
}
timerDefinition.checkParams(createTimeMs);
}

private void validateStartTimeMs() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,31 +19,32 @@

import org.apache.doris.job.common.IntervalUnit;

import com.google.gson.annotations.SerializedName;
import lombok.Data;

@Data
public class TimerDefinition {

@SerializedName(value = "interval")
private Long interval;

@SerializedName(value = "intervalUnit")
private IntervalUnit intervalUnit;

@SerializedName(value = "startTimeMs")
private Long startTimeMs;

@SerializedName(value = "endTimeMs")
private Long endTimeMs;

private Long createTimeMs;

private Long latestSchedulerTimeMs;


public void checkParams() {
public void checkParams(Long createTimeMs) {
if (null != startTimeMs && startTimeMs < System.currentTimeMillis()) {
throw new IllegalArgumentException("startTimeMs must be greater than current time");
}
if (null == startTimeMs) {
startTimeMs = createTimeMs + intervalUnit.getIntervalMs(interval);
}
if (startTimeMs < System.currentTimeMillis()) {
throw new IllegalArgumentException("startTimeMs must be greater than current time");
}
if (null != endTimeMs && endTimeMs < startTimeMs) {
throw new IllegalArgumentException("end time cannot be less than start time");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,15 +41,20 @@ public class DefaultTaskExecutorHandler<T extends AbstractTask> implements WorkH
public void onEvent(ExecuteTaskEvent<T> executeTaskEvent) {
T task = executeTaskEvent.getTask();
if (null == task) {
log.info("task is null, ignore");
return;
}
int maxConcurrentTaskNum = executeTaskEvent.getJobConfig().getMaxConcurrentTaskNum();
if (maxConcurrentTaskNum <= 0) {
task.runTask();
log.warn("task is null, ignore,maybe task has been canceled");
return;
}
if (null == executeTaskEvent.getJobConfig().getMaxConcurrentTaskNum()
|| executeTaskEvent.getJobConfig().getMaxConcurrentTaskNum() <= 0) {
try {
task.runTask();
return;
} catch (Exception e) {
log.warn("execute task error, task id is {}", task.getTaskId(), e);

}
}
int maxConcurrentTaskNum = executeTaskEvent.getJobConfig().getMaxConcurrentTaskNum();
Semaphore semaphore = null;
// get token
try {
Expand Down
Loading

0 comments on commit 4fb7457

Please sign in to comment.