From 61c9945d3b63b9ebdfc516ef53070d75316c59fe Mon Sep 17 00:00:00 2001 From: akshaysw Date: Fri, 23 Aug 2024 08:58:00 +0530 Subject: [PATCH 1/3] [fix] optimise code --- .../java/org/apache/atlas/discovery/EntityLineageService.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/repository/src/main/java/org/apache/atlas/discovery/EntityLineageService.java b/repository/src/main/java/org/apache/atlas/discovery/EntityLineageService.java index 5cfd10491f..801f3d85f3 100644 --- a/repository/src/main/java/org/apache/atlas/discovery/EntityLineageService.java +++ b/repository/src/main/java/org/apache/atlas/discovery/EntityLineageService.java @@ -591,9 +591,9 @@ private void updateParentNodesForEachEntity(AtlasLineageListContext lineageListC for (String parentOfParent : parentsOfParentNodes) { AtlasVertex vertex = AtlasGraphUtilsV2.findByGuid(this.graph, parentOfParent); if (vertex != null) { - Map details = fetchAttributes(vertex, FETCH_ENTITY_ATTRIBUTES); // Check if the guid is already in the set if (!seenGuids.contains(parentOfParent)) { + Map details = fetchAttributes(vertex, FETCH_ENTITY_ATTRIBUTES); parentNodesOfParentWithDetails.add(details); seenGuids.add(parentOfParent); // Add the guid to the set } @@ -626,8 +626,8 @@ private void updateParentNodesForEachEntity(AtlasLineageListContext lineageListC for (String childOfChild : childrenOfChildNode) { AtlasVertex vertex = AtlasGraphUtilsV2.findByGuid(this.graph, childOfChild); if (vertex != null) { - Map details = fetchAttributes(vertex, FETCH_ENTITY_ATTRIBUTES); if (!seenGuids.contains(childOfChild)) { + Map details = fetchAttributes(vertex, FETCH_ENTITY_ATTRIBUTES); childrenNodesOfChildrenWithDetails.add(details); seenGuids.add(childOfChild); // Add the guid to the set } From 76a7ab7cd4534da88e72cf5e493551d01d8cc680 Mon Sep 17 00:00:00 2001 From: akshaysw Date: Fri, 23 Aug 2024 09:07:03 +0530 Subject: [PATCH 2/3] [fix] resturcutre code --- .../atlas/discovery/EntityLineageService.java | 112 +++++++----------- 1 file changed, 42 insertions(+), 70 deletions(-) diff --git a/repository/src/main/java/org/apache/atlas/discovery/EntityLineageService.java b/repository/src/main/java/org/apache/atlas/discovery/EntityLineageService.java index 801f3d85f3..1c7cfd262d 100644 --- a/repository/src/main/java/org/apache/atlas/discovery/EntityLineageService.java +++ b/repository/src/main/java/org/apache/atlas/discovery/EntityLineageService.java @@ -518,7 +518,7 @@ private void traverseEdgesUsingBFS(String baseGuid, AtlasLineageListContext line if(lineageListContext.getImmediateNeighbours()){ // update parents for each entity - updateParentNodesForEachEntity(lineageListContext, ret, lineageParentsForEntityMap, lineageChildrenForEntityMap); + updateNeighbourNodesForEachEntity(lineageListContext, ret, lineageParentsForEntityMap, lineageChildrenForEntityMap); } if (currentDepth > lineageListContext.getDepth()) @@ -572,82 +572,54 @@ private void enqueueNeighbours(AtlasVertex currentVertex, boolean isDataset, } } - private void updateParentNodesForEachEntity(AtlasLineageListContext lineageListContext, AtlasLineageListInfo ret, Map> lineageParentsForEntityMap, Map> lineageChildrenForEntityMap){ + private void updateNeighbourNodesForEachEntity(AtlasLineageListContext lineageListContext, AtlasLineageListInfo ret, + Map> lineageParentsForEntityMap, + Map> lineageChildrenForEntityMap) { List entityList = ret.getEntities(); - if (entityList != null){ - for (AtlasEntityHeader entity : entityList) { - if (entity != null && entity.getGuid() != null) { - // Check if the entity GUID exists in the lineageParentsForEntityMap - if (lineageParentsForEntityMap.containsKey(entity.getGuid())) { - // Get the list of AtlasVertex from the map - List parentNodes = lineageParentsForEntityMap.get(entity.getGuid()); - if (parentNodes != null) { - Set seenGuids = new HashSet<>(); - List> parentNodesOfParentWithDetails = new ArrayList<>(); - for (String parentNode : parentNodes) { - if(lineageParentsForEntityMap.containsKey(parentNode)){ - List parentsOfParentNodes = lineageParentsForEntityMap.get(parentNode); - if (parentsOfParentNodes != null){ - for (String parentOfParent : parentsOfParentNodes) { - AtlasVertex vertex = AtlasGraphUtilsV2.findByGuid(this.graph, parentOfParent); - if (vertex != null) { - // Check if the guid is already in the set - if (!seenGuids.contains(parentOfParent)) { - Map details = fetchAttributes(vertex, FETCH_ENTITY_ATTRIBUTES); - parentNodesOfParentWithDetails.add(details); - seenGuids.add(parentOfParent); // Add the guid to the set - } - } - } - } - } - } + if (entityList == null) return; - if(isInputDirection(lineageListContext)){ - entity.setImmediateDownstream(parentNodesOfParentWithDetails); - } - else{ - entity.setImmediateUpstream(parentNodesOfParentWithDetails); - } - } - } + for (AtlasEntityHeader entity : entityList) { + if (entity == null || entity.getGuid() == null) continue; - if (lineageChildrenForEntityMap.containsKey(entity.getGuid())) { - // Get the list of AtlasVertex from the map - List childrenNodes = lineageChildrenForEntityMap.get(entity.getGuid()); - if (childrenNodes != null) { - Set seenGuids = new HashSet<>(); - List> childrenNodesOfChildrenWithDetails = new ArrayList<>(); - for (String childNode : childrenNodes) { - if(lineageChildrenForEntityMap.containsKey(childNode)){ - // Add all children for the current childNode - List childrenOfChildNode = lineageChildrenForEntityMap.get(childNode); - if (childrenOfChildNode != null){ - for (String childOfChild : childrenOfChildNode) { - AtlasVertex vertex = AtlasGraphUtilsV2.findByGuid(this.graph, childOfChild); - if (vertex != null) { - if (!seenGuids.contains(childOfChild)) { - Map details = fetchAttributes(vertex, FETCH_ENTITY_ATTRIBUTES); - childrenNodesOfChildrenWithDetails.add(details); - seenGuids.add(childOfChild); // Add the guid to the set - } - } - } - } - } - } + updateLineageForEntity(entity, lineageParentsForEntityMap, true, lineageListContext); + updateLineageForEntity(entity, lineageChildrenForEntityMap, false, lineageListContext); + } + } - if(isInputDirection(lineageListContext)){ - entity.setImmediateUpstream(childrenNodesOfChildrenWithDetails); - } - else{ - entity.setImmediateDownstream(childrenNodesOfChildrenWithDetails); - } - } - } + private void updateLineageForEntity(AtlasEntityHeader entity, Map> lineageMap, + boolean isParentMap, AtlasLineageListContext lineageListContext) { + List relatedNodes = lineageMap.get(entity.getGuid()); + if (relatedNodes == null) return; + + Set seenGuids = new HashSet<>(); + List> relatedNodesWithDetails = new ArrayList<>(); + + for (String node : relatedNodes) { + List subNodes = lineageMap.get(node); + if (subNodes == null) continue; + + for (String subNode : subNodes) { + AtlasVertex vertex = AtlasGraphUtilsV2.findByGuid(this.graph, subNode); + if (vertex != null && seenGuids.add(subNode)) { + Map details = fetchAttributes(vertex, FETCH_ENTITY_ATTRIBUTES); + relatedNodesWithDetails.add(details); } } } + + if (isParentMap) { + if (isInputDirection(lineageListContext)) { + entity.setImmediateDownstream(relatedNodesWithDetails); + } else { + entity.setImmediateUpstream(relatedNodesWithDetails); + } + } else { + if (isInputDirection(lineageListContext)) { + entity.setImmediateUpstream(relatedNodesWithDetails); + } else { + entity.setImmediateDownstream(relatedNodesWithDetails); + } + } } private void appendToResult(AtlasVertex currentVertex, AtlasLineageListContext lineageListContext, From 68df2b70379e64cc78888c5907024ad8f19fe099 Mon Sep 17 00:00:00 2001 From: akshaysw Date: Tue, 27 Aug 2024 11:58:16 +0530 Subject: [PATCH 3/3] [fix] add perf metric and rename --- .../atlas/discovery/EntityLineageService.java | 21 ++++++++++--------- 1 file changed, 11 insertions(+), 10 deletions(-) diff --git a/repository/src/main/java/org/apache/atlas/discovery/EntityLineageService.java b/repository/src/main/java/org/apache/atlas/discovery/EntityLineageService.java index 1c7cfd262d..fe788235fe 100644 --- a/repository/src/main/java/org/apache/atlas/discovery/EntityLineageService.java +++ b/repository/src/main/java/org/apache/atlas/discovery/EntityLineageService.java @@ -540,7 +540,6 @@ private void enqueueNeighbours(AtlasVertex currentVertex, boolean isDataset, edges = currentVertex.getEdges(OUT, isInputDirection(lineageListContext) ? PROCESS_INPUTS_EDGE : PROCESS_OUTPUTS_EDGE).iterator(); RequestContext.get().endMetricRecord(traverseEdgesOnDemandGetEdges); - List neighbors = new ArrayList<>(); while (edges.hasNext()) { AtlasEdge currentEdge = edges.next(); if (!lineageListContext.evaluateTraversalFilter(currentEdge)) @@ -575,6 +574,7 @@ private void enqueueNeighbours(AtlasVertex currentVertex, boolean isDataset, private void updateNeighbourNodesForEachEntity(AtlasLineageListContext lineageListContext, AtlasLineageListInfo ret, Map> lineageParentsForEntityMap, Map> lineageChildrenForEntityMap) { + AtlasPerfMetrics.MetricRecorder metric = RequestContext.get().startMetricRecord("updateNeighbourNodesForEachEntity"); List entityList = ret.getEntities(); if (entityList == null) return; @@ -584,17 +584,18 @@ private void updateNeighbourNodesForEachEntity(AtlasLineageListContext lineageLi updateLineageForEntity(entity, lineageParentsForEntityMap, true, lineageListContext); updateLineageForEntity(entity, lineageChildrenForEntityMap, false, lineageListContext); } + RequestContext.get().endMetricRecord(metric); } private void updateLineageForEntity(AtlasEntityHeader entity, Map> lineageMap, boolean isParentMap, AtlasLineageListContext lineageListContext) { - List relatedNodes = lineageMap.get(entity.getGuid()); - if (relatedNodes == null) return; + List relatedProcessNodes = lineageMap.get(entity.getGuid()); + if (relatedProcessNodes == null) return; Set seenGuids = new HashSet<>(); - List> relatedNodesWithDetails = new ArrayList<>(); + List> relatedDatasetNodes = new ArrayList<>(); - for (String node : relatedNodes) { + for (String node : relatedProcessNodes) { List subNodes = lineageMap.get(node); if (subNodes == null) continue; @@ -602,22 +603,22 @@ private void updateLineageForEntity(AtlasEntityHeader entity, Map details = fetchAttributes(vertex, FETCH_ENTITY_ATTRIBUTES); - relatedNodesWithDetails.add(details); + relatedDatasetNodes.add(details); } } } if (isParentMap) { if (isInputDirection(lineageListContext)) { - entity.setImmediateDownstream(relatedNodesWithDetails); + entity.setImmediateDownstream(relatedDatasetNodes); } else { - entity.setImmediateUpstream(relatedNodesWithDetails); + entity.setImmediateUpstream(relatedDatasetNodes); } } else { if (isInputDirection(lineageListContext)) { - entity.setImmediateUpstream(relatedNodesWithDetails); + entity.setImmediateUpstream(relatedDatasetNodes); } else { - entity.setImmediateDownstream(relatedNodesWithDetails); + entity.setImmediateDownstream(relatedDatasetNodes); } } }