From e7e53b7b23a8a79277b8df11613a73351accee55 Mon Sep 17 00:00:00 2001 From: Ayush Saxena Date: Thu, 25 Jan 2024 08:56:26 +0530 Subject: [PATCH] TEZ-4451: ThreadLevel IO Stats Support for TEZ. --- .../org/apache/tez/runtime/task/TaskRunner2Callable.java | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TaskRunner2Callable.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TaskRunner2Callable.java index 810a806228..7d9df70193 100644 --- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TaskRunner2Callable.java +++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TaskRunner2Callable.java @@ -18,6 +18,8 @@ import java.security.PrivilegedExceptionAction; import java.util.concurrent.atomic.AtomicBoolean; +import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.fs.statistics.IOStatisticsContext; import org.apache.hadoop.security.UserGroupInformation; import org.apache.tez.common.CallableWithNdc; import org.apache.tez.common.TezCommonUtils; @@ -75,6 +77,7 @@ public TaskRunner2CallableResult run() throws Exception { LOG.info("Initializing task" + ", taskAttemptId={}", task.getTaskAttemptID()); TezUtilsInternal.setHadoopCallerContext(task.getHadoopShim(), task.getTaskAttemptID()); TezCommonUtils.logCredentials(LOG, ugi.getCredentials(), "taskInit"); + IOStatisticsContext.getCurrentIOStatisticsContext().reset(); task.initialize(); if (!stopRequested.get() && !Thread.currentThread().isInterrupted()) { @@ -116,6 +119,10 @@ public TaskRunner2CallableResult run() throws Exception { // For a successful task, however, this should be almost no delay since close has already happened. maybeFixInterruptStatus(); LOG.info("Cleaning up task {}, stopRequested={}", task.getTaskAttemptID(), stopRequested.get()); + String ioStats = IOStatisticsContext.getCurrentIOStatisticsContext().snapshot().toString(); + if (StringUtils.isNotEmpty(ioStats)) { + LOG.info("TaskAttemptId={}, IOStatistics={}", task.getTaskAttemptID(), ioStats); + } task.getOutputContexts().forEach(outputContext -> outputContext.trapEvents(new TezTrapEventHandler(outputContext, this.tezUmbilical)));