Skip to content

Commit

Permalink
TEZ-4451: ThreadLevel IO Stats Support for TEZ.
Browse files Browse the repository at this point in the history
  • Loading branch information
ayushtkn committed Jan 25, 2024
1 parent cafa4b3 commit e7e53b7
Showing 1 changed file with 7 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
import java.security.PrivilegedExceptionAction;
import java.util.concurrent.atomic.AtomicBoolean;

import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.fs.statistics.IOStatisticsContext;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.tez.common.CallableWithNdc;
import org.apache.tez.common.TezCommonUtils;
Expand Down Expand Up @@ -75,6 +77,7 @@ public TaskRunner2CallableResult run() throws Exception {
LOG.info("Initializing task" + ", taskAttemptId={}", task.getTaskAttemptID());
TezUtilsInternal.setHadoopCallerContext(task.getHadoopShim(), task.getTaskAttemptID());
TezCommonUtils.logCredentials(LOG, ugi.getCredentials(), "taskInit");
IOStatisticsContext.getCurrentIOStatisticsContext().reset();
task.initialize();

if (!stopRequested.get() && !Thread.currentThread().isInterrupted()) {
Expand Down Expand Up @@ -116,6 +119,10 @@ public TaskRunner2CallableResult run() throws Exception {
// For a successful task, however, this should be almost no delay since close has already happened.
maybeFixInterruptStatus();
LOG.info("Cleaning up task {}, stopRequested={}", task.getTaskAttemptID(), stopRequested.get());
String ioStats = IOStatisticsContext.getCurrentIOStatisticsContext().snapshot().toString();
if (StringUtils.isNotEmpty(ioStats)) {
LOG.info("TaskAttemptId={}, IOStatistics={}", task.getTaskAttemptID(), ioStats);
}
task.getOutputContexts().forEach(outputContext
-> outputContext.trapEvents(new TezTrapEventHandler(outputContext,
this.tezUmbilical)));
Expand Down

0 comments on commit e7e53b7

Please sign in to comment.