From 61ff5fb9f606d46d8878e26ffe66fce0c7e94925 Mon Sep 17 00:00:00 2001 From: akshaysw Date: Tue, 20 Aug 2024 10:39:20 +0530 Subject: [PATCH 1/4] [add] immediate downstream and upstream assets in impact report --- .../model/instance/AtlasEntityHeader.java | 19 ++++++ .../atlas/discovery/EntityLineageService.java | 67 ++++++++++++++++--- 2 files changed, 78 insertions(+), 8 deletions(-) diff --git a/intg/src/main/java/org/apache/atlas/model/instance/AtlasEntityHeader.java b/intg/src/main/java/org/apache/atlas/model/instance/AtlasEntityHeader.java index c251811df1..d79e81947e 100644 --- a/intg/src/main/java/org/apache/atlas/model/instance/AtlasEntityHeader.java +++ b/intg/src/main/java/org/apache/atlas/model/instance/AtlasEntityHeader.java @@ -77,6 +77,9 @@ public class AtlasEntityHeader extends AtlasStruct implements Serializable { private Map collapse = null; + private List immediateUpstream; // New field + private List immediateDownstream; // New field + public AtlasEntityHeader() { this(null, null); @@ -346,6 +349,22 @@ public void setMeanings(final List meanings) { this.meanings = meanings; } + public List getImmediateUpstream() { + return immediateUpstream; + } + + public void setImmediateUpstream(List immediateUpstream) { + this.immediateUpstream = immediateUpstream; + } + + public List getImmediateDownstream() { + return immediateDownstream; + } + + public void setImmediateDownstream(List immediateDownstream) { + this.immediateDownstream = immediateDownstream; + } + /** * REST serialization friendly list. */ diff --git a/repository/src/main/java/org/apache/atlas/discovery/EntityLineageService.java b/repository/src/main/java/org/apache/atlas/discovery/EntityLineageService.java index 7e83794113..44557df87f 100644 --- a/repository/src/main/java/org/apache/atlas/discovery/EntityLineageService.java +++ b/repository/src/main/java/org/apache/atlas/discovery/EntityLineageService.java @@ -459,12 +459,18 @@ private void traverseEdgesUsingBFS(String baseGuid, AtlasLineageListContext line Set skippedVertices = new HashSet<>(); Queue traversalQueue = new LinkedList<>(); + Map> parentMapForNeighbours = new HashMap<>(); // New map to track parent nodes + AtlasVertex baseVertex = AtlasGraphUtilsV2.findByGuid(this.graph, baseGuid); boolean isBaseNodeDataset = validateEntityTypeAndCheckIfDataSet(baseGuid); - enqueueNeighbours(baseVertex, isBaseNodeDataset, lineageListContext, traversalQueue, visitedVertices, skippedVertices); + // Get the neighbors for the current node + List childrens = enqueueNeighbours(baseVertex, isBaseNodeDataset, lineageListContext, traversalQueue, visitedVertices, skippedVertices, parentMapForNeighbours); int currentDepth = 0; int currentLevel = isBaseNodeDataset? 0: 1; + // Add the current node and its neighbors to the result + appendToResult(baseVertex, lineageListContext, ret, currentLevel, childrens); + while (!traversalQueue.isEmpty() && !lineageListContext.isEntityLimitReached() && currentDepth < lineageListContext.getDepth()) { currentDepth++; @@ -484,24 +490,32 @@ private void traverseEdgesUsingBFS(String baseGuid, AtlasLineageListContext line boolean isDataset = validateEntityTypeAndCheckIfDataSet(currentGUID); if (!lineageListContext.evaluateVertexFilter(currentVertex)) { - enqueueNeighbours(currentVertex, isDataset, lineageListContext, traversalQueue, visitedVertices, skippedVertices); + enqueueNeighbours(currentVertex, isDataset, lineageListContext, traversalQueue, visitedVertices, skippedVertices, parentMapForNeighbours); continue; } if (checkOffsetAndSkipEntity(lineageListContext, ret)) { skippedVertices.add(currentGUID); - enqueueNeighbours(currentVertex, isDataset, lineageListContext, traversalQueue, visitedVertices, skippedVertices); + enqueueNeighbours(currentVertex, isDataset, lineageListContext, traversalQueue, visitedVertices, skippedVertices, parentMapForNeighbours); continue; } lineageListContext.incrementEntityCount(); - appendToResult(currentVertex, lineageListContext, ret, currentLevel); - enqueueNeighbours(currentVertex, isDataset, lineageListContext, traversalQueue, visitedVertices, skippedVertices); + // Get the neighbors for the current node + childrens = enqueueNeighbours(currentVertex, isDataset, lineageListContext, traversalQueue, visitedVertices, skippedVertices, parentMapForNeighbours); + + // Add the current node and its neighbors to the result + appendToResult(currentVertex, lineageListContext, ret, currentLevel, childrens); + if (isLastEntityInLastDepth(lineageListContext.getDepth(), currentDepth, entitiesInCurrentDepth, i)) { ret.setHasMore(false); lineageListContext.setHasMoreUpdated(true); } } } + + // update parents for each entity + updateParentNodesForEachEntity(lineageListContext, ret, parentMapForNeighbours); + if (currentDepth > lineageListContext.getDepth()) lineageListContext.setDepthLimitReached(true); @@ -509,8 +523,10 @@ private void traverseEdgesUsingBFS(String baseGuid, AtlasLineageListContext line RequestContext.get().endMetricRecord(metricRecorder); } - private void enqueueNeighbours(AtlasVertex currentVertex, boolean isDataset, AtlasLineageListContext lineageListContext, - Queue traversalQueue, Set visitedVertices, Set skippedVertices) { + private List enqueueNeighbours(AtlasVertex currentVertex, boolean isDataset, + AtlasLineageListContext lineageListContext, Queue traversalQueue, + Set visitedVertices, Set skippedVertices, + Map> parentMapForNeighbours) { AtlasPerfMetrics.MetricRecorder traverseEdgesOnDemandGetEdges = RequestContext.get().startMetricRecord("traverseEdgesOnDemandGetEdges"); Iterator edges; if (isDataset) @@ -519,6 +535,7 @@ private void enqueueNeighbours(AtlasVertex currentVertex, boolean isDataset, Atl edges = currentVertex.getEdges(OUT, isInputDirection(lineageListContext) ? PROCESS_INPUTS_EDGE : PROCESS_OUTPUTS_EDGE).iterator(); RequestContext.get().endMetricRecord(traverseEdgesOnDemandGetEdges); + List neighbors = new ArrayList<>(); while (edges.hasNext()) { AtlasEdge currentEdge = edges.next(); if (!lineageListContext.evaluateTraversalFilter(currentEdge)) @@ -538,12 +555,46 @@ private void enqueueNeighbours(AtlasVertex currentVertex, boolean isDataset, Atl traversalQueue.add(vertexGuid); addEntitiesToCache(neighbourVertex); } + parentMapForNeighbours + .computeIfAbsent(vertexGuid, k -> new ArrayList<>()) + .add(getGuid(currentVertex)); + neighbors.add(vertexGuid); // Add neighbor to the list + } + + return neighbors; // Return the list of neighbors + } + + private void updateParentNodesForEachEntity(AtlasLineageListContext lineageListContext, AtlasLineageListInfo ret, Map> parentMapForNeighbours){ + List entityList = ret.getEntities(); + for (AtlasEntityHeader entity : entityList) { + // Check if the entity GUID exists in the parentMapForNeighbours + if (parentMapForNeighbours.containsKey(entity.getGuid())) { + // Get the list of AtlasVertex from the map + List parentNodes = parentMapForNeighbours.get(entity.getGuid()); + if(isInputDirection(lineageListContext)){ + entity.setImmediateDownstream(parentNodes); + } + else{ + entity.setImmediateUpstream(parentNodes); + } + } } } - private void appendToResult(AtlasVertex currentVertex, AtlasLineageListContext lineageListContext, AtlasLineageListInfo ret, int currentLevel) throws AtlasBaseException { + private void appendToResult(AtlasVertex currentVertex, AtlasLineageListContext lineageListContext, + AtlasLineageListInfo ret, int currentLevel, List childrenNodes) throws AtlasBaseException { AtlasEntityHeader entity = entityRetriever.toAtlasEntityHeader(currentVertex, lineageListContext.getAttributes()); entity.setDepth(currentLevel); + + // as per the traversal direction set children and parents + // INPUT = upstream + // OUTPUT = downstream + if(isInputDirection(lineageListContext)){ + entity.setImmediateUpstream(childrenNodes); + } + else{ + entity.setImmediateDownstream(childrenNodes); + } ret.getEntities().add(entity); } From e2a9116fe278dfb26df13f4c1919a794a9be8fa6 Mon Sep 17 00:00:00 2001 From: akshaysw Date: Wed, 21 Aug 2024 16:39:01 +0530 Subject: [PATCH 2/4] [fix] logic for handling process --- .../atlas/discovery/EntityLineageService.java | 69 +++++++++++++------ 1 file changed, 47 insertions(+), 22 deletions(-) diff --git a/repository/src/main/java/org/apache/atlas/discovery/EntityLineageService.java b/repository/src/main/java/org/apache/atlas/discovery/EntityLineageService.java index 44557df87f..ca5a02491f 100644 --- a/repository/src/main/java/org/apache/atlas/discovery/EntityLineageService.java +++ b/repository/src/main/java/org/apache/atlas/discovery/EntityLineageService.java @@ -460,16 +460,18 @@ private void traverseEdgesUsingBFS(String baseGuid, AtlasLineageListContext line Queue traversalQueue = new LinkedList<>(); Map> parentMapForNeighbours = new HashMap<>(); // New map to track parent nodes + Map> childrenMapForNeighbours = new HashMap<>(); // New map to track parent nodes + AtlasVertex baseVertex = AtlasGraphUtilsV2.findByGuid(this.graph, baseGuid); boolean isBaseNodeDataset = validateEntityTypeAndCheckIfDataSet(baseGuid); // Get the neighbors for the current node - List childrens = enqueueNeighbours(baseVertex, isBaseNodeDataset, lineageListContext, traversalQueue, visitedVertices, skippedVertices, parentMapForNeighbours); + enqueueNeighbours(baseVertex, isBaseNodeDataset, lineageListContext, traversalQueue, visitedVertices, skippedVertices, parentMapForNeighbours, childrenMapForNeighbours); int currentDepth = 0; int currentLevel = isBaseNodeDataset? 0: 1; // Add the current node and its neighbors to the result - appendToResult(baseVertex, lineageListContext, ret, currentLevel, childrens); + appendToResult(baseVertex, lineageListContext, ret, currentLevel); while (!traversalQueue.isEmpty() && !lineageListContext.isEntityLimitReached() && currentDepth < lineageListContext.getDepth()) { currentDepth++; @@ -490,21 +492,21 @@ private void traverseEdgesUsingBFS(String baseGuid, AtlasLineageListContext line boolean isDataset = validateEntityTypeAndCheckIfDataSet(currentGUID); if (!lineageListContext.evaluateVertexFilter(currentVertex)) { - enqueueNeighbours(currentVertex, isDataset, lineageListContext, traversalQueue, visitedVertices, skippedVertices, parentMapForNeighbours); + enqueueNeighbours(currentVertex, isDataset, lineageListContext, traversalQueue, visitedVertices, skippedVertices, parentMapForNeighbours, childrenMapForNeighbours); continue; } if (checkOffsetAndSkipEntity(lineageListContext, ret)) { skippedVertices.add(currentGUID); - enqueueNeighbours(currentVertex, isDataset, lineageListContext, traversalQueue, visitedVertices, skippedVertices, parentMapForNeighbours); + enqueueNeighbours(currentVertex, isDataset, lineageListContext, traversalQueue, visitedVertices, skippedVertices, parentMapForNeighbours, childrenMapForNeighbours); continue; } lineageListContext.incrementEntityCount(); // Get the neighbors for the current node - childrens = enqueueNeighbours(currentVertex, isDataset, lineageListContext, traversalQueue, visitedVertices, skippedVertices, parentMapForNeighbours); + enqueueNeighbours(currentVertex, isDataset, lineageListContext, traversalQueue, visitedVertices, skippedVertices, parentMapForNeighbours, childrenMapForNeighbours); // Add the current node and its neighbors to the result - appendToResult(currentVertex, lineageListContext, ret, currentLevel, childrens); + appendToResult(currentVertex, lineageListContext, ret, currentLevel); if (isLastEntityInLastDepth(lineageListContext.getDepth(), currentDepth, entitiesInCurrentDepth, i)) { ret.setHasMore(false); @@ -514,7 +516,7 @@ private void traverseEdgesUsingBFS(String baseGuid, AtlasLineageListContext line } // update parents for each entity - updateParentNodesForEachEntity(lineageListContext, ret, parentMapForNeighbours); + updateParentNodesForEachEntity(lineageListContext, ret, parentMapForNeighbours, childrenMapForNeighbours); if (currentDepth > lineageListContext.getDepth()) lineageListContext.setDepthLimitReached(true); @@ -526,7 +528,7 @@ private void traverseEdgesUsingBFS(String baseGuid, AtlasLineageListContext line private List enqueueNeighbours(AtlasVertex currentVertex, boolean isDataset, AtlasLineageListContext lineageListContext, Queue traversalQueue, Set visitedVertices, Set skippedVertices, - Map> parentMapForNeighbours) { + Map> parentMapForNeighbours, Map> childrenMapForNeighbours) { AtlasPerfMetrics.MetricRecorder traverseEdgesOnDemandGetEdges = RequestContext.get().startMetricRecord("traverseEdgesOnDemandGetEdges"); Iterator edges; if (isDataset) @@ -558,43 +560,66 @@ private List enqueueNeighbours(AtlasVertex currentVertex, boolean isData parentMapForNeighbours .computeIfAbsent(vertexGuid, k -> new ArrayList<>()) .add(getGuid(currentVertex)); + childrenMapForNeighbours + .computeIfAbsent(getGuid(currentVertex), k -> new ArrayList<>()) + .add(vertexGuid); neighbors.add(vertexGuid); // Add neighbor to the list } return neighbors; // Return the list of neighbors } - private void updateParentNodesForEachEntity(AtlasLineageListContext lineageListContext, AtlasLineageListInfo ret, Map> parentMapForNeighbours){ + private void updateParentNodesForEachEntity(AtlasLineageListContext lineageListContext, AtlasLineageListInfo ret, Map> parentMapForNeighbours, Map> childrenMapForNeighbours){ List entityList = ret.getEntities(); for (AtlasEntityHeader entity : entityList) { // Check if the entity GUID exists in the parentMapForNeighbours if (parentMapForNeighbours.containsKey(entity.getGuid())) { // Get the list of AtlasVertex from the map List parentNodes = parentMapForNeighbours.get(entity.getGuid()); + + List parentNodesOfParent = new ArrayList<>(); + for (String parentNode : parentNodes) { + List parentsOfParentNode = parentMapForNeighbours.get(parentNode); + if (parentsOfParentNode != null) { + parentNodesOfParent.addAll(parentsOfParentNode); + } + } + + if(isInputDirection(lineageListContext)){ + entity.setImmediateDownstream(parentNodesOfParent); + } + else{ + entity.setImmediateUpstream(parentNodesOfParent); + } + } + + if (childrenMapForNeighbours.containsKey(entity.getGuid())) { + // Get the list of AtlasVertex from the map + List childrenNodes = childrenMapForNeighbours.get(entity.getGuid()); + + List childrenNodesOfChildren = new ArrayList<>(); + for (String childNode : childrenNodes) { + // Add all children for the current childNode + List childrenOfChildNode = childrenMapForNeighbours.get(childNode); + if (childrenOfChildNode != null) { + childrenNodesOfChildren.addAll(childrenOfChildNode); + } + } + if(isInputDirection(lineageListContext)){ - entity.setImmediateDownstream(parentNodes); + entity.setImmediateUpstream(childrenNodesOfChildren); } else{ - entity.setImmediateUpstream(parentNodes); + entity.setImmediateDownstream(childrenNodesOfChildren); } } } } private void appendToResult(AtlasVertex currentVertex, AtlasLineageListContext lineageListContext, - AtlasLineageListInfo ret, int currentLevel, List childrenNodes) throws AtlasBaseException { + AtlasLineageListInfo ret, int currentLevel) throws AtlasBaseException { AtlasEntityHeader entity = entityRetriever.toAtlasEntityHeader(currentVertex, lineageListContext.getAttributes()); entity.setDepth(currentLevel); - - // as per the traversal direction set children and parents - // INPUT = upstream - // OUTPUT = downstream - if(isInputDirection(lineageListContext)){ - entity.setImmediateUpstream(childrenNodes); - } - else{ - entity.setImmediateDownstream(childrenNodes); - } ret.getEntities().add(entity); } From 9a7edc25af301a86f80f27f758db1700d868e5d3 Mon Sep 17 00:00:00 2001 From: akshaysw Date: Wed, 21 Aug 2024 17:23:03 +0530 Subject: [PATCH 3/4] [fix] logic for handling process --- .../atlas/discovery/EntityLineageService.java | 31 +++++++++---------- .../atlas/repository/graph/GraphHelper.java | 4 +++ 2 files changed, 18 insertions(+), 17 deletions(-) diff --git a/repository/src/main/java/org/apache/atlas/discovery/EntityLineageService.java b/repository/src/main/java/org/apache/atlas/discovery/EntityLineageService.java index ca5a02491f..6d19dbefc5 100644 --- a/repository/src/main/java/org/apache/atlas/discovery/EntityLineageService.java +++ b/repository/src/main/java/org/apache/atlas/discovery/EntityLineageService.java @@ -72,8 +72,7 @@ import static org.apache.atlas.AtlasErrorCode.INSTANCE_LINEAGE_QUERY_FAILED; import static org.apache.atlas.model.instance.AtlasEntity.Status.DELETED; import static org.apache.atlas.model.lineage.AtlasLineageInfo.LineageDirection.*; -import static org.apache.atlas.repository.Constants.ACTIVE_STATE_VALUE; -import static org.apache.atlas.repository.Constants.RELATIONSHIP_GUID_PROPERTY_KEY; +import static org.apache.atlas.repository.Constants.*; import static org.apache.atlas.repository.graph.GraphHelper.*; import static org.apache.atlas.repository.graphdb.AtlasEdgeDirection.IN; import static org.apache.atlas.repository.graphdb.AtlasEdgeDirection.OUT; @@ -525,10 +524,10 @@ private void traverseEdgesUsingBFS(String baseGuid, AtlasLineageListContext line RequestContext.get().endMetricRecord(metricRecorder); } - private List enqueueNeighbours(AtlasVertex currentVertex, boolean isDataset, - AtlasLineageListContext lineageListContext, Queue traversalQueue, - Set visitedVertices, Set skippedVertices, - Map> parentMapForNeighbours, Map> childrenMapForNeighbours) { + private void enqueueNeighbours(AtlasVertex currentVertex, boolean isDataset, + AtlasLineageListContext lineageListContext, Queue traversalQueue, + Set visitedVertices, Set skippedVertices, + Map> parentMapForNeighbours, Map> childrenMapForNeighbours) { AtlasPerfMetrics.MetricRecorder traverseEdgesOnDemandGetEdges = RequestContext.get().startMetricRecord("traverseEdgesOnDemandGetEdges"); Iterator edges; if (isDataset) @@ -557,25 +556,23 @@ private List enqueueNeighbours(AtlasVertex currentVertex, boolean isData traversalQueue.add(vertexGuid); addEntitiesToCache(neighbourVertex); } + String vertexDisplayName = getQalifiedName(neighbourVertex); parentMapForNeighbours - .computeIfAbsent(vertexGuid, k -> new ArrayList<>()) - .add(getGuid(currentVertex)); + .computeIfAbsent(vertexDisplayName, k -> new ArrayList<>()) + .add(getQalifiedName(currentVertex)); childrenMapForNeighbours - .computeIfAbsent(getGuid(currentVertex), k -> new ArrayList<>()) - .add(vertexGuid); - neighbors.add(vertexGuid); // Add neighbor to the list + .computeIfAbsent(getQalifiedName(currentVertex), k -> new ArrayList<>()) + .add(vertexDisplayName); } - - return neighbors; // Return the list of neighbors } private void updateParentNodesForEachEntity(AtlasLineageListContext lineageListContext, AtlasLineageListInfo ret, Map> parentMapForNeighbours, Map> childrenMapForNeighbours){ List entityList = ret.getEntities(); for (AtlasEntityHeader entity : entityList) { // Check if the entity GUID exists in the parentMapForNeighbours - if (parentMapForNeighbours.containsKey(entity.getGuid())) { + if (parentMapForNeighbours.containsKey(entity.getAttribute(QUALIFIED_NAME))) { // Get the list of AtlasVertex from the map - List parentNodes = parentMapForNeighbours.get(entity.getGuid()); + List parentNodes = parentMapForNeighbours.get(entity.getAttribute(QUALIFIED_NAME)); List parentNodesOfParent = new ArrayList<>(); for (String parentNode : parentNodes) { @@ -593,9 +590,9 @@ private void updateParentNodesForEachEntity(AtlasLineageListContext lineageListC } } - if (childrenMapForNeighbours.containsKey(entity.getGuid())) { + if (childrenMapForNeighbours.containsKey(entity.getAttribute(QUALIFIED_NAME))) { // Get the list of AtlasVertex from the map - List childrenNodes = childrenMapForNeighbours.get(entity.getGuid()); + List childrenNodes = childrenMapForNeighbours.get(entity.getAttribute(QUALIFIED_NAME)); List childrenNodesOfChildren = new ArrayList<>(); for (String childNode : childrenNodes) { diff --git a/repository/src/main/java/org/apache/atlas/repository/graph/GraphHelper.java b/repository/src/main/java/org/apache/atlas/repository/graph/GraphHelper.java index d5926e8a00..fec47a7a36 100755 --- a/repository/src/main/java/org/apache/atlas/repository/graph/GraphHelper.java +++ b/repository/src/main/java/org/apache/atlas/repository/graph/GraphHelper.java @@ -1025,6 +1025,10 @@ public static String getGuid(AtlasVertex vertex) { return ret; } + public static String getQalifiedName(AtlasVertex vertex) { + return vertex.getProperty(Constants.QUALIFIED_NAME, String.class); + } + public static String getHomeId(AtlasElement element) { return element.getProperty(Constants.HOME_ID_KEY, String.class); } From 51ce5f9fc388a87616c3ee455e06ed00768a3c1a Mon Sep 17 00:00:00 2001 From: akshaysw Date: Wed, 21 Aug 2024 18:30:47 +0530 Subject: [PATCH 4/4] [fix] add flag --- .../model/lineage/LineageListRequest.java | 9 ++++++++ .../discovery/AtlasLineageListContext.java | 10 ++++++++ .../atlas/discovery/EntityLineageService.java | 23 +++++++++++-------- 3 files changed, 33 insertions(+), 9 deletions(-) diff --git a/intg/src/main/java/org/apache/atlas/model/lineage/LineageListRequest.java b/intg/src/main/java/org/apache/atlas/model/lineage/LineageListRequest.java index e717fda066..9293b1de89 100644 --- a/intg/src/main/java/org/apache/atlas/model/lineage/LineageListRequest.java +++ b/intg/src/main/java/org/apache/atlas/model/lineage/LineageListRequest.java @@ -25,6 +25,15 @@ public class LineageListRequest { private Set relationAttributes; private Boolean excludeMeanings; private Boolean excludeClassifications; + private Boolean immediateNeighbours; + + public Boolean getImmediateNeighbours() { + return immediateNeighbours; + } + + public void setImmediateNeighbours(Boolean immediateNeighbours) { + this.immediateNeighbours = immediateNeighbours; + } public enum LineageDirection {INPUT, OUTPUT} diff --git a/repository/src/main/java/org/apache/atlas/discovery/AtlasLineageListContext.java b/repository/src/main/java/org/apache/atlas/discovery/AtlasLineageListContext.java index 95657aaabe..06a9291b23 100644 --- a/repository/src/main/java/org/apache/atlas/discovery/AtlasLineageListContext.java +++ b/repository/src/main/java/org/apache/atlas/discovery/AtlasLineageListContext.java @@ -23,6 +23,7 @@ public final class AtlasLineageListContext { private int currentEntityCounter; private boolean depthLimitReached; private boolean hasMoreUpdated; + private Boolean immediateNeighbours; public AtlasLineageListContext(LineageListRequest lineageListRequest, AtlasTypeRegistry typeRegistry) { this.guid = lineageListRequest.getGuid(); @@ -35,6 +36,7 @@ public AtlasLineageListContext(LineageListRequest lineageListRequest, AtlasTypeR this.edgeTraversalPredicate = constructInMemoryPredicate(typeRegistry, lineageListRequest.getRelationshipTraversalFilters()); this.attributes = lineageListRequest.getAttributes(); this.relationAttributes = lineageListRequest.getRelationAttributes(); + this.immediateNeighbours = lineageListRequest.getImmediateNeighbours(); } public String getGuid() { @@ -190,4 +192,12 @@ public boolean isHasMoreUpdated() { public void setHasMoreUpdated(boolean hasMoreUpdated) { this.hasMoreUpdated = hasMoreUpdated; } + + public Boolean getImmediateNeighbours() { + return immediateNeighbours; + } + + public void setImmediateNeighbours(Boolean immediateNeighbours) { + this.immediateNeighbours = immediateNeighbours; + } } diff --git a/repository/src/main/java/org/apache/atlas/discovery/EntityLineageService.java b/repository/src/main/java/org/apache/atlas/discovery/EntityLineageService.java index 6d19dbefc5..c5e1a5750c 100644 --- a/repository/src/main/java/org/apache/atlas/discovery/EntityLineageService.java +++ b/repository/src/main/java/org/apache/atlas/discovery/EntityLineageService.java @@ -514,8 +514,10 @@ private void traverseEdgesUsingBFS(String baseGuid, AtlasLineageListContext line } } - // update parents for each entity - updateParentNodesForEachEntity(lineageListContext, ret, parentMapForNeighbours, childrenMapForNeighbours); + if(lineageListContext.getImmediateNeighbours()){ + // update parents for each entity + updateParentNodesForEachEntity(lineageListContext, ret, parentMapForNeighbours, childrenMapForNeighbours); + } if (currentDepth > lineageListContext.getDepth()) lineageListContext.setDepthLimitReached(true); @@ -556,13 +558,16 @@ private void enqueueNeighbours(AtlasVertex currentVertex, boolean isDataset, traversalQueue.add(vertexGuid); addEntitiesToCache(neighbourVertex); } - String vertexDisplayName = getQalifiedName(neighbourVertex); - parentMapForNeighbours - .computeIfAbsent(vertexDisplayName, k -> new ArrayList<>()) - .add(getQalifiedName(currentVertex)); - childrenMapForNeighbours - .computeIfAbsent(getQalifiedName(currentVertex), k -> new ArrayList<>()) - .add(vertexDisplayName); + + if(lineageListContext.getImmediateNeighbours()){ + String vertexDisplayName = getQalifiedName(neighbourVertex); + parentMapForNeighbours + .computeIfAbsent(vertexDisplayName, k -> new ArrayList<>()) + .add(getQalifiedName(currentVertex)); + childrenMapForNeighbours + .computeIfAbsent(getQalifiedName(currentVertex), k -> new ArrayList<>()) + .add(vertexDisplayName); + } } }