diff --git a/intg/src/main/java/org/apache/atlas/model/instance/AtlasEntityHeader.java b/intg/src/main/java/org/apache/atlas/model/instance/AtlasEntityHeader.java index b6376a8342..0bd7304c58 100644 --- a/intg/src/main/java/org/apache/atlas/model/instance/AtlasEntityHeader.java +++ b/intg/src/main/java/org/apache/atlas/model/instance/AtlasEntityHeader.java @@ -72,8 +72,11 @@ public class AtlasEntityHeader extends AtlasStruct implements Serializable { private Date updateTime = null; private String deleteHandler = null; private Integer depth = null; + private Integer traversalOrder = null; + private Map collapse = null; + public AtlasEntityHeader() { this(null, null); } @@ -147,21 +150,17 @@ public AtlasEntityHeader(AtlasEntity entity) { } } - public String getGuid() { - return guid; - } + public String getGuid() { return guid; } - public void setGuid(String guid) { - this.guid = guid; - } + public void setGuid(String guid) { this.guid = guid; } - public Integer getDepth() { - return depth; - } + public Integer getDepth() { return depth; } - public void setDepth(Integer depth) { - this.depth = depth; - } + public void setDepth(Integer depth) { this.depth = depth; } + + public Integer getTraversalOrder() { return traversalOrder; } + + public void setTraversalOrder(Integer traversalOrder) { this.traversalOrder = traversalOrder; } public AtlasEntity.Status getStatus() { return status; diff --git a/repository/src/main/java/org/apache/atlas/discovery/EntityLineageService.java b/repository/src/main/java/org/apache/atlas/discovery/EntityLineageService.java index e7f3a10501..c9c11e4a86 100644 --- a/repository/src/main/java/org/apache/atlas/discovery/EntityLineageService.java +++ b/repository/src/main/java/org/apache/atlas/discovery/EntityLineageService.java @@ -283,6 +283,7 @@ private AtlasLineageOnDemandInfo getLineageInfoOnDemand(String guid, AtlasLineag LineageOnDemandConstraints lineageConstraintsByGuid = getAndValidateLineageConstraintsByGuid(guid, atlasLineageOnDemandContext); AtlasLineageOnDemandInfo.LineageDirection direction = lineageConstraintsByGuid.getDirection(); + int level = 0; int depth = lineageConstraintsByGuid.getDepth(); AtlasLineageOnDemandInfo ret = initializeLineageOnDemandInfo(guid); @@ -293,33 +294,37 @@ private AtlasLineageOnDemandInfo getLineageInfoOnDemand(String guid, AtlasLineag AtomicInteger inputEntitiesTraversed = new AtomicInteger(0); AtomicInteger outputEntitiesTraversed = new AtomicInteger(0); + AtomicInteger traversalOrder = new AtomicInteger(1); if (isDataSet) { AtlasVertex datasetVertex = AtlasGraphUtilsV2.findByGuid(this.graph, guid); if (direction == AtlasLineageOnDemandInfo.LineageDirection.INPUT || direction == AtlasLineageOnDemandInfo.LineageDirection.BOTH) - traverseEdgesOnDemand(datasetVertex, true, depth, new HashSet<>(), atlasLineageOnDemandContext, ret, guid, inputEntitiesTraversed); + traverseEdgesOnDemand(datasetVertex, true, depth, level, new HashSet<>(), atlasLineageOnDemandContext, ret, guid, inputEntitiesTraversed, traversalOrder); if (direction == AtlasLineageOnDemandInfo.LineageDirection.OUTPUT || direction == AtlasLineageOnDemandInfo.LineageDirection.BOTH) - traverseEdgesOnDemand(datasetVertex, false, depth, new HashSet<>(), atlasLineageOnDemandContext, ret, guid, outputEntitiesTraversed); + traverseEdgesOnDemand(datasetVertex, false, depth, level, new HashSet<>(), atlasLineageOnDemandContext, ret, guid, outputEntitiesTraversed, traversalOrder); AtlasEntityHeader baseEntityHeader = entityRetriever.toAtlasEntityHeader(datasetVertex, atlasLineageOnDemandContext.getAttributes()); + baseEntityHeader.setDepth(level); + baseEntityHeader.setTraversalOrder(0); ret.getGuidEntityMap().put(guid, baseEntityHeader); } else { AtlasVertex processVertex = AtlasGraphUtilsV2.findByGuid(this.graph, guid); // make one hop to the next dataset vertices from process vertex and traverse with 'depth = depth - 1' if (direction == AtlasLineageOnDemandInfo.LineageDirection.INPUT || direction == AtlasLineageOnDemandInfo.LineageDirection.BOTH) { Iterator processEdges = processVertex.getEdges(AtlasEdgeDirection.OUT, PROCESS_INPUTS_EDGE).iterator(); - traverseEdgesOnDemand(processEdges, true, depth, atlasLineageOnDemandContext, ret, processVertex, guid, inputEntitiesTraversed); + traverseEdgesOnDemand(processEdges, true, depth, level, atlasLineageOnDemandContext, ret, processVertex, guid, inputEntitiesTraversed, traversalOrder); } if (direction == AtlasLineageOnDemandInfo.LineageDirection.OUTPUT || direction == AtlasLineageOnDemandInfo.LineageDirection.BOTH) { Iterator processEdges = processVertex.getEdges(AtlasEdgeDirection.OUT, PROCESS_OUTPUTS_EDGE).iterator(); - traverseEdgesOnDemand(processEdges, false, depth, atlasLineageOnDemandContext, ret, processVertex, guid, outputEntitiesTraversed); + traverseEdgesOnDemand(processEdges, false, depth, level, atlasLineageOnDemandContext, ret, processVertex, guid, outputEntitiesTraversed, traversalOrder); } } RequestContext.get().endMetricRecord(metricRecorder); return ret; } - - private void traverseEdgesOnDemand(Iterator processEdges, boolean isInput, int depth, AtlasLineageOnDemandContext atlasLineageOnDemandContext, AtlasLineageOnDemandInfo ret, AtlasVertex processVertex, String baseGuid, AtomicInteger entitiesTraversed) throws AtlasBaseException { + private void traverseEdgesOnDemand(Iterator processEdges, boolean isInput, int depth, int level, AtlasLineageOnDemandContext atlasLineageOnDemandContext, AtlasLineageOnDemandInfo ret, AtlasVertex processVertex, String baseGuid, AtomicInteger entitiesTraversed, AtomicInteger traversalOrder) throws AtlasBaseException { AtlasLineageOnDemandInfo.LineageDirection direction = isInput ? AtlasLineageOnDemandInfo.LineageDirection.INPUT : AtlasLineageOnDemandInfo.LineageDirection.OUTPUT; + int nextLevel = isInput ? level - 1: level + 1; + while (processEdges.hasNext()) { AtlasEdge processEdge = processEdges.next(); AtlasVertex datasetVertex = processEdge.getInVertex(); @@ -336,7 +341,8 @@ private void traverseEdgesOnDemand(Iterator processEdges, boolean isI if (incrementAndCheckIfRelationsLimitReached(processEdge, isInputEdge, atlasLineageOnDemandContext, ret, depth, entitiesTraversed, direction)) { break; } else { - addEdgeToResult(processEdge, ret, atlasLineageOnDemandContext); + addEdgeToResult(processEdge, ret, atlasLineageOnDemandContext, nextLevel, traversalOrder); + traversalOrder.incrementAndGet(); } String inGuid = AtlasGraphUtilsV2.getIdFromVertex(datasetVertex); @@ -346,17 +352,17 @@ private void traverseEdgesOnDemand(Iterator processEdges, boolean isI ret.getRelationsOnDemand().put(inGuid, new LineageInfoOnDemand(inGuidLineageConstrains)); } - traverseEdgesOnDemand(datasetVertex, isInput, depth - 1, new HashSet<>(), atlasLineageOnDemandContext, ret, baseGuid, entitiesTraversed); + traverseEdgesOnDemand(datasetVertex, isInput, depth - 1, nextLevel, new HashSet<>(), atlasLineageOnDemandContext, ret, baseGuid, entitiesTraversed, traversalOrder); } } - private void traverseEdgesOnDemand(AtlasVertex datasetVertex, boolean isInput, int depth, Set visitedVertices, AtlasLineageOnDemandContext atlasLineageOnDemandContext, AtlasLineageOnDemandInfo ret, String baseGuid, AtomicInteger entitiesTraversed) throws AtlasBaseException { + private void traverseEdgesOnDemand(AtlasVertex datasetVertex, boolean isInput, int depth, int level, Set visitedVertices, AtlasLineageOnDemandContext atlasLineageOnDemandContext, AtlasLineageOnDemandInfo ret, String baseGuid, AtomicInteger entitiesTraversed, AtomicInteger traversalOrder) throws AtlasBaseException { if (isEntityTraversalLimitReached(entitiesTraversed)) return; if (depth != 0) { // base condition of recursion for depth AtlasPerfMetrics.MetricRecorder metricRecorder = RequestContext.get().startMetricRecord("traverseEdgesOnDemand"); AtlasLineageOnDemandInfo.LineageDirection direction = isInput ? AtlasLineageOnDemandInfo.LineageDirection.INPUT : AtlasLineageOnDemandInfo.LineageDirection.OUTPUT; - + int nextLevel = isInput ? level - 1: level + 1; // keep track of visited vertices to avoid circular loop visitedVertices.add(getId(datasetVertex)); @@ -385,7 +391,7 @@ private void traverseEdgesOnDemand(AtlasVertex datasetVertex, boolean isInput, i else continue; } else { - addEdgeToResult(incomingEdge, ret, atlasLineageOnDemandContext); + addEdgeToResult(incomingEdge, ret, atlasLineageOnDemandContext, level, traversalOrder); } AtlasPerfMetrics.MetricRecorder traverseEdgesOnDemandGetEdgesOut = RequestContext.get().startMetricRecord("traverseEdgesOnDemandGetEdgesOut"); @@ -413,13 +419,14 @@ private void traverseEdgesOnDemand(AtlasVertex datasetVertex, boolean isInput, i else continue; } else { - addEdgeToResult(outgoingEdge, ret, atlasLineageOnDemandContext); + addEdgeToResult(outgoingEdge, ret, atlasLineageOnDemandContext, nextLevel, traversalOrder); entitiesTraversed.incrementAndGet(); + traversalOrder.incrementAndGet(); if (isEntityTraversalLimitReached(entitiesTraversed)) setEntityLimitReachedFlag(isInput, ret); } if (entityVertex != null && !visitedVertices.contains(getId(entityVertex))) { - traverseEdgesOnDemand(entityVertex, isInput, depth - 1, visitedVertices, atlasLineageOnDemandContext, ret, baseGuid, entitiesTraversed); // execute inner depth + traverseEdgesOnDemand(entityVertex, isInput, depth - 1, nextLevel, visitedVertices, atlasLineageOnDemandContext, ret, baseGuid, entitiesTraversed, traversalOrder); // execute inner depth } } } @@ -883,9 +890,9 @@ private void addEdgeToResult(AtlasEdge edge, AtlasLineageInfo lineageInfo, } } - private void addEdgeToResult(AtlasEdge edge, AtlasLineageOnDemandInfo lineageInfo, AtlasLineageOnDemandContext atlasLineageOnDemandContext) throws AtlasBaseException { + private void addEdgeToResult(AtlasEdge edge, AtlasLineageOnDemandInfo lineageInfo, AtlasLineageOnDemandContext atlasLineageOnDemandContext, int level, AtomicInteger traversalOrder) throws AtlasBaseException { if (!lineageContainsVisitedEdgeV2(lineageInfo, edge)) { - processEdge(edge, lineageInfo, atlasLineageOnDemandContext); + processEdge(edge, lineageInfo, atlasLineageOnDemandContext, level, traversalOrder); } } @@ -1455,26 +1462,41 @@ private void processEdge(final AtlasEdge edge, final Map entities, final Set relations, final Set visitedEdges, final Set attributes) throws AtlasBaseException { + private void processEdge(final AtlasEdge edge, final Map entities, final Set relations, final Set visitedEdges, final Set attributes, int level, AtomicInteger traversalOrder) throws AtlasBaseException { AtlasPerfMetrics.MetricRecorder metricRecorder = RequestContext.get().startMetricRecord("processEdge"); AtlasVertex inVertex = edge.getInVertex(); AtlasVertex outVertex = edge.getOutVertex(); + + String inTypeName = AtlasGraphUtilsV2.getTypeName(inVertex); + AtlasEntityType inEntityType = atlasTypeRegistry.getEntityTypeByName(inTypeName); + if (inEntityType == null) { + throw new AtlasBaseException(AtlasErrorCode.TYPE_NAME_NOT_FOUND, inTypeName); + } + boolean inIsProcess = inEntityType.getTypeAndAllSuperTypes().contains(PROCESS_SUPER_TYPE); + String inGuid = AtlasGraphUtilsV2.getIdFromVertex(inVertex); String outGuid = AtlasGraphUtilsV2.getIdFromVertex(outVertex); String relationGuid = AtlasGraphUtilsV2.getEncodedProperty(edge, RELATIONSHIP_GUID_PROPERTY_KEY, String.class); boolean isInputEdge = edge.getLabel().equalsIgnoreCase(PROCESS_INPUTS_EDGE); - if (!entities.containsKey(inGuid)) { AtlasEntityHeader entityHeader = entityRetriever.toAtlasEntityHeader(inVertex, attributes); + if (!inIsProcess) { + entityHeader.setDepth(level); + entityHeader.setTraversalOrder(traversalOrder.get()); + } entities.put(inGuid, entityHeader); } if (!entities.containsKey(outGuid)) { AtlasEntityHeader entityHeader = entityRetriever.toAtlasEntityHeader(outVertex, attributes); + if (inIsProcess) { + entityHeader.setDepth(level); + entityHeader.setTraversalOrder(traversalOrder.get()); + } entities.put(outGuid, entityHeader); } if (isInputEdge) {