Skip to content

Commit

Permalink
Merge pull request #3408 from atlanhq/LIN-1079-beta
Browse files Browse the repository at this point in the history
[LIN-1079] immediate downstream and upstream assets in impact report #3403
  • Loading branch information
akshaysw authored Aug 21, 2024
2 parents e448510 + 523ca84 commit d8f6410
Show file tree
Hide file tree
Showing 5 changed files with 127 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,9 @@ public class AtlasEntityHeader extends AtlasStruct implements Serializable {

private Map<String, AtlasSearchResult> collapse = null;

private List<String> immediateUpstream; // New field
private List<String> immediateDownstream; // New field


public AtlasEntityHeader() {
this(null, null);
Expand Down Expand Up @@ -346,6 +349,22 @@ public void setMeanings(final List<AtlasTermAssignmentHeader> meanings) {
this.meanings = meanings;
}

public List<String> getImmediateUpstream() {
return immediateUpstream;
}

public void setImmediateUpstream(List<String> immediateUpstream) {
this.immediateUpstream = immediateUpstream;
}

public List<String> getImmediateDownstream() {
return immediateDownstream;
}

public void setImmediateDownstream(List<String> immediateDownstream) {
this.immediateDownstream = immediateDownstream;
}

/**
* REST serialization friendly list.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,15 @@ public class LineageListRequest {
private Set<String> relationAttributes;
private Boolean excludeMeanings;
private Boolean excludeClassifications;
private Boolean immediateNeighbours;

public Boolean getImmediateNeighbours() {
return immediateNeighbours;
}

public void setImmediateNeighbours(Boolean immediateNeighbours) {
this.immediateNeighbours = immediateNeighbours;
}


private String lineageType = "DatasetProcessLineage";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ public final class AtlasLineageListContext {
private int currentEntityCounter;
private boolean depthLimitReached;
private boolean hasMoreUpdated;
private Boolean immediateNeighbours;
private String lineageType = "DatasetProcessLineage";

public AtlasLineageListContext(LineageListRequest lineageListRequest, AtlasTypeRegistry typeRegistry) {
Expand All @@ -37,6 +38,7 @@ public AtlasLineageListContext(LineageListRequest lineageListRequest, AtlasTypeR
this.attributes = lineageListRequest.getAttributes();
this.lineageType = lineageListRequest.getLineageType();
this.relationAttributes = lineageListRequest.getRelationAttributes();
this.immediateNeighbours = lineageListRequest.getImmediateNeighbours();
}

public String getGuid() {
Expand Down Expand Up @@ -200,4 +202,12 @@ public boolean isHasMoreUpdated() {
public void setHasMoreUpdated(boolean hasMoreUpdated) {
this.hasMoreUpdated = hasMoreUpdated;
}

public Boolean getImmediateNeighbours() {
return immediateNeighbours;
}

public void setImmediateNeighbours(Boolean immediateNeighbours) {
this.immediateNeighbours = immediateNeighbours;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -492,16 +492,23 @@ private void traverseEdgesUsingBFS(String baseGuid, AtlasLineageListContext line
Set<String> skippedVertices = new HashSet<>();
Queue<String> traversalQueue = new LinkedList<>();

Map<String, List<String>> parentMapForNeighbours = new HashMap<>(); // New map to track parent nodes
Map<String, List<String>> childrenMapForNeighbours = new HashMap<>(); // New map to track parent nodes


AtlasVertex baseVertex = AtlasGraphUtilsV2.findByGuid(this.graph, baseGuid);
HashMap<String, Boolean> dataTypeMap = validateAndGetEntityTypeMap(baseGuid);

boolean isNotConnecterVertex = lineageType.equals(PRODUCT_ASSET_LINEAGE)
? dataTypeMap.get(IS_DATA_PRODUCT)
: dataTypeMap.get(IS_DATASET);
enqueueNeighbours(baseVertex, dataTypeMap, lineageListContext, traversalQueue, visitedVertices, skippedVertices);
enqueueNeighbours(baseVertex, dataTypeMap, lineageListContext, traversalQueue, visitedVertices, skippedVertices, parentMapForNeighbours, childrenMapForNeighbours);
int currentDepth = 0;
int currentLevel = isNotConnecterVertex? 0: 1;

// Add the current node and its neighbors to the result
appendToResult(baseVertex, lineageListContext, ret, currentLevel);

while (!traversalQueue.isEmpty() && !lineageListContext.isEntityLimitReached() && currentDepth < lineageListContext.getDepth()) {
currentDepth++;

Expand All @@ -521,33 +528,45 @@ private void traverseEdgesUsingBFS(String baseGuid, AtlasLineageListContext line

HashMap<String, Boolean> currentEntityDataTypeMap = validateAndGetEntityTypeMap(currentGUID);
if (!lineageListContext.evaluateVertexFilter(currentVertex)) {
enqueueNeighbours(currentVertex, currentEntityDataTypeMap, lineageListContext, traversalQueue, visitedVertices, skippedVertices);
enqueueNeighbours(currentVertex, currentEntityDataTypeMap, lineageListContext, traversalQueue, visitedVertices, skippedVertices, parentMapForNeighbours, childrenMapForNeighbours);
continue;
}
if (checkOffsetAndSkipEntity(lineageListContext, ret)) {
skippedVertices.add(currentGUID);
enqueueNeighbours(currentVertex, currentEntityDataTypeMap, lineageListContext, traversalQueue, visitedVertices, skippedVertices);
enqueueNeighbours(currentVertex, currentEntityDataTypeMap, lineageListContext, traversalQueue, visitedVertices, skippedVertices, parentMapForNeighbours, childrenMapForNeighbours);
continue;
}

lineageListContext.incrementEntityCount();
// Get the neighbors for the current node
enqueueNeighbours(currentVertex, currentEntityDataTypeMap, lineageListContext, traversalQueue, visitedVertices, skippedVertices, parentMapForNeighbours, childrenMapForNeighbours);

// Add the current node and its neighbors to the result
appendToResult(currentVertex, lineageListContext, ret, currentLevel);
enqueueNeighbours(currentVertex, currentEntityDataTypeMap, lineageListContext, traversalQueue, visitedVertices, skippedVertices);

if (isLastEntityInLastDepth(lineageListContext.getDepth(), currentDepth, entitiesInCurrentDepth, i)) {
ret.setHasMore(false);
lineageListContext.setHasMoreUpdated(true);
}
}
}

if(lineageListContext.getImmediateNeighbours()){
// update parents for each entity
updateParentNodesForEachEntity(lineageListContext, ret, parentMapForNeighbours, childrenMapForNeighbours);
}

if (currentDepth > lineageListContext.getDepth())
lineageListContext.setDepthLimitReached(true);

setPageMetadata(lineageListContext, ret, traversalQueue);
RequestContext.get().endMetricRecord(metricRecorder);
}

private void enqueueNeighbours(AtlasVertex currentVertex, HashMap<String, Boolean> dataTypeMap, AtlasLineageListContext lineageListContext,
Queue<String> traversalQueue, Set<String> visitedVertices, Set<String> skippedVertices) {
private void enqueueNeighbours(AtlasVertex currentVertex, HashMap<String, Boolean> dataTypeMap,
AtlasLineageListContext lineageListContext, Queue<String> traversalQueue,
Set<String> visitedVertices, Set<String> skippedVertices,
Map<String, List<String>> parentMapForNeighbours, Map<String, List<String>> childrenMapForNeighbours) {
AtlasPerfMetrics.MetricRecorder traverseEdgesOnDemandGetEdges = RequestContext.get().startMetricRecord("traverseEdgesOnDemandGetEdges");
Iterator<AtlasEdge> edges;
String lineageInputLabel = RequestContext.get().getLineageInputLabel();
Expand All @@ -562,6 +581,7 @@ private void enqueueNeighbours(AtlasVertex currentVertex, HashMap<String, Boolea
edges = currentVertex.getEdges(OUT, isInputDirection(lineageListContext) ? lineageInputLabel : lineageOutputLabel).iterator();
RequestContext.get().endMetricRecord(traverseEdgesOnDemandGetEdges);

List<String> neighbors = new ArrayList<>();
while (edges.hasNext()) {
AtlasEdge currentEdge = edges.next();
if (!lineageListContext.evaluateTraversalFilter(currentEdge))
Expand All @@ -581,10 +601,68 @@ private void enqueueNeighbours(AtlasVertex currentVertex, HashMap<String, Boolea
traversalQueue.add(vertexGuid);
addEntitiesToCache(neighbourVertex);
}

if(lineageListContext.getImmediateNeighbours()){
String vertexDisplayName = getQalifiedName(neighbourVertex);
parentMapForNeighbours
.computeIfAbsent(vertexDisplayName, k -> new ArrayList<>())
.add(getQalifiedName(currentVertex));
childrenMapForNeighbours
.computeIfAbsent(getQalifiedName(currentVertex), k -> new ArrayList<>())
.add(vertexDisplayName);
}
}
}

private void updateParentNodesForEachEntity(AtlasLineageListContext lineageListContext, AtlasLineageListInfo ret, Map<String, List<String>> parentMapForNeighbours, Map<String, List<String>> childrenMapForNeighbours){
List<AtlasEntityHeader> entityList = ret.getEntities();
for (AtlasEntityHeader entity : entityList) {
// Check if the entity GUID exists in the parentMapForNeighbours
if (parentMapForNeighbours.containsKey(entity.getAttribute(QUALIFIED_NAME))) {
// Get the list of AtlasVertex from the map
List<String> parentNodes = parentMapForNeighbours.get(entity.getAttribute(QUALIFIED_NAME));

List<String> parentNodesOfParent = new ArrayList<>();
for (String parentNode : parentNodes) {
List<String> parentsOfParentNode = parentMapForNeighbours.get(parentNode);
if (parentsOfParentNode != null) {
parentNodesOfParent.addAll(parentsOfParentNode);
}
}

if(isInputDirection(lineageListContext)){
entity.setImmediateDownstream(parentNodesOfParent);
}
else{
entity.setImmediateUpstream(parentNodesOfParent);
}
}

if (childrenMapForNeighbours.containsKey(entity.getAttribute(QUALIFIED_NAME))) {
// Get the list of AtlasVertex from the map
List<String> childrenNodes = childrenMapForNeighbours.get(entity.getAttribute(QUALIFIED_NAME));

List<String> childrenNodesOfChildren = new ArrayList<>();
for (String childNode : childrenNodes) {
// Add all children for the current childNode
List<String> childrenOfChildNode = childrenMapForNeighbours.get(childNode);
if (childrenOfChildNode != null) {
childrenNodesOfChildren.addAll(childrenOfChildNode);
}
}

if(isInputDirection(lineageListContext)){
entity.setImmediateUpstream(childrenNodesOfChildren);
}
else{
entity.setImmediateDownstream(childrenNodesOfChildren);
}
}
}
}

private void appendToResult(AtlasVertex currentVertex, AtlasLineageListContext lineageListContext, AtlasLineageListInfo ret, int currentLevel) throws AtlasBaseException {
private void appendToResult(AtlasVertex currentVertex, AtlasLineageListContext lineageListContext,
AtlasLineageListInfo ret, int currentLevel) throws AtlasBaseException {
AtlasEntityHeader entity = entityRetriever.toAtlasEntityHeader(currentVertex, lineageListContext.getAttributes());
entity.setDepth(currentLevel);
ret.getEntities().add(entity);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1040,6 +1040,10 @@ public static String getGuid(AtlasVertex vertex) {
return ret;
}

public static String getQalifiedName(AtlasVertex vertex) {
return vertex.<String>getProperty(Constants.QUALIFIED_NAME, String.class);
}

public static String getHomeId(AtlasElement element) {
return element.getProperty(Constants.HOME_ID_KEY, String.class);
}
Expand Down

0 comments on commit d8f6410

Please sign in to comment.