From 0a79c9938404be3b813dc1585896ab5d477df6ab Mon Sep 17 00:00:00 2001 From: Laszlo Bodor Date: Tue, 20 Feb 2024 17:02:02 +0100 Subject: [PATCH] TEZ-4541: Remove or limit evergrowing DAG collections from DAGAppMaster --- .../apache/tez/dag/api/client/DAGClientHandler.java | 11 +++++------ .../main/java/org/apache/tez/dag/app/AppContext.java | 2 +- .../java/org/apache/tez/dag/app/DAGAppMaster.java | 11 +++++------ .../org/apache/tez/dag/app/TestRecoveryParser.java | 4 ++-- 4 files changed, 13 insertions(+), 15 deletions(-) diff --git a/tez-dag/src/main/java/org/apache/tez/dag/api/client/DAGClientHandler.java b/tez-dag/src/main/java/org/apache/tez/dag/api/client/DAGClientHandler.java index 4ed9d86a34..36b73dcf38 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/api/client/DAGClientHandler.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/api/client/DAGClientHandler.java @@ -56,10 +56,6 @@ private DAG getCurrentDAG() { return dagAppMaster.getContext().getCurrentDAG(); } - private Set getAllDagIDs() { - return dagAppMaster.getContext().getAllDAGIDs(); - } - public List getAllDAGs() throws TezException { return Collections.singletonList(getCurrentDAG().getID().toString()); } @@ -100,8 +96,11 @@ DAG getDAG(String dagIdStr) throws TezException { final String currentDAGIdStr = currentDAG.getID().toString(); if (!currentDAGIdStr.equals(dagIdStr)) { - if (getAllDagIDs().contains(dagIdStr)) { - LOG.debug("Looking for finished dagId {} current dag is {}", dagIdStr, currentDAGIdStr); + Set recentDagIds = dagAppMaster.getContext().getRecentDAGIDs(); + if (recentDagIds.contains(dagIdStr)) { + if (LOG.isDebugEnabled()) { + LOG.debug("Looking for finished dagId " + dagIdStr + " current dag is " + currentDAGIdStr); + } throw new DAGNotRunningException("DAG " + dagIdStr + " Not running, current dag is " + currentDAGIdStr); } else { diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/AppContext.java b/tez-dag/src/main/java/org/apache/tez/dag/app/AppContext.java index c9a7083c1d..bc58c598d3 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/AppContext.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/AppContext.java @@ -80,7 +80,7 @@ public interface AppContext { void setDAGRecoveryData(DAGRecoveryData dagRecoveryData); - Set getAllDAGIDs(); + Set getRecentDAGIDs(); @SuppressWarnings("rawtypes") EventHandler getEventHandler(); diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java index 263ac76b4c..1164a49f33 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java @@ -44,6 +44,7 @@ import java.util.List; import java.util.Map; import java.util.Map.Entry; +import java.util.Queue; import java.util.Random; import java.util.Set; import java.util.Timer; @@ -59,6 +60,7 @@ import java.util.Objects; import com.google.common.collect.BiMap; +import com.google.common.collect.EvictingQueue; import com.google.common.collect.HashBiMap; import com.google.common.collect.Lists; @@ -312,10 +314,7 @@ public class DAGAppMaster extends AbstractService { private final BiMap containerLaunchers = HashBiMap.create(); private final BiMap taskCommunicators = HashBiMap.create(); - /** - * set of already executed dag names. - */ - Set dagIDs = new HashSet(); + Queue dagIDs = EvictingQueue.create(10); protected boolean isLastAMRetry = false; @@ -1522,8 +1521,8 @@ public ListeningExecutorService getExecService() { } @Override - public Set getAllDAGIDs() { - return dagIDs; + public Set getRecentDAGIDs() { + return new HashSet<>(dagIDs); } @Override diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/TestRecoveryParser.java b/tez-dag/src/test/java/org/apache/tez/dag/app/TestRecoveryParser.java index 57148ee7e9..43b88e4dfa 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/TestRecoveryParser.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/TestRecoveryParser.java @@ -22,7 +22,6 @@ import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.HashMap; -import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Random; @@ -80,6 +79,7 @@ import org.apache.tez.runtime.api.impl.TezEvent; import org.junit.*; +import com.google.common.collect.EvictingQueue; import com.google.common.collect.Lists; import static org.junit.Assert.*; @@ -108,7 +108,7 @@ public void setUp() throws IllegalArgumentException, IOException { this.recoveryPath = new Path(TEST_ROOT_DIR + "/" + appId + "/recovery"); this.localFS.delete(new Path(TEST_ROOT_DIR), true); mockAppMaster = mock(DAGAppMaster.class); - mockAppMaster.dagIDs = new HashSet(); + mockAppMaster.dagIDs = EvictingQueue.create(10); when(mockAppMaster.getConfig()).thenReturn(new Configuration()); mockDAGImpl = mock(DAGImpl.class); when(mockAppMaster.createDAG(any(), any())).thenReturn(mockDAGImpl);