Skip to content

Commit

Permalink
TEZ-4541: Remove or limit evergrowing DAG collections from DAGAppMaster
Browse files Browse the repository at this point in the history
  • Loading branch information
abstractdog committed Apr 16, 2024
1 parent f080031 commit 0a79c99
Show file tree
Hide file tree
Showing 4 changed files with 13 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -56,10 +56,6 @@ private DAG getCurrentDAG() {
return dagAppMaster.getContext().getCurrentDAG();
}

private Set<String> getAllDagIDs() {
return dagAppMaster.getContext().getAllDAGIDs();
}

public List<String> getAllDAGs() throws TezException {
return Collections.singletonList(getCurrentDAG().getID().toString());
}
Expand Down Expand Up @@ -100,8 +96,11 @@ DAG getDAG(String dagIdStr) throws TezException {

final String currentDAGIdStr = currentDAG.getID().toString();
if (!currentDAGIdStr.equals(dagIdStr)) {
if (getAllDagIDs().contains(dagIdStr)) {
LOG.debug("Looking for finished dagId {} current dag is {}", dagIdStr, currentDAGIdStr);
Set<String> recentDagIds = dagAppMaster.getContext().getRecentDAGIDs();
if (recentDagIds.contains(dagIdStr)) {
if (LOG.isDebugEnabled()) {
LOG.debug("Looking for finished dagId " + dagIdStr + " current dag is " + currentDAGIdStr);
}
throw new DAGNotRunningException("DAG " + dagIdStr + " Not running, current dag is " +
currentDAGIdStr);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ public interface AppContext {

void setDAGRecoveryData(DAGRecoveryData dagRecoveryData);

Set<String> getAllDAGIDs();
Set<String> getRecentDAGIDs();

@SuppressWarnings("rawtypes")
EventHandler getEventHandler();
Expand Down
11 changes: 5 additions & 6 deletions tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Queue;
import java.util.Random;
import java.util.Set;
import java.util.Timer;
Expand All @@ -59,6 +60,7 @@
import java.util.Objects;

import com.google.common.collect.BiMap;
import com.google.common.collect.EvictingQueue;
import com.google.common.collect.HashBiMap;
import com.google.common.collect.Lists;

Expand Down Expand Up @@ -312,10 +314,7 @@ public class DAGAppMaster extends AbstractService {
private final BiMap<String, Integer> containerLaunchers = HashBiMap.create();
private final BiMap<String, Integer> taskCommunicators = HashBiMap.create();

/**
* set of already executed dag names.
*/
Set<String> dagIDs = new HashSet<String>();
Queue<String> dagIDs = EvictingQueue.create(10);

protected boolean isLastAMRetry = false;

Expand Down Expand Up @@ -1522,8 +1521,8 @@ public ListeningExecutorService getExecService() {
}

@Override
public Set<String> getAllDAGIDs() {
return dagIDs;
public Set<String> getRecentDAGIDs() {
return new HashSet<>(dagIDs);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Random;
Expand Down Expand Up @@ -80,6 +79,7 @@
import org.apache.tez.runtime.api.impl.TezEvent;
import org.junit.*;

import com.google.common.collect.EvictingQueue;
import com.google.common.collect.Lists;

import static org.junit.Assert.*;
Expand Down Expand Up @@ -108,7 +108,7 @@ public void setUp() throws IllegalArgumentException, IOException {
this.recoveryPath = new Path(TEST_ROOT_DIR + "/" + appId + "/recovery");
this.localFS.delete(new Path(TEST_ROOT_DIR), true);
mockAppMaster = mock(DAGAppMaster.class);
mockAppMaster.dagIDs = new HashSet<String>();
mockAppMaster.dagIDs = EvictingQueue.create(10);
when(mockAppMaster.getConfig()).thenReturn(new Configuration());
mockDAGImpl = mock(DAGImpl.class);
when(mockAppMaster.createDAG(any(), any())).thenReturn(mockDAGImpl);
Expand Down

0 comments on commit 0a79c99

Please sign in to comment.