From 7c7ab82a146bfac8ece2e1479cb7b4daffa79dc0 Mon Sep 17 00:00:00 2001 From: ErmiasG Date: Mon, 8 Jan 2024 13:19:51 +0100 Subject: [PATCH] [HWORKS-892] Use a single YarnClientWrapper to monitor all jobs (#1448) --- .../common/jobs/AsynchronousJobExecutor.java | 6 + .../AbstractExecutionController.java | 26 +- .../jobs/yarn/YarnExecutionFinalizer.java | 31 +- .../hopsworks/common/jobs/yarn/YarnJob.java | 3 +- .../common/jobs/yarn/YarnJobsMonitor.java | 51 +-- .../common/jobs/yarn/YarnLogUtil.java | 359 +-------------- .../common/jobs/yarn/YarnMonitor.java | 415 +++++++++++++++--- .../common/project/ProjectController.java | 22 +- 8 files changed, 429 insertions(+), 484 deletions(-) diff --git a/hopsworks-common/src/main/java/io/hops/hopsworks/common/jobs/AsynchronousJobExecutor.java b/hopsworks-common/src/main/java/io/hops/hopsworks/common/jobs/AsynchronousJobExecutor.java index 6ee36034e1..3559d7ee28 100755 --- a/hopsworks-common/src/main/java/io/hops/hopsworks/common/jobs/AsynchronousJobExecutor.java +++ b/hopsworks-common/src/main/java/io/hops/hopsworks/common/jobs/AsynchronousJobExecutor.java @@ -46,6 +46,7 @@ import io.hops.hopsworks.common.hdfs.DistributedFsService; import io.hops.hopsworks.common.jobs.execution.HopsJob; import io.hops.hopsworks.common.jobs.yarn.YarnExecutionFinalizer; +import io.hops.hopsworks.common.jobs.yarn.YarnLogUtil; import io.hops.hopsworks.common.security.BaseHadoopClientsService; import io.hops.hopsworks.common.security.CertificateMaterializer; import io.hops.hopsworks.common.util.Settings; @@ -86,6 +87,8 @@ public class AsynchronousJobExecutor { private CertificateMaterializer certificateMaterializer; @EJB private BaseHadoopClientsService baseHadoopClientsService; + @EJB + private YarnLogUtil yarnLogUtil; @Asynchronous @TransactionAttribute(TransactionAttributeType.NOT_SUPPORTED) @@ -139,4 +142,7 @@ public BaseHadoopClientsService getBaseHadoopClientsService() { return baseHadoopClientsService; } + public YarnLogUtil getYarnLogUtil() { + return yarnLogUtil; + } } diff --git a/hopsworks-common/src/main/java/io/hops/hopsworks/common/jobs/execution/AbstractExecutionController.java b/hopsworks-common/src/main/java/io/hops/hopsworks/common/jobs/execution/AbstractExecutionController.java index 5a86d51ae5..df71f607f8 100644 --- a/hopsworks-common/src/main/java/io/hops/hopsworks/common/jobs/execution/AbstractExecutionController.java +++ b/hopsworks-common/src/main/java/io/hops/hopsworks/common/jobs/execution/AbstractExecutionController.java @@ -18,7 +18,6 @@ import com.google.common.base.Strings; import io.hops.hopsworks.common.dao.jobhistory.ExecutionFacade; -import io.hops.hopsworks.common.dao.jobhistory.YarnApplicationAttemptStateFacade; import io.hops.hopsworks.common.dao.jobhistory.YarnApplicationstateFacade; import io.hops.hopsworks.common.dao.jobs.description.YarnAppUrlsDTO; import io.hops.hopsworks.common.dao.jobs.quota.YarnProjectsQuotaFacade; @@ -33,7 +32,6 @@ import io.hops.hopsworks.common.jobs.flink.FlinkController; import io.hops.hopsworks.common.jobs.spark.SparkController; import io.hops.hopsworks.common.jobs.yarn.YarnExecutionFinalizer; -import io.hops.hopsworks.common.jobs.yarn.YarnLogUtil; import io.hops.hopsworks.common.jobs.yarn.YarnMonitor; import io.hops.hopsworks.common.security.QuotaEnforcementException; import io.hops.hopsworks.common.security.QuotasEnforcement; @@ -64,7 +62,6 @@ import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.exceptions.YarnException; -import org.apache.hadoop.yarn.util.ConverterUtils; import javax.ejb.EJB; import javax.ejb.TransactionAttribute; @@ -103,9 +100,9 @@ public abstract class AbstractExecutionController implements ExecutionController @EJB private ExecutionFacade executionFacade; @EJB - private YarnClientService ycs; + private YarnMonitor yarnMonitor; @EJB - private YarnApplicationAttemptStateFacade appAttemptStateFacade; + private YarnClientService yarnClientService; @EJB private YarnApplicationstateFacade yarnApplicationstateFacade; @EJB @@ -209,10 +206,9 @@ public Execution stopExecution(Integer id) throws JobException { public Execution stopExecution(Execution execution) throws JobException { //An execution when it's initializing might not have an appId in hopsworks if(execution.getAppId() != null && JobState.getRunningStates().contains(execution.getState())) { - YarnClientWrapper yarnClientWrapper = null; + YarnClientWrapper yarnClientWrapper = yarnClientService.getYarnClientSuper(); try { - yarnClientWrapper = ycs.getYarnClientSuper(settings.getConfiguration()); - yarnClientWrapper.getYarnClient().killApplication(ApplicationId.fromString(execution.getAppId())); + yarnMonitor.cancelJob(yarnClientWrapper.getYarnClient(), ApplicationId.fromString(execution.getAppId())); yarnExecutionFinalizer.removeAllNecessary(execution); return executionFacade.findById(execution.getId()) .orElseThrow(() -> new JobException(RESTCodes.JobErrorCode.JOB_EXECUTION_NOT_FOUND, @@ -222,7 +218,7 @@ public Execution stopExecution(Execution execution) throws JobException { "Could not kill job for job:" + execution.getJob().getName() + "with appId:" + execution.getAppId(), ex); throw new JobException(RESTCodes.JobErrorCode.JOB_STOP_FAILED, Level.WARNING, ex.getMessage(), null, ex); } finally { - ycs.closeYarnClient(yarnClientWrapper); + yarnClientService.closeYarnClient(yarnClientWrapper); } } return execution; @@ -336,6 +332,7 @@ public JobLogDTO retryLogAggregation(Execution execution, JobLogDTO.LogType type DistributedFileSystemOps dfso = null; DistributedFileSystemOps udfso = null; + YarnClientWrapper yarnClientWrapper = null; Users user = execution.getUser(); String hdfsUser = hdfsUsersController.getHdfsUserName(execution.getJob().getProject(), user); String aggregatedLogPath = settings.getAggregatedLogPath(hdfsUser, execution.getAppId()); @@ -349,6 +346,7 @@ public JobLogDTO retryLogAggregation(Execution execution, JobLogDTO.LogType type throw new JobException(RESTCodes.JobErrorCode.JOB_LOG, Level.WARNING, "Logs not available. This could be caused by the retention policy."); } + yarnClientWrapper = yarnClientService.getYarnClientSuper(); String hdfsLogPath = null; String[] desiredLogTypes = null; switch (type){ @@ -365,16 +363,13 @@ public JobLogDTO retryLogAggregation(Execution execution, JobLogDTO.LogType type } if (!Strings.isNullOrEmpty(hdfsLogPath)) { - YarnClientWrapper yarnClientWrapper = ycs.getYarnClientSuper(settings.getConfiguration()); - ApplicationId applicationId = ConverterUtils.toApplicationId(execution.getAppId()); - YarnMonitor monitor = new YarnMonitor(applicationId, yarnClientWrapper, ycs); + ApplicationId applicationId = ApplicationId.fromString(execution.getAppId()); try { - YarnLogUtil.copyAggregatedYarnLogs(udfso, aggregatedLogPath, hdfsLogPath, desiredLogTypes, monitor); + yarnMonitor.copyAggregatedYarnLogs(applicationId, udfso, yarnClientWrapper.getYarnClient(), + aggregatedLogPath, hdfsLogPath, desiredLogTypes); } catch (IOException | InterruptedException | YarnException ex) { LOGGER.log(Level.SEVERE, null, ex); throw new JobException(RESTCodes.JobErrorCode.JOB_LOG, null, ex.getMessage()); - } finally { - monitor.close(); } } } catch (IOException ex) { @@ -386,6 +381,7 @@ public JobLogDTO retryLogAggregation(Execution execution, JobLogDTO.LogType type if (udfso != null) { dfs.closeDfsClient(udfso); } + yarnClientService.closeYarnClient(yarnClientWrapper); } return getLog(execution, type); diff --git a/hopsworks-common/src/main/java/io/hops/hopsworks/common/jobs/yarn/YarnExecutionFinalizer.java b/hopsworks-common/src/main/java/io/hops/hopsworks/common/jobs/yarn/YarnExecutionFinalizer.java index 29207e2097..5beba89070 100644 --- a/hopsworks-common/src/main/java/io/hops/hopsworks/common/jobs/yarn/YarnExecutionFinalizer.java +++ b/hopsworks-common/src/main/java/io/hops/hopsworks/common/jobs/yarn/YarnExecutionFinalizer.java @@ -39,19 +39,19 @@ package io.hops.hopsworks.common.jobs.yarn; -import io.hops.hopsworks.persistence.entity.jobs.description.Jobs; -import io.hops.hopsworks.persistence.entity.jobs.history.Execution; import io.hops.hopsworks.common.dao.jobhistory.ExecutionFacade; -import io.hops.hopsworks.persistence.entity.project.service.ProjectServiceEnum; -import io.hops.hopsworks.persistence.entity.project.service.ProjectServices; import io.hops.hopsworks.common.hdfs.DistributedFileSystemOps; import io.hops.hopsworks.common.hdfs.DistributedFsService; import io.hops.hopsworks.common.hdfs.Utils; -import io.hops.hopsworks.persistence.entity.jobs.configuration.JobType; -import io.hops.hopsworks.persistence.entity.jobs.configuration.history.JobState; import io.hops.hopsworks.common.util.Settings; import io.hops.hopsworks.common.yarn.YarnClientService; import io.hops.hopsworks.common.yarn.YarnClientWrapper; +import io.hops.hopsworks.persistence.entity.jobs.configuration.JobType; +import io.hops.hopsworks.persistence.entity.jobs.configuration.history.JobState; +import io.hops.hopsworks.persistence.entity.jobs.description.Jobs; +import io.hops.hopsworks.persistence.entity.jobs.history.Execution; +import io.hops.hopsworks.persistence.entity.project.service.ProjectServiceEnum; +import io.hops.hopsworks.persistence.entity.project.service.ProjectServices; import org.apache.hadoop.fs.Path; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.exceptions.YarnException; @@ -86,14 +86,15 @@ public class YarnExecutionFinalizer { @EJB private DistributedFsService dfs; @EJB - private YarnClientService ycs; + private YarnClientService yarnClientService; + @EJB + private YarnMonitor yarnMonitor; @Asynchronous public Future copyLogs(Execution exec) { DistributedFileSystemOps udfso = dfs.getDfsOps(exec.getHdfsUser()); + YarnClientWrapper yarnClientWrapper = yarnClientService.getYarnClientSuper(); ApplicationId applicationId = ApplicationId.fromString(exec.getAppId()); - YarnClientWrapper yarnClientWrapper = ycs.getYarnClientSuper(settings.getConfiguration()); - YarnMonitor monitor = new YarnMonitor(applicationId, yarnClientWrapper, ycs); try { String stdOutPath = settings.getAggregatedLogPath(exec.getHdfsUser(), exec.getAppId()); @@ -104,20 +105,20 @@ public Future copyLogs(Execution exec) { try { String[] desiredOutLogTypes = {"out"}; - YarnLogUtil.copyAggregatedYarnLogs(udfso, stdOutPath, stdOutFinalDestination, - desiredOutLogTypes, monitor); + yarnMonitor.copyAggregatedYarnLogs(applicationId, udfso, yarnClientWrapper.getYarnClient(), + stdOutPath, stdOutFinalDestination, desiredOutLogTypes); String[] desiredErrLogTypes = {"err", ".log"}; - YarnLogUtil.copyAggregatedYarnLogs(udfso, stdOutPath, stdErrFinalDestination, - desiredErrLogTypes, monitor); + yarnMonitor.copyAggregatedYarnLogs(applicationId, udfso, yarnClientWrapper.getYarnClient(), + stdOutPath, stdErrFinalDestination, desiredErrLogTypes); } catch (IOException | InterruptedException | YarnException ex) { - LOGGER.log(Level.SEVERE,"error while aggregation logs" + ex.toString()); + LOGGER.log(Level.SEVERE,"error while aggregation logs" + ex); } Execution execution = updateExecutionSTDPaths(stdOutFinalDestination, stdErrFinalDestination, exec); finalizeExecution(exec, exec.getState()); return new AsyncResult<>(execution); } finally { dfs.closeDfsClient(udfso); - monitor.close(); + yarnClientService.closeYarnClient(yarnClientWrapper); } } diff --git a/hopsworks-common/src/main/java/io/hops/hopsworks/common/jobs/yarn/YarnJob.java b/hopsworks-common/src/main/java/io/hops/hopsworks/common/jobs/yarn/YarnJob.java index 3d47673fa8..fc22005916 100755 --- a/hopsworks-common/src/main/java/io/hops/hopsworks/common/jobs/yarn/YarnJob.java +++ b/hopsworks-common/src/main/java/io/hops/hopsworks/common/jobs/yarn/YarnJob.java @@ -185,12 +185,11 @@ private boolean startApplicationMaster(DistributedFileSystemOps udfso, Distribut YarnApplicationState.KILLED); private void writeLog(String message, Exception exception, DistributedFileSystemOps udfso) { - Date date = new Date(); String dateString = date.toString(); dateString = dateString.replace(" ", "_").replace(":", "-"); stdErrFinalDestination = stdErrFinalDestination + jobs.getName() + dateString + "/stderr.log"; - YarnLogUtil.writeLog(udfso, stdErrFinalDestination, message, exception); + services.getYarnLogUtil().writeLog(udfso, stdErrFinalDestination, message, exception); services.getExecutionFacade().updateStdErrPath(execution, stdErrFinalDestination); } diff --git a/hopsworks-common/src/main/java/io/hops/hopsworks/common/jobs/yarn/YarnJobsMonitor.java b/hopsworks-common/src/main/java/io/hops/hopsworks/common/jobs/yarn/YarnJobsMonitor.java index 3ae2420620..ae8574119e 100644 --- a/hopsworks-common/src/main/java/io/hops/hopsworks/common/jobs/yarn/YarnJobsMonitor.java +++ b/hopsworks-common/src/main/java/io/hops/hopsworks/common/jobs/yarn/YarnJobsMonitor.java @@ -52,6 +52,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.api.records.YarnApplicationState; +import org.apache.hadoop.yarn.client.api.YarnClient; import org.apache.hadoop.yarn.exceptions.YarnException; import javax.annotation.PostConstruct; @@ -70,7 +71,6 @@ import java.io.IOException; import java.util.ArrayList; import java.util.HashMap; -import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.concurrent.Future; @@ -94,9 +94,11 @@ public class YarnJobsMonitor implements JobsMonitor { @EJB private YarnExecutionFinalizer execFinalizer; @EJB - private YarnClientService ycs; + private YarnMonitor yarnMonitor; @EJB private PayaraClusterManager payaraClusterManager; + @EJB + private YarnClientService yarnClientService; @Resource private TimerService timerService; private Timer timer; @@ -118,16 +120,17 @@ public void destroy() { private int maxStatusPollRetry; - Map monitors = new HashMap<>(); Map failures = new HashMap<>(); private final Map> copyLogsFutures = new HashMap<>(); - + @Timeout public synchronized void yarnJobMonitor(Timer timer) { if (!payaraClusterManager.amIThePrimary()) { return; } + YarnClientWrapper yarnClientWrapper = null; try { + yarnClientWrapper = yarnClientService.getYarnClientSuper(); Map executions = new HashMap<>(); List execs = executionFacade.findNotFinished(); if (execs != null && !execs.isEmpty()) { @@ -136,51 +139,33 @@ public synchronized void yarnJobMonitor(Timer timer) { executions.put(exec.getAppId(), exec); } } - //Remove (Close) all monitors of deleted jobs - Iterator> monitorsIter = monitors.entrySet().iterator(); - while (monitorsIter.hasNext()) { - Map.Entry entry = monitorsIter.next(); - // Check if Value associated with Key is 10 - if (!executions.containsKey(entry.getKey())) { - // Remove the element - entry.getValue().close(); - monitorsIter.remove(); - } - } maxStatusPollRetry = settings.getMaxStatusPollRetry(); List toRemove = new ArrayList<>(); for (Map.Entry entry : executions.entrySet()) { - YarnMonitor monitor = monitors.get(entry.getKey()); - if (monitor == null) { - ApplicationId appId = ApplicationId.fromString(entry.getKey()); - YarnClientWrapper newYarnclientWrapper = ycs.getYarnClientSuper(settings - .getConfiguration()); - monitor = new YarnMonitor(appId, newYarnclientWrapper, ycs); - monitors.put(entry.getKey(), monitor); - } - Execution exec = internalMonitor(executions.get(entry.getKey()), monitor); + ApplicationId appId = ApplicationId.fromString(entry.getKey()); + Execution exec = internalMonitor(yarnClientWrapper.getYarnClient(), appId, executions.get(entry.getKey())); if (exec == null) { toRemove.add(entry.getKey()); - monitor.close(); } } for (String appID : toRemove) { failures.remove(appID); - monitors.remove(appID); } // This is here to do bookkeeping. Remove from the map all the executions which have finished copying the logs copyLogsFutures.entrySet().removeIf(futureResult -> futureResult.getValue().isDone()); } } catch (Exception ex) { LOGGER.log(Level.SEVERE, "Error while monitoring jobs", ex); + } finally { + yarnClientService.closeYarnClient(yarnClientWrapper); } } - private Execution internalMonitor(Execution exec, YarnMonitor monitor) { + private Execution internalMonitor(YarnClient yarnClient, ApplicationId appId, Execution exec) { try { - YarnApplicationState appState = monitor.getApplicationState(); - FinalApplicationStatus finalAppStatus = monitor.getFinalApplicationStatus(); - float progress = monitor.getProgress(); + YarnApplicationState appState = yarnMonitor.getApplicationState(yarnClient, appId); + FinalApplicationStatus finalAppStatus = yarnMonitor.getFinalApplicationStatus(yarnClient, appId); + float progress = yarnMonitor.getProgress(yarnClient, appId); exec = updateProgress(progress, exec); exec = updateState(JobState.getJobState(appState), exec); exec = updateFinalStatus(JobFinalStatus.getJobFinalStatus(finalAppStatus), exec); @@ -188,12 +173,12 @@ private Execution internalMonitor(Execution exec, YarnMonitor monitor) { if ((appState == YarnApplicationState.FAILED || appState == YarnApplicationState.FINISHED || appState == YarnApplicationState.KILLED) - && !copyLogsFutures.containsKey(monitor.getApplicationId())) { + && !copyLogsFutures.containsKey(appId)) { exec = executionFacade.updateState(exec, JobState.AGGREGATING_LOGS); // Async call Future futureResult = execFinalizer.copyLogs(exec); - copyLogsFutures.put(monitor.getApplicationId(), futureResult); + copyLogsFutures.put(appId, futureResult); return null; } } catch (IOException | YarnException ex) { @@ -210,7 +195,7 @@ private Execution internalMonitor(Execution exec, YarnMonitor monitor) { if (failures.get(exec.getAppId()) != null && failures.get(exec.getAppId()) > maxStatusPollRetry) { try { LOGGER.log(Level.SEVERE, "Killing application, {0}, because unable to poll for status.", exec); - monitor.cancelJob(monitor.getApplicationId().toString()); + yarnMonitor.cancelJob(yarnClient, appId); exec = updateFinalStatus(JobFinalStatus.KILLED, exec); exec = updateProgress(0, exec); execFinalizer.finalizeExecution(exec, JobState.KILLED); diff --git a/hopsworks-common/src/main/java/io/hops/hopsworks/common/jobs/yarn/YarnLogUtil.java b/hopsworks-common/src/main/java/io/hops/hopsworks/common/jobs/yarn/YarnLogUtil.java index 7ed03413e2..460272f1c5 100644 --- a/hopsworks-common/src/main/java/io/hops/hopsworks/common/jobs/yarn/YarnLogUtil.java +++ b/hopsworks-common/src/main/java/io/hops/hopsworks/common/jobs/yarn/YarnLogUtil.java @@ -36,40 +36,27 @@ * DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. */ - package io.hops.hopsworks.common.jobs.yarn; import io.hops.hopsworks.common.hdfs.DistributedFileSystemOps; -import java.io.DataInputStream; -import java.io.File; -import java.io.FileNotFoundException; + +import javax.ejb.Stateless; +import javax.ejb.TransactionAttribute; +import javax.ejb.TransactionAttributeType; import java.io.IOException; import java.io.PrintStream; -import java.util.ArrayList; -import java.util.concurrent.TimeUnit; import java.util.logging.Level; import java.util.logging.Logger; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.yarn.api.records.ApplicationId; -import org.apache.hadoop.yarn.api.records.LogAggregationStatus; -import org.apache.hadoop.yarn.client.api.YarnClient; -import org.apache.hadoop.yarn.exceptions.YarnException; -import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat; -import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.ContainerLogsReader; - +@Stateless +@TransactionAttribute(TransactionAttributeType.NEVER) public class YarnLogUtil { - private static final Logger LOGGER = Logger.getLogger(YarnLogUtil.class.getName()); - - public static void writeLog(DistributedFileSystemOps dfs, String dst, - String message) { + public void writeLog(DistributedFileSystemOps dfs, String dst, String message) { writeLog(dfs, dst, message, null); } - public static void writeLog(DistributedFileSystemOps dfs, String dst, - String message, Exception exception) { + public void writeLog(DistributedFileSystemOps dfs, String dst, String message, Exception exception) { PrintStream writer = null; try { writer = new PrintStream(dfs.create(dst)); @@ -80,8 +67,7 @@ public static void writeLog(DistributedFileSystemOps dfs, String dst, } } catch (IOException ex) { if (writer != null) { - writer.print(YarnLogUtil.class.getName() - + ": Failed to write logs.\n" + ex.getMessage()); + writer.print(YarnMonitor.class.getName() + ": Failed to write logs.\n" + ex.getMessage()); } LOGGER.log(Level.SEVERE, null, ex); } finally { @@ -91,331 +77,4 @@ public static void writeLog(DistributedFileSystemOps dfs, String dst, } } } - - /** - * Given aggregated yarn log path and destination path copies the desired log - * type (stdout/stderr) - * - * @param dfs - * @param src aggregated yarn log path - * @param dst destination path to copy to - * @param desiredLogTypes stderr or stdout or stdlog - * @param monitor the monitor to check the log aggregation status - */ - public static void copyAggregatedYarnLogs(DistributedFileSystemOps dfs, String src, String dst, - String[] desiredLogTypes, YarnMonitor monitor) throws YarnException, IOException, InterruptedException { - - LogAggregationStatus logAggregationStatus = waitForLogAggregation(monitor.getYarnClient(), - monitor.getApplicationId()); - if (logAggregationStatus == null) { - // ServiceStatus might be null if there were issues starting the application - // most likely on the yarn side. - return; - } - - PrintStream writer = null; - String[] srcs; - try { - srcs = getAggregatedLogFilePaths(src, dfs); - if (!logFilesReady(srcs, dfs)) { - LOGGER.log(Level.SEVERE, "Error getting logs"); - } - writer = new PrintStream(dfs.create(dst)); - switch (logAggregationStatus) { - case FAILED: - writer.print("The log aggregation failed"); - break; - case TIME_OUT: - writer.print("*** WARNING: Log aggregation has timed-out for some of the containers\n\n\n"); - for (String desiredLogType : desiredLogTypes) { - writeLogs(dfs, srcs, writer, desiredLogType); - } - break; - case SUCCEEDED: - for (String desiredLogType : desiredLogTypes) { - writeLogs(dfs, srcs, writer, desiredLogType); - } - break; - default : - writer.print("Something went wrong during log aggregation phase! Log aggregation status is: " - + logAggregationStatus.name()); - } - } catch (Exception ex) { - if (writer != null) { - writer.print(YarnLogUtil.class.getName() + ": Failed to get aggregated logs.\n" + ex.getMessage()); - } - LOGGER.log(Level.SEVERE, null, ex); - } finally { - if (writer != null) { - writer.flush(); - writer.close(); - } - } - } - - public static LogAggregationStatus waitForLogAggregation(YarnClient yarnClient, ApplicationId appId) - throws InterruptedException, YarnException, IOException { - LogAggregationStatus logAggregationStatus = yarnClient.getApplicationReport(appId) - .getLogAggregationStatus(); - - int not_startRetries = 0; - while (!isFinal(logAggregationStatus)) { - TimeUnit.SECONDS.sleep(2); - logAggregationStatus = yarnClient.getApplicationReport(appId).getLogAggregationStatus(); - // NOT_START LogAggregation status might happen in two cases: - // (a) Application has failed very early and status didn't change to FAILED - // (b) Application has succeeded but the moment we probe for status, - // log aggregation hasn't started yet. - if (logAggregationStatus.equals(LogAggregationStatus.NOT_START)) { - if (++not_startRetries > 30) { - break; - } - } - } - return logAggregationStatus; - } - - private static boolean isFinal(LogAggregationStatus status){ - if (status == null) { - // ServiceStatus might be null if there were issues starting the application - // most likely on the yarn side. - return true; - } - - switch(status) { - case RUNNING: - case RUNNING_WITH_FAILURE: - case NOT_START: - return false; - default : - return true; - } - } - - private static void writeLogs(DistributedFileSystemOps dfs, String[] srcs, - PrintStream writer, String desiredLogType) { - ArrayList containerNames = new ArrayList<>(); - LogReader reader = null; - DataInputStream valueStream; - AggregatedLogFormat.LogKey key = new AggregatedLogFormat.LogKey(); - AggregatedLogFormat.ContainerLogsReader logReader = null; - Path location; - try { - for (String src : srcs) { - location = new Path(src); - LOGGER.log(Level.FINE, "Copying log from {0}", src); - try { - reader = new LogReader(dfs.getConf(), dfs, location); - valueStream = reader.next(key); - while (valueStream != null) { - containerNames.add(key); - valueStream = reader.next(key); - } - reader.close(); - reader = new LogReader(dfs.getConf(), dfs, location); - } catch (FileNotFoundException e) { - LOGGER.log(Level.FINE, "Logs not available. Aggregation may have failed."); - return; - } catch (IOException e) { - LOGGER.log(Level.SEVERE, "Error getting logs"); - return; - } - - try { - for (AggregatedLogFormat.LogKey containerKey : containerNames) { - valueStream = reader.next(key); - while (valueStream != null && !key.equals(containerKey)) { - valueStream = reader.next(key); - } - if (valueStream != null) { - logReader = new ContainerLogsReader(valueStream); - } - if (logReader != null) { - readContainerLogs(logReader, writer, desiredLogType, containerKey, - location.getName()); - } - } - - } catch (IOException e) { - LOGGER.log(Level.SEVERE, "Error getting logs"); - } - containerNames.clear(); - key = new AggregatedLogFormat.LogKey(); - logReader = null; - } - } finally { - if (reader != null) { - reader.close(); - } - } - } - - private static boolean logsReady(DistributedFileSystemOps dfs, String src) { - ArrayList containerNames = new ArrayList<>(); - LogReader reader = null; - DataInputStream valueStream; - AggregatedLogFormat.LogKey key = new AggregatedLogFormat.LogKey(); - AggregatedLogFormat.ContainerLogsReader logReader = null; - try { - try { - reader = new LogReader(dfs.getConf(), dfs, - new Path(src)); - valueStream = reader.next(key); - while (valueStream != null) { - containerNames.add(key); - valueStream = reader.next(key); - } - reader.close(); - reader = new LogReader(dfs.getConf(), dfs, - new Path(src)); - } catch (IOException e) { - return false; - } - - try { - for (AggregatedLogFormat.LogKey containerKey : containerNames) { - valueStream = reader.next(key); - while (valueStream != null && !key.equals(containerKey)) { - valueStream = reader.next(key); - } - if (valueStream != null) { - logReader = new ContainerLogsReader(valueStream); - } - if (logReader != null) { - if (!testLogs(logReader, "out")) { - return false; - } - } - } - } catch (IOException e) { - LOGGER.log(Level.SEVERE, "Error testing logs"); - } - } finally { - if (reader != null) { - reader.close(); - } - } - return true; - } - - private static boolean testLogs( - AggregatedLogFormat.ContainerLogsReader logReader, - String desiredLogType) throws IOException { - boolean foundLog = true; - String logType = logReader.nextLog(); - while (logType != null) { - foundLog = true; - if (!logType.contains(desiredLogType)) { - foundLog = false; - } - logType = logReader.nextLog(); - } - return foundLog; - } - - //Mostly taken from org.apache.hadoop.yarn.webapp.log.AggregatedLogsBlock - private static boolean readContainerLogs( - AggregatedLogFormat.ContainerLogsReader logReader, PrintStream writer, - String desiredLogType, AggregatedLogFormat.LogKey containerKey, - String nodename) throws - IOException { - int bufferSize = 65536; - char[] cbuf = new char[bufferSize]; - boolean foundLog = false; - String logType = logReader.nextLog(); - while (logType != null) { - if (desiredLogType == null || desiredLogType.isEmpty() - || logType.contains(desiredLogType)) { - long logLength = logReader.getCurrentLogLength(); - if (!foundLog) { - writer.append("Container: " + containerKey.toString() + " on " - + nodename + "\n" - + "===============================================" - + "=============================================== \n"); - } - if (logLength == 0) { - writer.append("Log Type: " + logType + "\n"); - writer.append("Log Length: " + 0 + "\n"); - logType = logReader.nextLog(); - continue; - } - writer.append("Log Type: " + logType + "\n"); - writer.append("Log Length: " + Long.toString(logLength) + "\n"); - writer.append("Log Contents: \n"); - int len = 0; - int currentToRead = logLength > bufferSize ? bufferSize - : (int) logLength; - while (logLength > 0 && (len = logReader.read(cbuf, 0, currentToRead)) - > 0) { - writer.append(new String(cbuf, 0, len)); - logLength = logLength - len; - currentToRead = logLength > bufferSize ? bufferSize : (int) logLength; - } - writer.append("\n"); - foundLog = true; - } - logType = logReader.nextLog(); - } - return foundLog; - } - - /** - * Given a path to an aggregated log returns the full path to the log file. - */ - private static String[] getAggregatedLogFilePaths(String path, - DistributedFileSystemOps dfs) throws IOException { - Path location = new Path(path); - String[] paths; - FileStatus[] fileStatus; - if (!dfs.exists(path)) { - paths = new String[1]; - paths[0] = path; - return paths; - } - if (!dfs.isDir(path)) { - paths = new String[1]; - paths[0] = path; - return paths; - } - fileStatus = dfs.listStatus(location); - if (fileStatus == null || fileStatus.length == 0) { - paths = new String[1]; - paths[0] = path; - return paths; - } - paths = new String[fileStatus.length]; - for (int i = 0; i < fileStatus.length; i++) { - paths[i] = path + File.separator + fileStatus[i].getPath().getName(); - } - return paths; - } - - private static boolean logFilesReady(String[] paths, - DistributedFileSystemOps dfs) throws - IOException { - boolean ready = false; - for (String path : paths) { - Path location = new Path(path); - FileStatus fileStatus; - if (!dfs.exists(path)) { - return false; - } - if (dfs.isDir(path)) { - return false; - } - fileStatus = dfs.getFileStatus(location); - if (fileStatus == null) { - return false; - } - if (fileStatus.getLen() == 0l) { - return false; - } - if (!logsReady(dfs, path)) { - return false; - } - ready = true; - } - return ready; - } - } diff --git a/hopsworks-common/src/main/java/io/hops/hopsworks/common/jobs/yarn/YarnMonitor.java b/hopsworks-common/src/main/java/io/hops/hopsworks/common/jobs/yarn/YarnMonitor.java index f543734b7d..131de9bc67 100755 --- a/hopsworks-common/src/main/java/io/hops/hopsworks/common/jobs/yarn/YarnMonitor.java +++ b/hopsworks-common/src/main/java/io/hops/hopsworks/common/jobs/yarn/YarnMonitor.java @@ -39,94 +39,391 @@ package io.hops.hopsworks.common.jobs.yarn; -import java.io.Closeable; -import java.io.IOException; - -import io.hops.hopsworks.common.yarn.YarnClientService; -import io.hops.hopsworks.common.yarn.YarnClientWrapper; -import org.apache.hadoop.service.Service; +import io.hops.hopsworks.common.hdfs.DistributedFileSystemOps; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.api.records.LogAggregationStatus; import org.apache.hadoop.yarn.api.records.YarnApplicationState; import org.apache.hadoop.yarn.client.api.YarnClient; import org.apache.hadoop.yarn.exceptions.YarnException; -import org.apache.hadoop.yarn.util.ConverterUtils; +import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat; -public final class YarnMonitor implements Closeable { +import javax.ejb.Stateless; +import javax.ejb.TransactionAttribute; +import javax.ejb.TransactionAttributeType; +import java.io.DataInputStream; +import java.io.File; +import java.io.IOException; +import java.io.PrintStream; +import java.util.ArrayList; +import java.util.concurrent.TimeUnit; +import java.util.logging.Level; +import java.util.logging.Logger; - private final YarnClientWrapper yarnClientWrapper; - private final ApplicationId appId; - private final YarnClientService ycs; +@Stateless +@TransactionAttribute(TransactionAttributeType.NOT_SUPPORTED) +public class YarnMonitor { + private static final Logger LOGGER = Logger.getLogger(YarnMonitor.class.getName()); - public YarnMonitor(ApplicationId id, YarnClientWrapper yarnClientWrapper, - YarnClientService ycs) { - if (id == null) { - throw new IllegalArgumentException( - "ApplicationId cannot be null for Yarn monitor!"); - } - this.appId = id; - this.yarnClientWrapper = yarnClientWrapper; - this.ycs = ycs; - } - public void stop() { - if (null != yarnClientWrapper) { - ycs.closeYarnClient(yarnClientWrapper); - } + //--------------------------------------------------------------------------- + //--------------------------- STATUS QUERIES -------------------------------- + //--------------------------------------------------------------------------- + public YarnApplicationState getApplicationState(YarnClient yarnClient, ApplicationId appId) + throws YarnException, IOException { + return yarnClient.getApplicationReport(appId).getYarnApplicationState(); + } + + public LogAggregationStatus getLogAggregationStatus(YarnClient yarnClient, ApplicationId appId) + throws YarnException, IOException { + return yarnClient.getApplicationReport(appId).getLogAggregationStatus(); } - public boolean isStarted() { - return yarnClientWrapper.getYarnClient().isInState(Service.STATE.STARTED); + public FinalApplicationStatus getFinalApplicationStatus(YarnClient yarnClient, ApplicationId appId) + throws YarnException, IOException { + return yarnClient.getApplicationReport(appId).getFinalApplicationStatus(); } - public boolean isStopped() { - return yarnClientWrapper.getYarnClient().isInState(Service.STATE.STOPPED); + public float getProgress(YarnClient yarnClient, ApplicationId appId) throws YarnException, IOException { + return yarnClient.getApplicationReport(appId).getProgress(); } //--------------------------------------------------------------------------- - //--------------------------- STATUS QUERIES -------------------------------- + //------------------------- YARNCLIENT UTILS -------------------------------- //--------------------------------------------------------------------------- - public YarnApplicationState getApplicationState() throws YarnException, - IOException { - return yarnClientWrapper.getYarnClient().getApplicationReport(appId) - .getYarnApplicationState(); - } - public LogAggregationStatus getLogAggregationStatus() throws YarnException, IOException { - return yarnClientWrapper.getYarnClient().getApplicationReport(appId) - .getLogAggregationStatus(); + public void cancelJob(YarnClient yarnClient, ApplicationId appid) throws YarnException, IOException { + yarnClient.killApplication(appid); } - - public FinalApplicationStatus getFinalApplicationStatus() throws YarnException, - IOException { - return yarnClientWrapper.getYarnClient().getApplicationReport(appId) - .getFinalApplicationStatus(); + + //--------------------------------------------------------------------------- + //------------------------- Yarn log util -------------------------------- + //--------------------------------------------------------------------------- + + /** + * Given aggregated yarn log path and destination path copies the desired log + * type (stdout/stderr) + * + * @param dfs + * @param src + * aggregated yarn log path + * @param dst + * destination path to copy to + * @param desiredLogTypes + * stderr or stdout or stdlog + */ + public void copyAggregatedYarnLogs(ApplicationId applicationId, DistributedFileSystemOps dfs, + YarnClient yarnClient, String src, String dst, String[] desiredLogTypes) + throws YarnException, IOException, InterruptedException { + + LogAggregationStatus logAggregationStatus = waitForLogAggregation(yarnClient, applicationId); + if (logAggregationStatus == null) { + // ServiceStatus might be null if there were issues starting the application + // most likely on the yarn side. + return; + } + + PrintStream writer = null; + String[] srcs; + try { + srcs = getAggregatedLogFilePaths(src, dfs); + if (!logFilesReady(srcs, dfs)) { + LOGGER.log(Level.INFO, "Log is not ready for AppId: {0}. Will retry. ", applicationId); + } + writer = new PrintStream(dfs.create(dst)); + switch (logAggregationStatus) { + case FAILED: + writer.print("The log aggregation failed"); + break; + case TIME_OUT: + writer.print("*** WARNING: Log aggregation has timed-out for some of the containers\n\n\n"); + for (String desiredLogType : desiredLogTypes) { + writeLogs(applicationId, dfs, srcs, writer, desiredLogType); + } + break; + case SUCCEEDED: + for (String desiredLogType : desiredLogTypes) { + writeLogs(applicationId, dfs, srcs, writer, desiredLogType); + } + break; + default: + writer.print("Something went wrong during log aggregation phase! Log aggregation status is: " + + logAggregationStatus.name()); + } + } catch (Exception ex) { + if (writer != null) { + writer.print(YarnMonitor.class.getName() + ": Failed to get aggregated logs.\n" + ex.getMessage()); + } + LOGGER.log(Level.SEVERE, null, ex); + } finally { + if (writer != null) { + writer.flush(); + writer.close(); + } + } } - public float getProgress() throws YarnException, - IOException { - return yarnClientWrapper.getYarnClient().getApplicationReport(appId).getProgress(); + public LogAggregationStatus waitForLogAggregation(YarnClient yarnClient, ApplicationId appId) + throws InterruptedException, YarnException, IOException { + LogAggregationStatus logAggregationStatus = getLogAggregationStatus(yarnClient, appId); + + int not_startRetries = 0; + while (!isFinal(logAggregationStatus)) { + TimeUnit.SECONDS.sleep(2); + logAggregationStatus = getLogAggregationStatus(yarnClient, appId); + // NOT_START LogAggregation status might happen in two cases: + // (a) Application has failed very early and status didn't change to FAILED + // (b) Application has succeeded but the moment we probe for status, + // log aggregation hasn't started yet. + if (logAggregationStatus.equals(LogAggregationStatus.NOT_START)) { + if (++not_startRetries > 30) { + break; + } + } + } + return logAggregationStatus; } - public ApplicationId getApplicationId() { - return appId; + private boolean isFinal(LogAggregationStatus status) { + if (status == null) { + // ServiceStatus might be null if there were issues starting the application + // most likely on the yarn side. + return true; + } + + switch (status) { + case RUNNING: + case RUNNING_WITH_FAILURE: + case NOT_START: + return false; + default: + return true; + } } - //--------------------------------------------------------------------------- - //------------------------- YARNCLIENT UTILS -------------------------------- - //--------------------------------------------------------------------------- - @Override - public void close() { - stop(); + private void writeLogs(ApplicationId appId, DistributedFileSystemOps dfs, String[] srcs, PrintStream writer, + String desiredLogType) { + ArrayList containerNames = new ArrayList<>(); + LogReader reader = null; + DataInputStream valueStream; + AggregatedLogFormat.LogKey key = new AggregatedLogFormat.LogKey(); + AggregatedLogFormat.ContainerLogsReader logReader = null; + Path location; + try { + for (String src : srcs) { + location = new Path(src); + LOGGER.log(Level.FINE, "Copying log from {0}", src); + try { + reader = new LogReader(dfs.getConf(), dfs, location); + valueStream = reader.next(key); + while (valueStream != null) { + containerNames.add(key); + valueStream = reader.next(key); + } + reader.close(); + reader = new LogReader(dfs.getConf(), dfs, location); + } catch (IOException e) { + LOGGER.log(Level.WARNING, "Logs are not available. Aggregation might not be done. AppId: {0}", appId); + return; + } + + try { + for (AggregatedLogFormat.LogKey containerKey : containerNames) { + valueStream = reader.next(key); + while (valueStream != null && !key.equals(containerKey)) { + valueStream = reader.next(key); + } + if (valueStream != null) { + logReader = new AggregatedLogFormat.ContainerLogsReader(valueStream); + } + if (logReader != null) { + readContainerLogs(logReader, writer, desiredLogType, containerKey, + location.getName()); + } + } + + } catch (IOException e) { + LOGGER.log(Level.WARNING, "Failed to get log. Aggregation might not be done. AppId: {0}", appId); + } + containerNames.clear(); + key = new AggregatedLogFormat.LogKey(); + logReader = null; + } + } finally { + if (reader != null) { + reader.close(); + } + } } - public void cancelJob(String appid) throws YarnException, IOException { - ApplicationId applicationId = ConverterUtils.toApplicationId(appid); - yarnClientWrapper.getYarnClient().killApplication(applicationId); + private boolean logsReady(DistributedFileSystemOps dfs, String src) { + ArrayList containerNames = new ArrayList<>(); + LogReader reader = null; + DataInputStream valueStream; + AggregatedLogFormat.LogKey key = new AggregatedLogFormat.LogKey(); + AggregatedLogFormat.ContainerLogsReader logReader = null; + try { + try { + reader = new LogReader(dfs.getConf(), dfs, + new Path(src)); + valueStream = reader.next(key); + while (valueStream != null) { + containerNames.add(key); + valueStream = reader.next(key); + } + reader.close(); + reader = new LogReader(dfs.getConf(), dfs, + new Path(src)); + } catch (IOException e) { + return false; + } + + try { + for (AggregatedLogFormat.LogKey containerKey : containerNames) { + valueStream = reader.next(key); + while (valueStream != null && !key.equals(containerKey)) { + valueStream = reader.next(key); + } + if (valueStream != null) { + logReader = new AggregatedLogFormat.ContainerLogsReader(valueStream); + } + if (logReader != null) { + if (!testLogs(logReader, "out")) { + return false; + } + } + } + } catch (IOException e) { + LOGGER.log(Level.SEVERE, "Error testing logs"); + } + } finally { + if (reader != null) { + reader.close(); + } + } + return true; } - public YarnClient getYarnClient() { - return yarnClientWrapper.getYarnClient(); + private boolean testLogs(AggregatedLogFormat.ContainerLogsReader logReader, String desiredLogType) + throws IOException { + boolean foundLog = true; + String logType = logReader.nextLog(); + while (logType != null) { + foundLog = true; + if (!logType.contains(desiredLogType)) { + foundLog = false; + } + logType = logReader.nextLog(); + } + return foundLog; + } + + //Mostly taken from org.apache.hadoop.yarn.webapp.log.AggregatedLogsBlock + private boolean readContainerLogs(AggregatedLogFormat.ContainerLogsReader logReader, PrintStream writer, + String desiredLogType, AggregatedLogFormat.LogKey containerKey, String nodename) throws IOException { + int bufferSize = 65536; + char[] cbuf = new char[bufferSize]; + boolean foundLog = false; + String logType = logReader.nextLog(); + while (logType != null) { + if (desiredLogType == null || desiredLogType.isEmpty() + || logType.contains(desiredLogType)) { + long logLength = logReader.getCurrentLogLength(); + if (!foundLog) { + writer.append("Container: ") + .append(containerKey.toString()) + .append(" on ") + .append(nodename) + .append("\n") + .append("===============================================") + .append("=============================================== \n"); + } + if (logLength == 0) { + writer.append("Log Type: ") + .append(logType) + .append("\n") + .append("Log Length: " + 0 + "\n"); + logType = logReader.nextLog(); + continue; + } + writer.append("Log Type: ") + .append(logType).append("\n") + .append("Log Length: ") + .append(String.valueOf(logLength)) + .append("\n") + .append("Log Contents: \n"); + int len = 0; + int currentToRead = logLength > bufferSize ? bufferSize + : (int) logLength; + while (logLength > 0 && (len = logReader.read(cbuf, 0, currentToRead)) + > 0) { + writer.append(new String(cbuf, 0, len)); + logLength = logLength - len; + currentToRead = logLength > bufferSize ? bufferSize : (int) logLength; + } + writer.append("\n"); + foundLog = true; + } + logType = logReader.nextLog(); + } + return foundLog; + } + + /** + * Given a path to an aggregated log returns the full path to the log file. + */ + private String[] getAggregatedLogFilePaths(String path, DistributedFileSystemOps dfs) throws IOException { + Path location = new Path(path); + String[] paths; + FileStatus[] fileStatus; + if (!dfs.exists(path)) { + paths = new String[1]; + paths[0] = path; + return paths; + } + if (!dfs.isDir(path)) { + paths = new String[1]; + paths[0] = path; + return paths; + } + fileStatus = dfs.listStatus(location); + if (fileStatus == null || fileStatus.length == 0) { + paths = new String[1]; + paths[0] = path; + return paths; + } + paths = new String[fileStatus.length]; + for (int i = 0; i < fileStatus.length; i++) { + paths[i] = path + File.separator + fileStatus[i].getPath().getName(); + } + return paths; + } + + private boolean logFilesReady(String[] paths, DistributedFileSystemOps dfs) throws IOException { + boolean ready = false; + for (String path : paths) { + Path location = new Path(path); + FileStatus fileStatus; + if (!dfs.exists(path)) { + return false; + } + if (dfs.isDir(path)) { + return false; + } + fileStatus = dfs.getFileStatus(location); + if (fileStatus == null) { + return false; + } + if (fileStatus.getLen() == 0l) { + return false; + } + if (!logsReady(dfs, path)) { + return false; + } + ready = true; + } + return ready; } } diff --git a/hopsworks-common/src/main/java/io/hops/hopsworks/common/project/ProjectController.java b/hopsworks-common/src/main/java/io/hops/hopsworks/common/project/ProjectController.java index 0e60b55e7f..1d5cf826c8 100644 --- a/hopsworks-common/src/main/java/io/hops/hopsworks/common/project/ProjectController.java +++ b/hopsworks-common/src/main/java/io/hops/hopsworks/common/project/ProjectController.java @@ -70,6 +70,7 @@ import io.hops.hopsworks.common.dao.user.activity.ActivityFacade; import io.hops.hopsworks.common.dataset.DatasetController; import io.hops.hopsworks.common.dataset.FolderNameValidator; +import io.hops.hopsworks.common.jobs.yarn.YarnMonitor; import io.hops.hopsworks.common.kafka.KafkaController; import io.hops.hopsworks.common.opensearch.OpenSearchController; import io.hops.hopsworks.common.experiments.tensorboard.TensorBoardController; @@ -84,7 +85,6 @@ import io.hops.hopsworks.common.hive.HiveController; import io.hops.hopsworks.common.jobs.JobController; import io.hops.hopsworks.common.jobs.execution.ExecutionController; -import io.hops.hopsworks.common.jobs.yarn.YarnLogUtil; import io.hops.hopsworks.common.jupyter.JupyterController; import io.hops.hopsworks.common.kafka.SubjectsCompatibilityController; import io.hops.hopsworks.common.kafka.SubjectsController; @@ -302,6 +302,8 @@ public class ProjectController { private AlertController alertController; @EJB private AMClient alertManager; + @EJB + private YarnMonitor yarnMonitor; @Inject @Any private Instance projectTeamRoleHandlers; @@ -1491,16 +1493,16 @@ private void killYarnJobs(Project project) throws JobException { } } - private void waitForJobLogs(List projectsApps, YarnClient client) + private void waitForJobLogs(List projectsApps, YarnClient yarnClient) throws YarnException, IOException, InterruptedException { for (ApplicationReport appReport : projectsApps) { FinalApplicationStatus finalState = appReport.getFinalApplicationStatus(); while (finalState.equals(FinalApplicationStatus.UNDEFINED)) { - client.killApplication(appReport.getApplicationId()); - appReport = client.getApplicationReport(appReport.getApplicationId()); + yarnClient.killApplication(appReport.getApplicationId()); + appReport = yarnClient.getApplicationReport(appReport.getApplicationId()); finalState = appReport.getFinalApplicationStatus(); } - YarnLogUtil.waitForLogAggregation(client, appReport.getApplicationId()); + yarnMonitor.waitForLogAggregation(yarnClient, appReport.getApplicationId()); } } @@ -2067,11 +2069,11 @@ public void removeMemberFromTeam(Project project, Users userToBeRemoved) throws String hdfsUser = hdfsUsersController.getHdfsUserName(project, userToBeRemoved); YarnClientWrapper yarnClientWrapper = ycs.getYarnClientSuper(settings.getConfiguration()); - YarnClient client = yarnClientWrapper.getYarnClient(); + YarnClient yarnClient = yarnClientWrapper.getYarnClient(); try { Set hdfsUsers = new HashSet<>(); hdfsUsers.add(hdfsUser); - List projectsApps = client.getApplications(null, hdfsUsers, null, EnumSet.of( + List projectsApps = yarnClient.getApplications(null, hdfsUsers, null, EnumSet.of( YarnApplicationState.ACCEPTED, YarnApplicationState.NEW, YarnApplicationState.NEW_SAVING, YarnApplicationState.RUNNING, YarnApplicationState.SUBMITTED)); //kill jupyter for this user @@ -2099,11 +2101,11 @@ public void removeMemberFromTeam(Project project, Users userToBeRemoved) throws for (ApplicationReport appReport : projectsApps) { FinalApplicationStatus finalState = appReport.getFinalApplicationStatus(); while (finalState.equals(FinalApplicationStatus.UNDEFINED)) { - client.killApplication(appReport.getApplicationId()); - appReport = client.getApplicationReport(appReport.getApplicationId()); + yarnClient.killApplication(appReport.getApplicationId()); + appReport = yarnClient.getApplicationReport(appReport.getApplicationId()); finalState = appReport.getFinalApplicationStatus(); } - YarnLogUtil.waitForLogAggregation(client, appReport.getApplicationId()); + yarnMonitor.waitForLogAggregation(yarnClient, appReport.getApplicationId()); } } catch (YarnException | IOException | InterruptedException e) { throw new ProjectException(RESTCodes.ProjectErrorCode.KILL_MEMBER_JOBS, Level.SEVERE,