Skip to content

Commit

Permalink
[fix] resturcutre code
Browse files Browse the repository at this point in the history
  • Loading branch information
akshaysw committed Aug 23, 2024
1 parent 50c2e13 commit 5f5d092
Showing 1 changed file with 42 additions and 70 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -555,7 +555,7 @@ private void traverseEdgesUsingBFS(String baseGuid, AtlasLineageListContext line

if(lineageListContext.getImmediateNeighbours()){
// update parents for each entity
updateParentNodesForEachEntity(lineageListContext, ret, lineageParentsForEntityMap, lineageChildrenForEntityMap);
updateNeighbourNodesForEachEntity(lineageListContext, ret, lineageParentsForEntityMap, lineageChildrenForEntityMap);
}

if (currentDepth > lineageListContext.getDepth())
Expand Down Expand Up @@ -615,82 +615,54 @@ private void enqueueNeighbours(AtlasVertex currentVertex, HashMap<String, Boolea
}
}

private void updateParentNodesForEachEntity(AtlasLineageListContext lineageListContext, AtlasLineageListInfo ret, Map<String, List<String>> lineageParentsForEntityMap, Map<String, List<String>> lineageChildrenForEntityMap){
private void updateNeighbourNodesForEachEntity(AtlasLineageListContext lineageListContext, AtlasLineageListInfo ret,
Map<String, List<String>> lineageParentsForEntityMap,
Map<String, List<String>> lineageChildrenForEntityMap) {
List<AtlasEntityHeader> entityList = ret.getEntities();
if (entityList != null){
for (AtlasEntityHeader entity : entityList) {
if (entity != null && entity.getGuid() != null) {
// Check if the entity GUID exists in the lineageParentsForEntityMap
if (lineageParentsForEntityMap.containsKey(entity.getGuid())) {
// Get the list of AtlasVertex from the map
List<String> parentNodes = lineageParentsForEntityMap.get(entity.getGuid());
if (parentNodes != null) {
Set<String> seenGuids = new HashSet<>();
List<Map<String,String>> parentNodesOfParentWithDetails = new ArrayList<>();
for (String parentNode : parentNodes) {
if(lineageParentsForEntityMap.containsKey(parentNode)){
List<String> parentsOfParentNodes = lineageParentsForEntityMap.get(parentNode);
if (parentsOfParentNodes != null){
for (String parentOfParent : parentsOfParentNodes) {
AtlasVertex vertex = AtlasGraphUtilsV2.findByGuid(this.graph, parentOfParent);
if (vertex != null) {
// Check if the guid is already in the set
if (!seenGuids.contains(parentOfParent)) {
Map<String, String> details = fetchAttributes(vertex, FETCH_ENTITY_ATTRIBUTES);
parentNodesOfParentWithDetails.add(details);
seenGuids.add(parentOfParent); // Add the guid to the set
}
}
}
}
}
}
if (entityList == null) return;

if(isInputDirection(lineageListContext)){
entity.setImmediateDownstream(parentNodesOfParentWithDetails);
}
else{
entity.setImmediateUpstream(parentNodesOfParentWithDetails);
}
}
}
for (AtlasEntityHeader entity : entityList) {
if (entity == null || entity.getGuid() == null) continue;

if (lineageChildrenForEntityMap.containsKey(entity.getGuid())) {
// Get the list of AtlasVertex from the map
List<String> childrenNodes = lineageChildrenForEntityMap.get(entity.getGuid());
if (childrenNodes != null) {
Set<String> seenGuids = new HashSet<>();
List<Map<String,String>> childrenNodesOfChildrenWithDetails = new ArrayList<>();
for (String childNode : childrenNodes) {
if(lineageChildrenForEntityMap.containsKey(childNode)){
// Add all children for the current childNode
List<String> childrenOfChildNode = lineageChildrenForEntityMap.get(childNode);
if (childrenOfChildNode != null){
for (String childOfChild : childrenOfChildNode) {
AtlasVertex vertex = AtlasGraphUtilsV2.findByGuid(this.graph, childOfChild);
if (vertex != null) {
if (!seenGuids.contains(childOfChild)) {
Map<String, String> details = fetchAttributes(vertex, FETCH_ENTITY_ATTRIBUTES);
childrenNodesOfChildrenWithDetails.add(details);
seenGuids.add(childOfChild); // Add the guid to the set
}
}
}
}
}
}
updateLineageForEntity(entity, lineageParentsForEntityMap, true, lineageListContext);
updateLineageForEntity(entity, lineageChildrenForEntityMap, false, lineageListContext);
}
}

if(isInputDirection(lineageListContext)){
entity.setImmediateUpstream(childrenNodesOfChildrenWithDetails);
}
else{
entity.setImmediateDownstream(childrenNodesOfChildrenWithDetails);
}
}
}
private void updateLineageForEntity(AtlasEntityHeader entity, Map<String, List<String>> lineageMap,
boolean isParentMap, AtlasLineageListContext lineageListContext) {
List<String> relatedNodes = lineageMap.get(entity.getGuid());
if (relatedNodes == null) return;

Set<String> seenGuids = new HashSet<>();
List<Map<String, String>> relatedNodesWithDetails = new ArrayList<>();

for (String node : relatedNodes) {
List<String> subNodes = lineageMap.get(node);
if (subNodes == null) continue;

for (String subNode : subNodes) {
AtlasVertex vertex = AtlasGraphUtilsV2.findByGuid(this.graph, subNode);
if (vertex != null && seenGuids.add(subNode)) {
Map<String, String> details = fetchAttributes(vertex, FETCH_ENTITY_ATTRIBUTES);
relatedNodesWithDetails.add(details);
}
}
}

if (isParentMap) {
if (isInputDirection(lineageListContext)) {
entity.setImmediateDownstream(relatedNodesWithDetails);
} else {
entity.setImmediateUpstream(relatedNodesWithDetails);
}
} else {
if (isInputDirection(lineageListContext)) {
entity.setImmediateUpstream(relatedNodesWithDetails);
} else {
entity.setImmediateDownstream(relatedNodesWithDetails);
}
}
}

private void appendToResult(AtlasVertex currentVertex, AtlasLineageListContext lineageListContext,
Expand Down

0 comments on commit 5f5d092

Please sign in to comment.