diff --git a/intg/src/main/java/org/apache/atlas/model/instance/AtlasEntityHeader.java b/intg/src/main/java/org/apache/atlas/model/instance/AtlasEntityHeader.java index c251811df1..d79e81947e 100644 --- a/intg/src/main/java/org/apache/atlas/model/instance/AtlasEntityHeader.java +++ b/intg/src/main/java/org/apache/atlas/model/instance/AtlasEntityHeader.java @@ -77,6 +77,9 @@ public class AtlasEntityHeader extends AtlasStruct implements Serializable { private Map collapse = null; + private List immediateUpstream; // New field + private List immediateDownstream; // New field + public AtlasEntityHeader() { this(null, null); @@ -346,6 +349,22 @@ public void setMeanings(final List meanings) { this.meanings = meanings; } + public List getImmediateUpstream() { + return immediateUpstream; + } + + public void setImmediateUpstream(List immediateUpstream) { + this.immediateUpstream = immediateUpstream; + } + + public List getImmediateDownstream() { + return immediateDownstream; + } + + public void setImmediateDownstream(List immediateDownstream) { + this.immediateDownstream = immediateDownstream; + } + /** * REST serialization friendly list. */ diff --git a/intg/src/main/java/org/apache/atlas/model/lineage/LineageListRequest.java b/intg/src/main/java/org/apache/atlas/model/lineage/LineageListRequest.java index 4897c76767..4ecff112d6 100644 --- a/intg/src/main/java/org/apache/atlas/model/lineage/LineageListRequest.java +++ b/intg/src/main/java/org/apache/atlas/model/lineage/LineageListRequest.java @@ -25,6 +25,15 @@ public class LineageListRequest { private Set relationAttributes; private Boolean excludeMeanings; private Boolean excludeClassifications; + private Boolean immediateNeighbours; + + public Boolean getImmediateNeighbours() { + return immediateNeighbours; + } + + public void setImmediateNeighbours(Boolean immediateNeighbours) { + this.immediateNeighbours = immediateNeighbours; + } private String lineageType = "DatasetProcessLineage"; diff --git a/repository/src/main/java/org/apache/atlas/discovery/AtlasLineageListContext.java b/repository/src/main/java/org/apache/atlas/discovery/AtlasLineageListContext.java index 600b496293..4cb729c8be 100644 --- a/repository/src/main/java/org/apache/atlas/discovery/AtlasLineageListContext.java +++ b/repository/src/main/java/org/apache/atlas/discovery/AtlasLineageListContext.java @@ -23,6 +23,7 @@ public final class AtlasLineageListContext { private int currentEntityCounter; private boolean depthLimitReached; private boolean hasMoreUpdated; + private Boolean immediateNeighbours; private String lineageType = "DatasetProcessLineage"; public AtlasLineageListContext(LineageListRequest lineageListRequest, AtlasTypeRegistry typeRegistry) { @@ -37,6 +38,7 @@ public AtlasLineageListContext(LineageListRequest lineageListRequest, AtlasTypeR this.attributes = lineageListRequest.getAttributes(); this.lineageType = lineageListRequest.getLineageType(); this.relationAttributes = lineageListRequest.getRelationAttributes(); + this.immediateNeighbours = lineageListRequest.getImmediateNeighbours(); } public String getGuid() { @@ -200,4 +202,12 @@ public boolean isHasMoreUpdated() { public void setHasMoreUpdated(boolean hasMoreUpdated) { this.hasMoreUpdated = hasMoreUpdated; } + + public Boolean getImmediateNeighbours() { + return immediateNeighbours; + } + + public void setImmediateNeighbours(Boolean immediateNeighbours) { + this.immediateNeighbours = immediateNeighbours; + } } diff --git a/repository/src/main/java/org/apache/atlas/discovery/EntityLineageService.java b/repository/src/main/java/org/apache/atlas/discovery/EntityLineageService.java index 9c9927902c..f379192fa7 100644 --- a/repository/src/main/java/org/apache/atlas/discovery/EntityLineageService.java +++ b/repository/src/main/java/org/apache/atlas/discovery/EntityLineageService.java @@ -492,16 +492,23 @@ private void traverseEdgesUsingBFS(String baseGuid, AtlasLineageListContext line Set skippedVertices = new HashSet<>(); Queue traversalQueue = new LinkedList<>(); + Map> parentMapForNeighbours = new HashMap<>(); // New map to track parent nodes + Map> childrenMapForNeighbours = new HashMap<>(); // New map to track parent nodes + + AtlasVertex baseVertex = AtlasGraphUtilsV2.findByGuid(this.graph, baseGuid); HashMap dataTypeMap = validateAndGetEntityTypeMap(baseGuid); boolean isNotConnecterVertex = lineageType.equals(PRODUCT_ASSET_LINEAGE) ? dataTypeMap.get(IS_DATA_PRODUCT) : dataTypeMap.get(IS_DATASET); - enqueueNeighbours(baseVertex, dataTypeMap, lineageListContext, traversalQueue, visitedVertices, skippedVertices); + enqueueNeighbours(baseVertex, dataTypeMap, lineageListContext, traversalQueue, visitedVertices, skippedVertices, parentMapForNeighbours, childrenMapForNeighbours); int currentDepth = 0; int currentLevel = isNotConnecterVertex? 0: 1; + // Add the current node and its neighbors to the result + appendToResult(baseVertex, lineageListContext, ret, currentLevel); + while (!traversalQueue.isEmpty() && !lineageListContext.isEntityLimitReached() && currentDepth < lineageListContext.getDepth()) { currentDepth++; @@ -521,24 +528,34 @@ private void traverseEdgesUsingBFS(String baseGuid, AtlasLineageListContext line HashMap currentEntityDataTypeMap = validateAndGetEntityTypeMap(currentGUID); if (!lineageListContext.evaluateVertexFilter(currentVertex)) { - enqueueNeighbours(currentVertex, currentEntityDataTypeMap, lineageListContext, traversalQueue, visitedVertices, skippedVertices); + enqueueNeighbours(currentVertex, currentEntityDataTypeMap, lineageListContext, traversalQueue, visitedVertices, skippedVertices, parentMapForNeighbours, childrenMapForNeighbours); continue; } if (checkOffsetAndSkipEntity(lineageListContext, ret)) { skippedVertices.add(currentGUID); - enqueueNeighbours(currentVertex, currentEntityDataTypeMap, lineageListContext, traversalQueue, visitedVertices, skippedVertices); + enqueueNeighbours(currentVertex, currentEntityDataTypeMap, lineageListContext, traversalQueue, visitedVertices, skippedVertices, parentMapForNeighbours, childrenMapForNeighbours); continue; } lineageListContext.incrementEntityCount(); + // Get the neighbors for the current node + enqueueNeighbours(currentVertex, currentEntityDataTypeMap, lineageListContext, traversalQueue, visitedVertices, skippedVertices, parentMapForNeighbours, childrenMapForNeighbours); + + // Add the current node and its neighbors to the result appendToResult(currentVertex, lineageListContext, ret, currentLevel); - enqueueNeighbours(currentVertex, currentEntityDataTypeMap, lineageListContext, traversalQueue, visitedVertices, skippedVertices); + if (isLastEntityInLastDepth(lineageListContext.getDepth(), currentDepth, entitiesInCurrentDepth, i)) { ret.setHasMore(false); lineageListContext.setHasMoreUpdated(true); } } } + + if(lineageListContext.getImmediateNeighbours()){ + // update parents for each entity + updateParentNodesForEachEntity(lineageListContext, ret, parentMapForNeighbours, childrenMapForNeighbours); + } + if (currentDepth > lineageListContext.getDepth()) lineageListContext.setDepthLimitReached(true); @@ -546,8 +563,10 @@ private void traverseEdgesUsingBFS(String baseGuid, AtlasLineageListContext line RequestContext.get().endMetricRecord(metricRecorder); } - private void enqueueNeighbours(AtlasVertex currentVertex, HashMap dataTypeMap, AtlasLineageListContext lineageListContext, - Queue traversalQueue, Set visitedVertices, Set skippedVertices) { + private void enqueueNeighbours(AtlasVertex currentVertex, HashMap dataTypeMap, + AtlasLineageListContext lineageListContext, Queue traversalQueue, + Set visitedVertices, Set skippedVertices, + Map> parentMapForNeighbours, Map> childrenMapForNeighbours) { AtlasPerfMetrics.MetricRecorder traverseEdgesOnDemandGetEdges = RequestContext.get().startMetricRecord("traverseEdgesOnDemandGetEdges"); Iterator edges; String lineageInputLabel = RequestContext.get().getLineageInputLabel(); @@ -562,6 +581,7 @@ private void enqueueNeighbours(AtlasVertex currentVertex, HashMap neighbors = new ArrayList<>(); while (edges.hasNext()) { AtlasEdge currentEdge = edges.next(); if (!lineageListContext.evaluateTraversalFilter(currentEdge)) @@ -581,10 +601,68 @@ private void enqueueNeighbours(AtlasVertex currentVertex, HashMap new ArrayList<>()) + .add(getQalifiedName(currentVertex)); + childrenMapForNeighbours + .computeIfAbsent(getQalifiedName(currentVertex), k -> new ArrayList<>()) + .add(vertexDisplayName); + } + } + } + + private void updateParentNodesForEachEntity(AtlasLineageListContext lineageListContext, AtlasLineageListInfo ret, Map> parentMapForNeighbours, Map> childrenMapForNeighbours){ + List entityList = ret.getEntities(); + for (AtlasEntityHeader entity : entityList) { + // Check if the entity GUID exists in the parentMapForNeighbours + if (parentMapForNeighbours.containsKey(entity.getAttribute(QUALIFIED_NAME))) { + // Get the list of AtlasVertex from the map + List parentNodes = parentMapForNeighbours.get(entity.getAttribute(QUALIFIED_NAME)); + + List parentNodesOfParent = new ArrayList<>(); + for (String parentNode : parentNodes) { + List parentsOfParentNode = parentMapForNeighbours.get(parentNode); + if (parentsOfParentNode != null) { + parentNodesOfParent.addAll(parentsOfParentNode); + } + } + + if(isInputDirection(lineageListContext)){ + entity.setImmediateDownstream(parentNodesOfParent); + } + else{ + entity.setImmediateUpstream(parentNodesOfParent); + } + } + + if (childrenMapForNeighbours.containsKey(entity.getAttribute(QUALIFIED_NAME))) { + // Get the list of AtlasVertex from the map + List childrenNodes = childrenMapForNeighbours.get(entity.getAttribute(QUALIFIED_NAME)); + + List childrenNodesOfChildren = new ArrayList<>(); + for (String childNode : childrenNodes) { + // Add all children for the current childNode + List childrenOfChildNode = childrenMapForNeighbours.get(childNode); + if (childrenOfChildNode != null) { + childrenNodesOfChildren.addAll(childrenOfChildNode); + } + } + + if(isInputDirection(lineageListContext)){ + entity.setImmediateUpstream(childrenNodesOfChildren); + } + else{ + entity.setImmediateDownstream(childrenNodesOfChildren); + } + } } } - private void appendToResult(AtlasVertex currentVertex, AtlasLineageListContext lineageListContext, AtlasLineageListInfo ret, int currentLevel) throws AtlasBaseException { + private void appendToResult(AtlasVertex currentVertex, AtlasLineageListContext lineageListContext, + AtlasLineageListInfo ret, int currentLevel) throws AtlasBaseException { AtlasEntityHeader entity = entityRetriever.toAtlasEntityHeader(currentVertex, lineageListContext.getAttributes()); entity.setDepth(currentLevel); ret.getEntities().add(entity); diff --git a/repository/src/main/java/org/apache/atlas/repository/graph/GraphHelper.java b/repository/src/main/java/org/apache/atlas/repository/graph/GraphHelper.java index 3e4ee2bd7a..942d4c9cdf 100755 --- a/repository/src/main/java/org/apache/atlas/repository/graph/GraphHelper.java +++ b/repository/src/main/java/org/apache/atlas/repository/graph/GraphHelper.java @@ -1040,6 +1040,10 @@ public static String getGuid(AtlasVertex vertex) { return ret; } + public static String getQalifiedName(AtlasVertex vertex) { + return vertex.getProperty(Constants.QUALIFIED_NAME, String.class); + } + public static String getHomeId(AtlasElement element) { return element.getProperty(Constants.HOME_ID_KEY, String.class); }