Skip to content

Commit

Permalink
[HWORKS-892] Use a single YarnClientWrapper to monitor all jobs (#1447)
Browse files Browse the repository at this point in the history
  • Loading branch information
ErmiasG authored Jan 8, 2024
1 parent d1a537a commit f173cc2
Show file tree
Hide file tree
Showing 8 changed files with 429 additions and 484 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import io.hops.hopsworks.common.hdfs.DistributedFsService;
import io.hops.hopsworks.common.jobs.execution.HopsJob;
import io.hops.hopsworks.common.jobs.yarn.YarnExecutionFinalizer;
import io.hops.hopsworks.common.jobs.yarn.YarnLogUtil;
import io.hops.hopsworks.common.security.BaseHadoopClientsService;
import io.hops.hopsworks.common.security.CertificateMaterializer;
import io.hops.hopsworks.common.util.Settings;
Expand Down Expand Up @@ -86,6 +87,8 @@ public class AsynchronousJobExecutor {
private CertificateMaterializer certificateMaterializer;
@EJB
private BaseHadoopClientsService baseHadoopClientsService;
@EJB
private YarnLogUtil yarnLogUtil;

@Asynchronous
@TransactionAttribute(TransactionAttributeType.NOT_SUPPORTED)
Expand Down Expand Up @@ -139,4 +142,7 @@ public BaseHadoopClientsService getBaseHadoopClientsService() {
return baseHadoopClientsService;
}

public YarnLogUtil getYarnLogUtil() {
return yarnLogUtil;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

import com.google.common.base.Strings;
import io.hops.hopsworks.common.dao.jobhistory.ExecutionFacade;
import io.hops.hopsworks.common.dao.jobhistory.YarnApplicationAttemptStateFacade;
import io.hops.hopsworks.common.dao.jobhistory.YarnApplicationstateFacade;
import io.hops.hopsworks.common.dao.jobs.description.YarnAppUrlsDTO;
import io.hops.hopsworks.common.dao.jobs.quota.YarnProjectsQuotaFacade;
Expand All @@ -33,7 +32,6 @@
import io.hops.hopsworks.common.jobs.flink.FlinkController;
import io.hops.hopsworks.common.jobs.spark.SparkController;
import io.hops.hopsworks.common.jobs.yarn.YarnExecutionFinalizer;
import io.hops.hopsworks.common.jobs.yarn.YarnLogUtil;
import io.hops.hopsworks.common.jobs.yarn.YarnMonitor;
import io.hops.hopsworks.common.security.QuotaEnforcementException;
import io.hops.hopsworks.common.security.QuotasEnforcement;
Expand Down Expand Up @@ -64,7 +62,6 @@
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.util.ConverterUtils;

import javax.ejb.EJB;
import javax.ejb.TransactionAttribute;
Expand Down Expand Up @@ -103,9 +100,9 @@ public abstract class AbstractExecutionController implements ExecutionController
@EJB
private ExecutionFacade executionFacade;
@EJB
private YarnClientService ycs;
private YarnMonitor yarnMonitor;
@EJB
private YarnApplicationAttemptStateFacade appAttemptStateFacade;
private YarnClientService yarnClientService;
@EJB
private YarnApplicationstateFacade yarnApplicationstateFacade;
@EJB
Expand Down Expand Up @@ -209,10 +206,9 @@ public Execution stopExecution(Integer id) throws JobException {
public Execution stopExecution(Execution execution) throws JobException {
//An execution when it's initializing might not have an appId in hopsworks
if(execution.getAppId() != null && JobState.getRunningStates().contains(execution.getState())) {
YarnClientWrapper yarnClientWrapper = null;
YarnClientWrapper yarnClientWrapper = yarnClientService.getYarnClientSuper();
try {
yarnClientWrapper = ycs.getYarnClientSuper(settings.getConfiguration());
yarnClientWrapper.getYarnClient().killApplication(ApplicationId.fromString(execution.getAppId()));
yarnMonitor.cancelJob(yarnClientWrapper.getYarnClient(), ApplicationId.fromString(execution.getAppId()));
yarnExecutionFinalizer.removeAllNecessary(execution);
return executionFacade.findById(execution.getId())
.orElseThrow(() -> new JobException(RESTCodes.JobErrorCode.JOB_EXECUTION_NOT_FOUND,
Expand All @@ -222,7 +218,7 @@ public Execution stopExecution(Execution execution) throws JobException {
"Could not kill job for job:" + execution.getJob().getName() + "with appId:" + execution.getAppId(), ex);
throw new JobException(RESTCodes.JobErrorCode.JOB_STOP_FAILED, Level.WARNING, ex.getMessage(), null, ex);
} finally {
ycs.closeYarnClient(yarnClientWrapper);
yarnClientService.closeYarnClient(yarnClientWrapper);
}
}
return execution;
Expand Down Expand Up @@ -336,6 +332,7 @@ public JobLogDTO retryLogAggregation(Execution execution, JobLogDTO.LogType type

DistributedFileSystemOps dfso = null;
DistributedFileSystemOps udfso = null;
YarnClientWrapper yarnClientWrapper = null;
Users user = execution.getUser();
String hdfsUser = hdfsUsersController.getHdfsUserName(execution.getJob().getProject(), user);
String aggregatedLogPath = settings.getAggregatedLogPath(hdfsUser, execution.getAppId());
Expand All @@ -349,6 +346,7 @@ public JobLogDTO retryLogAggregation(Execution execution, JobLogDTO.LogType type
throw new JobException(RESTCodes.JobErrorCode.JOB_LOG, Level.WARNING,
"Logs not available. This could be caused by the retention policy.");
}
yarnClientWrapper = yarnClientService.getYarnClientSuper();
String hdfsLogPath = null;
String[] desiredLogTypes = null;
switch (type){
Expand All @@ -365,16 +363,13 @@ public JobLogDTO retryLogAggregation(Execution execution, JobLogDTO.LogType type
}

if (!Strings.isNullOrEmpty(hdfsLogPath)) {
YarnClientWrapper yarnClientWrapper = ycs.getYarnClientSuper(settings.getConfiguration());
ApplicationId applicationId = ConverterUtils.toApplicationId(execution.getAppId());
YarnMonitor monitor = new YarnMonitor(applicationId, yarnClientWrapper, ycs);
ApplicationId applicationId = ApplicationId.fromString(execution.getAppId());
try {
YarnLogUtil.copyAggregatedYarnLogs(udfso, aggregatedLogPath, hdfsLogPath, desiredLogTypes, monitor);
yarnMonitor.copyAggregatedYarnLogs(applicationId, udfso, yarnClientWrapper.getYarnClient(),
aggregatedLogPath, hdfsLogPath, desiredLogTypes);
} catch (IOException | InterruptedException | YarnException ex) {
LOGGER.log(Level.SEVERE, null, ex);
throw new JobException(RESTCodes.JobErrorCode.JOB_LOG, null, ex.getMessage());
} finally {
monitor.close();
}
}
} catch (IOException ex) {
Expand All @@ -386,6 +381,7 @@ public JobLogDTO retryLogAggregation(Execution execution, JobLogDTO.LogType type
if (udfso != null) {
dfs.closeDfsClient(udfso);
}
yarnClientService.closeYarnClient(yarnClientWrapper);
}

return getLog(execution, type);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,19 +39,19 @@

package io.hops.hopsworks.common.jobs.yarn;

import io.hops.hopsworks.persistence.entity.jobs.description.Jobs;
import io.hops.hopsworks.persistence.entity.jobs.history.Execution;
import io.hops.hopsworks.common.dao.jobhistory.ExecutionFacade;
import io.hops.hopsworks.persistence.entity.project.service.ProjectServiceEnum;
import io.hops.hopsworks.persistence.entity.project.service.ProjectServices;
import io.hops.hopsworks.common.hdfs.DistributedFileSystemOps;
import io.hops.hopsworks.common.hdfs.DistributedFsService;
import io.hops.hopsworks.common.hdfs.Utils;
import io.hops.hopsworks.persistence.entity.jobs.configuration.JobType;
import io.hops.hopsworks.persistence.entity.jobs.configuration.history.JobState;
import io.hops.hopsworks.common.util.Settings;
import io.hops.hopsworks.common.yarn.YarnClientService;
import io.hops.hopsworks.common.yarn.YarnClientWrapper;
import io.hops.hopsworks.persistence.entity.jobs.configuration.JobType;
import io.hops.hopsworks.persistence.entity.jobs.configuration.history.JobState;
import io.hops.hopsworks.persistence.entity.jobs.description.Jobs;
import io.hops.hopsworks.persistence.entity.jobs.history.Execution;
import io.hops.hopsworks.persistence.entity.project.service.ProjectServiceEnum;
import io.hops.hopsworks.persistence.entity.project.service.ProjectServices;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.exceptions.YarnException;
Expand Down Expand Up @@ -86,14 +86,15 @@ public class YarnExecutionFinalizer {
@EJB
private DistributedFsService dfs;
@EJB
private YarnClientService ycs;
private YarnClientService yarnClientService;
@EJB
private YarnMonitor yarnMonitor;

@Asynchronous
public Future<Execution> copyLogs(Execution exec) {
DistributedFileSystemOps udfso = dfs.getDfsOps(exec.getHdfsUser());
YarnClientWrapper yarnClientWrapper = yarnClientService.getYarnClientSuper();
ApplicationId applicationId = ApplicationId.fromString(exec.getAppId());
YarnClientWrapper yarnClientWrapper = ycs.getYarnClientSuper(settings.getConfiguration());
YarnMonitor monitor = new YarnMonitor(applicationId, yarnClientWrapper, ycs);

try {
String stdOutPath = settings.getAggregatedLogPath(exec.getHdfsUser(), exec.getAppId());
Expand All @@ -104,20 +105,20 @@ public Future<Execution> copyLogs(Execution exec) {

try {
String[] desiredOutLogTypes = {"out"};
YarnLogUtil.copyAggregatedYarnLogs(udfso, stdOutPath, stdOutFinalDestination,
desiredOutLogTypes, monitor);
yarnMonitor.copyAggregatedYarnLogs(applicationId, udfso, yarnClientWrapper.getYarnClient(),
stdOutPath, stdOutFinalDestination, desiredOutLogTypes);
String[] desiredErrLogTypes = {"err", ".log"};
YarnLogUtil.copyAggregatedYarnLogs(udfso, stdOutPath, stdErrFinalDestination,
desiredErrLogTypes, monitor);
yarnMonitor.copyAggregatedYarnLogs(applicationId, udfso, yarnClientWrapper.getYarnClient(),
stdOutPath, stdErrFinalDestination, desiredErrLogTypes);
} catch (IOException | InterruptedException | YarnException ex) {
LOGGER.log(Level.SEVERE,"error while aggregation logs" + ex.toString());
LOGGER.log(Level.SEVERE,"error while aggregation logs" + ex);
}
Execution execution = updateExecutionSTDPaths(stdOutFinalDestination, stdErrFinalDestination, exec);
finalizeExecution(exec, exec.getState());
return new AsyncResult<>(execution);
} finally {
dfs.closeDfsClient(udfso);
monitor.close();
yarnClientService.closeYarnClient(yarnClientWrapper);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -185,12 +185,11 @@ private boolean startApplicationMaster(DistributedFileSystemOps udfso, Distribut
YarnApplicationState.KILLED);

private void writeLog(String message, Exception exception, DistributedFileSystemOps udfso) {

Date date = new Date();
String dateString = date.toString();
dateString = dateString.replace(" ", "_").replace(":", "-");
stdErrFinalDestination = stdErrFinalDestination + jobs.getName() + dateString + "/stderr.log";
YarnLogUtil.writeLog(udfso, stdErrFinalDestination, message, exception);
services.getYarnLogUtil().writeLog(udfso, stdErrFinalDestination, message, exception);
services.getExecutionFacade().updateStdErrPath(execution, stdErrFinalDestination);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.client.api.YarnClient;
import org.apache.hadoop.yarn.exceptions.YarnException;

import javax.annotation.PostConstruct;
Expand All @@ -70,7 +71,6 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Future;
Expand All @@ -94,9 +94,11 @@ public class YarnJobsMonitor implements JobsMonitor {
@EJB
private YarnExecutionFinalizer execFinalizer;
@EJB
private YarnClientService ycs;
private YarnMonitor yarnMonitor;
@EJB
private PayaraClusterManager payaraClusterManager;
@EJB
private YarnClientService yarnClientService;
@Resource
private TimerService timerService;
private Timer timer;
Expand All @@ -118,16 +120,17 @@ public void destroy() {

private int maxStatusPollRetry;

Map<String, YarnMonitor> monitors = new HashMap<>();
Map<String, Integer> failures = new HashMap<>();
private final Map<ApplicationId, Future<Execution>> copyLogsFutures = new HashMap<>();

@Timeout
public synchronized void yarnJobMonitor(Timer timer) {
if (!payaraClusterManager.amIThePrimary()) {
return;
}
YarnClientWrapper yarnClientWrapper = null;
try {
yarnClientWrapper = yarnClientService.getYarnClientSuper();
Map<String, Execution> executions = new HashMap<>();
List<Execution> execs = executionFacade.findNotFinished();
if (execs != null && !execs.isEmpty()) {
Expand All @@ -136,64 +139,46 @@ public synchronized void yarnJobMonitor(Timer timer) {
executions.put(exec.getAppId(), exec);
}
}
//Remove (Close) all monitors of deleted jobs
Iterator<Map.Entry<String, YarnMonitor>> monitorsIter = monitors.entrySet().iterator();
while (monitorsIter.hasNext()) {
Map.Entry<String, YarnMonitor> entry = monitorsIter.next();
// Check if Value associated with Key is 10
if (!executions.containsKey(entry.getKey())) {
// Remove the element
entry.getValue().close();
monitorsIter.remove();
}
}
maxStatusPollRetry = settings.getMaxStatusPollRetry();
List<String> toRemove = new ArrayList<>();
for (Map.Entry<String, Execution> entry : executions.entrySet()) {
YarnMonitor monitor = monitors.get(entry.getKey());
if (monitor == null) {
ApplicationId appId = ApplicationId.fromString(entry.getKey());
YarnClientWrapper newYarnclientWrapper = ycs.getYarnClientSuper(settings
.getConfiguration());
monitor = new YarnMonitor(appId, newYarnclientWrapper, ycs);
monitors.put(entry.getKey(), monitor);
}
Execution exec = internalMonitor(executions.get(entry.getKey()), monitor);
ApplicationId appId = ApplicationId.fromString(entry.getKey());
Execution exec = internalMonitor(yarnClientWrapper.getYarnClient(), appId, executions.get(entry.getKey()));
if (exec == null) {
toRemove.add(entry.getKey());
monitor.close();
}
}
for (String appID : toRemove) {
failures.remove(appID);
monitors.remove(appID);
}
// This is here to do bookkeeping. Remove from the map all the executions which have finished copying the logs
copyLogsFutures.entrySet().removeIf(futureResult -> futureResult.getValue().isDone());
}
} catch (Exception ex) {
LOGGER.log(Level.SEVERE, "Error while monitoring jobs", ex);
} finally {
yarnClientService.closeYarnClient(yarnClientWrapper);
}
}

private Execution internalMonitor(Execution exec, YarnMonitor monitor) {
private Execution internalMonitor(YarnClient yarnClient, ApplicationId appId, Execution exec) {
try {
YarnApplicationState appState = monitor.getApplicationState();
FinalApplicationStatus finalAppStatus = monitor.getFinalApplicationStatus();
float progress = monitor.getProgress();
YarnApplicationState appState = yarnMonitor.getApplicationState(yarnClient, appId);
FinalApplicationStatus finalAppStatus = yarnMonitor.getFinalApplicationStatus(yarnClient, appId);
float progress = yarnMonitor.getProgress(yarnClient, appId);
exec = updateProgress(progress, exec);
exec = updateState(JobState.getJobState(appState), exec);
exec = updateFinalStatus(JobFinalStatus.getJobFinalStatus(finalAppStatus), exec);

if ((appState == YarnApplicationState.FAILED
|| appState == YarnApplicationState.FINISHED
|| appState == YarnApplicationState.KILLED)
&& !copyLogsFutures.containsKey(monitor.getApplicationId())) {
&& !copyLogsFutures.containsKey(appId)) {

exec = executionFacade.updateState(exec, JobState.AGGREGATING_LOGS);
// Async call
Future<Execution> futureResult = execFinalizer.copyLogs(exec);
copyLogsFutures.put(monitor.getApplicationId(), futureResult);
copyLogsFutures.put(appId, futureResult);
return null;
}
} catch (IOException | YarnException ex) {
Expand All @@ -210,7 +195,7 @@ private Execution internalMonitor(Execution exec, YarnMonitor monitor) {
if (failures.get(exec.getAppId()) != null && failures.get(exec.getAppId()) > maxStatusPollRetry) {
try {
LOGGER.log(Level.SEVERE, "Killing application, {0}, because unable to poll for status.", exec);
monitor.cancelJob(monitor.getApplicationId().toString());
yarnMonitor.cancelJob(yarnClient, appId);
exec = updateFinalStatus(JobFinalStatus.KILLED, exec);
exec = updateProgress(0, exec);
execFinalizer.finalizeExecution(exec, JobState.KILLED);
Expand Down
Loading

0 comments on commit f173cc2

Please sign in to comment.