From 2cfd40b51b12a7628b2d829c1adfe29676b2693b Mon Sep 17 00:00:00 2001 From: Rajat Movaliya Date: Fri, 29 Mar 2024 17:24:34 +0530 Subject: [PATCH 1/9] changes to make it work --- .../org/apache/atlas/notification/AbstractNotification.java | 2 +- pom.xml | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/notification/src/main/java/org/apache/atlas/notification/AbstractNotification.java b/notification/src/main/java/org/apache/atlas/notification/AbstractNotification.java index cca4cb81d8..bed35932a1 100644 --- a/notification/src/main/java/org/apache/atlas/notification/AbstractNotification.java +++ b/notification/src/main/java/org/apache/atlas/notification/AbstractNotification.java @@ -96,7 +96,7 @@ public void send(NotificationType type, List messages, MessageSource sour createNotificationMessages(messages.get(index), strMessages, source); } - sendInternal(type, strMessages); +// sendInternal(type, strMessages); } @Override diff --git a/pom.xml b/pom.xml index 1cc9aa70dc..3abc39041b 100644 --- a/pom.xml +++ b/pom.xml @@ -717,7 +717,7 @@ 4.4.13 2.12.4 2.12.4 - 0.6.03 + 0.6.04 0.5.3 1 3.1.0 From 379d39453788ce567944ab317474acf7265da8a7 Mon Sep 17 00:00:00 2001 From: Rajat Movaliya Date: Tue, 2 Apr 2024 18:31:59 +0530 Subject: [PATCH 2/9] feat: add node depth in lineage --- .../model/instance/AtlasEntityHeader.java | 10 ++++++++++ .../atlas/discovery/EntityLineageService.java | 18 ++++++++++-------- 2 files changed, 20 insertions(+), 8 deletions(-) diff --git a/intg/src/main/java/org/apache/atlas/model/instance/AtlasEntityHeader.java b/intg/src/main/java/org/apache/atlas/model/instance/AtlasEntityHeader.java index be2819d7f2..f5f060349d 100644 --- a/intg/src/main/java/org/apache/atlas/model/instance/AtlasEntityHeader.java +++ b/intg/src/main/java/org/apache/atlas/model/instance/AtlasEntityHeader.java @@ -71,8 +71,10 @@ public class AtlasEntityHeader extends AtlasStruct implements Serializable { private Date createTime = null; private Date updateTime = null; private String deleteHandler = null; + private Integer depth = null; private Map collapse = null; + public AtlasEntityHeader() { this(null, null); } @@ -154,6 +156,14 @@ public void setGuid(String guid) { this.guid = guid; } + public Integer getDepth() { + return depth; + } + + public void setDepth(Integer depth) { + this.depth = depth; + } + public AtlasEntity.Status getStatus() { return status; } diff --git a/repository/src/main/java/org/apache/atlas/discovery/EntityLineageService.java b/repository/src/main/java/org/apache/atlas/discovery/EntityLineageService.java index 6927b1d1c4..b64739657b 100644 --- a/repository/src/main/java/org/apache/atlas/discovery/EntityLineageService.java +++ b/repository/src/main/java/org/apache/atlas/discovery/EntityLineageService.java @@ -336,7 +336,7 @@ private void traverseEdgesOnDemand(Iterator processEdges, boolean isI if (incrementAndCheckIfRelationsLimitReached(processEdge, isInputEdge, atlasLineageOnDemandContext, ret, depth, entitiesTraversed, direction)) { break; } else { - addEdgeToResult(processEdge, ret, atlasLineageOnDemandContext); + addEdgeToResult(processEdge, ret, atlasLineageOnDemandContext, depth); } String inGuid = AtlasGraphUtilsV2.getIdFromVertex(datasetVertex); @@ -385,7 +385,7 @@ private void traverseEdgesOnDemand(AtlasVertex datasetVertex, boolean isInput, i else continue; } else { - addEdgeToResult(incomingEdge, ret, atlasLineageOnDemandContext); + addEdgeToResult(incomingEdge, ret, atlasLineageOnDemandContext, depth); } AtlasPerfMetrics.MetricRecorder traverseEdgesOnDemandGetEdgesOut = RequestContext.get().startMetricRecord("traverseEdgesOnDemandGetEdgesOut"); @@ -413,7 +413,7 @@ private void traverseEdgesOnDemand(AtlasVertex datasetVertex, boolean isInput, i else continue; } else { - addEdgeToResult(outgoingEdge, ret, atlasLineageOnDemandContext); + addEdgeToResult(outgoingEdge, ret, atlasLineageOnDemandContext, depth); entitiesTraversed.incrementAndGet(); if (isEntityTraversalLimitReached(entitiesTraversed)) setEntityLimitReachedFlag(isInput, ret); @@ -874,9 +874,9 @@ private void addEdgeToResult(AtlasEdge edge, AtlasLineageInfo lineageInfo, } } - private void addEdgeToResult(AtlasEdge edge, AtlasLineageOnDemandInfo lineageInfo, AtlasLineageOnDemandContext atlasLineageOnDemandContext) throws AtlasBaseException { + private void addEdgeToResult(AtlasEdge edge, AtlasLineageOnDemandInfo lineageInfo, AtlasLineageOnDemandContext atlasLineageOnDemandContext, int depth) throws AtlasBaseException { if (!lineageContainsVisitedEdgeV2(lineageInfo, edge)) { - processEdge(edge, lineageInfo, atlasLineageOnDemandContext); + processEdge(edge, lineageInfo, atlasLineageOnDemandContext, depth); } } @@ -1446,11 +1446,11 @@ private void processEdge(final AtlasEdge edge, final Map entities, final Set relations, final Set visitedEdges, final Set attributes) throws AtlasBaseException { + private void processEdge(final AtlasEdge edge, final Map entities, final Set relations, final Set visitedEdges, final Set attributes, int depth) throws AtlasBaseException { AtlasPerfMetrics.MetricRecorder metricRecorder = RequestContext.get().startMetricRecord("processEdge"); AtlasVertex inVertex = edge.getInVertex(); AtlasVertex outVertex = edge.getOutVertex(); @@ -1462,10 +1462,12 @@ private void processEdge(final AtlasEdge edge, final Map Date: Wed, 3 Apr 2024 13:28:11 +0530 Subject: [PATCH 3/9] update lineage level correctly --- .../atlas/discovery/EntityLineageService.java | 43 ++++++++++--------- 1 file changed, 23 insertions(+), 20 deletions(-) diff --git a/repository/src/main/java/org/apache/atlas/discovery/EntityLineageService.java b/repository/src/main/java/org/apache/atlas/discovery/EntityLineageService.java index b64739657b..4a6eb70821 100644 --- a/repository/src/main/java/org/apache/atlas/discovery/EntityLineageService.java +++ b/repository/src/main/java/org/apache/atlas/discovery/EntityLineageService.java @@ -283,6 +283,7 @@ private AtlasLineageOnDemandInfo getLineageInfoOnDemand(String guid, AtlasLineag LineageOnDemandConstraints lineageConstraintsByGuid = getAndValidateLineageConstraintsByGuid(guid, atlasLineageOnDemandContext); AtlasLineageOnDemandInfo.LineageDirection direction = lineageConstraintsByGuid.getDirection(); + int level = 0; int depth = lineageConstraintsByGuid.getDepth(); AtlasLineageOnDemandInfo ret = initializeLineageOnDemandInfo(guid); @@ -296,21 +297,22 @@ private AtlasLineageOnDemandInfo getLineageInfoOnDemand(String guid, AtlasLineag if (isDataSet) { AtlasVertex datasetVertex = AtlasGraphUtilsV2.findByGuid(this.graph, guid); if (direction == AtlasLineageOnDemandInfo.LineageDirection.INPUT || direction == AtlasLineageOnDemandInfo.LineageDirection.BOTH) - traverseEdgesOnDemand(datasetVertex, true, depth, new HashSet<>(), atlasLineageOnDemandContext, ret, guid, inputEntitiesTraversed); + traverseEdgesOnDemand(datasetVertex, true, depth, level, new HashSet<>(), atlasLineageOnDemandContext, ret, guid, inputEntitiesTraversed); if (direction == AtlasLineageOnDemandInfo.LineageDirection.OUTPUT || direction == AtlasLineageOnDemandInfo.LineageDirection.BOTH) - traverseEdgesOnDemand(datasetVertex, false, depth, new HashSet<>(), atlasLineageOnDemandContext, ret, guid, outputEntitiesTraversed); + traverseEdgesOnDemand(datasetVertex, false, depth, level, new HashSet<>(), atlasLineageOnDemandContext, ret, guid, outputEntitiesTraversed); AtlasEntityHeader baseEntityHeader = entityRetriever.toAtlasEntityHeader(datasetVertex, atlasLineageOnDemandContext.getAttributes()); + baseEntityHeader.setDepth(level); ret.getGuidEntityMap().put(guid, baseEntityHeader); } else { AtlasVertex processVertex = AtlasGraphUtilsV2.findByGuid(this.graph, guid); // make one hop to the next dataset vertices from process vertex and traverse with 'depth = depth - 1' if (direction == AtlasLineageOnDemandInfo.LineageDirection.INPUT || direction == AtlasLineageOnDemandInfo.LineageDirection.BOTH) { Iterator processEdges = processVertex.getEdges(AtlasEdgeDirection.OUT, PROCESS_INPUTS_EDGE).iterator(); - traverseEdgesOnDemand(processEdges, true, depth, atlasLineageOnDemandContext, ret, processVertex, guid, inputEntitiesTraversed); + traverseEdgesOnDemand(processEdges, true, depth, level, atlasLineageOnDemandContext, ret, processVertex, guid, inputEntitiesTraversed); } if (direction == AtlasLineageOnDemandInfo.LineageDirection.OUTPUT || direction == AtlasLineageOnDemandInfo.LineageDirection.BOTH) { Iterator processEdges = processVertex.getEdges(AtlasEdgeDirection.OUT, PROCESS_OUTPUTS_EDGE).iterator(); - traverseEdgesOnDemand(processEdges, false, depth, atlasLineageOnDemandContext, ret, processVertex, guid, outputEntitiesTraversed); + traverseEdgesOnDemand(processEdges, false, depth, level, atlasLineageOnDemandContext, ret, processVertex, guid, outputEntitiesTraversed); } } RequestContext.get().endMetricRecord(metricRecorder); @@ -318,8 +320,10 @@ private AtlasLineageOnDemandInfo getLineageInfoOnDemand(String guid, AtlasLineag } - private void traverseEdgesOnDemand(Iterator processEdges, boolean isInput, int depth, AtlasLineageOnDemandContext atlasLineageOnDemandContext, AtlasLineageOnDemandInfo ret, AtlasVertex processVertex, String baseGuid, AtomicInteger entitiesTraversed) throws AtlasBaseException { + private void traverseEdgesOnDemand(Iterator processEdges, boolean isInput, int depth, int level, AtlasLineageOnDemandContext atlasLineageOnDemandContext, AtlasLineageOnDemandInfo ret, AtlasVertex processVertex, String baseGuid, AtomicInteger entitiesTraversed) throws AtlasBaseException { AtlasLineageOnDemandInfo.LineageDirection direction = isInput ? AtlasLineageOnDemandInfo.LineageDirection.INPUT : AtlasLineageOnDemandInfo.LineageDirection.OUTPUT; + int nextLevel = isInput ? level - 1: level + 1; + while (processEdges.hasNext()) { AtlasEdge processEdge = processEdges.next(); AtlasVertex datasetVertex = processEdge.getInVertex(); @@ -336,7 +340,7 @@ private void traverseEdgesOnDemand(Iterator processEdges, boolean isI if (incrementAndCheckIfRelationsLimitReached(processEdge, isInputEdge, atlasLineageOnDemandContext, ret, depth, entitiesTraversed, direction)) { break; } else { - addEdgeToResult(processEdge, ret, atlasLineageOnDemandContext, depth); + addEdgeToResult(processEdge, ret, atlasLineageOnDemandContext, nextLevel); } String inGuid = AtlasGraphUtilsV2.getIdFromVertex(datasetVertex); @@ -346,17 +350,17 @@ private void traverseEdgesOnDemand(Iterator processEdges, boolean isI ret.getRelationsOnDemand().put(inGuid, new LineageInfoOnDemand(inGuidLineageConstrains)); } - traverseEdgesOnDemand(datasetVertex, isInput, depth - 1, new HashSet<>(), atlasLineageOnDemandContext, ret, baseGuid, entitiesTraversed); + traverseEdgesOnDemand(datasetVertex, isInput, depth - 1, nextLevel, new HashSet<>(), atlasLineageOnDemandContext, ret, baseGuid, entitiesTraversed); } } - private void traverseEdgesOnDemand(AtlasVertex datasetVertex, boolean isInput, int depth, Set visitedVertices, AtlasLineageOnDemandContext atlasLineageOnDemandContext, AtlasLineageOnDemandInfo ret, String baseGuid, AtomicInteger entitiesTraversed) throws AtlasBaseException { + private void traverseEdgesOnDemand(AtlasVertex datasetVertex, boolean isInput, int depth, int level, Set visitedVertices, AtlasLineageOnDemandContext atlasLineageOnDemandContext, AtlasLineageOnDemandInfo ret, String baseGuid, AtomicInteger entitiesTraversed) throws AtlasBaseException { if (isEntityTraversalLimitReached(entitiesTraversed)) return; if (depth != 0) { // base condition of recursion for depth AtlasPerfMetrics.MetricRecorder metricRecorder = RequestContext.get().startMetricRecord("traverseEdgesOnDemand"); AtlasLineageOnDemandInfo.LineageDirection direction = isInput ? AtlasLineageOnDemandInfo.LineageDirection.INPUT : AtlasLineageOnDemandInfo.LineageDirection.OUTPUT; - + int nextLevel = isInput ? level - 1: level + 1; // keep track of visited vertices to avoid circular loop visitedVertices.add(getId(datasetVertex)); @@ -385,7 +389,7 @@ private void traverseEdgesOnDemand(AtlasVertex datasetVertex, boolean isInput, i else continue; } else { - addEdgeToResult(incomingEdge, ret, atlasLineageOnDemandContext, depth); + addEdgeToResult(incomingEdge, ret, atlasLineageOnDemandContext, level); } AtlasPerfMetrics.MetricRecorder traverseEdgesOnDemandGetEdgesOut = RequestContext.get().startMetricRecord("traverseEdgesOnDemandGetEdgesOut"); @@ -413,13 +417,13 @@ private void traverseEdgesOnDemand(AtlasVertex datasetVertex, boolean isInput, i else continue; } else { - addEdgeToResult(outgoingEdge, ret, atlasLineageOnDemandContext, depth); + addEdgeToResult(outgoingEdge, ret, atlasLineageOnDemandContext, nextLevel); entitiesTraversed.incrementAndGet(); if (isEntityTraversalLimitReached(entitiesTraversed)) setEntityLimitReachedFlag(isInput, ret); } if (entityVertex != null && !visitedVertices.contains(getId(entityVertex))) { - traverseEdgesOnDemand(entityVertex, isInput, depth - 1, visitedVertices, atlasLineageOnDemandContext, ret, baseGuid, entitiesTraversed); // execute inner depth + traverseEdgesOnDemand(entityVertex, isInput, depth - 1, nextLevel, visitedVertices, atlasLineageOnDemandContext, ret, baseGuid, entitiesTraversed); // execute inner depth } } } @@ -874,9 +878,9 @@ private void addEdgeToResult(AtlasEdge edge, AtlasLineageInfo lineageInfo, } } - private void addEdgeToResult(AtlasEdge edge, AtlasLineageOnDemandInfo lineageInfo, AtlasLineageOnDemandContext atlasLineageOnDemandContext, int depth) throws AtlasBaseException { + private void addEdgeToResult(AtlasEdge edge, AtlasLineageOnDemandInfo lineageInfo, AtlasLineageOnDemandContext atlasLineageOnDemandContext, int level) throws AtlasBaseException { if (!lineageContainsVisitedEdgeV2(lineageInfo, edge)) { - processEdge(edge, lineageInfo, atlasLineageOnDemandContext, depth); + processEdge(edge, lineageInfo, atlasLineageOnDemandContext, level); } } @@ -1446,11 +1450,11 @@ private void processEdge(final AtlasEdge edge, final Map entities, final Set relations, final Set visitedEdges, final Set attributes, int depth) throws AtlasBaseException { + private void processEdge(final AtlasEdge edge, final Map entities, final Set relations, final Set visitedEdges, final Set attributes, int level) throws AtlasBaseException { AtlasPerfMetrics.MetricRecorder metricRecorder = RequestContext.get().startMetricRecord("processEdge"); AtlasVertex inVertex = edge.getInVertex(); AtlasVertex outVertex = edge.getOutVertex(); @@ -1459,15 +1463,14 @@ private void processEdge(final AtlasEdge edge, final Map Date: Wed, 3 Apr 2024 14:36:30 +0530 Subject: [PATCH 4/9] remove depth from process node --- .../org/apache/atlas/discovery/EntityLineageService.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/repository/src/main/java/org/apache/atlas/discovery/EntityLineageService.java b/repository/src/main/java/org/apache/atlas/discovery/EntityLineageService.java index 4a6eb70821..3ec7743906 100644 --- a/repository/src/main/java/org/apache/atlas/discovery/EntityLineageService.java +++ b/repository/src/main/java/org/apache/atlas/discovery/EntityLineageService.java @@ -1458,6 +1458,7 @@ private void processEdge(final AtlasEdge edge, final Map Date: Wed, 3 Apr 2024 18:07:53 +0530 Subject: [PATCH 5/9] Revert "changes to make it work" This reverts commit 2cfd40b51b12a7628b2d829c1adfe29676b2693b. --- .../org/apache/atlas/notification/AbstractNotification.java | 2 +- pom.xml | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/notification/src/main/java/org/apache/atlas/notification/AbstractNotification.java b/notification/src/main/java/org/apache/atlas/notification/AbstractNotification.java index bed35932a1..cca4cb81d8 100644 --- a/notification/src/main/java/org/apache/atlas/notification/AbstractNotification.java +++ b/notification/src/main/java/org/apache/atlas/notification/AbstractNotification.java @@ -96,7 +96,7 @@ public void send(NotificationType type, List messages, MessageSource sour createNotificationMessages(messages.get(index), strMessages, source); } -// sendInternal(type, strMessages); + sendInternal(type, strMessages); } @Override diff --git a/pom.xml b/pom.xml index 3abc39041b..1cc9aa70dc 100644 --- a/pom.xml +++ b/pom.xml @@ -717,7 +717,7 @@ 4.4.13 2.12.4 2.12.4 - 0.6.04 + 0.6.03 0.5.3 1 3.1.0 From 1cfb2ede80324f359c84abba95c5c01e7523de2b Mon Sep 17 00:00:00 2001 From: Rajat Movaliya Date: Thu, 2 May 2024 18:34:16 +0530 Subject: [PATCH 6/9] add traversalOrder attribute --- .../model/instance/AtlasEntityHeader.java | 21 +++++++-------- .../atlas/discovery/EntityLineageService.java | 26 ++++++++++++------- 2 files changed, 25 insertions(+), 22 deletions(-) diff --git a/intg/src/main/java/org/apache/atlas/model/instance/AtlasEntityHeader.java b/intg/src/main/java/org/apache/atlas/model/instance/AtlasEntityHeader.java index f5f060349d..7cb3f21b1c 100644 --- a/intg/src/main/java/org/apache/atlas/model/instance/AtlasEntityHeader.java +++ b/intg/src/main/java/org/apache/atlas/model/instance/AtlasEntityHeader.java @@ -72,6 +72,7 @@ public class AtlasEntityHeader extends AtlasStruct implements Serializable { private Date updateTime = null; private String deleteHandler = null; private Integer depth = null; + private Integer traversalOrder = null; private Map collapse = null; @@ -148,21 +149,17 @@ public AtlasEntityHeader(AtlasEntity entity) { } } - public String getGuid() { - return guid; - } + public String getGuid() { return guid; } - public void setGuid(String guid) { - this.guid = guid; - } + public void setGuid(String guid) { this.guid = guid; } - public Integer getDepth() { - return depth; - } + public Integer getDepth() { return depth; } - public void setDepth(Integer depth) { - this.depth = depth; - } + public void setDepth(Integer depth) { this.depth = depth; } + + public Integer getTraversalOrder() { return traversalOrder; } + + public void setTraversalOrder(Integer traversalOrder) { this.traversalOrder = traversalOrder; } public AtlasEntity.Status getStatus() { return status; diff --git a/repository/src/main/java/org/apache/atlas/discovery/EntityLineageService.java b/repository/src/main/java/org/apache/atlas/discovery/EntityLineageService.java index 3ec7743906..353232cdbf 100644 --- a/repository/src/main/java/org/apache/atlas/discovery/EntityLineageService.java +++ b/repository/src/main/java/org/apache/atlas/discovery/EntityLineageService.java @@ -340,7 +340,7 @@ private void traverseEdgesOnDemand(Iterator processEdges, boolean isI if (incrementAndCheckIfRelationsLimitReached(processEdge, isInputEdge, atlasLineageOnDemandContext, ret, depth, entitiesTraversed, direction)) { break; } else { - addEdgeToResult(processEdge, ret, atlasLineageOnDemandContext, nextLevel); + addEdgeToResult(processEdge, ret, atlasLineageOnDemandContext, nextLevel, entitiesTraversed); } String inGuid = AtlasGraphUtilsV2.getIdFromVertex(datasetVertex); @@ -389,7 +389,7 @@ private void traverseEdgesOnDemand(AtlasVertex datasetVertex, boolean isInput, i else continue; } else { - addEdgeToResult(incomingEdge, ret, atlasLineageOnDemandContext, level); + addEdgeToResult(incomingEdge, ret, atlasLineageOnDemandContext, level, entitiesTraversed); } AtlasPerfMetrics.MetricRecorder traverseEdgesOnDemandGetEdgesOut = RequestContext.get().startMetricRecord("traverseEdgesOnDemandGetEdgesOut"); @@ -417,7 +417,7 @@ private void traverseEdgesOnDemand(AtlasVertex datasetVertex, boolean isInput, i else continue; } else { - addEdgeToResult(outgoingEdge, ret, atlasLineageOnDemandContext, nextLevel); + addEdgeToResult(outgoingEdge, ret, atlasLineageOnDemandContext, nextLevel, entitiesTraversed); entitiesTraversed.incrementAndGet(); if (isEntityTraversalLimitReached(entitiesTraversed)) setEntityLimitReachedFlag(isInput, ret); @@ -878,9 +878,9 @@ private void addEdgeToResult(AtlasEdge edge, AtlasLineageInfo lineageInfo, } } - private void addEdgeToResult(AtlasEdge edge, AtlasLineageOnDemandInfo lineageInfo, AtlasLineageOnDemandContext atlasLineageOnDemandContext, int level) throws AtlasBaseException { + private void addEdgeToResult(AtlasEdge edge, AtlasLineageOnDemandInfo lineageInfo, AtlasLineageOnDemandContext atlasLineageOnDemandContext, int level, AtomicInteger entitiesTraversed) throws AtlasBaseException { if (!lineageContainsVisitedEdgeV2(lineageInfo, edge)) { - processEdge(edge, lineageInfo, atlasLineageOnDemandContext, level); + processEdge(edge, lineageInfo, atlasLineageOnDemandContext, level, entitiesTraversed); } } @@ -1450,11 +1450,11 @@ private void processEdge(final AtlasEdge edge, final Map entities, final Set relations, final Set visitedEdges, final Set attributes, int level) throws AtlasBaseException { + private void processEdge(final AtlasEdge edge, final Map entities, final Set relations, final Set visitedEdges, final Set attributes, int level, AtomicInteger entitiesTraversed) throws AtlasBaseException { AtlasPerfMetrics.MetricRecorder metricRecorder = RequestContext.get().startMetricRecord("processEdge"); AtlasVertex inVertex = edge.getInVertex(); AtlasVertex outVertex = edge.getOutVertex(); @@ -1466,12 +1466,18 @@ private void processEdge(final AtlasEdge edge, final Map Date: Fri, 3 May 2024 14:31:00 +0530 Subject: [PATCH 7/9] add separate counter for traversalOrder --- .../atlas/discovery/EntityLineageService.java | 40 ++++++++++--------- 1 file changed, 21 insertions(+), 19 deletions(-) diff --git a/repository/src/main/java/org/apache/atlas/discovery/EntityLineageService.java b/repository/src/main/java/org/apache/atlas/discovery/EntityLineageService.java index 353232cdbf..5bdecd451f 100644 --- a/repository/src/main/java/org/apache/atlas/discovery/EntityLineageService.java +++ b/repository/src/main/java/org/apache/atlas/discovery/EntityLineageService.java @@ -294,12 +294,13 @@ private AtlasLineageOnDemandInfo getLineageInfoOnDemand(String guid, AtlasLineag AtomicInteger inputEntitiesTraversed = new AtomicInteger(0); AtomicInteger outputEntitiesTraversed = new AtomicInteger(0); + AtomicInteger traversalOrder = new AtomicInteger(1); if (isDataSet) { AtlasVertex datasetVertex = AtlasGraphUtilsV2.findByGuid(this.graph, guid); if (direction == AtlasLineageOnDemandInfo.LineageDirection.INPUT || direction == AtlasLineageOnDemandInfo.LineageDirection.BOTH) - traverseEdgesOnDemand(datasetVertex, true, depth, level, new HashSet<>(), atlasLineageOnDemandContext, ret, guid, inputEntitiesTraversed); + traverseEdgesOnDemand(datasetVertex, true, depth, level, new HashSet<>(), atlasLineageOnDemandContext, ret, guid, inputEntitiesTraversed, traversalOrder); if (direction == AtlasLineageOnDemandInfo.LineageDirection.OUTPUT || direction == AtlasLineageOnDemandInfo.LineageDirection.BOTH) - traverseEdgesOnDemand(datasetVertex, false, depth, level, new HashSet<>(), atlasLineageOnDemandContext, ret, guid, outputEntitiesTraversed); + traverseEdgesOnDemand(datasetVertex, false, depth, level, new HashSet<>(), atlasLineageOnDemandContext, ret, guid, outputEntitiesTraversed, traversalOrder); AtlasEntityHeader baseEntityHeader = entityRetriever.toAtlasEntityHeader(datasetVertex, atlasLineageOnDemandContext.getAttributes()); baseEntityHeader.setDepth(level); ret.getGuidEntityMap().put(guid, baseEntityHeader); @@ -308,19 +309,18 @@ private AtlasLineageOnDemandInfo getLineageInfoOnDemand(String guid, AtlasLineag // make one hop to the next dataset vertices from process vertex and traverse with 'depth = depth - 1' if (direction == AtlasLineageOnDemandInfo.LineageDirection.INPUT || direction == AtlasLineageOnDemandInfo.LineageDirection.BOTH) { Iterator processEdges = processVertex.getEdges(AtlasEdgeDirection.OUT, PROCESS_INPUTS_EDGE).iterator(); - traverseEdgesOnDemand(processEdges, true, depth, level, atlasLineageOnDemandContext, ret, processVertex, guid, inputEntitiesTraversed); + traverseEdgesOnDemand(processEdges, true, depth, level, atlasLineageOnDemandContext, ret, processVertex, guid, inputEntitiesTraversed, traversalOrder); } if (direction == AtlasLineageOnDemandInfo.LineageDirection.OUTPUT || direction == AtlasLineageOnDemandInfo.LineageDirection.BOTH) { Iterator processEdges = processVertex.getEdges(AtlasEdgeDirection.OUT, PROCESS_OUTPUTS_EDGE).iterator(); - traverseEdgesOnDemand(processEdges, false, depth, level, atlasLineageOnDemandContext, ret, processVertex, guid, outputEntitiesTraversed); + traverseEdgesOnDemand(processEdges, false, depth, level, atlasLineageOnDemandContext, ret, processVertex, guid, outputEntitiesTraversed, traversalOrder); } } RequestContext.get().endMetricRecord(metricRecorder); return ret; } - - private void traverseEdgesOnDemand(Iterator processEdges, boolean isInput, int depth, int level, AtlasLineageOnDemandContext atlasLineageOnDemandContext, AtlasLineageOnDemandInfo ret, AtlasVertex processVertex, String baseGuid, AtomicInteger entitiesTraversed) throws AtlasBaseException { + private void traverseEdgesOnDemand(Iterator processEdges, boolean isInput, int depth, int level, AtlasLineageOnDemandContext atlasLineageOnDemandContext, AtlasLineageOnDemandInfo ret, AtlasVertex processVertex, String baseGuid, AtomicInteger entitiesTraversed, AtomicInteger traversalOrder) throws AtlasBaseException { AtlasLineageOnDemandInfo.LineageDirection direction = isInput ? AtlasLineageOnDemandInfo.LineageDirection.INPUT : AtlasLineageOnDemandInfo.LineageDirection.OUTPUT; int nextLevel = isInput ? level - 1: level + 1; @@ -340,7 +340,8 @@ private void traverseEdgesOnDemand(Iterator processEdges, boolean isI if (incrementAndCheckIfRelationsLimitReached(processEdge, isInputEdge, atlasLineageOnDemandContext, ret, depth, entitiesTraversed, direction)) { break; } else { - addEdgeToResult(processEdge, ret, atlasLineageOnDemandContext, nextLevel, entitiesTraversed); + addEdgeToResult(processEdge, ret, atlasLineageOnDemandContext, nextLevel, traversalOrder); + traversalOrder.incrementAndGet(); } String inGuid = AtlasGraphUtilsV2.getIdFromVertex(datasetVertex); @@ -350,11 +351,11 @@ private void traverseEdgesOnDemand(Iterator processEdges, boolean isI ret.getRelationsOnDemand().put(inGuid, new LineageInfoOnDemand(inGuidLineageConstrains)); } - traverseEdgesOnDemand(datasetVertex, isInput, depth - 1, nextLevel, new HashSet<>(), atlasLineageOnDemandContext, ret, baseGuid, entitiesTraversed); + traverseEdgesOnDemand(datasetVertex, isInput, depth - 1, nextLevel, new HashSet<>(), atlasLineageOnDemandContext, ret, baseGuid, entitiesTraversed, traversalOrder); } } - private void traverseEdgesOnDemand(AtlasVertex datasetVertex, boolean isInput, int depth, int level, Set visitedVertices, AtlasLineageOnDemandContext atlasLineageOnDemandContext, AtlasLineageOnDemandInfo ret, String baseGuid, AtomicInteger entitiesTraversed) throws AtlasBaseException { + private void traverseEdgesOnDemand(AtlasVertex datasetVertex, boolean isInput, int depth, int level, Set visitedVertices, AtlasLineageOnDemandContext atlasLineageOnDemandContext, AtlasLineageOnDemandInfo ret, String baseGuid, AtomicInteger entitiesTraversed, AtomicInteger traversalOrder) throws AtlasBaseException { if (isEntityTraversalLimitReached(entitiesTraversed)) return; if (depth != 0) { // base condition of recursion for depth @@ -389,7 +390,7 @@ private void traverseEdgesOnDemand(AtlasVertex datasetVertex, boolean isInput, i else continue; } else { - addEdgeToResult(incomingEdge, ret, atlasLineageOnDemandContext, level, entitiesTraversed); + addEdgeToResult(incomingEdge, ret, atlasLineageOnDemandContext, level, traversalOrder); } AtlasPerfMetrics.MetricRecorder traverseEdgesOnDemandGetEdgesOut = RequestContext.get().startMetricRecord("traverseEdgesOnDemandGetEdgesOut"); @@ -417,13 +418,14 @@ private void traverseEdgesOnDemand(AtlasVertex datasetVertex, boolean isInput, i else continue; } else { - addEdgeToResult(outgoingEdge, ret, atlasLineageOnDemandContext, nextLevel, entitiesTraversed); + addEdgeToResult(outgoingEdge, ret, atlasLineageOnDemandContext, nextLevel, traversalOrder); entitiesTraversed.incrementAndGet(); + traversalOrder.incrementAndGet(); if (isEntityTraversalLimitReached(entitiesTraversed)) setEntityLimitReachedFlag(isInput, ret); } if (entityVertex != null && !visitedVertices.contains(getId(entityVertex))) { - traverseEdgesOnDemand(entityVertex, isInput, depth - 1, nextLevel, visitedVertices, atlasLineageOnDemandContext, ret, baseGuid, entitiesTraversed); // execute inner depth + traverseEdgesOnDemand(entityVertex, isInput, depth - 1, nextLevel, visitedVertices, atlasLineageOnDemandContext, ret, baseGuid, entitiesTraversed, traversalOrder); // execute inner depth } } } @@ -878,9 +880,9 @@ private void addEdgeToResult(AtlasEdge edge, AtlasLineageInfo lineageInfo, } } - private void addEdgeToResult(AtlasEdge edge, AtlasLineageOnDemandInfo lineageInfo, AtlasLineageOnDemandContext atlasLineageOnDemandContext, int level, AtomicInteger entitiesTraversed) throws AtlasBaseException { + private void addEdgeToResult(AtlasEdge edge, AtlasLineageOnDemandInfo lineageInfo, AtlasLineageOnDemandContext atlasLineageOnDemandContext, int level, AtomicInteger traversalOrder) throws AtlasBaseException { if (!lineageContainsVisitedEdgeV2(lineageInfo, edge)) { - processEdge(edge, lineageInfo, atlasLineageOnDemandContext, level, entitiesTraversed); + processEdge(edge, lineageInfo, atlasLineageOnDemandContext, level, traversalOrder); } } @@ -1450,11 +1452,11 @@ private void processEdge(final AtlasEdge edge, final Map entities, final Set relations, final Set visitedEdges, final Set attributes, int level, AtomicInteger entitiesTraversed) throws AtlasBaseException { + private void processEdge(final AtlasEdge edge, final Map entities, final Set relations, final Set visitedEdges, final Set attributes, int level, AtomicInteger traversalOrder) throws AtlasBaseException { AtlasPerfMetrics.MetricRecorder metricRecorder = RequestContext.get().startMetricRecord("processEdge"); AtlasVertex inVertex = edge.getInVertex(); AtlasVertex outVertex = edge.getOutVertex(); @@ -1468,7 +1470,7 @@ private void processEdge(final AtlasEdge edge, final Map Date: Wed, 8 May 2024 12:04:19 +0530 Subject: [PATCH 8/9] add traversalOrder for base node --- .../java/org/apache/atlas/discovery/EntityLineageService.java | 1 + 1 file changed, 1 insertion(+) diff --git a/repository/src/main/java/org/apache/atlas/discovery/EntityLineageService.java b/repository/src/main/java/org/apache/atlas/discovery/EntityLineageService.java index 5bdecd451f..22f7039e6e 100644 --- a/repository/src/main/java/org/apache/atlas/discovery/EntityLineageService.java +++ b/repository/src/main/java/org/apache/atlas/discovery/EntityLineageService.java @@ -303,6 +303,7 @@ private AtlasLineageOnDemandInfo getLineageInfoOnDemand(String guid, AtlasLineag traverseEdgesOnDemand(datasetVertex, false, depth, level, new HashSet<>(), atlasLineageOnDemandContext, ret, guid, outputEntitiesTraversed, traversalOrder); AtlasEntityHeader baseEntityHeader = entityRetriever.toAtlasEntityHeader(datasetVertex, atlasLineageOnDemandContext.getAttributes()); baseEntityHeader.setDepth(level); + baseEntityHeader.setTraversalOrder(0); ret.getGuidEntityMap().put(guid, baseEntityHeader); } else { AtlasVertex processVertex = AtlasGraphUtilsV2.findByGuid(this.graph, guid); From 9cb3abf619a6e210711183641f626916a86af3a7 Mon Sep 17 00:00:00 2001 From: Rajat Movaliya Date: Wed, 8 May 2024 14:35:08 +0530 Subject: [PATCH 9/9] add support for column lineage --- .../org/apache/atlas/discovery/EntityLineageService.java | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/repository/src/main/java/org/apache/atlas/discovery/EntityLineageService.java b/repository/src/main/java/org/apache/atlas/discovery/EntityLineageService.java index 638523ad6d..c9c11e4a86 100644 --- a/repository/src/main/java/org/apache/atlas/discovery/EntityLineageService.java +++ b/repository/src/main/java/org/apache/atlas/discovery/EntityLineageService.java @@ -1470,7 +1470,14 @@ private void processEdge(final AtlasEdge edge, final Map