diff --git a/intg/src/main/java/org/apache/atlas/model/instance/AtlasEntityHeader.java b/intg/src/main/java/org/apache/atlas/model/instance/AtlasEntityHeader.java index c251811df1..13820f5255 100644 --- a/intg/src/main/java/org/apache/atlas/model/instance/AtlasEntityHeader.java +++ b/intg/src/main/java/org/apache/atlas/model/instance/AtlasEntityHeader.java @@ -77,6 +77,9 @@ public class AtlasEntityHeader extends AtlasStruct implements Serializable { private Map collapse = null; + private List> immediateUpstream; // New field + private List> immediateDownstream; // New field + public AtlasEntityHeader() { this(null, null); @@ -346,6 +349,22 @@ public void setMeanings(final List meanings) { this.meanings = meanings; } + public List> getImmediateUpstream() { + return immediateUpstream; + } + + public void setImmediateUpstream(List> immediateUpstream) { + this.immediateUpstream = immediateUpstream; + } + + public List> getImmediateDownstream() { + return immediateDownstream; + } + + public void setImmediateDownstream(List> immediateDownstream) { + this.immediateDownstream = immediateDownstream; + } + /** * REST serialization friendly list. */ diff --git a/intg/src/main/java/org/apache/atlas/model/lineage/LineageListRequest.java b/intg/src/main/java/org/apache/atlas/model/lineage/LineageListRequest.java index e717fda066..b8274d67ee 100644 --- a/intg/src/main/java/org/apache/atlas/model/lineage/LineageListRequest.java +++ b/intg/src/main/java/org/apache/atlas/model/lineage/LineageListRequest.java @@ -25,6 +25,15 @@ public class LineageListRequest { private Set relationAttributes; private Boolean excludeMeanings; private Boolean excludeClassifications; + private Boolean immediateNeighbours=false; + + public Boolean getImmediateNeighbours() { + return immediateNeighbours; + } + + public void setImmediateNeighbours(Boolean immediateNeighbours) { + this.immediateNeighbours = immediateNeighbours; + } public enum LineageDirection {INPUT, OUTPUT} diff --git a/repository/src/main/java/org/apache/atlas/discovery/AtlasLineageListContext.java b/repository/src/main/java/org/apache/atlas/discovery/AtlasLineageListContext.java index 95657aaabe..06a9291b23 100644 --- a/repository/src/main/java/org/apache/atlas/discovery/AtlasLineageListContext.java +++ b/repository/src/main/java/org/apache/atlas/discovery/AtlasLineageListContext.java @@ -23,6 +23,7 @@ public final class AtlasLineageListContext { private int currentEntityCounter; private boolean depthLimitReached; private boolean hasMoreUpdated; + private Boolean immediateNeighbours; public AtlasLineageListContext(LineageListRequest lineageListRequest, AtlasTypeRegistry typeRegistry) { this.guid = lineageListRequest.getGuid(); @@ -35,6 +36,7 @@ public AtlasLineageListContext(LineageListRequest lineageListRequest, AtlasTypeR this.edgeTraversalPredicate = constructInMemoryPredicate(typeRegistry, lineageListRequest.getRelationshipTraversalFilters()); this.attributes = lineageListRequest.getAttributes(); this.relationAttributes = lineageListRequest.getRelationAttributes(); + this.immediateNeighbours = lineageListRequest.getImmediateNeighbours(); } public String getGuid() { @@ -190,4 +192,12 @@ public boolean isHasMoreUpdated() { public void setHasMoreUpdated(boolean hasMoreUpdated) { this.hasMoreUpdated = hasMoreUpdated; } + + public Boolean getImmediateNeighbours() { + return immediateNeighbours; + } + + public void setImmediateNeighbours(Boolean immediateNeighbours) { + this.immediateNeighbours = immediateNeighbours; + } } diff --git a/repository/src/main/java/org/apache/atlas/discovery/EntityLineageService.java b/repository/src/main/java/org/apache/atlas/discovery/EntityLineageService.java index 7e83794113..5cfd10491f 100644 --- a/repository/src/main/java/org/apache/atlas/discovery/EntityLineageService.java +++ b/repository/src/main/java/org/apache/atlas/discovery/EntityLineageService.java @@ -72,8 +72,7 @@ import static org.apache.atlas.AtlasErrorCode.INSTANCE_LINEAGE_QUERY_FAILED; import static org.apache.atlas.model.instance.AtlasEntity.Status.DELETED; import static org.apache.atlas.model.lineage.AtlasLineageInfo.LineageDirection.*; -import static org.apache.atlas.repository.Constants.ACTIVE_STATE_VALUE; -import static org.apache.atlas.repository.Constants.RELATIONSHIP_GUID_PROPERTY_KEY; +import static org.apache.atlas.repository.Constants.*; import static org.apache.atlas.repository.graph.GraphHelper.*; import static org.apache.atlas.repository.graphdb.AtlasEdgeDirection.IN; import static org.apache.atlas.repository.graphdb.AtlasEdgeDirection.OUT; @@ -97,6 +96,8 @@ public class EntityLineageService implements AtlasLineageService { private final AtlasTypeRegistry atlasTypeRegistry; private final VertexEdgeCache vertexEdgeCache; + private static final List FETCH_ENTITY_ATTRIBUTES = Arrays.asList(ATTRIBUTE_NAME_GUID, QUALIFIED_NAME, NAME); + @Inject EntityLineageService(AtlasTypeRegistry typeRegistry, AtlasGraph atlasGraph, VertexEdgeCache vertexEdgeCache) { this.graph = atlasGraph; @@ -459,12 +460,20 @@ private void traverseEdgesUsingBFS(String baseGuid, AtlasLineageListContext line Set skippedVertices = new HashSet<>(); Queue traversalQueue = new LinkedList<>(); + Map> lineageParentsForEntityMap = new HashMap<>(); // New map to track parent nodes + Map> lineageChildrenForEntityMap = new HashMap<>(); // New map to track parent nodes + + AtlasVertex baseVertex = AtlasGraphUtilsV2.findByGuid(this.graph, baseGuid); boolean isBaseNodeDataset = validateEntityTypeAndCheckIfDataSet(baseGuid); - enqueueNeighbours(baseVertex, isBaseNodeDataset, lineageListContext, traversalQueue, visitedVertices, skippedVertices); + // Get the neighbors for the current node + enqueueNeighbours(baseVertex, isBaseNodeDataset, lineageListContext, traversalQueue, visitedVertices, skippedVertices, lineageParentsForEntityMap, lineageChildrenForEntityMap); int currentDepth = 0; int currentLevel = isBaseNodeDataset? 0: 1; + // Add the current node and its neighbors to the result + appendToResult(baseVertex, lineageListContext, ret, currentLevel); + while (!traversalQueue.isEmpty() && !lineageListContext.isEntityLimitReached() && currentDepth < lineageListContext.getDepth()) { currentDepth++; @@ -484,24 +493,34 @@ private void traverseEdgesUsingBFS(String baseGuid, AtlasLineageListContext line boolean isDataset = validateEntityTypeAndCheckIfDataSet(currentGUID); if (!lineageListContext.evaluateVertexFilter(currentVertex)) { - enqueueNeighbours(currentVertex, isDataset, lineageListContext, traversalQueue, visitedVertices, skippedVertices); + enqueueNeighbours(currentVertex, isDataset, lineageListContext, traversalQueue, visitedVertices, skippedVertices, lineageParentsForEntityMap, lineageChildrenForEntityMap); continue; } if (checkOffsetAndSkipEntity(lineageListContext, ret)) { skippedVertices.add(currentGUID); - enqueueNeighbours(currentVertex, isDataset, lineageListContext, traversalQueue, visitedVertices, skippedVertices); + enqueueNeighbours(currentVertex, isDataset, lineageListContext, traversalQueue, visitedVertices, skippedVertices, lineageParentsForEntityMap, lineageChildrenForEntityMap); continue; } lineageListContext.incrementEntityCount(); + // Get the neighbors for the current node + enqueueNeighbours(currentVertex, isDataset, lineageListContext, traversalQueue, visitedVertices, skippedVertices, lineageParentsForEntityMap, lineageChildrenForEntityMap); + + // Add the current node and its neighbors to the result appendToResult(currentVertex, lineageListContext, ret, currentLevel); - enqueueNeighbours(currentVertex, isDataset, lineageListContext, traversalQueue, visitedVertices, skippedVertices); + if (isLastEntityInLastDepth(lineageListContext.getDepth(), currentDepth, entitiesInCurrentDepth, i)) { ret.setHasMore(false); lineageListContext.setHasMoreUpdated(true); } } } + + if(lineageListContext.getImmediateNeighbours()){ + // update parents for each entity + updateParentNodesForEachEntity(lineageListContext, ret, lineageParentsForEntityMap, lineageChildrenForEntityMap); + } + if (currentDepth > lineageListContext.getDepth()) lineageListContext.setDepthLimitReached(true); @@ -509,8 +528,10 @@ private void traverseEdgesUsingBFS(String baseGuid, AtlasLineageListContext line RequestContext.get().endMetricRecord(metricRecorder); } - private void enqueueNeighbours(AtlasVertex currentVertex, boolean isDataset, AtlasLineageListContext lineageListContext, - Queue traversalQueue, Set visitedVertices, Set skippedVertices) { + private void enqueueNeighbours(AtlasVertex currentVertex, boolean isDataset, + AtlasLineageListContext lineageListContext, Queue traversalQueue, + Set visitedVertices, Set skippedVertices, + Map> lineageParentsForEntityMap, Map> lineageChildrenForEntityMap) { AtlasPerfMetrics.MetricRecorder traverseEdgesOnDemandGetEdges = RequestContext.get().startMetricRecord("traverseEdgesOnDemandGetEdges"); Iterator edges; if (isDataset) @@ -519,6 +540,7 @@ private void enqueueNeighbours(AtlasVertex currentVertex, boolean isDataset, Atl edges = currentVertex.getEdges(OUT, isInputDirection(lineageListContext) ? PROCESS_INPUTS_EDGE : PROCESS_OUTPUTS_EDGE).iterator(); RequestContext.get().endMetricRecord(traverseEdgesOnDemandGetEdges); + List neighbors = new ArrayList<>(); while (edges.hasNext()) { AtlasEdge currentEdge = edges.next(); if (!lineageListContext.evaluateTraversalFilter(currentEdge)) @@ -538,10 +560,98 @@ private void enqueueNeighbours(AtlasVertex currentVertex, boolean isDataset, Atl traversalQueue.add(vertexGuid); addEntitiesToCache(neighbourVertex); } + + if(lineageListContext.getImmediateNeighbours()){ + lineageParentsForEntityMap + .computeIfAbsent(vertexGuid, k -> new ArrayList<>()) + .add(getGuid(currentVertex)); + lineageChildrenForEntityMap + .computeIfAbsent(getGuid(currentVertex), k -> new ArrayList<>()) + .add(vertexGuid); + } + } + } + + private void updateParentNodesForEachEntity(AtlasLineageListContext lineageListContext, AtlasLineageListInfo ret, Map> lineageParentsForEntityMap, Map> lineageChildrenForEntityMap){ + List entityList = ret.getEntities(); + if (entityList != null){ + for (AtlasEntityHeader entity : entityList) { + if (entity != null && entity.getGuid() != null) { + // Check if the entity GUID exists in the lineageParentsForEntityMap + if (lineageParentsForEntityMap.containsKey(entity.getGuid())) { + // Get the list of AtlasVertex from the map + List parentNodes = lineageParentsForEntityMap.get(entity.getGuid()); + if (parentNodes != null) { + Set seenGuids = new HashSet<>(); + List> parentNodesOfParentWithDetails = new ArrayList<>(); + for (String parentNode : parentNodes) { + if(lineageParentsForEntityMap.containsKey(parentNode)){ + List parentsOfParentNodes = lineageParentsForEntityMap.get(parentNode); + if (parentsOfParentNodes != null){ + for (String parentOfParent : parentsOfParentNodes) { + AtlasVertex vertex = AtlasGraphUtilsV2.findByGuid(this.graph, parentOfParent); + if (vertex != null) { + Map details = fetchAttributes(vertex, FETCH_ENTITY_ATTRIBUTES); + // Check if the guid is already in the set + if (!seenGuids.contains(parentOfParent)) { + parentNodesOfParentWithDetails.add(details); + seenGuids.add(parentOfParent); // Add the guid to the set + } + } + } + } + } + } + + if(isInputDirection(lineageListContext)){ + entity.setImmediateDownstream(parentNodesOfParentWithDetails); + } + else{ + entity.setImmediateUpstream(parentNodesOfParentWithDetails); + } + } + } + + if (lineageChildrenForEntityMap.containsKey(entity.getGuid())) { + // Get the list of AtlasVertex from the map + List childrenNodes = lineageChildrenForEntityMap.get(entity.getGuid()); + if (childrenNodes != null) { + Set seenGuids = new HashSet<>(); + List> childrenNodesOfChildrenWithDetails = new ArrayList<>(); + for (String childNode : childrenNodes) { + if(lineageChildrenForEntityMap.containsKey(childNode)){ + // Add all children for the current childNode + List childrenOfChildNode = lineageChildrenForEntityMap.get(childNode); + if (childrenOfChildNode != null){ + for (String childOfChild : childrenOfChildNode) { + AtlasVertex vertex = AtlasGraphUtilsV2.findByGuid(this.graph, childOfChild); + if (vertex != null) { + Map details = fetchAttributes(vertex, FETCH_ENTITY_ATTRIBUTES); + if (!seenGuids.contains(childOfChild)) { + childrenNodesOfChildrenWithDetails.add(details); + seenGuids.add(childOfChild); // Add the guid to the set + } + } + } + } + } + } + + if(isInputDirection(lineageListContext)){ + entity.setImmediateUpstream(childrenNodesOfChildrenWithDetails); + } + else{ + entity.setImmediateDownstream(childrenNodesOfChildrenWithDetails); + } + } + } + } + } } } - private void appendToResult(AtlasVertex currentVertex, AtlasLineageListContext lineageListContext, AtlasLineageListInfo ret, int currentLevel) throws AtlasBaseException { + private void appendToResult(AtlasVertex currentVertex, AtlasLineageListContext lineageListContext, + AtlasLineageListInfo ret, int currentLevel) throws AtlasBaseException { AtlasEntityHeader entity = entityRetriever.toAtlasEntityHeader(currentVertex, lineageListContext.getAttributes()); entity.setDepth(currentLevel); ret.getEntities().add(entity); diff --git a/repository/src/main/java/org/apache/atlas/repository/graph/GraphHelper.java b/repository/src/main/java/org/apache/atlas/repository/graph/GraphHelper.java index d5926e8a00..5af4741c2f 100755 --- a/repository/src/main/java/org/apache/atlas/repository/graph/GraphHelper.java +++ b/repository/src/main/java/org/apache/atlas/repository/graph/GraphHelper.java @@ -1025,6 +1025,22 @@ public static String getGuid(AtlasVertex vertex) { return ret; } + public static Map fetchAttributes(AtlasVertex vertex, List attributes) { + Map attributesList = new HashMap<>(); + + for (String attr: attributes){ + if (Objects.equals(attr, ATTRIBUTE_NAME_GUID)){ + // always add guid to the list from cache + attributesList.put(ATTRIBUTE_NAME_GUID, getGuid(vertex)); + } + else{ + attributesList.put(attr, vertex.getProperty(attr, String.class)); + } + } + // Return the ArrayList + return attributesList; + } + public static String getHomeId(AtlasElement element) { return element.getProperty(Constants.HOME_ID_KEY, String.class); }