Skip to content

Commit

Permalink
[HWORKS-553] [HWORKS-865] fix np in yarn monitor (#1428)
Browse files Browse the repository at this point in the history
  • Loading branch information
ErmiasG authored Dec 1, 2023
1 parent 96f7695 commit 47d0d58
Show file tree
Hide file tree
Showing 4 changed files with 66 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import io.hops.hopsworks.persistence.entity.jobs.configuration.history.JobState;
import io.hops.hopsworks.persistence.entity.jobs.description.JobAlert;
import io.hops.hopsworks.persistence.entity.jobs.description.JobAlertStatus;
import io.hops.hopsworks.persistence.entity.jobs.description.Jobs;
import io.hops.hopsworks.persistence.entity.jobs.history.Execution;
import io.hops.hopsworks.persistence.entity.project.Project;
import io.hops.hopsworks.persistence.entity.project.alert.ProjectServiceAlert;
Expand Down Expand Up @@ -79,8 +80,11 @@ public class AlertController {
* @param execution
*/
public void sendAlert(JobState newState, Execution execution) {
List<PostableAlert> postableAlerts = getAlerts(newState, execution);
sendJobAlert(postableAlerts, execution.getJob().getProject(), execution.getJob().getName(), execution.getId());
Jobs job = execution != null ? execution.getJob() : null;
if (job != null) {
List<PostableAlert> postableAlerts = getAlerts(newState, execution);
sendJobAlert(postableAlerts, job.getProject(), job.getName(), execution.getId());
}
}

/**
Expand All @@ -89,8 +93,11 @@ public void sendAlert(JobState newState, Execution execution) {
* @param execution
*/
public void sendAlert(JobFinalStatus newState, Execution execution) {
List<PostableAlert> postableAlerts = getAlerts(newState, execution);
sendJobAlert(postableAlerts, execution.getJob().getProject(), execution.getJob().getName(), execution.getId());
Jobs job = execution != null ? execution.getJob() : null;
if (job != null) {
List<PostableAlert> postableAlerts = getAlerts(newState, execution);
sendJobAlert(postableAlerts, job.getProject(), job.getName(), execution.getId());
}
}

/**
Expand Down Expand Up @@ -247,23 +254,25 @@ private List<PostableAlert> getAlerts(JobFinalStatus jobState, Execution executi
return postableAlerts;
}

//method expects execution not null.
private List<PostableAlert> getAlerts(JobAlertStatus jobAlertStatus,
ProjectServiceAlertStatus projectServiceAlertStatus, Execution execution) {
List<PostableAlert> postableAlerts = new ArrayList<>();
if (execution.getJob().getJobAlertCollection() != null && !execution.getJob().getJobAlertCollection().isEmpty()) {
for (JobAlert alert : execution.getJob().getJobAlertCollection()) {
Jobs job = execution.getJob();
if (job != null && job.getJobAlertCollection() != null && !job.getJobAlertCollection().isEmpty()) {
for (JobAlert alert : job.getJobAlertCollection()) {
if (alert.getStatus().equals(jobAlertStatus)) {
PostableAlert postableAlert = getPostableAlert(execution.getJob().getProject(), alert.getAlertType(),
alert.getSeverity(), alert.getStatus().getName(), execution.getJob().getName(), execution.getId());
PostableAlert postableAlert = getPostableAlert(job.getProject(), alert.getAlertType(),
alert.getSeverity(), alert.getStatus().getName(), job.getName(), execution.getId());
postableAlerts.add(postableAlert);
}
}
} else if (execution.getJob().getProject().getProjectServiceAlerts() != null &&
!execution.getJob().getProject().getProjectServiceAlerts().isEmpty()) {
for (ProjectServiceAlert alert : execution.getJob().getProject().getProjectServiceAlerts()) {
} else if (job != null && job.getProject().getProjectServiceAlerts() != null &&
!job.getProject().getProjectServiceAlerts().isEmpty()) {
for (ProjectServiceAlert alert : job.getProject().getProjectServiceAlerts()) {
if (ProjectServiceEnum.JOBS.equals(alert.getService()) && alert.getStatus().equals(projectServiceAlertStatus)) {
PostableAlert postableAlert = getPostableAlert(execution.getJob().getProject(), alert.getAlertType(),
alert.getSeverity(), alert.getStatus().getName(), execution.getJob().getName(), execution.getId());
PostableAlert postableAlert = getPostableAlert(job.getProject(), alert.getAlertType(),
alert.getSeverity(), alert.getStatus().getName(), job.getName(), execution.getId());
postableAlerts.add(postableAlert);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,26 +38,44 @@ public class ExecutionUpdateController {
private AlertController alertController;

public Execution updateProgress(float progress, Execution execution) {
return executionFacade.updateProgress(execution, progress);
//The execution won't exist in the database, if the job has been deleted.
if (executionFacade.findById(execution.getId()).isPresent()) {
execution = executionFacade.updateProgress(execution, progress);
}
return execution;
}

public Execution updateExecutionStop(long executionStop, Execution execution) {
return executionFacade.updateExecutionStop(execution, executionStop);
//The execution won't exist in the database, if the job has been deleted.
if (executionFacade.findById(execution.getId()).isPresent()) {
execution = executionFacade.updateExecutionStop(execution, executionStop);
}
return execution;
}

public Execution updateState(JobState newState, Execution execution) {
return executionFacade.updateState(execution, newState);
//The execution won't exist in the database, if the job has been deleted.
if (executionFacade.findById(execution.getId()).isPresent()) {
execution = executionFacade.updateState(execution, newState);
}
return execution;
}

public Execution updateStateAndSendAlert(Execution execution) {
execution = executionFacade.update(execution);
alertController.sendAlert(execution.getState(), execution);
//The execution won't exist in the database, if the job has been deleted.
if (executionFacade.findById(execution.getId()).isPresent()) {
execution = executionFacade.update(execution);
alertController.sendAlert(execution.getState(), execution);
}
return execution;
}

public Execution updateFinalStatusAndSendAlert(JobFinalStatus finalStatus, Execution execution) {
execution = executionFacade.updateFinalStatus(execution, finalStatus);
alertController.sendAlert(finalStatus, execution);
//The execution won't exist in the database, if the job has been deleted.
if (executionFacade.findById(execution.getId()).isPresent()) {
execution = executionFacade.updateFinalStatus(execution, finalStatus);
alertController.sendAlert(finalStatus, execution);
}
return execution;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@

package io.hops.hopsworks.common.jobs.yarn;

import io.hops.hopsworks.persistence.entity.jobs.description.Jobs;
import io.hops.hopsworks.persistence.entity.jobs.history.Execution;
import io.hops.hopsworks.common.dao.jobhistory.ExecutionFacade;
import io.hops.hopsworks.persistence.entity.project.service.ProjectServiceEnum;
Expand Down Expand Up @@ -112,7 +113,7 @@ public Future<Execution> copyLogs(Execution exec) {
LOGGER.log(Level.SEVERE,"error while aggregation logs" + ex.toString());
}
Execution execution = updateExecutionSTDPaths(stdOutFinalDestination, stdErrFinalDestination, exec);
finalize(exec, exec.getState());
finalizeExecution(exec, exec.getState());
return new AsyncResult<>(execution);
} finally {
dfs.closeDfsClient(udfso);
Expand All @@ -121,9 +122,9 @@ public Future<Execution> copyLogs(Execution exec) {
}

@Asynchronous
public void finalize(Execution exec, JobState jobState) {
public void finalizeExecution(Execution exec, JobState jobState) {
//The execution won't exist in the database, if the job has been deleted.
if (executionFacade.findById(exec.getId()) != null) {
if (executionFacade.findById(exec.getId()).isPresent()) {
long executionStop = System.currentTimeMillis();
exec = executionFacade.updateExecutionStop(exec, executionStop);
executionFacade.updateState(exec, jobState);
Expand All @@ -136,8 +137,8 @@ public void finalize(Execution exec, JobState jobState) {
"Exception while cleaning after job:{0}, with appId:{1}, some cleaning is probably needed {2}",
new Object[]{exec.getJob().getName(), exec.getAppId(), ex.getMessage()});
}

if (exec.getJob().getJobType().equals(JobType.FLINK)) {
Jobs jobs = exec.getJob();
if (jobs != null && jobs.getJobType().equals(JobType.FLINK)) {
cleanCerts(exec);
}
}
Expand Down Expand Up @@ -172,18 +173,23 @@ public void removeAllNecessary(Execution exec) throws IOException {
private void cleanCerts(Execution exec) {
//Remove local files required for the job (Kafka certs etc.)
//Search for other jobs using Kafka in the same project. If any active
//ones are found
//ones are found.

Collection<ProjectServices> projectServices = exec.getJob().getProject().getProjectServicesCollection();
//if job is deleted we can not do a proper cleanup with this exec
Jobs jobs = exec.getJob();
if (jobs == null) {
return;
}
Collection<ProjectServices> projectServices = jobs.getProject().getProjectServicesCollection();
Iterator<ProjectServices> iter = projectServices.iterator();
boolean removeKafkaCerts = true;
while (iter.hasNext()) {
ProjectServices projectService = iter.next();
//If the project is of type KAFKA
if (projectService.getProjectServicesPK().getService() == ProjectServiceEnum.KAFKA) {
List<Execution> execs = executionFacade.findByProjectAndType(exec.getJob().getProject(), JobType.FLINK);
List<Execution> execs = executionFacade.findByProjectAndType(jobs.getProject(), JobType.FLINK);
if (execs != null) {
execs.addAll(executionFacade.findByProjectAndType(exec.getJob().getProject(), JobType.SPARK));
execs.addAll(executionFacade.findByProjectAndType(jobs.getProject(), JobType.SPARK));
}
//Find if this project has running jobs
if (execs != null && !execs.isEmpty()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ public synchronized void yarnJobMonitor(Timer timer) {
while (monitorsIter.hasNext()) {
Map.Entry<String, YarnMonitor> entry = monitorsIter.next();
// Check if Value associated with Key is 10
if (!executions.keySet().contains(entry.getKey())) {
if (!executions.containsKey(entry.getKey())) {
// Remove the element
entry.getValue().close();
monitorsIter.remove();
Expand Down Expand Up @@ -213,10 +213,10 @@ private Execution internalMonitor(Execution exec, YarnMonitor monitor) {
monitor.cancelJob(monitor.getApplicationId().toString());
exec = updateFinalStatus(JobFinalStatus.KILLED, exec);
exec = updateProgress(0, exec);
execFinalizer.finalize(exec, JobState.KILLED);
execFinalizer.finalizeExecution(exec, JobState.KILLED);
} catch (YarnException | IOException ex) {
LOGGER.log(Level.SEVERE, "Failed to cancel execution, " + exec + " after failing to poll for status.", ex);
execFinalizer.finalize(exec, JobState.FRAMEWORK_FAILURE);
execFinalizer.finalizeExecution(exec, JobState.FRAMEWORK_FAILURE);
}
return null;
}
Expand Down

0 comments on commit 47d0d58

Please sign in to comment.