Skip to content

Commit

Permalink
[Fix](Job)cancel task is not cleared from running task
Browse files Browse the repository at this point in the history
Insert task is not work
  • Loading branch information
CalvinKirs committed Dec 27, 2023
1 parent d672e32 commit 0d05f80
Show file tree
Hide file tree
Showing 5 changed files with 90 additions and 39 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,8 @@ public abstract class AbstractJob<T extends AbstractTask, C> implements Job<T, C
@SerializedName(value = "sql")
String executeSql;

public AbstractJob() {}
public AbstractJob() {
}

public AbstractJob(Long id) {
jobId = id;
Expand All @@ -99,10 +100,10 @@ public AbstractJob(Long id) {
* executeSql and runningTasks is not required for load.
*/
public AbstractJob(Long jobId, String jobName, JobStatus jobStatus,
String currentDbName,
String comment,
UserIdentity createUser,
JobExecutionConfiguration jobConfig) {
String currentDbName,
String comment,
UserIdentity createUser,
JobExecutionConfiguration jobConfig) {
this(jobId, jobName, jobStatus, currentDbName, comment,
createUser, jobConfig, System.currentTimeMillis(), null, null);
}
Expand Down Expand Up @@ -137,6 +138,7 @@ public void cancelAllTasks() throws JobException {
for (T task : runningTasks) {
task.cancel();
}
runningTasks = new ArrayList<>();
}

private static final ImmutableList<String> TITLE_NAMES =
Expand All @@ -163,6 +165,7 @@ public void cancelTaskById(long taskId) throws JobException {
}
runningTasks.stream().filter(task -> task.getTaskId().equals(taskId)).findFirst()
.orElseThrow(() -> new JobException("no task id: " + taskId)).cancel();
runningTasks.removeIf(task -> task.getTaskId().equals(taskId));
if (jobConfig.getExecuteType().equals(JobExecuteType.ONE_TIME)) {
updateJobStatus(JobStatus.FINISHED);
}
Expand Down Expand Up @@ -364,10 +367,12 @@ public ShowResultSetMetaData getJobMetaData() {
}

@Override
public void onRegister() throws JobException {}
public void onRegister() throws JobException {
}

@Override
public void onUnRegister() throws JobException {}
public void onUnRegister() throws JobException {
}

@Override
public void onReplayCreate() throws JobException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -197,13 +197,13 @@ public InsertJob(String jobName,
}

public InsertJob(ConnectContext ctx,
StmtExecutor executor,
String labelName,
List<InsertIntoTableCommand> plans,
Set<String> sinkTableNames,
Map<String, String> properties,
String comment,
JobExecutionConfiguration jobConfig) {
StmtExecutor executor,
String labelName,
List<InsertIntoTableCommand> plans,
Set<String> sinkTableNames,
Map<String, String> properties,
String comment,
JobExecutionConfiguration jobConfig) {
super(getNextJobId(), labelName, JobStatus.RUNNING, null,
comment, ctx.getCurrentUserIdentity(), jobConfig);
this.ctx = ctx;
Expand All @@ -219,6 +219,12 @@ public InsertJob(ConnectContext ctx,

@Override
public List<InsertTask> createTasks(TaskType taskType, Map<Object, Object> taskContext) {
if (null == plans) {
plans = new ArrayList<>();
}
if (null == idToTasks) {
idToTasks = new ConcurrentHashMap<>();
}
if (plans.isEmpty()) {
InsertTask task = new InsertTask(labelName, getCurrentDbName(), getExecuteSql(), getCreateUser());
idToTasks.put(task.getTaskId(), task);
Expand Down Expand Up @@ -462,9 +468,16 @@ public long getTimeout() {


public static InsertJob readFields(DataInput in) throws IOException {
//fix me : some field is not set and this method is not used
String jsonJob = Text.readString(in);
InsertJob job = GsonUtils.GSON.fromJson(jsonJob, InsertJob.class);
job.setRunningTasks(new ArrayList<>());
if (null == job.plans) {
job.plans = new ArrayList<>();
}
if (null == job.idToTasks) {
job.idToTasks = new ConcurrentHashMap<>();
}
return job;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,7 @@ public List<? extends AbstractTask> queryTasks(Long jobId) throws JobException {
}

public void triggerJob(long jobId, C context) throws JobException {
log.info("trigger job, job id is {}", jobId);
checkJobExist(jobId);
jobScheduler.schedulerInstantJob(jobMap.get(jobId), TaskType.MANUAL, context);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,15 +154,19 @@ private void cycleTimerJobScheduler(T job, long startTimeWindowMs) {
public void schedulerInstantJob(T job, TaskType taskType, C context) {
List<? extends AbstractTask> tasks = job.commonCreateTasks(taskType, context);
if (CollectionUtils.isEmpty(tasks)) {
log.info("job create task is empty, skip scheduler, job id is {},job name is {}", job.getJobId(),
log.info("job create task is empty, skip scheduler, job id is {}, job name is {}", job.getJobId(),
job.getJobName());
if (job.getJobConfig().getExecuteType().equals(JobExecuteType.INSTANT)) {
job.setJobStatus(JobStatus.FINISHED);
}
return;
}
tasks.forEach(task -> taskDisruptorGroupManager.dispatchInstantTask(task, job.getJobType(),
job.getJobConfig()));
tasks.forEach(task -> {
taskDisruptorGroupManager.dispatchInstantTask(task, job.getJobType(),
job.getJobConfig());
log.info("dispatch instant job, job id is {}, job name is {}, task id is {}", job.getJobId(),
job.getJobName(), task.getTaskId());
});

}

Expand Down
72 changes: 50 additions & 22 deletions regression-test/suites/job_p0/test_base_insert_job.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,10 @@ suite("test_base_insert_job") {
DROP JOB where jobname = '${jobMixedName}'
"""

sql """
DROP JOB where jobname = '${jobName}'
"""

sql """
CREATE TABLE IF NOT EXISTS `${tableName}`
(
Expand All @@ -69,7 +73,7 @@ suite("test_base_insert_job") {
Thread.sleep(2500)
def jobs = sql """select * from ${tableName}"""
println jobs
assert 3>=jobs.size() >= (2 as Boolean) //at least 2 records, some times 3 records
assert 3 >= jobs.size() >= (2 as Boolean) //at least 2 records, some times 3 records
sql """
CREATE JOB ${jobMixedName} ON SCHEDULE every 1 second DO insert into ${tableName} (timestamp, type, user_id) values ('2023-03-18','1','12213');
"""
Expand All @@ -83,7 +87,7 @@ suite("test_base_insert_job") {
sql """
DROP JOB where jobname = '${jobMixedName}'
"""

sql """drop table if exists `${tableName}` force """
sql """
CREATE TABLE IF NOT EXISTS `${tableName}`
Expand All @@ -99,11 +103,11 @@ suite("test_base_insert_job") {
);
"""
// Enlarge this parameter to avoid other factors that cause time verification to fail when submitting.
def currentMs=System.currentTimeMillis()+20000;
def dateTime = LocalDateTime.ofInstant(Instant.ofEpochMilli(currentMs), ZoneId.systemDefault());
def currentMs = System.currentTimeMillis() + 20000;
def dateTime = LocalDateTime.ofInstant(Instant.ofEpochMilli(currentMs), ZoneId.systemDefault());

def formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
def startTime= dateTime.format(formatter);
def startTime = dateTime.format(formatter);
def dataCount = sql """select count(*) from ${tableName}"""
assert dataCount.get(0).get(0) == 0
sql """
Expand All @@ -113,8 +117,8 @@ suite("test_base_insert_job") {
Thread.sleep(25000)
def onceJob = sql """ select id,ExecuteSql from jobs("type"="insert") where Name like '%${jobName}%' and ExecuteType='ONE_TIME' """
assert onceJob.size() == 1
def onceJobId= onceJob.get(0).get(0);
def onceJobSql= onceJob.get(0).get(1);
def onceJobId = onceJob.get(0).get(0);
def onceJobSql = onceJob.get(0).get(1);
println onceJobSql
def assertSql = "insert into ${tableName} values (\'2023-07-19\', sleep(10000), 1001);"
println assertSql
Expand All @@ -126,7 +130,7 @@ suite("test_base_insert_job") {
assert datas.get(0).get(0) == "RUNNING"
def taskId = datas.get(0).get(1)
sql """cancel task where jobName='${jobName}' and taskId= ${taskId}"""
def cancelTask = sql """ select status from tasks("type"="insert") where jobid= ${onceJobId}"""
def cancelTask = sql """ select status from tasks("type"="insert") where jobid= ${onceJobId}"""
println cancelTask
//check task status
assert cancelTask.size() == 1
Expand All @@ -135,71 +139,95 @@ suite("test_base_insert_job") {
def dataCount1 = sql """select count(1) from ${tableName}"""
assert dataCount1.get(0).get(0) == 0
// check job status
def oncejob=sql """select status,comment from jobs("type"="insert") where Name='${jobName}' """
def oncejob = sql """select status,comment from jobs("type"="insert") where Name='${jobName}' """
println oncejob
assert oncejob.get(0).get(0) == "FINISHED"
//assert comment
assert oncejob.get(0).get(1) == "test for test&68686781jbjbhj//ncsa"

sql """
DROP JOB where jobname = '${jobName}'
"""

sql """
CREATE JOB ${jobName} ON SCHEDULE every 1 second comment 'test for test&68686781jbjbhj//ncsa' DO insert into ${tableName} values ('2023-07-19', sleep(10000), 1001);
"""

Thread.sleep(2500)

sql """
PAUSE JOB where jobname = '${jobName}'
"""
def job = sql """ select id,ExecuteSql from jobs("type"="insert") where Name like '%${jobName}%' """
assert job.size() == 1
def jobId = job.get(0).get(0);
def tasks = sql """ select status from tasks("type"="insert") where jobid= ${jobId} """
assert tasks.size() == 1
sql """
RESUME JOB where jobname = '${jobName}'
"""
Thread.sleep(2500)
def resumeTasks = sql """ select status from tasks("type"="insert") where jobid= ${jobId} """
println resumeTasks
assert resumeTasks.size() == 2
// assert same job name
try {
sql """
CREATE JOB ${jobName} ON SCHEDULE EVERY 10 second comment 'test for test&68686781jbjbhj//ncsa' DO insert into ${tableName} values ('2023-07-19', sleep(10000), 1001);
"""
}catch (Exception e) {
} catch (Exception e) {
assert e.getMessage().contains("job name exist, jobName:insert_recovery_test_base_insert_job")
}
def errorTblName="${tableName}qwertyuioppoiuyte"
def errorTblName = "${tableName}qwertyuioppoiuyte"
sql """drop table if exists `${errorTblName}` force"""
// assert error table name
try {
sql """
CREATE JOB ${jobName} ON SCHEDULE EVERY 10 second comment 'test for test&68686781jbjbhj//ncsa' DO insert into ${errorTblName} values ('2023-07-19', sleep(10000), 1001);
"""
}catch (Exception e) {
} catch (Exception e) {
assert e.getMessage().contains("Unknown table 't_test_BASE_inSert_jobqwertyuioppoiuyte'")
}
// assert not support stmt
try{
try {
sql """
CREATE JOB ${jobName} ON SCHEDULE at '${startTime}' comment 'test' DO update ${tableName} set type=2 where type=1;
"""
} catch (Exception e) {
assert e.getMessage().contains("Not support UpdateStmt type in job")
}
// assert start time greater than current time
try{
try {
sql """
CREATE JOB ${jobName} ON SCHEDULE at '${startTime}' comment 'test' DO insert into ${tableName} (timestamp, type, user_id) values ('2023-03-18','1','12213');
"""
} catch (Exception e) {
assert e.getMessage().contains("startTimeMs must be greater than current time")
}
// assert end time less than start time
try{
try {
sql """
CREATE JOB test_one_time_error_starts ON SCHEDULE at '2023-11-13 14:18:07' comment 'test' DO insert into ${tableName} (timestamp, type, user_id) values ('2023-03-18','1','12213');
"""
} catch (Exception e) {
assert e.getMessage().contains("startTimeMs must be greater than current time")
}
try{
try {
sql """
CREATE JOB inner_test ON SCHEDULE at '2023-11-13 14:18:07' comment 'test' DO insert into ${tableName} (timestamp, type, user_id) values ('2023-03-18','1','12213');
"""
} catch (Exception e) {
assert e.getMessage().contains("job name can not start with inner_")
}
// assert end time less than start time
try{
try {
sql """
CREATE JOB test_error_starts ON SCHEDULE every 1 second ends '2023-11-13 14:18:07' comment 'test' DO insert into ${tableName} (timestamp, type, user_id) values ('2023-03-18','1','12213');
"""
} catch (Exception e) {
assert e.getMessage().contains("end time cannot be less than start time")
}
// assert interval time unit can not be years
try{
try {
sql """
CREATE JOB test_error_starts ON SCHEDULE every 1 years ends '2023-11-13 14:18:07' comment 'test' DO insert into ${tableName} (timestamp, type, user_id) values ('2023-03-18','1','12213');
"""
Expand Down Expand Up @@ -228,8 +256,8 @@ suite("test_base_insert_job") {
sql """
CREATE JOB ENDS ON SCHEDULE every 20 second comment 'test' DO insert into ${tableName} (timestamp, type, user_id) values ('2023-03-18','1','12213');
"""
def jobCountRsp = sql"""select count(1) from jobs("type"="insert") where name in ('JOB','DO','SCHEDULE','AT','STARTS','ENDS')"""

def jobCountRsp = sql """select count(1) from jobs("type"="insert") where name in ('JOB','DO','SCHEDULE','AT','STARTS','ENDS')"""
assert jobCountRsp.get(0).get(0) == 6

}

0 comments on commit 0d05f80

Please sign in to comment.