From 409c21de489bb09a7905b86c5d720df81888922e Mon Sep 17 00:00:00 2001 From: Calvin Kirs Date: Thu, 12 Dec 2024 14:20:44 +0800 Subject: [PATCH] restart fe --- .../apache/doris/job/base/AbstractJob.java | 11 ++++ .../apache/doris/job/manager/JobManager.java | 58 +++++++++++-------- 2 files changed, 44 insertions(+), 25 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/base/AbstractJob.java b/fe/fe-core/src/main/java/org/apache/doris/job/base/AbstractJob.java index 906b86494fb748..24e6bbc7fe3b55 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/base/AbstractJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/base/AbstractJob.java @@ -461,4 +461,15 @@ public void onReplayCreate() throws JobException { public void onReplayEnd(AbstractJob replayJob) throws JobException { log.info(new LogBuilder(LogKey.SCHEDULER_JOB, getJobId()).add("msg", "replay delete scheduler job").build()); } + + private void updateTaskStatusAfterRestart() { + if (CollectionUtils.isEmpty(runningTasks)) { + return; + } + runningTasks.forEach(task -> { + if (task.getStatus().equals(TaskStatus.RUNNING)) { + task.setStatus(TaskStatus.PENDING); + } + }); + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/manager/JobManager.java b/fe/fe-core/src/main/java/org/apache/doris/job/manager/JobManager.java index 2a957775e113b8..3ce2c4cc1928c2 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/manager/JobManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/manager/JobManager.java @@ -92,6 +92,14 @@ public void start() { jobScheduler.start(); } + private void clearTaskStatusWhenFeRestart() { + List runningJobs = jobMap.values().stream() + .filter(job -> job.getJobStatus().equals(JobStatus.RUNNING)).collect(Collectors.toList()); + for (T job : runningJobs) { + + } + } + /** * get running job @@ -544,8 +552,8 @@ public void cancelLoadJob(String dbName, String label, String state, } // check state here unfinishedLoadJob = - matchLoadJobs.stream().filter(InsertJob::isRunning) - .collect(Collectors.toList()); + matchLoadJobs.stream().filter(InsertJob::isRunning) + .collect(Collectors.toList()); if (unfinishedLoadJob.isEmpty()) { throw new JobException("There is no uncompleted job"); } @@ -556,7 +564,7 @@ public void cancelLoadJob(String dbName, String label, String state, if (unfinishedLoadJob.size() > 1 || unfinishedLoadJob.get(0).getTableNames().isEmpty()) { if (Env.getCurrentEnv().getAccessManager() .checkDbPriv(ConnectContext.get(), InternalCatalog.INTERNAL_CATALOG_NAME, dbName, - PrivPredicate.LOAD)) { + PrivPredicate.LOAD)) { ErrorReport.reportAnalysisException(ErrorCode.ERR_DBACCESS_DENIED_ERROR, "LOAD", ConnectContext.get().getQualifiedUser(), ConnectContext.get().getRemoteIP(), dbName); @@ -565,8 +573,8 @@ public void cancelLoadJob(String dbName, String label, String state, for (String tableName : unfinishedLoadJob.get(0).getTableNames()) { if (Env.getCurrentEnv().getAccessManager() .checkTblPriv(ConnectContext.get(), InternalCatalog.INTERNAL_CATALOG_NAME, dbName, - tableName, - PrivPredicate.LOAD)) { + tableName, + PrivPredicate.LOAD)) { ErrorReport.reportAnalysisException(ErrorCode.ERR_TABLEACCESS_DENIED_ERROR, "LOAD", ConnectContext.get().getQualifiedUser(), ConnectContext.get().getRemoteIP(), dbName + ":" + tableName); @@ -591,26 +599,26 @@ private static void addNeedCancelLoadJob(String label, String state, CaseSensibility.LABEL.getCaseSensibility()); matchLoadJobs.addAll( loadJobs.stream() - .filter(job -> !job.isCancelled()) - .filter(job -> { - if (operator != null) { - // compound - boolean labelFilter = - label.contains("%") ? matcher.match(job.getLabelName()) - : job.getLabelName().equalsIgnoreCase(label); - boolean stateFilter = job.getJobStatus().name().equalsIgnoreCase(state); - return operator instanceof And ? labelFilter && stateFilter : - labelFilter || stateFilter; - } - if (StringUtils.isNotEmpty(label)) { - return label.contains("%") ? matcher.match(job.getLabelName()) - : job.getLabelName().equalsIgnoreCase(label); - } - if (StringUtils.isNotEmpty(state)) { - return job.getJobStatus().name().equalsIgnoreCase(state); - } - return false; - }).collect(Collectors.toList()) + .filter(job -> !job.isCancelled()) + .filter(job -> { + if (operator != null) { + // compound + boolean labelFilter = + label.contains("%") ? matcher.match(job.getLabelName()) + : job.getLabelName().equalsIgnoreCase(label); + boolean stateFilter = job.getJobStatus().name().equalsIgnoreCase(state); + return operator instanceof And ? labelFilter && stateFilter : + labelFilter || stateFilter; + } + if (StringUtils.isNotEmpty(label)) { + return label.contains("%") ? matcher.match(job.getLabelName()) + : job.getLabelName().equalsIgnoreCase(label); + } + if (StringUtils.isNotEmpty(state)) { + return job.getJobStatus().name().equalsIgnoreCase(state); + } + return false; + }).collect(Collectors.toList()) ); } }