From 47d0d58f23c011de76675f468e9db9216daa14d2 Mon Sep 17 00:00:00 2001 From: ErmiasG Date: Fri, 1 Dec 2023 13:31:14 +0100 Subject: [PATCH] [HWORKS-553] [HWORKS-865] fix np in yarn monitor (#1428) --- .../common/alert/AlertController.java | 35 ++++++++++++------- .../execution/ExecutionUpdateController.java | 34 +++++++++++++----- .../jobs/yarn/YarnExecutionFinalizer.java | 24 ++++++++----- .../common/jobs/yarn/YarnJobsMonitor.java | 6 ++-- 4 files changed, 66 insertions(+), 33 deletions(-) diff --git a/hopsworks-common/src/main/java/io/hops/hopsworks/common/alert/AlertController.java b/hopsworks-common/src/main/java/io/hops/hopsworks/common/alert/AlertController.java index 215b57e352..1d05e6ed7d 100644 --- a/hopsworks-common/src/main/java/io/hops/hopsworks/common/alert/AlertController.java +++ b/hopsworks-common/src/main/java/io/hops/hopsworks/common/alert/AlertController.java @@ -43,6 +43,7 @@ import io.hops.hopsworks.persistence.entity.jobs.configuration.history.JobState; import io.hops.hopsworks.persistence.entity.jobs.description.JobAlert; import io.hops.hopsworks.persistence.entity.jobs.description.JobAlertStatus; +import io.hops.hopsworks.persistence.entity.jobs.description.Jobs; import io.hops.hopsworks.persistence.entity.jobs.history.Execution; import io.hops.hopsworks.persistence.entity.project.Project; import io.hops.hopsworks.persistence.entity.project.alert.ProjectServiceAlert; @@ -79,8 +80,11 @@ public class AlertController { * @param execution */ public void sendAlert(JobState newState, Execution execution) { - List postableAlerts = getAlerts(newState, execution); - sendJobAlert(postableAlerts, execution.getJob().getProject(), execution.getJob().getName(), execution.getId()); + Jobs job = execution != null ? execution.getJob() : null; + if (job != null) { + List postableAlerts = getAlerts(newState, execution); + sendJobAlert(postableAlerts, job.getProject(), job.getName(), execution.getId()); + } } /** @@ -89,8 +93,11 @@ public void sendAlert(JobState newState, Execution execution) { * @param execution */ public void sendAlert(JobFinalStatus newState, Execution execution) { - List postableAlerts = getAlerts(newState, execution); - sendJobAlert(postableAlerts, execution.getJob().getProject(), execution.getJob().getName(), execution.getId()); + Jobs job = execution != null ? execution.getJob() : null; + if (job != null) { + List postableAlerts = getAlerts(newState, execution); + sendJobAlert(postableAlerts, job.getProject(), job.getName(), execution.getId()); + } } /** @@ -247,23 +254,25 @@ private List getAlerts(JobFinalStatus jobState, Execution executi return postableAlerts; } + //method expects execution not null. private List getAlerts(JobAlertStatus jobAlertStatus, ProjectServiceAlertStatus projectServiceAlertStatus, Execution execution) { List postableAlerts = new ArrayList<>(); - if (execution.getJob().getJobAlertCollection() != null && !execution.getJob().getJobAlertCollection().isEmpty()) { - for (JobAlert alert : execution.getJob().getJobAlertCollection()) { + Jobs job = execution.getJob(); + if (job != null && job.getJobAlertCollection() != null && !job.getJobAlertCollection().isEmpty()) { + for (JobAlert alert : job.getJobAlertCollection()) { if (alert.getStatus().equals(jobAlertStatus)) { - PostableAlert postableAlert = getPostableAlert(execution.getJob().getProject(), alert.getAlertType(), - alert.getSeverity(), alert.getStatus().getName(), execution.getJob().getName(), execution.getId()); + PostableAlert postableAlert = getPostableAlert(job.getProject(), alert.getAlertType(), + alert.getSeverity(), alert.getStatus().getName(), job.getName(), execution.getId()); postableAlerts.add(postableAlert); } } - } else if (execution.getJob().getProject().getProjectServiceAlerts() != null && - !execution.getJob().getProject().getProjectServiceAlerts().isEmpty()) { - for (ProjectServiceAlert alert : execution.getJob().getProject().getProjectServiceAlerts()) { + } else if (job != null && job.getProject().getProjectServiceAlerts() != null && + !job.getProject().getProjectServiceAlerts().isEmpty()) { + for (ProjectServiceAlert alert : job.getProject().getProjectServiceAlerts()) { if (ProjectServiceEnum.JOBS.equals(alert.getService()) && alert.getStatus().equals(projectServiceAlertStatus)) { - PostableAlert postableAlert = getPostableAlert(execution.getJob().getProject(), alert.getAlertType(), - alert.getSeverity(), alert.getStatus().getName(), execution.getJob().getName(), execution.getId()); + PostableAlert postableAlert = getPostableAlert(job.getProject(), alert.getAlertType(), + alert.getSeverity(), alert.getStatus().getName(), job.getName(), execution.getId()); postableAlerts.add(postableAlert); } } diff --git a/hopsworks-common/src/main/java/io/hops/hopsworks/common/jobs/execution/ExecutionUpdateController.java b/hopsworks-common/src/main/java/io/hops/hopsworks/common/jobs/execution/ExecutionUpdateController.java index 21d4d7e6e1..0bb1d22cda 100644 --- a/hopsworks-common/src/main/java/io/hops/hopsworks/common/jobs/execution/ExecutionUpdateController.java +++ b/hopsworks-common/src/main/java/io/hops/hopsworks/common/jobs/execution/ExecutionUpdateController.java @@ -38,26 +38,44 @@ public class ExecutionUpdateController { private AlertController alertController; public Execution updateProgress(float progress, Execution execution) { - return executionFacade.updateProgress(execution, progress); + //The execution won't exist in the database, if the job has been deleted. + if (executionFacade.findById(execution.getId()).isPresent()) { + execution = executionFacade.updateProgress(execution, progress); + } + return execution; } - + public Execution updateExecutionStop(long executionStop, Execution execution) { - return executionFacade.updateExecutionStop(execution, executionStop); + //The execution won't exist in the database, if the job has been deleted. + if (executionFacade.findById(execution.getId()).isPresent()) { + execution = executionFacade.updateExecutionStop(execution, executionStop); + } + return execution; } public Execution updateState(JobState newState, Execution execution) { - return executionFacade.updateState(execution, newState); + //The execution won't exist in the database, if the job has been deleted. + if (executionFacade.findById(execution.getId()).isPresent()) { + execution = executionFacade.updateState(execution, newState); + } + return execution; } public Execution updateStateAndSendAlert(Execution execution) { - execution = executionFacade.update(execution); - alertController.sendAlert(execution.getState(), execution); + //The execution won't exist in the database, if the job has been deleted. + if (executionFacade.findById(execution.getId()).isPresent()) { + execution = executionFacade.update(execution); + alertController.sendAlert(execution.getState(), execution); + } return execution; } public Execution updateFinalStatusAndSendAlert(JobFinalStatus finalStatus, Execution execution) { - execution = executionFacade.updateFinalStatus(execution, finalStatus); - alertController.sendAlert(finalStatus, execution); + //The execution won't exist in the database, if the job has been deleted. + if (executionFacade.findById(execution.getId()).isPresent()) { + execution = executionFacade.updateFinalStatus(execution, finalStatus); + alertController.sendAlert(finalStatus, execution); + } return execution; } } diff --git a/hopsworks-common/src/main/java/io/hops/hopsworks/common/jobs/yarn/YarnExecutionFinalizer.java b/hopsworks-common/src/main/java/io/hops/hopsworks/common/jobs/yarn/YarnExecutionFinalizer.java index 82c611ad6e..29207e2097 100644 --- a/hopsworks-common/src/main/java/io/hops/hopsworks/common/jobs/yarn/YarnExecutionFinalizer.java +++ b/hopsworks-common/src/main/java/io/hops/hopsworks/common/jobs/yarn/YarnExecutionFinalizer.java @@ -39,6 +39,7 @@ package io.hops.hopsworks.common.jobs.yarn; +import io.hops.hopsworks.persistence.entity.jobs.description.Jobs; import io.hops.hopsworks.persistence.entity.jobs.history.Execution; import io.hops.hopsworks.common.dao.jobhistory.ExecutionFacade; import io.hops.hopsworks.persistence.entity.project.service.ProjectServiceEnum; @@ -112,7 +113,7 @@ public Future copyLogs(Execution exec) { LOGGER.log(Level.SEVERE,"error while aggregation logs" + ex.toString()); } Execution execution = updateExecutionSTDPaths(stdOutFinalDestination, stdErrFinalDestination, exec); - finalize(exec, exec.getState()); + finalizeExecution(exec, exec.getState()); return new AsyncResult<>(execution); } finally { dfs.closeDfsClient(udfso); @@ -121,9 +122,9 @@ public Future copyLogs(Execution exec) { } @Asynchronous - public void finalize(Execution exec, JobState jobState) { + public void finalizeExecution(Execution exec, JobState jobState) { //The execution won't exist in the database, if the job has been deleted. - if (executionFacade.findById(exec.getId()) != null) { + if (executionFacade.findById(exec.getId()).isPresent()) { long executionStop = System.currentTimeMillis(); exec = executionFacade.updateExecutionStop(exec, executionStop); executionFacade.updateState(exec, jobState); @@ -136,8 +137,8 @@ public void finalize(Execution exec, JobState jobState) { "Exception while cleaning after job:{0}, with appId:{1}, some cleaning is probably needed {2}", new Object[]{exec.getJob().getName(), exec.getAppId(), ex.getMessage()}); } - - if (exec.getJob().getJobType().equals(JobType.FLINK)) { + Jobs jobs = exec.getJob(); + if (jobs != null && jobs.getJobType().equals(JobType.FLINK)) { cleanCerts(exec); } } @@ -172,18 +173,23 @@ public void removeAllNecessary(Execution exec) throws IOException { private void cleanCerts(Execution exec) { //Remove local files required for the job (Kafka certs etc.) //Search for other jobs using Kafka in the same project. If any active - //ones are found + //ones are found. - Collection projectServices = exec.getJob().getProject().getProjectServicesCollection(); + //if job is deleted we can not do a proper cleanup with this exec + Jobs jobs = exec.getJob(); + if (jobs == null) { + return; + } + Collection projectServices = jobs.getProject().getProjectServicesCollection(); Iterator iter = projectServices.iterator(); boolean removeKafkaCerts = true; while (iter.hasNext()) { ProjectServices projectService = iter.next(); //If the project is of type KAFKA if (projectService.getProjectServicesPK().getService() == ProjectServiceEnum.KAFKA) { - List execs = executionFacade.findByProjectAndType(exec.getJob().getProject(), JobType.FLINK); + List execs = executionFacade.findByProjectAndType(jobs.getProject(), JobType.FLINK); if (execs != null) { - execs.addAll(executionFacade.findByProjectAndType(exec.getJob().getProject(), JobType.SPARK)); + execs.addAll(executionFacade.findByProjectAndType(jobs.getProject(), JobType.SPARK)); } //Find if this project has running jobs if (execs != null && !execs.isEmpty()) { diff --git a/hopsworks-common/src/main/java/io/hops/hopsworks/common/jobs/yarn/YarnJobsMonitor.java b/hopsworks-common/src/main/java/io/hops/hopsworks/common/jobs/yarn/YarnJobsMonitor.java index 81c40e9bce..3ae2420620 100644 --- a/hopsworks-common/src/main/java/io/hops/hopsworks/common/jobs/yarn/YarnJobsMonitor.java +++ b/hopsworks-common/src/main/java/io/hops/hopsworks/common/jobs/yarn/YarnJobsMonitor.java @@ -141,7 +141,7 @@ public synchronized void yarnJobMonitor(Timer timer) { while (monitorsIter.hasNext()) { Map.Entry entry = monitorsIter.next(); // Check if Value associated with Key is 10 - if (!executions.keySet().contains(entry.getKey())) { + if (!executions.containsKey(entry.getKey())) { // Remove the element entry.getValue().close(); monitorsIter.remove(); @@ -213,10 +213,10 @@ private Execution internalMonitor(Execution exec, YarnMonitor monitor) { monitor.cancelJob(monitor.getApplicationId().toString()); exec = updateFinalStatus(JobFinalStatus.KILLED, exec); exec = updateProgress(0, exec); - execFinalizer.finalize(exec, JobState.KILLED); + execFinalizer.finalizeExecution(exec, JobState.KILLED); } catch (YarnException | IOException ex) { LOGGER.log(Level.SEVERE, "Failed to cancel execution, " + exec + " after failing to poll for status.", ex); - execFinalizer.finalize(exec, JobState.FRAMEWORK_FAILURE); + execFinalizer.finalizeExecution(exec, JobState.FRAMEWORK_FAILURE); } return null; }