From 0d05f805d0fb170fea677bcae30fe1933caee613 Mon Sep 17 00:00:00 2001 From: Calvin Kirs Date: Wed, 27 Dec 2023 10:47:22 +0800 Subject: [PATCH] [Fix](Job)cancel task is not cleared from running task Insert task is not work --- .../apache/doris/job/base/AbstractJob.java | 19 +++-- .../job/extensions/insert/InsertJob.java | 27 +++++-- .../apache/doris/job/manager/JobManager.java | 1 + .../doris/job/scheduler/JobScheduler.java | 10 ++- .../suites/job_p0/test_base_insert_job.groovy | 72 +++++++++++++------ 5 files changed, 90 insertions(+), 39 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/base/AbstractJob.java b/fe/fe-core/src/main/java/org/apache/doris/job/base/AbstractJob.java index 6e9cb48da1ca44..bc8f921d841dbe 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/base/AbstractJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/base/AbstractJob.java @@ -89,7 +89,8 @@ public abstract class AbstractJob implements Job(); } private static final ImmutableList TITLE_NAMES = @@ -163,6 +165,7 @@ public void cancelTaskById(long taskId) throws JobException { } runningTasks.stream().filter(task -> task.getTaskId().equals(taskId)).findFirst() .orElseThrow(() -> new JobException("no task id: " + taskId)).cancel(); + runningTasks.removeIf(task -> task.getTaskId().equals(taskId)); if (jobConfig.getExecuteType().equals(JobExecuteType.ONE_TIME)) { updateJobStatus(JobStatus.FINISHED); } @@ -364,10 +367,12 @@ public ShowResultSetMetaData getJobMetaData() { } @Override - public void onRegister() throws JobException {} + public void onRegister() throws JobException { + } @Override - public void onUnRegister() throws JobException {} + public void onUnRegister() throws JobException { + } @Override public void onReplayCreate() throws JobException { diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertJob.java b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertJob.java index 9256864efcacc1..c3f22f82163a31 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertJob.java @@ -197,13 +197,13 @@ public InsertJob(String jobName, } public InsertJob(ConnectContext ctx, - StmtExecutor executor, - String labelName, - List plans, - Set sinkTableNames, - Map properties, - String comment, - JobExecutionConfiguration jobConfig) { + StmtExecutor executor, + String labelName, + List plans, + Set sinkTableNames, + Map properties, + String comment, + JobExecutionConfiguration jobConfig) { super(getNextJobId(), labelName, JobStatus.RUNNING, null, comment, ctx.getCurrentUserIdentity(), jobConfig); this.ctx = ctx; @@ -219,6 +219,12 @@ public InsertJob(ConnectContext ctx, @Override public List createTasks(TaskType taskType, Map taskContext) { + if (null == plans) { + plans = new ArrayList<>(); + } + if (null == idToTasks) { + idToTasks = new ConcurrentHashMap<>(); + } if (plans.isEmpty()) { InsertTask task = new InsertTask(labelName, getCurrentDbName(), getExecuteSql(), getCreateUser()); idToTasks.put(task.getTaskId(), task); @@ -462,9 +468,16 @@ public long getTimeout() { public static InsertJob readFields(DataInput in) throws IOException { + //fix me : some field is not set and this method is not used String jsonJob = Text.readString(in); InsertJob job = GsonUtils.GSON.fromJson(jsonJob, InsertJob.class); job.setRunningTasks(new ArrayList<>()); + if (null == job.plans) { + job.plans = new ArrayList<>(); + } + if (null == job.idToTasks) { + job.idToTasks = new ConcurrentHashMap<>(); + } return job; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/manager/JobManager.java b/fe/fe-core/src/main/java/org/apache/doris/job/manager/JobManager.java index 814d6b773ad1ef..dae75b2b43ec3e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/manager/JobManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/manager/JobManager.java @@ -220,6 +220,7 @@ public List queryTasks(Long jobId) throws JobException { } public void triggerJob(long jobId, C context) throws JobException { + log.info("trigger job, job id is {}", jobId); checkJobExist(jobId); jobScheduler.schedulerInstantJob(jobMap.get(jobId), TaskType.MANUAL, context); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/scheduler/JobScheduler.java b/fe/fe-core/src/main/java/org/apache/doris/job/scheduler/JobScheduler.java index 08bbbb6dbaba9a..8dc58f07057397 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/scheduler/JobScheduler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/scheduler/JobScheduler.java @@ -154,15 +154,19 @@ private void cycleTimerJobScheduler(T job, long startTimeWindowMs) { public void schedulerInstantJob(T job, TaskType taskType, C context) { List tasks = job.commonCreateTasks(taskType, context); if (CollectionUtils.isEmpty(tasks)) { - log.info("job create task is empty, skip scheduler, job id is {},job name is {}", job.getJobId(), + log.info("job create task is empty, skip scheduler, job id is {}, job name is {}", job.getJobId(), job.getJobName()); if (job.getJobConfig().getExecuteType().equals(JobExecuteType.INSTANT)) { job.setJobStatus(JobStatus.FINISHED); } return; } - tasks.forEach(task -> taskDisruptorGroupManager.dispatchInstantTask(task, job.getJobType(), - job.getJobConfig())); + tasks.forEach(task -> { + taskDisruptorGroupManager.dispatchInstantTask(task, job.getJobType(), + job.getJobConfig()); + log.info("dispatch instant job, job id is {}, job name is {}, task id is {}", job.getJobId(), + job.getJobName(), task.getTaskId()); + }); } diff --git a/regression-test/suites/job_p0/test_base_insert_job.groovy b/regression-test/suites/job_p0/test_base_insert_job.groovy index b41bf50738b150..3ef2f86b8ebf5f 100644 --- a/regression-test/suites/job_p0/test_base_insert_job.groovy +++ b/regression-test/suites/job_p0/test_base_insert_job.groovy @@ -50,6 +50,10 @@ suite("test_base_insert_job") { DROP JOB where jobname = '${jobMixedName}' """ + sql """ + DROP JOB where jobname = '${jobName}' + """ + sql """ CREATE TABLE IF NOT EXISTS `${tableName}` ( @@ -69,7 +73,7 @@ suite("test_base_insert_job") { Thread.sleep(2500) def jobs = sql """select * from ${tableName}""" println jobs - assert 3>=jobs.size() >= (2 as Boolean) //at least 2 records, some times 3 records + assert 3 >= jobs.size() >= (2 as Boolean) //at least 2 records, some times 3 records sql """ CREATE JOB ${jobMixedName} ON SCHEDULE every 1 second DO insert into ${tableName} (timestamp, type, user_id) values ('2023-03-18','1','12213'); """ @@ -83,7 +87,7 @@ suite("test_base_insert_job") { sql """ DROP JOB where jobname = '${jobMixedName}' """ - + sql """drop table if exists `${tableName}` force """ sql """ CREATE TABLE IF NOT EXISTS `${tableName}` @@ -99,11 +103,11 @@ suite("test_base_insert_job") { ); """ // Enlarge this parameter to avoid other factors that cause time verification to fail when submitting. - def currentMs=System.currentTimeMillis()+20000; - def dateTime = LocalDateTime.ofInstant(Instant.ofEpochMilli(currentMs), ZoneId.systemDefault()); + def currentMs = System.currentTimeMillis() + 20000; + def dateTime = LocalDateTime.ofInstant(Instant.ofEpochMilli(currentMs), ZoneId.systemDefault()); def formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"); - def startTime= dateTime.format(formatter); + def startTime = dateTime.format(formatter); def dataCount = sql """select count(*) from ${tableName}""" assert dataCount.get(0).get(0) == 0 sql """ @@ -113,8 +117,8 @@ suite("test_base_insert_job") { Thread.sleep(25000) def onceJob = sql """ select id,ExecuteSql from jobs("type"="insert") where Name like '%${jobName}%' and ExecuteType='ONE_TIME' """ assert onceJob.size() == 1 - def onceJobId= onceJob.get(0).get(0); - def onceJobSql= onceJob.get(0).get(1); + def onceJobId = onceJob.get(0).get(0); + def onceJobSql = onceJob.get(0).get(1); println onceJobSql def assertSql = "insert into ${tableName} values (\'2023-07-19\', sleep(10000), 1001);" println assertSql @@ -126,7 +130,7 @@ suite("test_base_insert_job") { assert datas.get(0).get(0) == "RUNNING" def taskId = datas.get(0).get(1) sql """cancel task where jobName='${jobName}' and taskId= ${taskId}""" - def cancelTask = sql """ select status from tasks("type"="insert") where jobid= ${onceJobId}""" + def cancelTask = sql """ select status from tasks("type"="insert") where jobid= ${onceJobId}""" println cancelTask //check task status assert cancelTask.size() == 1 @@ -135,32 +139,56 @@ suite("test_base_insert_job") { def dataCount1 = sql """select count(1) from ${tableName}""" assert dataCount1.get(0).get(0) == 0 // check job status - def oncejob=sql """select status,comment from jobs("type"="insert") where Name='${jobName}' """ + def oncejob = sql """select status,comment from jobs("type"="insert") where Name='${jobName}' """ println oncejob assert oncejob.get(0).get(0) == "FINISHED" //assert comment assert oncejob.get(0).get(1) == "test for test&68686781jbjbhj//ncsa" - + sql """ + DROP JOB where jobname = '${jobName}' + """ + + sql """ + CREATE JOB ${jobName} ON SCHEDULE every 1 second comment 'test for test&68686781jbjbhj//ncsa' DO insert into ${tableName} values ('2023-07-19', sleep(10000), 1001); + """ + + Thread.sleep(2500) + + sql """ + PAUSE JOB where jobname = '${jobName}' + """ + def job = sql """ select id,ExecuteSql from jobs("type"="insert") where Name like '%${jobName}%' """ + assert job.size() == 1 + def jobId = job.get(0).get(0); + def tasks = sql """ select status from tasks("type"="insert") where jobid= ${jobId} """ + assert tasks.size() == 1 + sql """ + RESUME JOB where jobname = '${jobName}' + """ + Thread.sleep(2500) + def resumeTasks = sql """ select status from tasks("type"="insert") where jobid= ${jobId} """ + println resumeTasks + assert resumeTasks.size() == 2 // assert same job name try { sql """ CREATE JOB ${jobName} ON SCHEDULE EVERY 10 second comment 'test for test&68686781jbjbhj//ncsa' DO insert into ${tableName} values ('2023-07-19', sleep(10000), 1001); """ - }catch (Exception e) { + } catch (Exception e) { assert e.getMessage().contains("job name exist, jobName:insert_recovery_test_base_insert_job") } - def errorTblName="${tableName}qwertyuioppoiuyte" + def errorTblName = "${tableName}qwertyuioppoiuyte" sql """drop table if exists `${errorTblName}` force""" // assert error table name try { sql """ CREATE JOB ${jobName} ON SCHEDULE EVERY 10 second comment 'test for test&68686781jbjbhj//ncsa' DO insert into ${errorTblName} values ('2023-07-19', sleep(10000), 1001); """ - }catch (Exception e) { + } catch (Exception e) { assert e.getMessage().contains("Unknown table 't_test_BASE_inSert_jobqwertyuioppoiuyte'") } // assert not support stmt - try{ + try { sql """ CREATE JOB ${jobName} ON SCHEDULE at '${startTime}' comment 'test' DO update ${tableName} set type=2 where type=1; """ @@ -168,7 +196,7 @@ suite("test_base_insert_job") { assert e.getMessage().contains("Not support UpdateStmt type in job") } // assert start time greater than current time - try{ + try { sql """ CREATE JOB ${jobName} ON SCHEDULE at '${startTime}' comment 'test' DO insert into ${tableName} (timestamp, type, user_id) values ('2023-03-18','1','12213'); """ @@ -176,14 +204,14 @@ suite("test_base_insert_job") { assert e.getMessage().contains("startTimeMs must be greater than current time") } // assert end time less than start time - try{ + try { sql """ CREATE JOB test_one_time_error_starts ON SCHEDULE at '2023-11-13 14:18:07' comment 'test' DO insert into ${tableName} (timestamp, type, user_id) values ('2023-03-18','1','12213'); """ } catch (Exception e) { assert e.getMessage().contains("startTimeMs must be greater than current time") } - try{ + try { sql """ CREATE JOB inner_test ON SCHEDULE at '2023-11-13 14:18:07' comment 'test' DO insert into ${tableName} (timestamp, type, user_id) values ('2023-03-18','1','12213'); """ @@ -191,7 +219,7 @@ suite("test_base_insert_job") { assert e.getMessage().contains("job name can not start with inner_") } // assert end time less than start time - try{ + try { sql """ CREATE JOB test_error_starts ON SCHEDULE every 1 second ends '2023-11-13 14:18:07' comment 'test' DO insert into ${tableName} (timestamp, type, user_id) values ('2023-03-18','1','12213'); """ @@ -199,7 +227,7 @@ suite("test_base_insert_job") { assert e.getMessage().contains("end time cannot be less than start time") } // assert interval time unit can not be years - try{ + try { sql """ CREATE JOB test_error_starts ON SCHEDULE every 1 years ends '2023-11-13 14:18:07' comment 'test' DO insert into ${tableName} (timestamp, type, user_id) values ('2023-03-18','1','12213'); """ @@ -228,8 +256,8 @@ suite("test_base_insert_job") { sql """ CREATE JOB ENDS ON SCHEDULE every 20 second comment 'test' DO insert into ${tableName} (timestamp, type, user_id) values ('2023-03-18','1','12213'); """ - - def jobCountRsp = sql"""select count(1) from jobs("type"="insert") where name in ('JOB','DO','SCHEDULE','AT','STARTS','ENDS')""" + + def jobCountRsp = sql """select count(1) from jobs("type"="insert") where name in ('JOB','DO','SCHEDULE','AT','STARTS','ENDS')""" assert jobCountRsp.get(0).get(0) == 6 - + }