Skip to content

Commit

Permalink
TEZ-4554: Counter for used nodes within a DAG
Browse files Browse the repository at this point in the history
  • Loading branch information
abstractdog committed Jun 21, 2024
1 parent 0ac505b commit 547c859
Show file tree
Hide file tree
Showing 6 changed files with 115 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -87,5 +87,24 @@ public enum DAGCounter {
* Number of container reuses during a DAG. This is incremented every time
* the containerReused callback is called in the TaskSchedulerContext.
*/
TOTAL_CONTAINER_REUSE_COUNT
TOTAL_CONTAINER_REUSE_COUNT,

/*
* Number of nodes to which task attempts were assigned in this DAG.
* Nodes are distinguished by the Yarn NodeId.
*/
NODE_USED_COUNT,

/*
* Number of node hosts to which task attempts were assigned in this DAG.
* Nodes are distinguished by Yarn NodeId.getHost()
*/
NODE_HOSTS_USED_COUNT,

/*
* Total number of nodes visible to the task scheduler (regardless of
* task assignments). This is typically exposed by a resource manager
* client.
*/
NODE_TOTAL_COUNT
}
4 changes: 2 additions & 2 deletions tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAG.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
import java.util.Set;

import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.tez.common.counters.DAGCounter;
Expand Down Expand Up @@ -106,7 +106,7 @@ VertexStatusBuilder getVertexStatus(String vertexName,

void incrementDagCounter(DAGCounter counter, int incrValue);
void setDagCounter(DAGCounter counter, int setValue);
void addUsedContainer(ContainerId containerId);
void addUsedContainer(Container container);

/**
* Called by the DAGAppMaster when the DAG is started normally or in the event of recovery.
Expand Down
20 changes: 16 additions & 4 deletions tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,10 @@
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.ApplicationConstants;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.state.InvalidStateTransitonException;
import org.apache.hadoop.yarn.state.MultipleArcTransition;
Expand Down Expand Up @@ -250,6 +252,11 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,

private final MemoryMXBean memoryMXBean = ManagementFactory.getMemoryMXBean();
private final Set<ContainerId> containersUsedByCurrentDAG = new HashSet<>();
@VisibleForTesting
final Set<NodeId> nodesUsedByCurrentDAG = new HashSet<>();
@VisibleForTesting
final Set<String> nodeHostsUsedByCurrentDAG = new HashSet<>();


protected static final
StateMachineFactory<DAGImpl, DAGState, DAGEventType, DAGEvent>
Expand Down Expand Up @@ -2563,7 +2570,7 @@ public void onStart() {
@Override
public void onFinish() {
stopVertexServices();
handleUsedContainersOnDagFinish();
updateCounters();
}

private void startVertexServices() {
Expand All @@ -2579,11 +2586,16 @@ void stopVertexServices() {
}

@Override
public void addUsedContainer(ContainerId containerId) {
containersUsedByCurrentDAG.add(containerId);
public void addUsedContainer(Container container) {
containersUsedByCurrentDAG.add(container.getId());
nodesUsedByCurrentDAG.add(container.getNodeId());
nodeHostsUsedByCurrentDAG.add(container.getNodeId().getHost());
}

private void handleUsedContainersOnDagFinish() {
private void updateCounters() {
setDagCounter(DAGCounter.TOTAL_CONTAINERS_USED, containersUsedByCurrentDAG.size());
setDagCounter(DAGCounter.NODE_USED_COUNT, nodesUsedByCurrentDAG.size());
setDagCounter(DAGCounter.NODE_HOSTS_USED_COUNT, nodeHostsUsedByCurrentDAG.size());
setDagCounter(DAGCounter.NODE_TOTAL_COUNT, appContext.getTaskScheduler().getNumClusterNodes());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -746,7 +746,7 @@ public synchronized void taskAllocated(int schedulerId, Object task,
sendEvent(new AMNodeEventContainerAllocated(container
.getNodeId(), schedulerId, container.getId()));
}
appContext.getCurrentDAG().addUsedContainer(containerId);
appContext.getCurrentDAG().addUsedContainer(container);

TaskAttempt taskAttempt = event.getTaskAttempt();
// TODO - perhaps check if the task still needs this container
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import org.apache.tez.common.counters.TezCounters;
import org.apache.tez.dag.api.TezConstants;
import org.apache.tez.dag.app.dag.event.DAGEventTerminateDag;
import org.apache.tez.dag.app.rm.TaskSchedulerManager;
import org.apache.tez.hadoop.shim.DefaultHadoopShim;
import org.apache.tez.hadoop.shim.HadoopShim;
import org.junit.Rule;
Expand All @@ -60,6 +61,8 @@
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.util.Clock;
Expand Down Expand Up @@ -181,6 +184,7 @@ public class TestDAGImpl {
private ACLManager aclManager;
private ApplicationAttemptId appAttemptId;
private DAGImpl dag;
private TaskSchedulerManager taskSchedulerManager;
private TaskEventDispatcher taskEventDispatcher;
private VertexEventDispatcher vertexEventDispatcher;
private DagEventDispatcher dagEventDispatcher;
Expand Down Expand Up @@ -861,11 +865,12 @@ public void setup() {
dispatcher = new DrainDispatcher();
fsTokens = new Credentials();
appContext = mock(AppContext.class);
taskSchedulerManager = mock(TaskSchedulerManager.class);
execService = mock(ListeningExecutorService.class);
final ListenableFuture<Void> mockFuture = mock(ListenableFuture.class);
when(appContext.getHadoopShim()).thenReturn(defaultShim);
when(appContext.getApplicationID()).thenReturn(appAttemptId.getApplicationId());

doAnswer(new Answer() {
public ListenableFuture<Void> answer(InvocationOnMock invocation) {
Object[] args = invocation.getArguments();
Expand Down Expand Up @@ -2358,22 +2363,82 @@ public void testCounterLimits() {

}

@SuppressWarnings("unchecked")
@Test(timeout = 5000)
public void testTotalContainersUsedCounter() {
DAGImpl spy = getDagSpy();

spy.addUsedContainer(Container.newInstance(ContainerId.fromString("container_e16_1504924099862_7571_01_000005"),
mock(NodeId.class), null, null, null, null));
spy.addUsedContainer(Container.newInstance(ContainerId.fromString("container_e16_1504924099862_7571_01_000006"),
mock(NodeId.class), null, null, null, null));

spy.onFinish();
// 2 calls to addUsedContainer
verify(spy, times(2)).addUsedContainer(any(Container.class));
// 2 containers were used
Assert.assertEquals(2,
spy.getAllCounters().getGroup(DAGCounter.class.getName()).findCounter(DAGCounter.TOTAL_CONTAINERS_USED.name())
.getValue());
}

@Test(timeout = 5000)
public void testNodesUsedCounter() {
DAGImpl spy = getDagSpy();

Container containerOnHost = mock(Container.class);
when(containerOnHost.getNodeId()).thenReturn(NodeId.fromString("localhost:0"));
Container containerOnSameHost = mock(Container.class);
when(containerOnSameHost.getNodeId()).thenReturn(NodeId.fromString("localhost:0"));
Container containerOnDifferentHost = mock(Container.class);
when(containerOnDifferentHost.getNodeId()).thenReturn(NodeId.fromString("otherhost:0"));
Container containerOnSameHostWithDifferentPort = mock(Container.class);
when(containerOnSameHostWithDifferentPort.getNodeId()).thenReturn(NodeId.fromString("localhost:1"));

spy.addUsedContainer(containerOnHost);
spy.addUsedContainer(containerOnSameHost);
spy.addUsedContainer(containerOnDifferentHost);
spy.addUsedContainer(containerOnSameHostWithDifferentPort);

when(taskSchedulerManager.getNumClusterNodes()).thenReturn(10);

spy.onFinish();
// 4 calls to addUsedContainer
verify(spy, times(4)).addUsedContainer(any(Container.class));
// 3 nodes were used: localhost:0, otherhost:0, localhost:1
// localhost:0 and localhost:1 might be on the same physical host, but as long as
// yarn considers them different nodes, we consider them different too
Assert.assertEquals(3,
spy.getAllCounters().getGroup(DAGCounter.class.getName()).findCounter(DAGCounter.NODE_USED_COUNT.name())
.getValue());

Assert.assertTrue(spy.nodesUsedByCurrentDAG.contains(NodeId.fromString("localhost:0")));
Assert.assertTrue(spy.nodesUsedByCurrentDAG.contains(NodeId.fromString("otherhost:0")));
Assert.assertTrue(spy.nodesUsedByCurrentDAG.contains(NodeId.fromString("localhost:1")));

// 2 distinct node hosts were seen: localhost, otherhost
Assert.assertEquals(2,
spy.getAllCounters().getGroup(DAGCounter.class.getName())
.findCounter(DAGCounter.NODE_HOSTS_USED_COUNT.name())
.getValue());

Assert.assertEquals(10,
spy.getAllCounters().getGroup(DAGCounter.class.getName())
.findCounter(DAGCounter.NODE_TOTAL_COUNT.name())
.getValue());

Assert.assertTrue(spy.nodeHostsUsedByCurrentDAG.contains("localhost"));
Assert.assertTrue(spy.nodeHostsUsedByCurrentDAG.contains("otherhost"));
}

private DAGImpl getDagSpy() {
initDAG(mrrDag);
dispatcher.await();
startDAG(mrrDag);
dispatcher.await();

DAGImpl spy = spy(mrrDag);
spy.addUsedContainer(mock(ContainerId.class));
spy.addUsedContainer(mock(ContainerId.class));
// needed when onFinish() method is called on a DAGImpl
when(mrrAppContext.getTaskScheduler()).thenReturn(taskSchedulerManager);

spy.onFinish();
// 2 calls to addUsedContainer, obviously, we did it here
verify(spy, times(2)).addUsedContainer(any(ContainerId.class));
// 1 call to setDagCounter, which happened at dag.onFinish
verify(spy).setDagCounter(DAGCounter.TOTAL_CONTAINERS_USED, 2);
return spy(mrrDag);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,7 @@ public void testSimpleAllocate() throws Exception {
assertEquals(priority, assignEvent.getPriority());
assertEquals(mockAttemptId, assignEvent.getTaskAttemptId());

verify(mockAppContext.getCurrentDAG()).addUsedContainer(any(ContainerId.class)); // called on taskAllocated
verify(mockAppContext.getCurrentDAG()).addUsedContainer(any(Container.class)); // called on taskAllocated
}

@Test(timeout = 5000)
Expand Down

0 comments on commit 547c859

Please sign in to comment.