Skip to content

Commit

Permalink
[Fix](Job)Incorrect task query result of insert type
Browse files Browse the repository at this point in the history
IdToTask has no persistence, so the queried task will be lost once it is restarted.

The cancel task does not update metadata after being removed from the running task.

tvf displays an error when some fields in the query task result are empty
  • Loading branch information
CalvinKirs committed Jan 16, 2024
1 parent 6d8567c commit cc7cbc0
Show file tree
Hide file tree
Showing 6 changed files with 87 additions and 43 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ public void cancelTaskById(long taskId) throws JobException {
throw new JobException("no running task");
}
runningTasks.stream().filter(task -> task.getTaskId().equals(taskId)).findFirst()
.orElseThrow(() -> new JobException("no task id: " + taskId)).cancel();
.orElseThrow(() -> new JobException("Not found task id: " + taskId)).cancel();
runningTasks.removeIf(task -> task.getTaskId().equals(taskId));
if (jobConfig.getExecuteType().equals(JobExecuteType.ONE_TIME)) {
updateJobStatus(JobStatus.FINISHED);
Expand Down Expand Up @@ -289,27 +289,32 @@ public void logUpdateOperation() {

@Override
public void onTaskFail(T task) throws JobException {
updateJobStatusIfEnd();
updateJobStatusIfEnd(false);
runningTasks.remove(task);
}

@Override
public void onTaskSuccess(T task) throws JobException {
updateJobStatusIfEnd();
updateJobStatusIfEnd(true);
runningTasks.remove(task);

}


private void updateJobStatusIfEnd() throws JobException {
private void updateJobStatusIfEnd(boolean taskSuccess) throws JobException {
JobExecuteType executeType = getJobConfig().getExecuteType();
if (executeType.equals(JobExecuteType.MANUAL)) {
return;
}
switch (executeType) {
case ONE_TIME:
case INSTANT:
Env.getCurrentEnv().getJobManager().getJob(jobId).updateJobStatus(JobStatus.FINISHED);
this.finishTimeMs = System.currentTimeMillis();
if (taskSuccess) {
Env.getCurrentEnv().getJobManager().getJob(jobId).updateJobStatus(JobStatus.FINISHED);
} else {
Env.getCurrentEnv().getJobManager().getJob(jobId).updateJobStatus(JobStatus.STOPPED);
}
break;
case RECURRING:
TimerDefinition timerDefinition = getJobConfig().getTimerDefinition();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.doris.common.ErrorReport;
import org.apache.doris.common.FeConstants;
import org.apache.doris.common.LabelAlreadyUsedException;
import org.apache.doris.common.MetaNotFoundException;
import org.apache.doris.common.io.Text;
import org.apache.doris.common.util.LogBuilder;
import org.apache.doris.common.util.LogKey;
Expand All @@ -42,6 +43,7 @@
import org.apache.doris.job.common.TaskType;
import org.apache.doris.job.exception.JobException;
import org.apache.doris.load.FailMsg;
import org.apache.doris.load.loadv2.LoadJob;
import org.apache.doris.load.loadv2.LoadStatistic;
import org.apache.doris.mysql.privilege.PrivPredicate;
import org.apache.doris.mysql.privilege.Privilege;
Expand Down Expand Up @@ -244,9 +246,11 @@ public InsertJob(ConnectContext ctx,

@Override
public List<InsertTask> createTasks(TaskType taskType, Map<Object, Object> taskContext) {
List<InsertTask> newTasks = new ArrayList<>();
if (plans.isEmpty()) {
InsertTask task = new InsertTask(labelName, getCurrentDbName(), getExecuteSql(), getCreateUser());
idToTasks.put(task.getTaskId(), task);
newTasks.add(task);
recordTask(task.getTaskId());
} else {
// use for load stmt
Expand All @@ -256,11 +260,12 @@ public List<InsertTask> createTasks(TaskType taskType, Map<Object, Object> taskC
}
InsertTask task = new InsertTask(logicalPlan, ctx, stmtExecutor, loadStatistic);
idToTasks.put(task.getTaskId(), task);
newTasks.add(task);
recordTask(task.getTaskId());
}
}
initTasks(idToTasks.values(), taskType);
return new ArrayList<>(idToTasks.values());
initTasks(newTasks, taskType);
return new ArrayList<>(newTasks);
}

public void recordTask(long id) {
Expand All @@ -269,14 +274,15 @@ public void recordTask(long id) {
}
if (CollectionUtils.isEmpty(historyTaskIdList)) {
historyTaskIdList = new ConcurrentLinkedQueue<>();
Env.getCurrentEnv().getEditLog().logUpdateJob(this);
historyTaskIdList.add(id);
Env.getCurrentEnv().getEditLog().logUpdateJob(this);
return;
}
historyTaskIdList.add(id);
if (historyTaskIdList.size() >= Config.max_persistence_task_count) {
historyTaskIdList.poll();
}
Env.getCurrentEnv().getEditLog().logUpdateJob(this);
}

@Override
Expand Down Expand Up @@ -317,22 +323,44 @@ public List<InsertTask> queryTasks() {
}
//TODO it's will be refactor, we will storage task info in job inner and query from it
List<Long> taskIdList = new ArrayList<>(this.historyTaskIdList);
if (getJobConfig().getExecuteType().equals(JobExecuteType.INSTANT)) {
Collections.reverse(taskIdList);
return queryLoadTasksByTaskIds(taskIdList);
}
List<LoadJob> loadJobs = Env.getCurrentEnv().getLoadManager().queryLoadJobsByJobIds(taskIdList);
if (CollectionUtils.isEmpty(loadJobs)) {
return new ArrayList<>();
}
List<InsertTask> tasks = new ArrayList<>();
loadJobs.forEach(loadJob -> {
InsertTask task;
try {
task = new InsertTask(loadJob.getLabel(), loadJob.getDb().getFullName(), null, getCreateUser());
task.setCreateTimeMs(loadJob.getCreateTimestamp());
} catch (MetaNotFoundException e) {
log.warn("load job not found, job id is {}", loadJob.getId());
return;
}
task.setJobId(getJobId());
task.setTaskId(loadJob.getId());
task.setJobInfo(loadJob);
tasks.add(task);
});
return tasks;

Collections.reverse(taskIdList);
return queryLoadTasksByTaskIds(taskIdList);
}

public List<InsertTask> queryLoadTasksByTaskIds(List<Long> taskIdList) {
if (taskIdList.isEmpty()) {
return new ArrayList<>();
}
List<InsertTask> jobs = new ArrayList<>();
List<InsertTask> tasks = new ArrayList<>();
taskIdList.forEach(id -> {
if (null != idToTasks.get(id)) {
jobs.add(idToTasks.get(id));
tasks.add(idToTasks.get(id));
}
});
return jobs;
return tasks;
}

@Override
Expand All @@ -351,14 +379,11 @@ public ShowResultSetMetaData getTaskMetaData() {
}

@Override
public void onTaskFail(InsertTask task) {
try {
updateJobStatus(JobStatus.STOPPED);
public void onTaskFail(InsertTask task) throws JobException {
if (getJobConfig().getExecuteType().equals(JobExecuteType.INSTANT)) {
this.failMsg = new FailMsg(FailMsg.CancelType.LOAD_RUN_FAIL, task.getErrMsg());
} catch (JobException e) {
throw new RuntimeException(e);
}
getRunningTasks().remove(task);
super.onTaskFail(task);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,11 @@
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.ScalarType;
import org.apache.doris.common.FeConstants;
import org.apache.doris.common.util.TimeUtils;
import org.apache.doris.job.exception.JobException;
import org.apache.doris.job.task.AbstractTask;
import org.apache.doris.load.FailMsg;
import org.apache.doris.load.loadv2.LoadJob;
import org.apache.doris.load.loadv2.LoadStatistic;
import org.apache.doris.nereids.parser.NereidsParser;
import org.apache.doris.nereids.trees.plans.commands.InsertIntoTableCommand;
Expand Down Expand Up @@ -89,7 +89,7 @@ public class InsertTask extends AbstractTask {

@Getter
@Setter
private InsertJob jobInfo;
private LoadJob jobInfo;
private TaskType taskType = TaskType.PENDING;
private MergeType mergeType = MergeType.APPEND;

Expand Down Expand Up @@ -127,7 +127,7 @@ public InsertTask(String labelName, String currentDb, String sql, UserIdentity u
}

public InsertTask(String labelName, InsertIntoTableCommand insertInto,
ConnectContext ctx, StmtExecutor executor, LoadStatistic statistic) {
ConnectContext ctx, StmtExecutor executor, LoadStatistic statistic) {
this.labelName = labelName;
this.command = insertInto;
this.userIdentity = ctx.getCurrentUserIdentity();
Expand Down Expand Up @@ -216,23 +216,27 @@ public TRow getTvfInfo() {
// if task not start, load job is null,return pending task show info
return getPendingTaskTVFInfo();
}
trow.addToColumnValue(new TCell().setStringVal(String.valueOf(jobInfo.getJobId())));
trow.addToColumnValue(new TCell().setStringVal(String.valueOf(jobInfo.getId())));
trow.addToColumnValue(new TCell().setStringVal(String.valueOf(getJobId())));
trow.addToColumnValue(new TCell().setStringVal(labelName));
trow.addToColumnValue(new TCell().setStringVal(jobInfo.getJobStatus().name()));
trow.addToColumnValue(new TCell().setStringVal(jobInfo.getState().name()));
// err msg
String errMsg = FeConstants.null_string;
String errMsg = "";
if (failMsg != null) {
errMsg = "type:" + failMsg.getCancelType() + "; msg:" + failMsg.getMsg();
}
trow.addToColumnValue(new TCell().setStringVal(errMsg));
// create time
trow.addToColumnValue(new TCell().setStringVal(TimeUtils.longToTimeString(jobInfo.getCreateTimeMs())));
trow.addToColumnValue(new TCell().setStringVal(TimeUtils.longToTimeString(jobInfo.getCreateTimestamp())));
// load end time
trow.addToColumnValue(new TCell().setStringVal(TimeUtils.longToTimeString(jobInfo.getFinishTimeMs())));
trow.addToColumnValue(new TCell().setStringVal(TimeUtils.longToTimeString(jobInfo.getFinishTimestamp())));
// tracking url
trow.addToColumnValue(new TCell().setStringVal(trackingUrl));
trow.addToColumnValue(new TCell().setStringVal(loadStatistic.toJson()));
if (null != loadStatistic) {
trow.addToColumnValue(new TCell().setStringVal(loadStatistic.toJson()));
} else {
trow.addToColumnValue(new TCell().setStringVal(""));
}
trow.addToColumnValue(new TCell().setStringVal(userIdentity.getQualifiedUser()));
return trow;
}
Expand All @@ -244,11 +248,11 @@ private TRow getPendingTaskTVFInfo() {
trow.addToColumnValue(new TCell().setStringVal(String.valueOf(getJobId())));
trow.addToColumnValue(new TCell().setStringVal(getJobId() + LABEL_SPLITTER + getTaskId()));
trow.addToColumnValue(new TCell().setStringVal(getStatus().name()));
trow.addToColumnValue(new TCell().setStringVal(FeConstants.null_string));
trow.addToColumnValue(new TCell().setStringVal(""));
trow.addToColumnValue(new TCell().setStringVal(TimeUtils.longToTimeString(getCreateTimeMs())));
trow.addToColumnValue(new TCell().setStringVal(FeConstants.null_string));
trow.addToColumnValue(new TCell().setStringVal(FeConstants.null_string));
trow.addToColumnValue(new TCell().setStringVal(FeConstants.null_string));
trow.addToColumnValue(new TCell().setStringVal(""));
trow.addToColumnValue(new TCell().setStringVal(""));
trow.addToColumnValue(new TCell().setStringVal(""));
trow.addToColumnValue(new TCell().setStringVal(userIdentity.getQualifiedUser()));
return trow;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,7 @@ public void cancelTaskById(String jobName, Long taskId) throws JobException {
for (T job : jobMap.values()) {
if (job.getJobName().equals(jobName)) {
job.cancelTaskById(taskId);
job.logUpdateOperation();
return;
}
}
Expand Down Expand Up @@ -378,6 +379,7 @@ private static boolean validState(JobState jobState, InsertJob loadJob) {
}
}

//todo it's not belong to JobManager
public void cancelLoadJob(CancelLoadStmt cs)
throws JobException, AnalysisException, DdlException {
String dbName = cs.getDbName();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -184,8 +184,8 @@ private void executeTimerJobIdsWithinLastTenMinutesWindow() {
}
for (Map.Entry<Long, T> entry : jobMap.entrySet()) {
T job = entry.getValue();
if (job.getJobStatus().equals(JobStatus.FINISHED)) {
clearFinishedJob(job);
if (job.getJobStatus().equals(JobStatus.FINISHED) || job.getJobStatus().equals(JobStatus.STOPPED)) {
clearEndJob(job);
continue;
}
if (!job.getJobStatus().equals(JobStatus.RUNNING) && !job.getJobConfig().checkIsTimerJob()) {
Expand All @@ -195,7 +195,7 @@ private void executeTimerJobIdsWithinLastTenMinutesWindow() {
}
}

private void clearFinishedJob(T job) {
private void clearEndJob(T job) {
if (job.getFinishTimeMs() + FINISHED_JOB_CLEANUP_THRESHOLD_TIME_MS < System.currentTimeMillis()) {
return;
}
Expand Down
24 changes: 16 additions & 8 deletions regression-test/suites/job_p0/test_base_insert_job.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -71,9 +71,18 @@ suite("test_base_insert_job") {
CREATE JOB ${jobName} ON SCHEDULE every 1 second comment 'test' DO insert into ${tableName} (timestamp, type, user_id) values ('2023-03-18','1','12213');
"""
Thread.sleep(2500)
def jobs = sql """select * from ${tableName}"""
println jobs
assert 3 >= jobs.size() >= (2 as Boolean) //at least 2 records, some times 3 records
sql """
PAUSE JOB where jobname = '${jobName}'
"""
def tblDatas = sql """select * from ${tableName}"""
println tblDatas
assert 3 >= tblDatas.size() >= (2 as Boolean) //at least 2 records, some times 3 records
def pauseJobId = sql """select id from jobs("type"="insert") where Name='${jobName}'"""
def taskStatus = sql """select status from tasks("type"="insert") where jobid= '${pauseJobId.get(0).get(0)}'"""
println taskStatus
for (int i = 0; i < taskStatus.size(); i++) {
assert taskStatus.get(i).get(0) != "FAILED"||taskStatus.get(i).get(0) != "STOPPED"||taskStatus.get(i).get(0) != "STOPPED"
}
sql """
CREATE JOB ${jobMixedName} ON SCHEDULE every 1 second DO insert into ${tableName} (timestamp, type, user_id) values ('2023-03-18','1','12213');
"""
Expand Down Expand Up @@ -132,9 +141,8 @@ suite("test_base_insert_job") {
sql """cancel task where jobName='${jobName}' and taskId= ${taskId}"""
def cancelTask = sql """ select status from tasks("type"="insert") where jobid= ${onceJobId}"""
println cancelTask
//check task status
assert cancelTask.size() == 1
assert cancelTask.get(0).get(0) == "CANCELED"
//check task size is 0, cancel task where be deleted
assert cancelTask.size() == 0
// check table data
def dataCount1 = sql """select count(1) from ${tableName}"""
assert dataCount1.get(0).get(0) == 0
Expand All @@ -161,14 +169,14 @@ suite("test_base_insert_job") {
assert job.size() == 1
def jobId = job.get(0).get(0);
def tasks = sql """ select status from tasks("type"="insert") where jobid= ${jobId} """
assert tasks.size() == 1
assert tasks.size() == 0
sql """
RESUME JOB where jobname = '${jobName}'
"""
Thread.sleep(2500)
def resumeTasks = sql """ select status from tasks("type"="insert") where jobid= ${jobId} """
println resumeTasks
assert resumeTasks.size() == 2
assert resumeTasks.size() == 1
// assert same job name
try {
sql """
Expand Down

0 comments on commit cc7cbc0

Please sign in to comment.