Skip to content

Commit

Permalink
[Improve](Job)Refactor JOB
Browse files Browse the repository at this point in the history
  • Loading branch information
CalvinKirs committed Nov 12, 2023
1 parent 6637f9c commit 05d28e1
Show file tree
Hide file tree
Showing 56 changed files with 1,820 additions and 2,389 deletions.
10 changes: 5 additions & 5 deletions fe/fe-core/src/main/cup/sql_parser.cup
Original file line number Diff line number Diff line change
Expand Up @@ -2553,17 +2553,17 @@ resource_desc ::=
create_job_stmt ::=
KW_CREATE KW_JOB job_label:jobLabel KW_ON KW_SCHEDULER KW_EVERY INTEGER_LITERAL:time_interval ident:time_unit opt_job_starts:startsTime opt_job_ends:endsTime opt_comment:comment KW_DO stmt:executeSql
{:
CreateJobStmt stmt = new CreateJobStmt(jobLabel,"RECURRING",null,time_interval,time_unit, startsTime, endsTime,comment,executeSql);
CreateJobStmt stmt = new CreateJobStmt(jobLabel,org.apache.doris.job.base.JobExecuteType.RECURRING,null,time_interval,time_unit, startsTime, endsTime,comment,executeSql);
RESULT = stmt;
:}
| KW_CREATE KW_JOB job_label:jobLabel KW_ON KW_SCHEDULER KW_STREAMING KW_AT STRING_LITERAL:atTime opt_comment:comment KW_DO stmt:executeSql
/* support in future | KW_CREATE KW_JOB job_label:jobLabel KW_ON KW_SCHEDULER KW_STREAMING KW_AT STRING_LITERAL:atTime opt_comment:comment KW_DO stmt:executeSql
{:
CreateJobStmt stmt = new CreateJobStmt(jobLabel,"STREAMING",atTime,null,null,null,null,comment,executeSql);
CreateJobStmt stmt = new CreateJobStmt(jobLabel,org.apache.doris.job.base.JobExecuteType.STREAMING,atTime,null,null,null,null,comment,executeSql);
RESULT = stmt;
:}
:} */
| KW_CREATE KW_JOB job_label:jobLabel KW_ON KW_SCHEDULER KW_AT STRING_LITERAL:atTime opt_comment:comment KW_DO stmt:executeSql
{:
CreateJobStmt stmt = new CreateJobStmt(jobLabel,"ONE_TIME",atTime,null,null,null,null,comment,executeSql);
CreateJobStmt stmt = new CreateJobStmt(jobLabel,org.apache.doris.job.base.JobExecuteType.ONE_TIME,atTime,null,null,null,null,comment,executeSql);
RESULT = stmt;
:}
;
Expand Down
116 changes: 36 additions & 80 deletions fe/fe-core/src/main/java/org/apache/doris/analysis/CreateJobStmt.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,14 @@
import org.apache.doris.common.ErrorReport;
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.TimeUtils;
import org.apache.doris.job.base.AbstractJob;
import org.apache.doris.job.base.JobExecuteType;
import org.apache.doris.job.base.JobExecutionConfiguration;
import org.apache.doris.job.base.TimerDefinition;
import org.apache.doris.job.common.IntervalUnit;
import org.apache.doris.job.common.JobStatus;
import org.apache.doris.mysql.privilege.PrivPredicate;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.scheduler.common.IntervalUnit;
import org.apache.doris.scheduler.constants.JobCategory;
import org.apache.doris.scheduler.constants.JobStatus;
import org.apache.doris.scheduler.constants.JobType;
import org.apache.doris.scheduler.executor.SqlJobExecutor;
import org.apache.doris.scheduler.job.Job;

import com.google.common.collect.ImmutableSet;
import lombok.Getter;
Expand Down Expand Up @@ -66,7 +66,7 @@ public class CreateJobStmt extends DdlStmt {
private StatementBase stmt;

@Getter
private Job job;
private AbstractJob job;

private final LabelName labelName;

Expand All @@ -88,9 +88,9 @@ public class CreateJobStmt extends DdlStmt {
= new ImmutableSet.Builder<Class<? extends DdlStmt>>().add(InsertStmt.class)
.add(UpdateStmt.class).build();

private static HashSet<String> supportStmtClassNamesCache = new HashSet<>(16);
private static final HashSet<String> supportStmtClassNamesCache = new HashSet<>(16);

public CreateJobStmt(LabelName labelName, String jobTypeName, String onceJobStartTimestamp,
public CreateJobStmt(LabelName labelName, JobExecuteType executeType, String onceJobStartTimestamp,
Long interval, String intervalTimeUnit,
String startsTimeStamp, String endsTimeStamp, String comment, StatementBase doStmt) {
this.labelName = labelName;
Expand All @@ -101,9 +101,28 @@ public CreateJobStmt(LabelName labelName, String jobTypeName, String onceJobStar
this.endsTimeStamp = endsTimeStamp;
this.comment = comment;
this.stmt = doStmt;
this.job = new Job();
JobType jobType = JobType.valueOf(jobTypeName.toUpperCase());
job.setJobType(jobType);
JobExecutionConfiguration jobExecutionConfiguration = new JobExecutionConfiguration();
jobExecutionConfiguration.setExecuteType(executeType);

TimerDefinition timerDefinition = new TimerDefinition();

if (null != onceJobStartTimestamp) {
timerDefinition.setStartTimeMs(TimeUtils.timeStringToLong(onceJobStartTimestamp));
}
if (null != interval) {
timerDefinition.setInterval(interval);
}
if (null != intervalTimeUnit) {
timerDefinition.setIntervalUnit(IntervalUnit.valueOf(intervalTimeUnit.toUpperCase()));
}
if (null != startsTimeStamp) {
timerDefinition.setStartTimeMs(TimeUtils.timeStringToLong(startsTimeStamp));
}
if (null != endsTimeStamp) {
timerDefinition.setEndTimeMs(TimeUtils.timeStringToLong(endsTimeStamp));
}
jobExecutionConfiguration.setTimerDefinition(timerDefinition);
job.setComment(comment);
}

private String parseExecuteSql(String sql) throws AnalysisException {
Expand All @@ -123,23 +142,13 @@ public void analyze(Analyzer analyzer) throws UserException {
labelName.analyze(analyzer);
String dbName = labelName.getDbName();
Env.getCurrentInternalCatalog().getDbOrAnalysisException(dbName);
job.setDbName(labelName.getDbName());
job.setCurrentDbName(labelName.getDbName());
job.setJobName(labelName.getLabelName());
if (StringUtils.isNotBlank(onceJobStartTimestamp)) {
analyzerOnceTimeJob();
} else {
analyzerCycleJob();
}
if (ConnectContext.get() != null) {
timezone = ConnectContext.get().getSessionVariable().getTimeZone();
}
timezone = TimeUtils.checkTimeZoneValidAndStandardize(timezone);
job.setTimezone(timezone);
job.checkJobParams();
job.setComment(comment);
//todo support user define
job.setUser(ConnectContext.get().getQualifiedUser());
job.setCurrentUser(ConnectContext.get().getQualifiedUser());
job.setJobStatus(JobStatus.RUNNING);
job.setJobCategory(JobCategory.SQL);
analyzerSqlStmt();
}

Expand All @@ -165,61 +174,8 @@ private void checkStmtSupport() throws AnalysisException {
private void analyzerSqlStmt() throws UserException {
checkStmtSupport();
stmt.analyze(analyzer);
String originStmt = getOrigStmt().originStmt;
String executeSql = parseExecuteSql(originStmt);
SqlJobExecutor sqlJobExecutor = new SqlJobExecutor(executeSql);
job.setExecutor(sqlJobExecutor);
//String originStmt = getOrigStmt().originStmt;
//String executeSql = parseExecuteSql(originStmt);
}


private void analyzerCycleJob() throws UserException {
if (null == interval) {
throw new AnalysisException("interval is null");
}
if (interval <= 0) {
throw new AnalysisException("interval must be greater than 0");
}

if (StringUtils.isBlank(intervalTimeUnit)) {
throw new AnalysisException("intervalTimeUnit is null");
}
try {
IntervalUnit intervalUnit = IntervalUnit.valueOf(intervalTimeUnit.toUpperCase());
job.setIntervalUnit(intervalUnit);
long intervalTimeMs = intervalUnit.getParameterValue(interval);
job.setIntervalMs(intervalTimeMs);
job.setOriginInterval(interval);
} catch (IllegalArgumentException e) {
throw new AnalysisException("interval time unit is not valid, we only support second,minute,hour,day,week");
}
if (StringUtils.isNotBlank(startsTimeStamp)) {
long startsTimeMillis = TimeUtils.timeStringToLong(startsTimeStamp);
if (startsTimeMillis < System.currentTimeMillis()) {
throw new AnalysisException("starts time must be greater than current time");
}
job.setStartTimeMs(startsTimeMillis);
}
if (StringUtils.isNotBlank(endsTimeStamp)) {
long endTimeMillis = TimeUtils.timeStringToLong(endsTimeStamp);
if (endTimeMillis < System.currentTimeMillis()) {
throw new AnalysisException("ends time must be greater than current time");
}
job.setEndTimeMs(endTimeMillis);
}
if (job.getStartTimeMs() > 0 && job.getEndTimeMs() > 0
&& (job.getEndTimeMs() - job.getStartTimeMs() < job.getIntervalMs())) {
throw new AnalysisException("ends time must be greater than start time and interval time");
}
}


private void analyzerOnceTimeJob() throws UserException {
job.setIntervalMs(0L);

long executeAtTimeMillis = TimeUtils.timeStringToLong(onceJobStartTimestamp);
if (executeAtTimeMillis < System.currentTimeMillis()) {
throw new AnalysisException("job time stamp must be greater than current time");
}
job.setStartTimeMs(executeAtTimeMillis);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,10 @@
import org.apache.doris.common.UserException;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.ShowResultSetMetaData;
import org.apache.doris.scheduler.constants.JobCategory;

import com.google.common.base.Strings;
import com.google.common.collect.ImmutableList;
import lombok.Getter;
import org.apache.commons.lang3.StringUtils;

import java.util.List;

Expand Down Expand Up @@ -65,9 +63,6 @@ public class ShowJobStmt extends ShowStmt {
@Getter
private String dbFullName; // optional

@Getter
private JobCategory jobCategory; // optional

private String jobCategoryName; // optional

@Getter
Expand All @@ -86,11 +81,6 @@ public void analyze(Analyzer analyzer) throws UserException {
super.analyze(analyzer);
checkAuth();
checkLabelName(analyzer);
if (StringUtils.isBlank(jobCategoryName)) {
this.jobCategory = JobCategory.SQL;
} else {
this.jobCategory = JobCategory.valueOf(jobCategoryName.toUpperCase());
}
}

private void checkAuth() throws AnalysisException {
Expand Down Expand Up @@ -122,9 +112,6 @@ public ShowResultSetMetaData getMetaData() {
ShowResultSetMetaData.Builder builder = ShowResultSetMetaData.builder();

for (String title : TITLE_NAMES) {
if (this.jobCategory.equals(JobCategory.MTMV) && title.equals(NAME_TITLE)) {
builder.addColumn(new Column(MTMV_NAME_TITLE, ScalarType.createVarchar(30)));
}
builder.addColumn(new Column(title, ScalarType.createVarchar(30)));
}
return builder.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,10 @@
import org.apache.doris.common.ErrorReport;
import org.apache.doris.common.UserException;
import org.apache.doris.qe.ShowResultSetMetaData;
import org.apache.doris.scheduler.constants.JobCategory;

import com.google.common.base.Strings;
import com.google.common.collect.ImmutableList;
import lombok.Getter;
import org.apache.commons.lang3.StringUtils;

import java.util.List;

Expand All @@ -57,8 +55,6 @@ public class ShowJobTaskStmt extends ShowStmt {
@Getter
private final LabelName labelName;

@Getter
private JobCategory jobCategory; // optional

@Getter
private String dbFullName; // optional
Expand All @@ -67,12 +63,6 @@ public class ShowJobTaskStmt extends ShowStmt {

public ShowJobTaskStmt(String category, LabelName labelName) {
this.labelName = labelName;
String jobCategoryName = category;
if (StringUtils.isBlank(jobCategoryName)) {
this.jobCategory = JobCategory.SQL;
} else {
this.jobCategory = JobCategory.valueOf(jobCategoryName.toUpperCase());
}
}

@Override
Expand Down Expand Up @@ -114,9 +104,6 @@ public ShowResultSetMetaData getMetaData() {

@Override
public RedirectStatus getRedirectStatus() {
if (jobCategory.isPersistent()) {
return RedirectStatus.FORWARD_NO_SYNC;
}
return RedirectStatus.NO_FORWARD;
}
}
Loading

0 comments on commit 05d28e1

Please sign in to comment.