From a239c6a7f1175e247e615534ce5b009964cf84f6 Mon Sep 17 00:00:00 2001 From: Rajat Movaliya Date: Mon, 24 Jun 2024 18:32:17 +0530 Subject: [PATCH 1/2] add horizontal flag count --- .../java/org/apache/atlas/AtlasConfiguration.java | 2 +- .../apache/atlas/discovery/EntityLineageService.java | 12 ++++++++---- 2 files changed, 9 insertions(+), 5 deletions(-) diff --git a/intg/src/main/java/org/apache/atlas/AtlasConfiguration.java b/intg/src/main/java/org/apache/atlas/AtlasConfiguration.java index 6aaf2439bf..a701510a4e 100644 --- a/intg/src/main/java/org/apache/atlas/AtlasConfiguration.java +++ b/intg/src/main/java/org/apache/atlas/AtlasConfiguration.java @@ -93,7 +93,7 @@ public enum AtlasConfiguration { GRAPH_TRAVERSAL_PARALLELISM("atlas.graph.traverse.bucket.size",10), LINEAGE_ON_DEMAND_ENABLED("atlas.lineage.on.demand.enabled", true), LINEAGE_ON_DEMAND_DEFAULT_NODE_COUNT("atlas.lineage.on.demand.default.node.count", 3), - LINEAGE_MAX_NODE_COUNT("atlas.lineage.max.node.count", 1000), + LINEAGE_MAX_NODE_COUNT("atlas.lineage.max.node.count", 100), SUPPORTED_RELATIONSHIP_EVENTS("atlas.notification.relationships.filter", "asset_readme,asset_links"), diff --git a/repository/src/main/java/org/apache/atlas/discovery/EntityLineageService.java b/repository/src/main/java/org/apache/atlas/discovery/EntityLineageService.java index 8ef41cf231..db51ef407e 100644 --- a/repository/src/main/java/org/apache/atlas/discovery/EntityLineageService.java +++ b/repository/src/main/java/org/apache/atlas/discovery/EntityLineageService.java @@ -646,15 +646,19 @@ else if (!isInput && ! isInVertexVisited) } private void setHasDownstream(AtlasLineageOnDemandContext atlasLineageOnDemandContext, AtlasVertex inVertex, LineageInfoOnDemand inLineageInfo) { - List filteredEdges = getFilteredAtlasEdges(inVertex, IN, PROCESS_INPUTS_EDGE, atlasLineageOnDemandContext, true); - if (!filteredEdges.isEmpty()) + List filteredEdges = getFilteredAtlasEdges(inVertex, IN, PROCESS_INPUTS_EDGE, atlasLineageOnDemandContext, false); + if (!filteredEdges.isEmpty()) { inLineageInfo.setHasDownstream(true); + inLineageInfo.setTotalOutputRelationsCount(filteredEdges.size()); + } } private void setHasUpstream(AtlasLineageOnDemandContext atlasLineageOnDemandContext, AtlasVertex outVertex, LineageInfoOnDemand outLineageInfo) { - List filteredEdges = getFilteredAtlasEdges(outVertex, IN, PROCESS_OUTPUTS_EDGE, atlasLineageOnDemandContext, true); - if (!filteredEdges.isEmpty()) + List filteredEdges = getFilteredAtlasEdges(outVertex, IN, PROCESS_OUTPUTS_EDGE, atlasLineageOnDemandContext, false); + if (!filteredEdges.isEmpty()) { outLineageInfo.setHasUpstream(true); + outLineageInfo.setTotalInputRelationsCount(filteredEdges.size()); + } } private List getFilteredAtlasEdges(AtlasVertex outVertex, AtlasEdgeDirection direction, String processEdgeLabel, AtlasLineageOnDemandContext atlasLineageOnDemandContext, boolean hasAnyCheck) { From 2f633b97b9333b7327441c6198aa1d8b9e359695 Mon Sep 17 00:00:00 2001 From: Rajat Movaliya Date: Mon, 24 Jun 2024 18:41:56 +0530 Subject: [PATCH 2/2] revert vertical pagination count logic --- .../atlas/discovery/EntityLineageService.java | 21 ++----------------- 1 file changed, 2 insertions(+), 19 deletions(-) diff --git a/repository/src/main/java/org/apache/atlas/discovery/EntityLineageService.java b/repository/src/main/java/org/apache/atlas/discovery/EntityLineageService.java index db51ef407e..f3fc72e4e7 100644 --- a/repository/src/main/java/org/apache/atlas/discovery/EntityLineageService.java +++ b/repository/src/main/java/org/apache/atlas/discovery/EntityLineageService.java @@ -592,7 +592,7 @@ private boolean incrementAndCheckIfRelationsLimitReached(AtlasEdge atlasEdge, bo setHorizontalPaginationFlags(isInput, atlasLineageOnDemandContext, ret, depth, entitiesTraversed, inVertex, inGuid, outVertex, outGuid, inLineageInfo, outLineageInfo); - boolean hasRelationsLimitReached = setVerticalPaginationFlags(depth, atlasLineageOnDemandContext, isInput, entitiesTraversed, inLineageInfo, outLineageInfo, inVertex, outVertex); + boolean hasRelationsLimitReached = setVerticalPaginationFlags(entitiesTraversed, inLineageInfo, outLineageInfo); if (!hasRelationsLimitReached) { ret.getRelationsOnDemand().put(inGuid, inLineageInfo); ret.getRelationsOnDemand().put(outGuid, outLineageInfo); @@ -602,29 +602,12 @@ private boolean incrementAndCheckIfRelationsLimitReached(AtlasEdge atlasEdge, bo return hasRelationsLimitReached; } - private boolean setVerticalPaginationFlags(int depth, AtlasLineageOnDemandContext atlasLineageOnDemandContext, boolean isInput, AtomicInteger entitiesTraversed, LineageInfoOnDemand inLineageInfo, LineageInfoOnDemand outLineageInfo, AtlasVertex inVertex, AtlasVertex outVertex) { + private boolean setVerticalPaginationFlags(AtomicInteger entitiesTraversed, LineageInfoOnDemand inLineageInfo, LineageInfoOnDemand outLineageInfo) { boolean hasRelationsLimitReached = false; if (inLineageInfo.isInputRelationsReachedLimit() || outLineageInfo.isOutputRelationsReachedLimit() || isEntityTraversalLimitReached(entitiesTraversed)) { inLineageInfo.setHasMoreInputs(true); outLineageInfo.setHasMoreOutputs(true); hasRelationsLimitReached = true; - - if (depth >= 0) { - boolean inIsProcess = Objects.equals(AtlasGraphUtilsV2.getTypeName(inVertex), PROCESS_SUPER_TYPE); - AtlasEdgeDirection inDirection = inIsProcess ? OUT: IN; // Process Node edges always points outwards - AtlasEdgeDirection outDirection = !inIsProcess ? OUT: IN; // Data Node edges always points inwards - String edgeLabel = isInput ? PROCESS_INPUTS_EDGE : PROCESS_OUTPUTS_EDGE; - - if (outLineageInfo.getTotalOutputRelationsCount() == 0) { - List outFilteredEdges = getFilteredAtlasEdges(outVertex, outDirection, edgeLabel, atlasLineageOnDemandContext, false); - outLineageInfo.setTotalOutputRelationsCount(outFilteredEdges.size()); - } - - if (inLineageInfo.getTotalInputRelationsCount() == 0) { - List inFilteredEdges = getFilteredAtlasEdges(inVertex, inDirection, edgeLabel, atlasLineageOnDemandContext, false); - inLineageInfo.setTotalInputRelationsCount(inFilteredEdges.size()); - } - } } if (!hasRelationsLimitReached) {