diff --git a/repository/src/main/java/org/apache/atlas/discovery/EntityLineageService.java b/repository/src/main/java/org/apache/atlas/discovery/EntityLineageService.java index 5cfd10491f..fe788235fe 100644 --- a/repository/src/main/java/org/apache/atlas/discovery/EntityLineageService.java +++ b/repository/src/main/java/org/apache/atlas/discovery/EntityLineageService.java @@ -518,7 +518,7 @@ private void traverseEdgesUsingBFS(String baseGuid, AtlasLineageListContext line if(lineageListContext.getImmediateNeighbours()){ // update parents for each entity - updateParentNodesForEachEntity(lineageListContext, ret, lineageParentsForEntityMap, lineageChildrenForEntityMap); + updateNeighbourNodesForEachEntity(lineageListContext, ret, lineageParentsForEntityMap, lineageChildrenForEntityMap); } if (currentDepth > lineageListContext.getDepth()) @@ -540,7 +540,6 @@ private void enqueueNeighbours(AtlasVertex currentVertex, boolean isDataset, edges = currentVertex.getEdges(OUT, isInputDirection(lineageListContext) ? PROCESS_INPUTS_EDGE : PROCESS_OUTPUTS_EDGE).iterator(); RequestContext.get().endMetricRecord(traverseEdgesOnDemandGetEdges); - List neighbors = new ArrayList<>(); while (edges.hasNext()) { AtlasEdge currentEdge = edges.next(); if (!lineageListContext.evaluateTraversalFilter(currentEdge)) @@ -572,82 +571,56 @@ private void enqueueNeighbours(AtlasVertex currentVertex, boolean isDataset, } } - private void updateParentNodesForEachEntity(AtlasLineageListContext lineageListContext, AtlasLineageListInfo ret, Map> lineageParentsForEntityMap, Map> lineageChildrenForEntityMap){ + private void updateNeighbourNodesForEachEntity(AtlasLineageListContext lineageListContext, AtlasLineageListInfo ret, + Map> lineageParentsForEntityMap, + Map> lineageChildrenForEntityMap) { + AtlasPerfMetrics.MetricRecorder metric = RequestContext.get().startMetricRecord("updateNeighbourNodesForEachEntity"); List entityList = ret.getEntities(); - if (entityList != null){ - for (AtlasEntityHeader entity : entityList) { - if (entity != null && entity.getGuid() != null) { - // Check if the entity GUID exists in the lineageParentsForEntityMap - if (lineageParentsForEntityMap.containsKey(entity.getGuid())) { - // Get the list of AtlasVertex from the map - List parentNodes = lineageParentsForEntityMap.get(entity.getGuid()); - if (parentNodes != null) { - Set seenGuids = new HashSet<>(); - List> parentNodesOfParentWithDetails = new ArrayList<>(); - for (String parentNode : parentNodes) { - if(lineageParentsForEntityMap.containsKey(parentNode)){ - List parentsOfParentNodes = lineageParentsForEntityMap.get(parentNode); - if (parentsOfParentNodes != null){ - for (String parentOfParent : parentsOfParentNodes) { - AtlasVertex vertex = AtlasGraphUtilsV2.findByGuid(this.graph, parentOfParent); - if (vertex != null) { - Map details = fetchAttributes(vertex, FETCH_ENTITY_ATTRIBUTES); - // Check if the guid is already in the set - if (!seenGuids.contains(parentOfParent)) { - parentNodesOfParentWithDetails.add(details); - seenGuids.add(parentOfParent); // Add the guid to the set - } - } - } - } - } - } + if (entityList == null) return; - if(isInputDirection(lineageListContext)){ - entity.setImmediateDownstream(parentNodesOfParentWithDetails); - } - else{ - entity.setImmediateUpstream(parentNodesOfParentWithDetails); - } - } - } + for (AtlasEntityHeader entity : entityList) { + if (entity == null || entity.getGuid() == null) continue; - if (lineageChildrenForEntityMap.containsKey(entity.getGuid())) { - // Get the list of AtlasVertex from the map - List childrenNodes = lineageChildrenForEntityMap.get(entity.getGuid()); - if (childrenNodes != null) { - Set seenGuids = new HashSet<>(); - List> childrenNodesOfChildrenWithDetails = new ArrayList<>(); - for (String childNode : childrenNodes) { - if(lineageChildrenForEntityMap.containsKey(childNode)){ - // Add all children for the current childNode - List childrenOfChildNode = lineageChildrenForEntityMap.get(childNode); - if (childrenOfChildNode != null){ - for (String childOfChild : childrenOfChildNode) { - AtlasVertex vertex = AtlasGraphUtilsV2.findByGuid(this.graph, childOfChild); - if (vertex != null) { - Map details = fetchAttributes(vertex, FETCH_ENTITY_ATTRIBUTES); - if (!seenGuids.contains(childOfChild)) { - childrenNodesOfChildrenWithDetails.add(details); - seenGuids.add(childOfChild); // Add the guid to the set - } - } - } - } - } - } + updateLineageForEntity(entity, lineageParentsForEntityMap, true, lineageListContext); + updateLineageForEntity(entity, lineageChildrenForEntityMap, false, lineageListContext); + } + RequestContext.get().endMetricRecord(metric); + } - if(isInputDirection(lineageListContext)){ - entity.setImmediateUpstream(childrenNodesOfChildrenWithDetails); - } - else{ - entity.setImmediateDownstream(childrenNodesOfChildrenWithDetails); - } - } - } + private void updateLineageForEntity(AtlasEntityHeader entity, Map> lineageMap, + boolean isParentMap, AtlasLineageListContext lineageListContext) { + List relatedProcessNodes = lineageMap.get(entity.getGuid()); + if (relatedProcessNodes == null) return; + + Set seenGuids = new HashSet<>(); + List> relatedDatasetNodes = new ArrayList<>(); + + for (String node : relatedProcessNodes) { + List subNodes = lineageMap.get(node); + if (subNodes == null) continue; + + for (String subNode : subNodes) { + AtlasVertex vertex = AtlasGraphUtilsV2.findByGuid(this.graph, subNode); + if (vertex != null && seenGuids.add(subNode)) { + Map details = fetchAttributes(vertex, FETCH_ENTITY_ATTRIBUTES); + relatedDatasetNodes.add(details); } } } + + if (isParentMap) { + if (isInputDirection(lineageListContext)) { + entity.setImmediateDownstream(relatedDatasetNodes); + } else { + entity.setImmediateUpstream(relatedDatasetNodes); + } + } else { + if (isInputDirection(lineageListContext)) { + entity.setImmediateUpstream(relatedDatasetNodes); + } else { + entity.setImmediateDownstream(relatedDatasetNodes); + } + } } private void appendToResult(AtlasVertex currentVertex, AtlasLineageListContext lineageListContext,