From 301604f73ca5188b9b626e483158dbc26413473e Mon Sep 17 00:00:00 2001 From: Calvin Kirs Date: Tue, 5 Dec 2023 12:25:07 +0800 Subject: [PATCH] add cancel task --- .../apache/doris/analysis/CreateJobStmt.java | 4 +- .../apache/doris/job/base/AbstractJob.java | 3 ++ .../suites/job_p0/test_base_insert_job.groovy | 43 +++++++++++++++---- 3 files changed, 40 insertions(+), 10 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateJobStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateJobStmt.java index 23cab5b7b55a7f..1a44ee4f90cf96 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateJobStmt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateJobStmt.java @@ -137,11 +137,11 @@ public void analyze(Analyzer analyzer) throws UserException { if (null != intervalTimeUnit) { IntervalUnit intervalUnit = IntervalUnit.fromString(intervalTimeUnit.toUpperCase()); if (null == intervalUnit) { - throw new AnalysisException("invalid interval time unit " + intervalTimeUnit); + throw new AnalysisException("interval time unit can not be " + intervalTimeUnit); } if (intervalUnit.equals(IntervalUnit.SECOND) && !Config.enable_job_schedule_second_for_test) { - throw new AnalysisException("interval time unit can not be week"); + throw new AnalysisException("interval time unit can not be second"); } timerDefinition.setIntervalUnit(intervalUnit); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/base/AbstractJob.java b/fe/fe-core/src/main/java/org/apache/doris/job/base/AbstractJob.java index 7b82f00b5bfea4..d1f016a307ebde 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/base/AbstractJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/base/AbstractJob.java @@ -114,6 +114,9 @@ public void cancelTaskById(long taskId) throws JobException { } runningTasks.stream().filter(task -> task.getTaskId().equals(taskId)).findFirst() .orElseThrow(() -> new JobException("no task id: " + taskId)).cancel(); + if (jobConfig.getExecuteType().equals(JobExecuteType.ONE_TIME)) { + updateJobStatus(JobStatus.FINISHED); + } } public List queryAllTasks() { diff --git a/regression-test/suites/job_p0/test_base_insert_job.groovy b/regression-test/suites/job_p0/test_base_insert_job.groovy index 71dc2f616303c9..b090ba623fd061 100644 --- a/regression-test/suites/job_p0/test_base_insert_job.groovy +++ b/regression-test/suites/job_p0/test_base_insert_job.groovy @@ -65,26 +65,43 @@ suite("test_base_insert_job") { "replication_allocation" = "tag.location.default: 1" ); """ - def currentMs=System.currentTimeMillis()+1000; + def currentMs=System.currentTimeMillis()+2000; def dateTime = LocalDateTime.ofInstant(Instant.ofEpochMilli(currentMs), ZoneId.systemDefault()); def formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"); def startTime= dateTime.format(formatter); + def dataCount = sql """select count(*) from ${tableName}""" + assert dataCount.get(0).get(0) == 0 sql """ - CREATE JOB ${jobName} ON SCHEDULER at '${startTime}' comment 'test' DO insert into ${tableName} (timestamp, type, user_id) values ('2023-03-18','1','12213'); + CREATE JOB ${jobName} ON SCHEDULER at '${startTime}' comment 'test' DO insert into ${tableName} values ('2023-07-19', sleep(1000), 1001); """ - Thread.sleep(3000*60) - + Thread.sleep(1000*60*2) + + // test cancel task def datas = sql """show job tasks for ${jobName}""" println datas assert datas.size() == 1 + println datas.get(0).get(2) + assert datas.get(0).get(2) == "RUNNING" + def taskId = datas.get(0).get(0) + sql """cancel task where jobName='${jobName}' and taskId= ${taskId}""" + def cancelTask = sql """ show job tasks for ${jobName}""" + println cancelTask + assert cancelTask.size() == 1 + assert cancelTask.get(0).get(2) == "CANCELED" + def dataCount1 = sql """select count(*) from ${tableName}""" + assert dataCount1.get(0).get(0) == 0 + def oncejob=sql """show job for ${jobName} """ + println oncejob + assert oncejob.get(0).get(5) == "FINISHED" + try{ sql """ CREATE JOB ${jobName} ON SCHEDULER at '${startTime}' comment 'test' DO insert into ${tableName} (timestamp, type, user_id) values ('2023-03-18','1','12213'); """ } catch (Exception e) { - assert true + assert e.getMessage().contains("startTimeMs must be greater than current time") } sql """ DROP JOB where jobname = 'test_one_time_error_starts' @@ -94,7 +111,7 @@ suite("test_base_insert_job") { CREATE JOB test_one_time_error_starts ON SCHEDULER at '2023-11-13 14:18:07' comment 'test' DO insert into ${tableName} (timestamp, type, user_id) values ('2023-03-18','1','12213'); """ } catch (Exception e) { - assert true + assert e.getMessage().contains("startTimeMs must be greater than current time") } sql """ DROP JOB where jobname = 'test_error_starts' @@ -104,8 +121,18 @@ suite("test_base_insert_job") { CREATE JOB test_error_starts ON SCHEDULER every 1 second ends '2023-11-13 14:18:07' comment 'test' DO insert into ${tableName} (timestamp, type, user_id) values ('2023-03-18','1','12213'); """ } catch (Exception e) { - assert true + assert e.getMessage().contains("interval time unit can not be second") } - + sql """ + DROP JOB where jobname = 'test_error_starts' + """ + try{ + sql """ + CREATE JOB test_error_starts ON SCHEDULER every 1 years ends '2023-11-13 14:18:07' comment 'test' DO insert into ${tableName} (timestamp, type, user_id) values ('2023-03-18','1','12213'); + """ + } catch (Exception e) { + assert e.getMessage().contains("interval time unit can not be years") + } + }