From fd7fbaecc2b9042e7d82af72fb9c9ea4f92ae675 Mon Sep 17 00:00:00 2001 From: Calvin Kirs Date: Tue, 17 Dec 2024 08:56:34 +0800 Subject: [PATCH] =?UTF-8?q?[Fix](Job)Fix=20redundant=20job=20scheduling=20?= =?UTF-8?q?by=20preventing=20same=20state=20transitions=20(e.g.,=20RUNNING?= =?UTF-8?q?=20=E2=86=92=20RUNNING)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit In the current job scheduling logic, invalid state transitions (e.g., RUNNING to RUNNING) are not filtered, which causes redundant scheduling during resume operations. This PR adds a check to ensure that jobs cannot transition to the same state, preventing duplicate scheduling triggers and improving state consistency. --- .../apache/doris/job/manager/JobManager.java | 56 ++++++++++--------- .../suites/job_p0/test_base_insert_job.groovy | 5 ++ 2 files changed, 35 insertions(+), 26 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/manager/JobManager.java b/fe/fe-core/src/main/java/org/apache/doris/job/manager/JobManager.java index 2a957775e113b8..ae35970fa53913 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/manager/JobManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/manager/JobManager.java @@ -201,9 +201,13 @@ public void alterJobStatus(String jobName, JobStatus jobStatus) throws JobExcept for (T a : jobMap.values()) { if (a.getJobName().equals(jobName)) { try { + if (jobStatus.equals(a.getJobStatus())) { + throw new JobException("Can't change job status to the same status"); + } alterJobStatus(a.getJobId(), jobStatus); } catch (JobException e) { - throw new JobException("unregister job error, jobName:" + jobName); + throw new JobException("Alter job status error, jobName is %cds, errorMsg is %s", + jobName, e.getMessage()); } } } @@ -544,8 +548,8 @@ public void cancelLoadJob(String dbName, String label, String state, } // check state here unfinishedLoadJob = - matchLoadJobs.stream().filter(InsertJob::isRunning) - .collect(Collectors.toList()); + matchLoadJobs.stream().filter(InsertJob::isRunning) + .collect(Collectors.toList()); if (unfinishedLoadJob.isEmpty()) { throw new JobException("There is no uncompleted job"); } @@ -556,7 +560,7 @@ public void cancelLoadJob(String dbName, String label, String state, if (unfinishedLoadJob.size() > 1 || unfinishedLoadJob.get(0).getTableNames().isEmpty()) { if (Env.getCurrentEnv().getAccessManager() .checkDbPriv(ConnectContext.get(), InternalCatalog.INTERNAL_CATALOG_NAME, dbName, - PrivPredicate.LOAD)) { + PrivPredicate.LOAD)) { ErrorReport.reportAnalysisException(ErrorCode.ERR_DBACCESS_DENIED_ERROR, "LOAD", ConnectContext.get().getQualifiedUser(), ConnectContext.get().getRemoteIP(), dbName); @@ -565,8 +569,8 @@ public void cancelLoadJob(String dbName, String label, String state, for (String tableName : unfinishedLoadJob.get(0).getTableNames()) { if (Env.getCurrentEnv().getAccessManager() .checkTblPriv(ConnectContext.get(), InternalCatalog.INTERNAL_CATALOG_NAME, dbName, - tableName, - PrivPredicate.LOAD)) { + tableName, + PrivPredicate.LOAD)) { ErrorReport.reportAnalysisException(ErrorCode.ERR_TABLEACCESS_DENIED_ERROR, "LOAD", ConnectContext.get().getQualifiedUser(), ConnectContext.get().getRemoteIP(), dbName + ":" + tableName); @@ -591,26 +595,26 @@ private static void addNeedCancelLoadJob(String label, String state, CaseSensibility.LABEL.getCaseSensibility()); matchLoadJobs.addAll( loadJobs.stream() - .filter(job -> !job.isCancelled()) - .filter(job -> { - if (operator != null) { - // compound - boolean labelFilter = - label.contains("%") ? matcher.match(job.getLabelName()) - : job.getLabelName().equalsIgnoreCase(label); - boolean stateFilter = job.getJobStatus().name().equalsIgnoreCase(state); - return operator instanceof And ? labelFilter && stateFilter : - labelFilter || stateFilter; - } - if (StringUtils.isNotEmpty(label)) { - return label.contains("%") ? matcher.match(job.getLabelName()) - : job.getLabelName().equalsIgnoreCase(label); - } - if (StringUtils.isNotEmpty(state)) { - return job.getJobStatus().name().equalsIgnoreCase(state); - } - return false; - }).collect(Collectors.toList()) + .filter(job -> !job.isCancelled()) + .filter(job -> { + if (operator != null) { + // compound + boolean labelFilter = + label.contains("%") ? matcher.match(job.getLabelName()) + : job.getLabelName().equalsIgnoreCase(label); + boolean stateFilter = job.getJobStatus().name().equalsIgnoreCase(state); + return operator instanceof And ? labelFilter && stateFilter : + labelFilter || stateFilter; + } + if (StringUtils.isNotEmpty(label)) { + return label.contains("%") ? matcher.match(job.getLabelName()) + : job.getLabelName().equalsIgnoreCase(label); + } + if (StringUtils.isNotEmpty(state)) { + return job.getJobStatus().name().equalsIgnoreCase(state); + } + return false; + }).collect(Collectors.toList()) ); } } diff --git a/regression-test/suites/job_p0/test_base_insert_job.groovy b/regression-test/suites/job_p0/test_base_insert_job.groovy index 33ae28443b290a..e245800aae2b24 100644 --- a/regression-test/suites/job_p0/test_base_insert_job.groovy +++ b/regression-test/suites/job_p0/test_base_insert_job.groovy @@ -190,6 +190,11 @@ suite("test_base_insert_job") { // check job status and succeed task count is 1 pressJob.size() == 1 && '1' == onceJob.get(0).get(0) }) + assertThrows(Exception) { + sql """ + RESUME JOB where jobName='press' + """ + } sql """ DROP JOB IF EXISTS where jobname = 'past_start_time'