Skip to content

Commit

Permalink
TEZ-4561: Improve reported exception when DAGAppMaster is shutting do…
Browse files Browse the repository at this point in the history
…wn. (#365). (Ayush Saxena, reviewed by Laszlo Bodor)
  • Loading branch information
ayushtkn authored Sep 3, 2024
1 parent 174d4e3 commit 76490d4
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 5 deletions.
24 changes: 22 additions & 2 deletions tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import java.util.Arrays;
import java.util.Calendar;
import java.util.Collections;
import java.util.Date;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.HashSet;
Expand Down Expand Up @@ -936,6 +937,15 @@ public void handle(DAGAppMasterEvent event) {
protected class DAGAppMasterShutdownHandler {
private AtomicBoolean shutdownHandled = new AtomicBoolean(false);
private long sleepTimeBeforeExit = TezConstants.TEZ_DAG_SLEEP_TIME_BEFORE_EXIT;
private long shutdownTime;

public Date getShutdownTime() {
return new Date(shutdownTime);
}

public void setShutdownTime(long shutdownTime) {
this.shutdownTime = shutdownTime;
}

void setSleepTimeBeforeExit(long sleepTimeBeforeExit) {
this.sleepTimeBeforeExit = sleepTimeBeforeExit;
Expand All @@ -954,6 +964,7 @@ public void shutdown(boolean now) {

synchronized (shutdownHandlerRunning) {
shutdownHandlerRunning.set(true);
setShutdownTime(System.currentTimeMillis());
}
LOG.info("Handling DAGAppMaster shutdown");

Expand Down Expand Up @@ -1680,9 +1691,11 @@ public HadoopShim getHadoopShim() {

@Override
public Map<ApplicationAccessType, String> getApplicationACLs() {
if (getServiceState() != STATE.STARTED) {
STATE serviceState = getServiceState();
if (serviceState != STATE.STARTED) {
throw new TezUncheckedException(
"Cannot get ApplicationACLs before all services have started");
"Cannot get ApplicationACLs before all services have started, The current service state is " + serviceState
+ "." + getShutdownTimeString());
}
return taskSchedulerManager.getApplicationAcls();
}
Expand Down Expand Up @@ -1743,6 +1756,13 @@ public void setQueueName(String queueName) {
}
}

private String getShutdownTimeString() {
if (shutdownHandler != null && shutdownHandler.getShutdownTime() != null) {
return " The shutdown hook started at " + shutdownHandler.getShutdownTime();
}
return "";
}

private static class ServiceWithDependency implements ServiceStateChangeListener {
ServiceWithDependency(Service service) {
this.service = service;
Expand Down
29 changes: 26 additions & 3 deletions tez-dag/src/test/java/org/apache/tez/dag/app/TestDAGAppMaster.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@
import org.apache.hadoop.security.token.SecretManager;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.util.Progressable;
import org.apache.hadoop.test.LambdaTestUtils;
import org.apache.hadoop.util.Time;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
Expand All @@ -49,6 +50,7 @@
import org.apache.tez.dag.api.NamedEntityDescriptor;
import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.api.TezConstants;
import org.apache.tez.dag.api.TezUncheckedException;
import org.apache.tez.dag.api.UserPayload;
import org.apache.tez.dag.api.records.DAGProtos;
import org.apache.tez.dag.api.records.DAGProtos.AMPluginDescriptorProto;
Expand All @@ -67,18 +69,19 @@
import org.junit.Before;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
import org.mockito.Mockito;

import java.io.ByteArrayInputStream;
import java.io.DataInput;
import java.io.DataInputStream;
import java.io.DataOutput;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.lang.reflect.Field;
import java.net.URI;
import java.nio.ByteBuffer;
import java.time.Instant;
import java.util.Date;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
Expand Down Expand Up @@ -493,6 +496,26 @@ public void testDagCredentialsWithMerge() throws Exception {
testDagCredentials(true);
}

@Test
public void testGetACLFailure() throws Exception {
ApplicationId appId = ApplicationId.newInstance(1, 1);
ApplicationAttemptId attemptId = ApplicationAttemptId.newInstance(appId, 2);
DAGAppMasterForTest dam = new DAGAppMasterForTest(attemptId, true);
TezConfiguration conf = new TezConfiguration(false);
conf.setBoolean(TezConfiguration.DAG_RECOVERY_ENABLED, false);
dam.init(conf);
LambdaTestUtils.intercept(TezUncheckedException.class,
"Cannot get ApplicationACLs before all services have started, The current service state is INITED",
() -> dam.getContext().getApplicationACLs());
dam.start();
dam.stop();
Mockito.when(dam.mockShutdown.getShutdownTime()).thenReturn(Date.from(Instant.ofEpochMilli(Time.now())));
LambdaTestUtils.intercept(TezUncheckedException.class,
" Cannot get ApplicationACLs before all services have started, "
+ "The current service state is STOPPED. The shutdown hook started at "
+ dam.mockShutdown.getShutdownTime(), () -> dam.getContext().getApplicationACLs());
}

@Test
public void testBadProgress() throws Exception {
TezConfiguration conf = new TezConfiguration();
Expand Down

0 comments on commit 76490d4

Please sign in to comment.