From 61ff5fb9f606d46d8878e26ffe66fce0c7e94925 Mon Sep 17 00:00:00 2001 From: akshaysw Date: Tue, 20 Aug 2024 10:39:20 +0530 Subject: [PATCH 01/11] [add] immediate downstream and upstream assets in impact report --- .../model/instance/AtlasEntityHeader.java | 19 ++++++ .../atlas/discovery/EntityLineageService.java | 67 ++++++++++++++++--- 2 files changed, 78 insertions(+), 8 deletions(-) diff --git a/intg/src/main/java/org/apache/atlas/model/instance/AtlasEntityHeader.java b/intg/src/main/java/org/apache/atlas/model/instance/AtlasEntityHeader.java index c251811df1..d79e81947e 100644 --- a/intg/src/main/java/org/apache/atlas/model/instance/AtlasEntityHeader.java +++ b/intg/src/main/java/org/apache/atlas/model/instance/AtlasEntityHeader.java @@ -77,6 +77,9 @@ public class AtlasEntityHeader extends AtlasStruct implements Serializable { private Map collapse = null; + private List immediateUpstream; // New field + private List immediateDownstream; // New field + public AtlasEntityHeader() { this(null, null); @@ -346,6 +349,22 @@ public void setMeanings(final List meanings) { this.meanings = meanings; } + public List getImmediateUpstream() { + return immediateUpstream; + } + + public void setImmediateUpstream(List immediateUpstream) { + this.immediateUpstream = immediateUpstream; + } + + public List getImmediateDownstream() { + return immediateDownstream; + } + + public void setImmediateDownstream(List immediateDownstream) { + this.immediateDownstream = immediateDownstream; + } + /** * REST serialization friendly list. */ diff --git a/repository/src/main/java/org/apache/atlas/discovery/EntityLineageService.java b/repository/src/main/java/org/apache/atlas/discovery/EntityLineageService.java index 7e83794113..44557df87f 100644 --- a/repository/src/main/java/org/apache/atlas/discovery/EntityLineageService.java +++ b/repository/src/main/java/org/apache/atlas/discovery/EntityLineageService.java @@ -459,12 +459,18 @@ private void traverseEdgesUsingBFS(String baseGuid, AtlasLineageListContext line Set skippedVertices = new HashSet<>(); Queue traversalQueue = new LinkedList<>(); + Map> parentMapForNeighbours = new HashMap<>(); // New map to track parent nodes + AtlasVertex baseVertex = AtlasGraphUtilsV2.findByGuid(this.graph, baseGuid); boolean isBaseNodeDataset = validateEntityTypeAndCheckIfDataSet(baseGuid); - enqueueNeighbours(baseVertex, isBaseNodeDataset, lineageListContext, traversalQueue, visitedVertices, skippedVertices); + // Get the neighbors for the current node + List childrens = enqueueNeighbours(baseVertex, isBaseNodeDataset, lineageListContext, traversalQueue, visitedVertices, skippedVertices, parentMapForNeighbours); int currentDepth = 0; int currentLevel = isBaseNodeDataset? 0: 1; + // Add the current node and its neighbors to the result + appendToResult(baseVertex, lineageListContext, ret, currentLevel, childrens); + while (!traversalQueue.isEmpty() && !lineageListContext.isEntityLimitReached() && currentDepth < lineageListContext.getDepth()) { currentDepth++; @@ -484,24 +490,32 @@ private void traverseEdgesUsingBFS(String baseGuid, AtlasLineageListContext line boolean isDataset = validateEntityTypeAndCheckIfDataSet(currentGUID); if (!lineageListContext.evaluateVertexFilter(currentVertex)) { - enqueueNeighbours(currentVertex, isDataset, lineageListContext, traversalQueue, visitedVertices, skippedVertices); + enqueueNeighbours(currentVertex, isDataset, lineageListContext, traversalQueue, visitedVertices, skippedVertices, parentMapForNeighbours); continue; } if (checkOffsetAndSkipEntity(lineageListContext, ret)) { skippedVertices.add(currentGUID); - enqueueNeighbours(currentVertex, isDataset, lineageListContext, traversalQueue, visitedVertices, skippedVertices); + enqueueNeighbours(currentVertex, isDataset, lineageListContext, traversalQueue, visitedVertices, skippedVertices, parentMapForNeighbours); continue; } lineageListContext.incrementEntityCount(); - appendToResult(currentVertex, lineageListContext, ret, currentLevel); - enqueueNeighbours(currentVertex, isDataset, lineageListContext, traversalQueue, visitedVertices, skippedVertices); + // Get the neighbors for the current node + childrens = enqueueNeighbours(currentVertex, isDataset, lineageListContext, traversalQueue, visitedVertices, skippedVertices, parentMapForNeighbours); + + // Add the current node and its neighbors to the result + appendToResult(currentVertex, lineageListContext, ret, currentLevel, childrens); + if (isLastEntityInLastDepth(lineageListContext.getDepth(), currentDepth, entitiesInCurrentDepth, i)) { ret.setHasMore(false); lineageListContext.setHasMoreUpdated(true); } } } + + // update parents for each entity + updateParentNodesForEachEntity(lineageListContext, ret, parentMapForNeighbours); + if (currentDepth > lineageListContext.getDepth()) lineageListContext.setDepthLimitReached(true); @@ -509,8 +523,10 @@ private void traverseEdgesUsingBFS(String baseGuid, AtlasLineageListContext line RequestContext.get().endMetricRecord(metricRecorder); } - private void enqueueNeighbours(AtlasVertex currentVertex, boolean isDataset, AtlasLineageListContext lineageListContext, - Queue traversalQueue, Set visitedVertices, Set skippedVertices) { + private List enqueueNeighbours(AtlasVertex currentVertex, boolean isDataset, + AtlasLineageListContext lineageListContext, Queue traversalQueue, + Set visitedVertices, Set skippedVertices, + Map> parentMapForNeighbours) { AtlasPerfMetrics.MetricRecorder traverseEdgesOnDemandGetEdges = RequestContext.get().startMetricRecord("traverseEdgesOnDemandGetEdges"); Iterator edges; if (isDataset) @@ -519,6 +535,7 @@ private void enqueueNeighbours(AtlasVertex currentVertex, boolean isDataset, Atl edges = currentVertex.getEdges(OUT, isInputDirection(lineageListContext) ? PROCESS_INPUTS_EDGE : PROCESS_OUTPUTS_EDGE).iterator(); RequestContext.get().endMetricRecord(traverseEdgesOnDemandGetEdges); + List neighbors = new ArrayList<>(); while (edges.hasNext()) { AtlasEdge currentEdge = edges.next(); if (!lineageListContext.evaluateTraversalFilter(currentEdge)) @@ -538,12 +555,46 @@ private void enqueueNeighbours(AtlasVertex currentVertex, boolean isDataset, Atl traversalQueue.add(vertexGuid); addEntitiesToCache(neighbourVertex); } + parentMapForNeighbours + .computeIfAbsent(vertexGuid, k -> new ArrayList<>()) + .add(getGuid(currentVertex)); + neighbors.add(vertexGuid); // Add neighbor to the list + } + + return neighbors; // Return the list of neighbors + } + + private void updateParentNodesForEachEntity(AtlasLineageListContext lineageListContext, AtlasLineageListInfo ret, Map> parentMapForNeighbours){ + List entityList = ret.getEntities(); + for (AtlasEntityHeader entity : entityList) { + // Check if the entity GUID exists in the parentMapForNeighbours + if (parentMapForNeighbours.containsKey(entity.getGuid())) { + // Get the list of AtlasVertex from the map + List parentNodes = parentMapForNeighbours.get(entity.getGuid()); + if(isInputDirection(lineageListContext)){ + entity.setImmediateDownstream(parentNodes); + } + else{ + entity.setImmediateUpstream(parentNodes); + } + } } } - private void appendToResult(AtlasVertex currentVertex, AtlasLineageListContext lineageListContext, AtlasLineageListInfo ret, int currentLevel) throws AtlasBaseException { + private void appendToResult(AtlasVertex currentVertex, AtlasLineageListContext lineageListContext, + AtlasLineageListInfo ret, int currentLevel, List childrenNodes) throws AtlasBaseException { AtlasEntityHeader entity = entityRetriever.toAtlasEntityHeader(currentVertex, lineageListContext.getAttributes()); entity.setDepth(currentLevel); + + // as per the traversal direction set children and parents + // INPUT = upstream + // OUTPUT = downstream + if(isInputDirection(lineageListContext)){ + entity.setImmediateUpstream(childrenNodes); + } + else{ + entity.setImmediateDownstream(childrenNodes); + } ret.getEntities().add(entity); } From e2a9116fe278dfb26df13f4c1919a794a9be8fa6 Mon Sep 17 00:00:00 2001 From: akshaysw Date: Wed, 21 Aug 2024 16:39:01 +0530 Subject: [PATCH 02/11] [fix] logic for handling process --- .../atlas/discovery/EntityLineageService.java | 69 +++++++++++++------ 1 file changed, 47 insertions(+), 22 deletions(-) diff --git a/repository/src/main/java/org/apache/atlas/discovery/EntityLineageService.java b/repository/src/main/java/org/apache/atlas/discovery/EntityLineageService.java index 44557df87f..ca5a02491f 100644 --- a/repository/src/main/java/org/apache/atlas/discovery/EntityLineageService.java +++ b/repository/src/main/java/org/apache/atlas/discovery/EntityLineageService.java @@ -460,16 +460,18 @@ private void traverseEdgesUsingBFS(String baseGuid, AtlasLineageListContext line Queue traversalQueue = new LinkedList<>(); Map> parentMapForNeighbours = new HashMap<>(); // New map to track parent nodes + Map> childrenMapForNeighbours = new HashMap<>(); // New map to track parent nodes + AtlasVertex baseVertex = AtlasGraphUtilsV2.findByGuid(this.graph, baseGuid); boolean isBaseNodeDataset = validateEntityTypeAndCheckIfDataSet(baseGuid); // Get the neighbors for the current node - List childrens = enqueueNeighbours(baseVertex, isBaseNodeDataset, lineageListContext, traversalQueue, visitedVertices, skippedVertices, parentMapForNeighbours); + enqueueNeighbours(baseVertex, isBaseNodeDataset, lineageListContext, traversalQueue, visitedVertices, skippedVertices, parentMapForNeighbours, childrenMapForNeighbours); int currentDepth = 0; int currentLevel = isBaseNodeDataset? 0: 1; // Add the current node and its neighbors to the result - appendToResult(baseVertex, lineageListContext, ret, currentLevel, childrens); + appendToResult(baseVertex, lineageListContext, ret, currentLevel); while (!traversalQueue.isEmpty() && !lineageListContext.isEntityLimitReached() && currentDepth < lineageListContext.getDepth()) { currentDepth++; @@ -490,21 +492,21 @@ private void traverseEdgesUsingBFS(String baseGuid, AtlasLineageListContext line boolean isDataset = validateEntityTypeAndCheckIfDataSet(currentGUID); if (!lineageListContext.evaluateVertexFilter(currentVertex)) { - enqueueNeighbours(currentVertex, isDataset, lineageListContext, traversalQueue, visitedVertices, skippedVertices, parentMapForNeighbours); + enqueueNeighbours(currentVertex, isDataset, lineageListContext, traversalQueue, visitedVertices, skippedVertices, parentMapForNeighbours, childrenMapForNeighbours); continue; } if (checkOffsetAndSkipEntity(lineageListContext, ret)) { skippedVertices.add(currentGUID); - enqueueNeighbours(currentVertex, isDataset, lineageListContext, traversalQueue, visitedVertices, skippedVertices, parentMapForNeighbours); + enqueueNeighbours(currentVertex, isDataset, lineageListContext, traversalQueue, visitedVertices, skippedVertices, parentMapForNeighbours, childrenMapForNeighbours); continue; } lineageListContext.incrementEntityCount(); // Get the neighbors for the current node - childrens = enqueueNeighbours(currentVertex, isDataset, lineageListContext, traversalQueue, visitedVertices, skippedVertices, parentMapForNeighbours); + enqueueNeighbours(currentVertex, isDataset, lineageListContext, traversalQueue, visitedVertices, skippedVertices, parentMapForNeighbours, childrenMapForNeighbours); // Add the current node and its neighbors to the result - appendToResult(currentVertex, lineageListContext, ret, currentLevel, childrens); + appendToResult(currentVertex, lineageListContext, ret, currentLevel); if (isLastEntityInLastDepth(lineageListContext.getDepth(), currentDepth, entitiesInCurrentDepth, i)) { ret.setHasMore(false); @@ -514,7 +516,7 @@ private void traverseEdgesUsingBFS(String baseGuid, AtlasLineageListContext line } // update parents for each entity - updateParentNodesForEachEntity(lineageListContext, ret, parentMapForNeighbours); + updateParentNodesForEachEntity(lineageListContext, ret, parentMapForNeighbours, childrenMapForNeighbours); if (currentDepth > lineageListContext.getDepth()) lineageListContext.setDepthLimitReached(true); @@ -526,7 +528,7 @@ private void traverseEdgesUsingBFS(String baseGuid, AtlasLineageListContext line private List enqueueNeighbours(AtlasVertex currentVertex, boolean isDataset, AtlasLineageListContext lineageListContext, Queue traversalQueue, Set visitedVertices, Set skippedVertices, - Map> parentMapForNeighbours) { + Map> parentMapForNeighbours, Map> childrenMapForNeighbours) { AtlasPerfMetrics.MetricRecorder traverseEdgesOnDemandGetEdges = RequestContext.get().startMetricRecord("traverseEdgesOnDemandGetEdges"); Iterator edges; if (isDataset) @@ -558,43 +560,66 @@ private List enqueueNeighbours(AtlasVertex currentVertex, boolean isData parentMapForNeighbours .computeIfAbsent(vertexGuid, k -> new ArrayList<>()) .add(getGuid(currentVertex)); + childrenMapForNeighbours + .computeIfAbsent(getGuid(currentVertex), k -> new ArrayList<>()) + .add(vertexGuid); neighbors.add(vertexGuid); // Add neighbor to the list } return neighbors; // Return the list of neighbors } - private void updateParentNodesForEachEntity(AtlasLineageListContext lineageListContext, AtlasLineageListInfo ret, Map> parentMapForNeighbours){ + private void updateParentNodesForEachEntity(AtlasLineageListContext lineageListContext, AtlasLineageListInfo ret, Map> parentMapForNeighbours, Map> childrenMapForNeighbours){ List entityList = ret.getEntities(); for (AtlasEntityHeader entity : entityList) { // Check if the entity GUID exists in the parentMapForNeighbours if (parentMapForNeighbours.containsKey(entity.getGuid())) { // Get the list of AtlasVertex from the map List parentNodes = parentMapForNeighbours.get(entity.getGuid()); + + List parentNodesOfParent = new ArrayList<>(); + for (String parentNode : parentNodes) { + List parentsOfParentNode = parentMapForNeighbours.get(parentNode); + if (parentsOfParentNode != null) { + parentNodesOfParent.addAll(parentsOfParentNode); + } + } + + if(isInputDirection(lineageListContext)){ + entity.setImmediateDownstream(parentNodesOfParent); + } + else{ + entity.setImmediateUpstream(parentNodesOfParent); + } + } + + if (childrenMapForNeighbours.containsKey(entity.getGuid())) { + // Get the list of AtlasVertex from the map + List childrenNodes = childrenMapForNeighbours.get(entity.getGuid()); + + List childrenNodesOfChildren = new ArrayList<>(); + for (String childNode : childrenNodes) { + // Add all children for the current childNode + List childrenOfChildNode = childrenMapForNeighbours.get(childNode); + if (childrenOfChildNode != null) { + childrenNodesOfChildren.addAll(childrenOfChildNode); + } + } + if(isInputDirection(lineageListContext)){ - entity.setImmediateDownstream(parentNodes); + entity.setImmediateUpstream(childrenNodesOfChildren); } else{ - entity.setImmediateUpstream(parentNodes); + entity.setImmediateDownstream(childrenNodesOfChildren); } } } } private void appendToResult(AtlasVertex currentVertex, AtlasLineageListContext lineageListContext, - AtlasLineageListInfo ret, int currentLevel, List childrenNodes) throws AtlasBaseException { + AtlasLineageListInfo ret, int currentLevel) throws AtlasBaseException { AtlasEntityHeader entity = entityRetriever.toAtlasEntityHeader(currentVertex, lineageListContext.getAttributes()); entity.setDepth(currentLevel); - - // as per the traversal direction set children and parents - // INPUT = upstream - // OUTPUT = downstream - if(isInputDirection(lineageListContext)){ - entity.setImmediateUpstream(childrenNodes); - } - else{ - entity.setImmediateDownstream(childrenNodes); - } ret.getEntities().add(entity); } From 9a7edc25af301a86f80f27f758db1700d868e5d3 Mon Sep 17 00:00:00 2001 From: akshaysw Date: Wed, 21 Aug 2024 17:23:03 +0530 Subject: [PATCH 03/11] [fix] logic for handling process --- .../atlas/discovery/EntityLineageService.java | 31 +++++++++---------- .../atlas/repository/graph/GraphHelper.java | 4 +++ 2 files changed, 18 insertions(+), 17 deletions(-) diff --git a/repository/src/main/java/org/apache/atlas/discovery/EntityLineageService.java b/repository/src/main/java/org/apache/atlas/discovery/EntityLineageService.java index ca5a02491f..6d19dbefc5 100644 --- a/repository/src/main/java/org/apache/atlas/discovery/EntityLineageService.java +++ b/repository/src/main/java/org/apache/atlas/discovery/EntityLineageService.java @@ -72,8 +72,7 @@ import static org.apache.atlas.AtlasErrorCode.INSTANCE_LINEAGE_QUERY_FAILED; import static org.apache.atlas.model.instance.AtlasEntity.Status.DELETED; import static org.apache.atlas.model.lineage.AtlasLineageInfo.LineageDirection.*; -import static org.apache.atlas.repository.Constants.ACTIVE_STATE_VALUE; -import static org.apache.atlas.repository.Constants.RELATIONSHIP_GUID_PROPERTY_KEY; +import static org.apache.atlas.repository.Constants.*; import static org.apache.atlas.repository.graph.GraphHelper.*; import static org.apache.atlas.repository.graphdb.AtlasEdgeDirection.IN; import static org.apache.atlas.repository.graphdb.AtlasEdgeDirection.OUT; @@ -525,10 +524,10 @@ private void traverseEdgesUsingBFS(String baseGuid, AtlasLineageListContext line RequestContext.get().endMetricRecord(metricRecorder); } - private List enqueueNeighbours(AtlasVertex currentVertex, boolean isDataset, - AtlasLineageListContext lineageListContext, Queue traversalQueue, - Set visitedVertices, Set skippedVertices, - Map> parentMapForNeighbours, Map> childrenMapForNeighbours) { + private void enqueueNeighbours(AtlasVertex currentVertex, boolean isDataset, + AtlasLineageListContext lineageListContext, Queue traversalQueue, + Set visitedVertices, Set skippedVertices, + Map> parentMapForNeighbours, Map> childrenMapForNeighbours) { AtlasPerfMetrics.MetricRecorder traverseEdgesOnDemandGetEdges = RequestContext.get().startMetricRecord("traverseEdgesOnDemandGetEdges"); Iterator edges; if (isDataset) @@ -557,25 +556,23 @@ private List enqueueNeighbours(AtlasVertex currentVertex, boolean isData traversalQueue.add(vertexGuid); addEntitiesToCache(neighbourVertex); } + String vertexDisplayName = getQalifiedName(neighbourVertex); parentMapForNeighbours - .computeIfAbsent(vertexGuid, k -> new ArrayList<>()) - .add(getGuid(currentVertex)); + .computeIfAbsent(vertexDisplayName, k -> new ArrayList<>()) + .add(getQalifiedName(currentVertex)); childrenMapForNeighbours - .computeIfAbsent(getGuid(currentVertex), k -> new ArrayList<>()) - .add(vertexGuid); - neighbors.add(vertexGuid); // Add neighbor to the list + .computeIfAbsent(getQalifiedName(currentVertex), k -> new ArrayList<>()) + .add(vertexDisplayName); } - - return neighbors; // Return the list of neighbors } private void updateParentNodesForEachEntity(AtlasLineageListContext lineageListContext, AtlasLineageListInfo ret, Map> parentMapForNeighbours, Map> childrenMapForNeighbours){ List entityList = ret.getEntities(); for (AtlasEntityHeader entity : entityList) { // Check if the entity GUID exists in the parentMapForNeighbours - if (parentMapForNeighbours.containsKey(entity.getGuid())) { + if (parentMapForNeighbours.containsKey(entity.getAttribute(QUALIFIED_NAME))) { // Get the list of AtlasVertex from the map - List parentNodes = parentMapForNeighbours.get(entity.getGuid()); + List parentNodes = parentMapForNeighbours.get(entity.getAttribute(QUALIFIED_NAME)); List parentNodesOfParent = new ArrayList<>(); for (String parentNode : parentNodes) { @@ -593,9 +590,9 @@ private void updateParentNodesForEachEntity(AtlasLineageListContext lineageListC } } - if (childrenMapForNeighbours.containsKey(entity.getGuid())) { + if (childrenMapForNeighbours.containsKey(entity.getAttribute(QUALIFIED_NAME))) { // Get the list of AtlasVertex from the map - List childrenNodes = childrenMapForNeighbours.get(entity.getGuid()); + List childrenNodes = childrenMapForNeighbours.get(entity.getAttribute(QUALIFIED_NAME)); List childrenNodesOfChildren = new ArrayList<>(); for (String childNode : childrenNodes) { diff --git a/repository/src/main/java/org/apache/atlas/repository/graph/GraphHelper.java b/repository/src/main/java/org/apache/atlas/repository/graph/GraphHelper.java index d5926e8a00..fec47a7a36 100755 --- a/repository/src/main/java/org/apache/atlas/repository/graph/GraphHelper.java +++ b/repository/src/main/java/org/apache/atlas/repository/graph/GraphHelper.java @@ -1025,6 +1025,10 @@ public static String getGuid(AtlasVertex vertex) { return ret; } + public static String getQalifiedName(AtlasVertex vertex) { + return vertex.getProperty(Constants.QUALIFIED_NAME, String.class); + } + public static String getHomeId(AtlasElement element) { return element.getProperty(Constants.HOME_ID_KEY, String.class); } From 51ce5f9fc388a87616c3ee455e06ed00768a3c1a Mon Sep 17 00:00:00 2001 From: akshaysw Date: Wed, 21 Aug 2024 18:30:47 +0530 Subject: [PATCH 04/11] [fix] add flag --- .../model/lineage/LineageListRequest.java | 9 ++++++++ .../discovery/AtlasLineageListContext.java | 10 ++++++++ .../atlas/discovery/EntityLineageService.java | 23 +++++++++++-------- 3 files changed, 33 insertions(+), 9 deletions(-) diff --git a/intg/src/main/java/org/apache/atlas/model/lineage/LineageListRequest.java b/intg/src/main/java/org/apache/atlas/model/lineage/LineageListRequest.java index e717fda066..9293b1de89 100644 --- a/intg/src/main/java/org/apache/atlas/model/lineage/LineageListRequest.java +++ b/intg/src/main/java/org/apache/atlas/model/lineage/LineageListRequest.java @@ -25,6 +25,15 @@ public class LineageListRequest { private Set relationAttributes; private Boolean excludeMeanings; private Boolean excludeClassifications; + private Boolean immediateNeighbours; + + public Boolean getImmediateNeighbours() { + return immediateNeighbours; + } + + public void setImmediateNeighbours(Boolean immediateNeighbours) { + this.immediateNeighbours = immediateNeighbours; + } public enum LineageDirection {INPUT, OUTPUT} diff --git a/repository/src/main/java/org/apache/atlas/discovery/AtlasLineageListContext.java b/repository/src/main/java/org/apache/atlas/discovery/AtlasLineageListContext.java index 95657aaabe..06a9291b23 100644 --- a/repository/src/main/java/org/apache/atlas/discovery/AtlasLineageListContext.java +++ b/repository/src/main/java/org/apache/atlas/discovery/AtlasLineageListContext.java @@ -23,6 +23,7 @@ public final class AtlasLineageListContext { private int currentEntityCounter; private boolean depthLimitReached; private boolean hasMoreUpdated; + private Boolean immediateNeighbours; public AtlasLineageListContext(LineageListRequest lineageListRequest, AtlasTypeRegistry typeRegistry) { this.guid = lineageListRequest.getGuid(); @@ -35,6 +36,7 @@ public AtlasLineageListContext(LineageListRequest lineageListRequest, AtlasTypeR this.edgeTraversalPredicate = constructInMemoryPredicate(typeRegistry, lineageListRequest.getRelationshipTraversalFilters()); this.attributes = lineageListRequest.getAttributes(); this.relationAttributes = lineageListRequest.getRelationAttributes(); + this.immediateNeighbours = lineageListRequest.getImmediateNeighbours(); } public String getGuid() { @@ -190,4 +192,12 @@ public boolean isHasMoreUpdated() { public void setHasMoreUpdated(boolean hasMoreUpdated) { this.hasMoreUpdated = hasMoreUpdated; } + + public Boolean getImmediateNeighbours() { + return immediateNeighbours; + } + + public void setImmediateNeighbours(Boolean immediateNeighbours) { + this.immediateNeighbours = immediateNeighbours; + } } diff --git a/repository/src/main/java/org/apache/atlas/discovery/EntityLineageService.java b/repository/src/main/java/org/apache/atlas/discovery/EntityLineageService.java index 6d19dbefc5..c5e1a5750c 100644 --- a/repository/src/main/java/org/apache/atlas/discovery/EntityLineageService.java +++ b/repository/src/main/java/org/apache/atlas/discovery/EntityLineageService.java @@ -514,8 +514,10 @@ private void traverseEdgesUsingBFS(String baseGuid, AtlasLineageListContext line } } - // update parents for each entity - updateParentNodesForEachEntity(lineageListContext, ret, parentMapForNeighbours, childrenMapForNeighbours); + if(lineageListContext.getImmediateNeighbours()){ + // update parents for each entity + updateParentNodesForEachEntity(lineageListContext, ret, parentMapForNeighbours, childrenMapForNeighbours); + } if (currentDepth > lineageListContext.getDepth()) lineageListContext.setDepthLimitReached(true); @@ -556,13 +558,16 @@ private void enqueueNeighbours(AtlasVertex currentVertex, boolean isDataset, traversalQueue.add(vertexGuid); addEntitiesToCache(neighbourVertex); } - String vertexDisplayName = getQalifiedName(neighbourVertex); - parentMapForNeighbours - .computeIfAbsent(vertexDisplayName, k -> new ArrayList<>()) - .add(getQalifiedName(currentVertex)); - childrenMapForNeighbours - .computeIfAbsent(getQalifiedName(currentVertex), k -> new ArrayList<>()) - .add(vertexDisplayName); + + if(lineageListContext.getImmediateNeighbours()){ + String vertexDisplayName = getQalifiedName(neighbourVertex); + parentMapForNeighbours + .computeIfAbsent(vertexDisplayName, k -> new ArrayList<>()) + .add(getQalifiedName(currentVertex)); + childrenMapForNeighbours + .computeIfAbsent(getQalifiedName(currentVertex), k -> new ArrayList<>()) + .add(vertexDisplayName); + } } } From 5a73789f2f8749b589e497ea5bcb438a647973a5 Mon Sep 17 00:00:00 2001 From: akshaysw Date: Wed, 21 Aug 2024 21:59:28 +0530 Subject: [PATCH 05/11] [fix] add deafult value false for flag --- .../java/org/apache/atlas/model/lineage/LineageListRequest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/intg/src/main/java/org/apache/atlas/model/lineage/LineageListRequest.java b/intg/src/main/java/org/apache/atlas/model/lineage/LineageListRequest.java index 9293b1de89..b8274d67ee 100644 --- a/intg/src/main/java/org/apache/atlas/model/lineage/LineageListRequest.java +++ b/intg/src/main/java/org/apache/atlas/model/lineage/LineageListRequest.java @@ -25,7 +25,7 @@ public class LineageListRequest { private Set relationAttributes; private Boolean excludeMeanings; private Boolean excludeClassifications; - private Boolean immediateNeighbours; + private Boolean immediateNeighbours=false; public Boolean getImmediateNeighbours() { return immediateNeighbours; From 8da3ef991f147f5e51fba5a584c098d2a9777481 Mon Sep 17 00:00:00 2001 From: akshaysw Date: Wed, 21 Aug 2024 23:05:46 +0530 Subject: [PATCH 06/11] [add] qualified name, guid and name details --- .../model/instance/AtlasEntityHeader.java | 12 +++--- .../atlas/discovery/EntityLineageService.java | 39 +++++++++---------- .../atlas/repository/graph/GraphHelper.java | 10 +++++ 3 files changed, 35 insertions(+), 26 deletions(-) diff --git a/intg/src/main/java/org/apache/atlas/model/instance/AtlasEntityHeader.java b/intg/src/main/java/org/apache/atlas/model/instance/AtlasEntityHeader.java index d79e81947e..4b8b80241d 100644 --- a/intg/src/main/java/org/apache/atlas/model/instance/AtlasEntityHeader.java +++ b/intg/src/main/java/org/apache/atlas/model/instance/AtlasEntityHeader.java @@ -77,8 +77,8 @@ public class AtlasEntityHeader extends AtlasStruct implements Serializable { private Map collapse = null; - private List immediateUpstream; // New field - private List immediateDownstream; // New field + private List> immediateUpstream; // New field + private List> immediateDownstream; // New field public AtlasEntityHeader() { @@ -349,19 +349,19 @@ public void setMeanings(final List meanings) { this.meanings = meanings; } - public List getImmediateUpstream() { + public List> getImmediateUpstream() { return immediateUpstream; } - public void setImmediateUpstream(List immediateUpstream) { + public void setImmediateUpstream(List> immediateUpstream) { this.immediateUpstream = immediateUpstream; } - public List getImmediateDownstream() { + public List> getImmediateDownstream() { return immediateDownstream; } - public void setImmediateDownstream(List immediateDownstream) { + public void setImmediateDownstream(List> immediateDownstream) { this.immediateDownstream = immediateDownstream; } diff --git a/repository/src/main/java/org/apache/atlas/discovery/EntityLineageService.java b/repository/src/main/java/org/apache/atlas/discovery/EntityLineageService.java index c5e1a5750c..d71a6fd118 100644 --- a/repository/src/main/java/org/apache/atlas/discovery/EntityLineageService.java +++ b/repository/src/main/java/org/apache/atlas/discovery/EntityLineageService.java @@ -560,13 +560,12 @@ private void enqueueNeighbours(AtlasVertex currentVertex, boolean isDataset, } if(lineageListContext.getImmediateNeighbours()){ - String vertexDisplayName = getQalifiedName(neighbourVertex); parentMapForNeighbours - .computeIfAbsent(vertexDisplayName, k -> new ArrayList<>()) - .add(getQalifiedName(currentVertex)); + .computeIfAbsent(vertexGuid, k -> new ArrayList<>()) + .add(getGuid(currentVertex)); childrenMapForNeighbours - .computeIfAbsent(getQalifiedName(currentVertex), k -> new ArrayList<>()) - .add(vertexDisplayName); + .computeIfAbsent(getGuid(currentVertex), k -> new ArrayList<>()) + .add(vertexGuid); } } } @@ -575,44 +574,44 @@ private void updateParentNodesForEachEntity(AtlasLineageListContext lineageListC List entityList = ret.getEntities(); for (AtlasEntityHeader entity : entityList) { // Check if the entity GUID exists in the parentMapForNeighbours - if (parentMapForNeighbours.containsKey(entity.getAttribute(QUALIFIED_NAME))) { + if (parentMapForNeighbours.containsKey(entity.getGuid())) { // Get the list of AtlasVertex from the map - List parentNodes = parentMapForNeighbours.get(entity.getAttribute(QUALIFIED_NAME)); + List parentNodes = parentMapForNeighbours.get(entity.getGuid()); - List parentNodesOfParent = new ArrayList<>(); + List> parentNodesOfParentWithDetails = new ArrayList<>(); for (String parentNode : parentNodes) { - List parentsOfParentNode = parentMapForNeighbours.get(parentNode); - if (parentsOfParentNode != null) { - parentNodesOfParent.addAll(parentsOfParentNode); + List parentsOfParentNodes = parentMapForNeighbours.get(parentNode); + for (String parentOfParent : parentsOfParentNodes) { + parentNodesOfParentWithDetails.add(getNodeDetails(AtlasGraphUtilsV2.findByGuid(this.graph, parentOfParent))); } } if(isInputDirection(lineageListContext)){ - entity.setImmediateDownstream(parentNodesOfParent); + entity.setImmediateDownstream(parentNodesOfParentWithDetails); } else{ - entity.setImmediateUpstream(parentNodesOfParent); + entity.setImmediateUpstream(parentNodesOfParentWithDetails); } } - if (childrenMapForNeighbours.containsKey(entity.getAttribute(QUALIFIED_NAME))) { + if (childrenMapForNeighbours.containsKey(entity.getGuid())) { // Get the list of AtlasVertex from the map - List childrenNodes = childrenMapForNeighbours.get(entity.getAttribute(QUALIFIED_NAME)); + List childrenNodes = childrenMapForNeighbours.get(entity.getGuid()); - List childrenNodesOfChildren = new ArrayList<>(); + List> childrenNodesOfChildrenWithDetails = new ArrayList<>(); for (String childNode : childrenNodes) { // Add all children for the current childNode List childrenOfChildNode = childrenMapForNeighbours.get(childNode); - if (childrenOfChildNode != null) { - childrenNodesOfChildren.addAll(childrenOfChildNode); + for (String childOfChild : childrenOfChildNode) { + childrenNodesOfChildrenWithDetails.add(getNodeDetails(AtlasGraphUtilsV2.findByGuid(this.graph, childOfChild))); } } if(isInputDirection(lineageListContext)){ - entity.setImmediateUpstream(childrenNodesOfChildren); + entity.setImmediateUpstream(childrenNodesOfChildrenWithDetails); } else{ - entity.setImmediateDownstream(childrenNodesOfChildren); + entity.setImmediateDownstream(childrenNodesOfChildrenWithDetails); } } } diff --git a/repository/src/main/java/org/apache/atlas/repository/graph/GraphHelper.java b/repository/src/main/java/org/apache/atlas/repository/graph/GraphHelper.java index fec47a7a36..1ce0983c26 100755 --- a/repository/src/main/java/org/apache/atlas/repository/graph/GraphHelper.java +++ b/repository/src/main/java/org/apache/atlas/repository/graph/GraphHelper.java @@ -1029,6 +1029,16 @@ public static String getQalifiedName(AtlasVertex vertex) { return vertex.getProperty(Constants.QUALIFIED_NAME, String.class); } + public static List getNodeDetails(AtlasVertex vertex) { + List stringList = new ArrayList<>(); + // Add strings to the list + stringList.add(getGuid(vertex)); + stringList.add(vertex.getProperty(QUALIFIED_NAME, String.class)); + stringList.add(vertex.getProperty(NAME, String.class)); + // Return the ArrayList + return stringList; + } + public static String getHomeId(AtlasElement element) { return element.getProperty(Constants.HOME_ID_KEY, String.class); } From bd3944c9472019fc823de9ccde4fc5d7a6c8ce57 Mon Sep 17 00:00:00 2001 From: akshaysw Date: Thu, 22 Aug 2024 11:45:39 +0530 Subject: [PATCH 07/11] [add] change response format --- .../atlas/model/instance/AtlasEntityHeader.java | 12 ++++++------ .../apache/atlas/discovery/EntityLineageService.java | 4 ++-- .../apache/atlas/repository/graph/GraphHelper.java | 10 +++++----- 3 files changed, 13 insertions(+), 13 deletions(-) diff --git a/intg/src/main/java/org/apache/atlas/model/instance/AtlasEntityHeader.java b/intg/src/main/java/org/apache/atlas/model/instance/AtlasEntityHeader.java index 4b8b80241d..13820f5255 100644 --- a/intg/src/main/java/org/apache/atlas/model/instance/AtlasEntityHeader.java +++ b/intg/src/main/java/org/apache/atlas/model/instance/AtlasEntityHeader.java @@ -77,8 +77,8 @@ public class AtlasEntityHeader extends AtlasStruct implements Serializable { private Map collapse = null; - private List> immediateUpstream; // New field - private List> immediateDownstream; // New field + private List> immediateUpstream; // New field + private List> immediateDownstream; // New field public AtlasEntityHeader() { @@ -349,19 +349,19 @@ public void setMeanings(final List meanings) { this.meanings = meanings; } - public List> getImmediateUpstream() { + public List> getImmediateUpstream() { return immediateUpstream; } - public void setImmediateUpstream(List> immediateUpstream) { + public void setImmediateUpstream(List> immediateUpstream) { this.immediateUpstream = immediateUpstream; } - public List> getImmediateDownstream() { + public List> getImmediateDownstream() { return immediateDownstream; } - public void setImmediateDownstream(List> immediateDownstream) { + public void setImmediateDownstream(List> immediateDownstream) { this.immediateDownstream = immediateDownstream; } diff --git a/repository/src/main/java/org/apache/atlas/discovery/EntityLineageService.java b/repository/src/main/java/org/apache/atlas/discovery/EntityLineageService.java index d71a6fd118..7f870546a3 100644 --- a/repository/src/main/java/org/apache/atlas/discovery/EntityLineageService.java +++ b/repository/src/main/java/org/apache/atlas/discovery/EntityLineageService.java @@ -578,7 +578,7 @@ private void updateParentNodesForEachEntity(AtlasLineageListContext lineageListC // Get the list of AtlasVertex from the map List parentNodes = parentMapForNeighbours.get(entity.getGuid()); - List> parentNodesOfParentWithDetails = new ArrayList<>(); + List> parentNodesOfParentWithDetails = new ArrayList<>(); for (String parentNode : parentNodes) { List parentsOfParentNodes = parentMapForNeighbours.get(parentNode); for (String parentOfParent : parentsOfParentNodes) { @@ -598,7 +598,7 @@ private void updateParentNodesForEachEntity(AtlasLineageListContext lineageListC // Get the list of AtlasVertex from the map List childrenNodes = childrenMapForNeighbours.get(entity.getGuid()); - List> childrenNodesOfChildrenWithDetails = new ArrayList<>(); + List> childrenNodesOfChildrenWithDetails = new ArrayList<>(); for (String childNode : childrenNodes) { // Add all children for the current childNode List childrenOfChildNode = childrenMapForNeighbours.get(childNode); diff --git a/repository/src/main/java/org/apache/atlas/repository/graph/GraphHelper.java b/repository/src/main/java/org/apache/atlas/repository/graph/GraphHelper.java index 1ce0983c26..50314e5b7b 100755 --- a/repository/src/main/java/org/apache/atlas/repository/graph/GraphHelper.java +++ b/repository/src/main/java/org/apache/atlas/repository/graph/GraphHelper.java @@ -1029,12 +1029,12 @@ public static String getQalifiedName(AtlasVertex vertex) { return vertex.getProperty(Constants.QUALIFIED_NAME, String.class); } - public static List getNodeDetails(AtlasVertex vertex) { - List stringList = new ArrayList<>(); + public static Map getNodeDetails(AtlasVertex vertex) { + Map stringList = new HashMap<>(); // Add strings to the list - stringList.add(getGuid(vertex)); - stringList.add(vertex.getProperty(QUALIFIED_NAME, String.class)); - stringList.add(vertex.getProperty(NAME, String.class)); + stringList.put(ATTRIBUTE_NAME_GUID, getGuid(vertex)); + stringList.put(QUALIFIED_NAME, vertex.getProperty(QUALIFIED_NAME, String.class)); + stringList.put(NAME, vertex.getProperty(NAME, String.class)); // Return the ArrayList return stringList; } From 6cd442b982b3f22efda8a515b6c9c18771eb2c3e Mon Sep 17 00:00:00 2001 From: akshaysw Date: Thu, 22 Aug 2024 13:43:37 +0530 Subject: [PATCH 08/11] [add] address review comments --- .../atlas/discovery/EntityLineageService.java | 42 ++++++++++--------- .../atlas/repository/graph/GraphHelper.java | 22 +++++----- 2 files changed, 34 insertions(+), 30 deletions(-) diff --git a/repository/src/main/java/org/apache/atlas/discovery/EntityLineageService.java b/repository/src/main/java/org/apache/atlas/discovery/EntityLineageService.java index 7f870546a3..63563a5b0e 100644 --- a/repository/src/main/java/org/apache/atlas/discovery/EntityLineageService.java +++ b/repository/src/main/java/org/apache/atlas/discovery/EntityLineageService.java @@ -96,6 +96,8 @@ public class EntityLineageService implements AtlasLineageService { private final AtlasTypeRegistry atlasTypeRegistry; private final VertexEdgeCache vertexEdgeCache; + private static final List FETCH_ENTITY_ATTRIBUTES = Arrays.asList(ATTRIBUTE_NAME_GUID, QUALIFIED_NAME, NAME); + @Inject EntityLineageService(AtlasTypeRegistry typeRegistry, AtlasGraph atlasGraph, VertexEdgeCache vertexEdgeCache) { this.graph = atlasGraph; @@ -458,14 +460,14 @@ private void traverseEdgesUsingBFS(String baseGuid, AtlasLineageListContext line Set skippedVertices = new HashSet<>(); Queue traversalQueue = new LinkedList<>(); - Map> parentMapForNeighbours = new HashMap<>(); // New map to track parent nodes - Map> childrenMapForNeighbours = new HashMap<>(); // New map to track parent nodes + Map> lineageParentsForEntityMap = new HashMap<>(); // New map to track parent nodes + Map> lineageChildrenForEntityMap = new HashMap<>(); // New map to track parent nodes AtlasVertex baseVertex = AtlasGraphUtilsV2.findByGuid(this.graph, baseGuid); boolean isBaseNodeDataset = validateEntityTypeAndCheckIfDataSet(baseGuid); // Get the neighbors for the current node - enqueueNeighbours(baseVertex, isBaseNodeDataset, lineageListContext, traversalQueue, visitedVertices, skippedVertices, parentMapForNeighbours, childrenMapForNeighbours); + enqueueNeighbours(baseVertex, isBaseNodeDataset, lineageListContext, traversalQueue, visitedVertices, skippedVertices, lineageParentsForEntityMap, lineageChildrenForEntityMap); int currentDepth = 0; int currentLevel = isBaseNodeDataset? 0: 1; @@ -491,18 +493,18 @@ private void traverseEdgesUsingBFS(String baseGuid, AtlasLineageListContext line boolean isDataset = validateEntityTypeAndCheckIfDataSet(currentGUID); if (!lineageListContext.evaluateVertexFilter(currentVertex)) { - enqueueNeighbours(currentVertex, isDataset, lineageListContext, traversalQueue, visitedVertices, skippedVertices, parentMapForNeighbours, childrenMapForNeighbours); + enqueueNeighbours(currentVertex, isDataset, lineageListContext, traversalQueue, visitedVertices, skippedVertices, lineageParentsForEntityMap, lineageChildrenForEntityMap); continue; } if (checkOffsetAndSkipEntity(lineageListContext, ret)) { skippedVertices.add(currentGUID); - enqueueNeighbours(currentVertex, isDataset, lineageListContext, traversalQueue, visitedVertices, skippedVertices, parentMapForNeighbours, childrenMapForNeighbours); + enqueueNeighbours(currentVertex, isDataset, lineageListContext, traversalQueue, visitedVertices, skippedVertices, lineageParentsForEntityMap, lineageChildrenForEntityMap); continue; } lineageListContext.incrementEntityCount(); // Get the neighbors for the current node - enqueueNeighbours(currentVertex, isDataset, lineageListContext, traversalQueue, visitedVertices, skippedVertices, parentMapForNeighbours, childrenMapForNeighbours); + enqueueNeighbours(currentVertex, isDataset, lineageListContext, traversalQueue, visitedVertices, skippedVertices, lineageParentsForEntityMap, lineageChildrenForEntityMap); // Add the current node and its neighbors to the result appendToResult(currentVertex, lineageListContext, ret, currentLevel); @@ -516,7 +518,7 @@ private void traverseEdgesUsingBFS(String baseGuid, AtlasLineageListContext line if(lineageListContext.getImmediateNeighbours()){ // update parents for each entity - updateParentNodesForEachEntity(lineageListContext, ret, parentMapForNeighbours, childrenMapForNeighbours); + updateParentNodesForEachEntity(lineageListContext, ret, lineageParentsForEntityMap, lineageChildrenForEntityMap); } if (currentDepth > lineageListContext.getDepth()) @@ -529,7 +531,7 @@ private void traverseEdgesUsingBFS(String baseGuid, AtlasLineageListContext line private void enqueueNeighbours(AtlasVertex currentVertex, boolean isDataset, AtlasLineageListContext lineageListContext, Queue traversalQueue, Set visitedVertices, Set skippedVertices, - Map> parentMapForNeighbours, Map> childrenMapForNeighbours) { + Map> lineageParentsForEntityMap, Map> lineageChildrenForEntityMap) { AtlasPerfMetrics.MetricRecorder traverseEdgesOnDemandGetEdges = RequestContext.get().startMetricRecord("traverseEdgesOnDemandGetEdges"); Iterator edges; if (isDataset) @@ -560,29 +562,29 @@ private void enqueueNeighbours(AtlasVertex currentVertex, boolean isDataset, } if(lineageListContext.getImmediateNeighbours()){ - parentMapForNeighbours + lineageParentsForEntityMap .computeIfAbsent(vertexGuid, k -> new ArrayList<>()) .add(getGuid(currentVertex)); - childrenMapForNeighbours + lineageChildrenForEntityMap .computeIfAbsent(getGuid(currentVertex), k -> new ArrayList<>()) .add(vertexGuid); } } } - private void updateParentNodesForEachEntity(AtlasLineageListContext lineageListContext, AtlasLineageListInfo ret, Map> parentMapForNeighbours, Map> childrenMapForNeighbours){ + private void updateParentNodesForEachEntity(AtlasLineageListContext lineageListContext, AtlasLineageListInfo ret, Map> lineageParentsForEntityMap, Map> lineageChildrenForEntityMap){ List entityList = ret.getEntities(); for (AtlasEntityHeader entity : entityList) { - // Check if the entity GUID exists in the parentMapForNeighbours - if (parentMapForNeighbours.containsKey(entity.getGuid())) { + // Check if the entity GUID exists in the lineageParentsForEntityMap + if (lineageParentsForEntityMap.containsKey(entity.getGuid())) { // Get the list of AtlasVertex from the map - List parentNodes = parentMapForNeighbours.get(entity.getGuid()); + List parentNodes = lineageParentsForEntityMap.get(entity.getGuid()); List> parentNodesOfParentWithDetails = new ArrayList<>(); for (String parentNode : parentNodes) { - List parentsOfParentNodes = parentMapForNeighbours.get(parentNode); + List parentsOfParentNodes = lineageParentsForEntityMap.get(parentNode); for (String parentOfParent : parentsOfParentNodes) { - parentNodesOfParentWithDetails.add(getNodeDetails(AtlasGraphUtilsV2.findByGuid(this.graph, parentOfParent))); + parentNodesOfParentWithDetails.add(fetchAttributes(AtlasGraphUtilsV2.findByGuid(this.graph, parentOfParent), FETCH_ENTITY_ATTRIBUTES)); } } @@ -594,16 +596,16 @@ private void updateParentNodesForEachEntity(AtlasLineageListContext lineageListC } } - if (childrenMapForNeighbours.containsKey(entity.getGuid())) { + if (lineageChildrenForEntityMap.containsKey(entity.getGuid())) { // Get the list of AtlasVertex from the map - List childrenNodes = childrenMapForNeighbours.get(entity.getGuid()); + List childrenNodes = lineageChildrenForEntityMap.get(entity.getGuid()); List> childrenNodesOfChildrenWithDetails = new ArrayList<>(); for (String childNode : childrenNodes) { // Add all children for the current childNode - List childrenOfChildNode = childrenMapForNeighbours.get(childNode); + List childrenOfChildNode = lineageChildrenForEntityMap.get(childNode); for (String childOfChild : childrenOfChildNode) { - childrenNodesOfChildrenWithDetails.add(getNodeDetails(AtlasGraphUtilsV2.findByGuid(this.graph, childOfChild))); + childrenNodesOfChildrenWithDetails.add(fetchAttributes(AtlasGraphUtilsV2.findByGuid(this.graph, childOfChild), FETCH_ENTITY_ATTRIBUTES)); } } diff --git a/repository/src/main/java/org/apache/atlas/repository/graph/GraphHelper.java b/repository/src/main/java/org/apache/atlas/repository/graph/GraphHelper.java index 50314e5b7b..5af4741c2f 100755 --- a/repository/src/main/java/org/apache/atlas/repository/graph/GraphHelper.java +++ b/repository/src/main/java/org/apache/atlas/repository/graph/GraphHelper.java @@ -1025,18 +1025,20 @@ public static String getGuid(AtlasVertex vertex) { return ret; } - public static String getQalifiedName(AtlasVertex vertex) { - return vertex.getProperty(Constants.QUALIFIED_NAME, String.class); - } + public static Map fetchAttributes(AtlasVertex vertex, List attributes) { + Map attributesList = new HashMap<>(); - public static Map getNodeDetails(AtlasVertex vertex) { - Map stringList = new HashMap<>(); - // Add strings to the list - stringList.put(ATTRIBUTE_NAME_GUID, getGuid(vertex)); - stringList.put(QUALIFIED_NAME, vertex.getProperty(QUALIFIED_NAME, String.class)); - stringList.put(NAME, vertex.getProperty(NAME, String.class)); + for (String attr: attributes){ + if (Objects.equals(attr, ATTRIBUTE_NAME_GUID)){ + // always add guid to the list from cache + attributesList.put(ATTRIBUTE_NAME_GUID, getGuid(vertex)); + } + else{ + attributesList.put(attr, vertex.getProperty(attr, String.class)); + } + } // Return the ArrayList - return stringList; + return attributesList; } public static String getHomeId(AtlasElement element) { From 8b3caaf90853ebdd90411b55d8c64d18d7532496 Mon Sep 17 00:00:00 2001 From: akshaysw Date: Thu, 22 Aug 2024 17:46:22 +0530 Subject: [PATCH 09/11] [fix] remove duplicate entries --- .../atlas/discovery/EntityLineageService.java | 17 +++++++++++++---- 1 file changed, 13 insertions(+), 4 deletions(-) diff --git a/repository/src/main/java/org/apache/atlas/discovery/EntityLineageService.java b/repository/src/main/java/org/apache/atlas/discovery/EntityLineageService.java index 63563a5b0e..e1640a370c 100644 --- a/repository/src/main/java/org/apache/atlas/discovery/EntityLineageService.java +++ b/repository/src/main/java/org/apache/atlas/discovery/EntityLineageService.java @@ -579,12 +579,17 @@ private void updateParentNodesForEachEntity(AtlasLineageListContext lineageListC if (lineageParentsForEntityMap.containsKey(entity.getGuid())) { // Get the list of AtlasVertex from the map List parentNodes = lineageParentsForEntityMap.get(entity.getGuid()); - + Set seenGuids = new HashSet<>(); List> parentNodesOfParentWithDetails = new ArrayList<>(); for (String parentNode : parentNodes) { List parentsOfParentNodes = lineageParentsForEntityMap.get(parentNode); for (String parentOfParent : parentsOfParentNodes) { - parentNodesOfParentWithDetails.add(fetchAttributes(AtlasGraphUtilsV2.findByGuid(this.graph, parentOfParent), FETCH_ENTITY_ATTRIBUTES)); + Map details = fetchAttributes(AtlasGraphUtilsV2.findByGuid(this.graph, parentOfParent), FETCH_ENTITY_ATTRIBUTES); + // Check if the guid is already in the set + if (!seenGuids.contains(parentOfParent)) { + parentNodesOfParentWithDetails.add(details); + seenGuids.add(parentOfParent); // Add the guid to the set + } } } @@ -599,13 +604,17 @@ private void updateParentNodesForEachEntity(AtlasLineageListContext lineageListC if (lineageChildrenForEntityMap.containsKey(entity.getGuid())) { // Get the list of AtlasVertex from the map List childrenNodes = lineageChildrenForEntityMap.get(entity.getGuid()); - + Set seenGuids = new HashSet<>(); List> childrenNodesOfChildrenWithDetails = new ArrayList<>(); for (String childNode : childrenNodes) { // Add all children for the current childNode List childrenOfChildNode = lineageChildrenForEntityMap.get(childNode); for (String childOfChild : childrenOfChildNode) { - childrenNodesOfChildrenWithDetails.add(fetchAttributes(AtlasGraphUtilsV2.findByGuid(this.graph, childOfChild), FETCH_ENTITY_ATTRIBUTES)); + Map details = fetchAttributes(AtlasGraphUtilsV2.findByGuid(this.graph, childOfChild), FETCH_ENTITY_ATTRIBUTES); + if (!seenGuids.contains(childOfChild)) { + childrenNodesOfChildrenWithDetails.add(details); + seenGuids.add(childOfChild); // Add the guid to the set + } } } From 39801cec2cbda95912057d0981ce1301e8e1016c Mon Sep 17 00:00:00 2001 From: akshaysw Date: Thu, 22 Aug 2024 20:08:08 +0530 Subject: [PATCH 10/11] [fix] handle null exception --- .../atlas/discovery/EntityLineageService.java | 104 ++++++++++-------- 1 file changed, 61 insertions(+), 43 deletions(-) diff --git a/repository/src/main/java/org/apache/atlas/discovery/EntityLineageService.java b/repository/src/main/java/org/apache/atlas/discovery/EntityLineageService.java index e1640a370c..ab17049b96 100644 --- a/repository/src/main/java/org/apache/atlas/discovery/EntityLineageService.java +++ b/repository/src/main/java/org/apache/atlas/discovery/EntityLineageService.java @@ -574,56 +574,74 @@ private void enqueueNeighbours(AtlasVertex currentVertex, boolean isDataset, private void updateParentNodesForEachEntity(AtlasLineageListContext lineageListContext, AtlasLineageListInfo ret, Map> lineageParentsForEntityMap, Map> lineageChildrenForEntityMap){ List entityList = ret.getEntities(); - for (AtlasEntityHeader entity : entityList) { - // Check if the entity GUID exists in the lineageParentsForEntityMap - if (lineageParentsForEntityMap.containsKey(entity.getGuid())) { - // Get the list of AtlasVertex from the map - List parentNodes = lineageParentsForEntityMap.get(entity.getGuid()); - Set seenGuids = new HashSet<>(); - List> parentNodesOfParentWithDetails = new ArrayList<>(); - for (String parentNode : parentNodes) { - List parentsOfParentNodes = lineageParentsForEntityMap.get(parentNode); - for (String parentOfParent : parentsOfParentNodes) { - Map details = fetchAttributes(AtlasGraphUtilsV2.findByGuid(this.graph, parentOfParent), FETCH_ENTITY_ATTRIBUTES); - // Check if the guid is already in the set - if (!seenGuids.contains(parentOfParent)) { - parentNodesOfParentWithDetails.add(details); - seenGuids.add(parentOfParent); // Add the guid to the set + if (entityList != null){ + for (AtlasEntityHeader entity : entityList) { + if (entity != null && entity.getGuid() != null) { + // Check if the entity GUID exists in the lineageParentsForEntityMap + if (lineageParentsForEntityMap.containsKey(entity.getGuid())) { + // Get the list of AtlasVertex from the map + List parentNodes = lineageParentsForEntityMap.get(entity.getGuid()); + if (parentNodes != null) { + Set seenGuids = new HashSet<>(); + List> parentNodesOfParentWithDetails = new ArrayList<>(); + for (String parentNode : parentNodes) { + List parentsOfParentNodes = lineageParentsForEntityMap.get(parentNode); + if (parentsOfParentNodes != null){ + for (String parentOfParent : parentsOfParentNodes) { + AtlasVertex vertex = AtlasGraphUtilsV2.findByGuid(this.graph, parentOfParent); + if (vertex != null) { + Map details = fetchAttributes(vertex, FETCH_ENTITY_ATTRIBUTES); + // Check if the guid is already in the set + if (!seenGuids.contains(parentOfParent)) { + parentNodesOfParentWithDetails.add(details); + seenGuids.add(parentOfParent); // Add the guid to the set + } + } + } + } + } + + if(isInputDirection(lineageListContext)){ + entity.setImmediateDownstream(parentNodesOfParentWithDetails); + } + else{ + entity.setImmediateUpstream(parentNodesOfParentWithDetails); + } } } - } - if(isInputDirection(lineageListContext)){ - entity.setImmediateDownstream(parentNodesOfParentWithDetails); - } - else{ - entity.setImmediateUpstream(parentNodesOfParentWithDetails); - } - } + if (lineageChildrenForEntityMap.containsKey(entity.getGuid())) { + // Get the list of AtlasVertex from the map + List childrenNodes = lineageChildrenForEntityMap.get(entity.getGuid()); + if (childrenNodes != null) { + Set seenGuids = new HashSet<>(); + List> childrenNodesOfChildrenWithDetails = new ArrayList<>(); + for (String childNode : childrenNodes) { + // Add all children for the current childNode + List childrenOfChildNode = lineageChildrenForEntityMap.get(childNode); + if (childrenOfChildNode != null){ + for (String childOfChild : childrenOfChildNode) { + AtlasVertex vertex = AtlasGraphUtilsV2.findByGuid(this.graph, childOfChild); + if (vertex != null) { + Map details = fetchAttributes(vertex, FETCH_ENTITY_ATTRIBUTES); + if (!seenGuids.contains(childOfChild)) { + childrenNodesOfChildrenWithDetails.add(details); + seenGuids.add(childOfChild); // Add the guid to the set + } + } + } + } + } - if (lineageChildrenForEntityMap.containsKey(entity.getGuid())) { - // Get the list of AtlasVertex from the map - List childrenNodes = lineageChildrenForEntityMap.get(entity.getGuid()); - Set seenGuids = new HashSet<>(); - List> childrenNodesOfChildrenWithDetails = new ArrayList<>(); - for (String childNode : childrenNodes) { - // Add all children for the current childNode - List childrenOfChildNode = lineageChildrenForEntityMap.get(childNode); - for (String childOfChild : childrenOfChildNode) { - Map details = fetchAttributes(AtlasGraphUtilsV2.findByGuid(this.graph, childOfChild), FETCH_ENTITY_ATTRIBUTES); - if (!seenGuids.contains(childOfChild)) { - childrenNodesOfChildrenWithDetails.add(details); - seenGuids.add(childOfChild); // Add the guid to the set + if(isInputDirection(lineageListContext)){ + entity.setImmediateUpstream(childrenNodesOfChildrenWithDetails); + } + else{ + entity.setImmediateDownstream(childrenNodesOfChildrenWithDetails); + } } } } - - if(isInputDirection(lineageListContext)){ - entity.setImmediateUpstream(childrenNodesOfChildrenWithDetails); - } - else{ - entity.setImmediateDownstream(childrenNodesOfChildrenWithDetails); - } } } } From 3172f8588baffbbbb199592e3d86a74b906f70fd Mon Sep 17 00:00:00 2001 From: akshaysw Date: Thu, 22 Aug 2024 20:32:10 +0530 Subject: [PATCH 11/11] [fix] handle null exception --- .../atlas/discovery/EntityLineageService.java | 44 ++++++++++--------- 1 file changed, 24 insertions(+), 20 deletions(-) diff --git a/repository/src/main/java/org/apache/atlas/discovery/EntityLineageService.java b/repository/src/main/java/org/apache/atlas/discovery/EntityLineageService.java index ab17049b96..5cfd10491f 100644 --- a/repository/src/main/java/org/apache/atlas/discovery/EntityLineageService.java +++ b/repository/src/main/java/org/apache/atlas/discovery/EntityLineageService.java @@ -585,16 +585,18 @@ private void updateParentNodesForEachEntity(AtlasLineageListContext lineageListC Set seenGuids = new HashSet<>(); List> parentNodesOfParentWithDetails = new ArrayList<>(); for (String parentNode : parentNodes) { - List parentsOfParentNodes = lineageParentsForEntityMap.get(parentNode); - if (parentsOfParentNodes != null){ - for (String parentOfParent : parentsOfParentNodes) { - AtlasVertex vertex = AtlasGraphUtilsV2.findByGuid(this.graph, parentOfParent); - if (vertex != null) { - Map details = fetchAttributes(vertex, FETCH_ENTITY_ATTRIBUTES); - // Check if the guid is already in the set - if (!seenGuids.contains(parentOfParent)) { - parentNodesOfParentWithDetails.add(details); - seenGuids.add(parentOfParent); // Add the guid to the set + if(lineageParentsForEntityMap.containsKey(parentNode)){ + List parentsOfParentNodes = lineageParentsForEntityMap.get(parentNode); + if (parentsOfParentNodes != null){ + for (String parentOfParent : parentsOfParentNodes) { + AtlasVertex vertex = AtlasGraphUtilsV2.findByGuid(this.graph, parentOfParent); + if (vertex != null) { + Map details = fetchAttributes(vertex, FETCH_ENTITY_ATTRIBUTES); + // Check if the guid is already in the set + if (!seenGuids.contains(parentOfParent)) { + parentNodesOfParentWithDetails.add(details); + seenGuids.add(parentOfParent); // Add the guid to the set + } } } } @@ -617,16 +619,18 @@ private void updateParentNodesForEachEntity(AtlasLineageListContext lineageListC Set seenGuids = new HashSet<>(); List> childrenNodesOfChildrenWithDetails = new ArrayList<>(); for (String childNode : childrenNodes) { - // Add all children for the current childNode - List childrenOfChildNode = lineageChildrenForEntityMap.get(childNode); - if (childrenOfChildNode != null){ - for (String childOfChild : childrenOfChildNode) { - AtlasVertex vertex = AtlasGraphUtilsV2.findByGuid(this.graph, childOfChild); - if (vertex != null) { - Map details = fetchAttributes(vertex, FETCH_ENTITY_ATTRIBUTES); - if (!seenGuids.contains(childOfChild)) { - childrenNodesOfChildrenWithDetails.add(details); - seenGuids.add(childOfChild); // Add the guid to the set + if(lineageChildrenForEntityMap.containsKey(childNode)){ + // Add all children for the current childNode + List childrenOfChildNode = lineageChildrenForEntityMap.get(childNode); + if (childrenOfChildNode != null){ + for (String childOfChild : childrenOfChildNode) { + AtlasVertex vertex = AtlasGraphUtilsV2.findByGuid(this.graph, childOfChild); + if (vertex != null) { + Map details = fetchAttributes(vertex, FETCH_ENTITY_ATTRIBUTES); + if (!seenGuids.contains(childOfChild)) { + childrenNodesOfChildrenWithDetails.add(details); + seenGuids.add(childOfChild); // Add the guid to the set + } } } }