Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

LIN-1079 immediate downstream and upstream assets in impact report #3412

Merged
merged 11 commits into from
Aug 22, 2024
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,9 @@ public class AtlasEntityHeader extends AtlasStruct implements Serializable {

private Map<String, AtlasSearchResult> collapse = null;

private List<Map<String,String>> immediateUpstream; // New field
private List<Map<String,String>> immediateDownstream; // New field


public AtlasEntityHeader() {
this(null, null);
Expand Down Expand Up @@ -346,6 +349,22 @@ public void setMeanings(final List<AtlasTermAssignmentHeader> meanings) {
this.meanings = meanings;
}

public List<Map<String,String>> getImmediateUpstream() {
return immediateUpstream;
}

public void setImmediateUpstream(List<Map<String,String>> immediateUpstream) {
this.immediateUpstream = immediateUpstream;
}

public List<Map<String,String>> getImmediateDownstream() {
return immediateDownstream;
}

public void setImmediateDownstream(List<Map<String,String>> immediateDownstream) {
this.immediateDownstream = immediateDownstream;
}

/**
* REST serialization friendly list.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,15 @@ public class LineageListRequest {
private Set<String> relationAttributes;
private Boolean excludeMeanings;
private Boolean excludeClassifications;
private Boolean immediateNeighbours=false;
akshaysw marked this conversation as resolved.
Show resolved Hide resolved

public Boolean getImmediateNeighbours() {
return immediateNeighbours;
}

public void setImmediateNeighbours(Boolean immediateNeighbours) {
this.immediateNeighbours = immediateNeighbours;
}

public enum LineageDirection {INPUT, OUTPUT}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ public final class AtlasLineageListContext {
private int currentEntityCounter;
private boolean depthLimitReached;
private boolean hasMoreUpdated;
private Boolean immediateNeighbours;

public AtlasLineageListContext(LineageListRequest lineageListRequest, AtlasTypeRegistry typeRegistry) {
this.guid = lineageListRequest.getGuid();
Expand All @@ -35,6 +36,7 @@ public AtlasLineageListContext(LineageListRequest lineageListRequest, AtlasTypeR
this.edgeTraversalPredicate = constructInMemoryPredicate(typeRegistry, lineageListRequest.getRelationshipTraversalFilters());
this.attributes = lineageListRequest.getAttributes();
this.relationAttributes = lineageListRequest.getRelationAttributes();
this.immediateNeighbours = lineageListRequest.getImmediateNeighbours();
}

public String getGuid() {
Expand Down Expand Up @@ -190,4 +192,12 @@ public boolean isHasMoreUpdated() {
public void setHasMoreUpdated(boolean hasMoreUpdated) {
this.hasMoreUpdated = hasMoreUpdated;
}

public Boolean getImmediateNeighbours() {
return immediateNeighbours;
}

public void setImmediateNeighbours(Boolean immediateNeighbours) {
this.immediateNeighbours = immediateNeighbours;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,7 @@
import static org.apache.atlas.AtlasErrorCode.INSTANCE_LINEAGE_QUERY_FAILED;
import static org.apache.atlas.model.instance.AtlasEntity.Status.DELETED;
import static org.apache.atlas.model.lineage.AtlasLineageInfo.LineageDirection.*;
import static org.apache.atlas.repository.Constants.ACTIVE_STATE_VALUE;
import static org.apache.atlas.repository.Constants.RELATIONSHIP_GUID_PROPERTY_KEY;
import static org.apache.atlas.repository.Constants.*;
import static org.apache.atlas.repository.graph.GraphHelper.*;
import static org.apache.atlas.repository.graphdb.AtlasEdgeDirection.IN;
import static org.apache.atlas.repository.graphdb.AtlasEdgeDirection.OUT;
Expand Down Expand Up @@ -459,12 +458,20 @@ private void traverseEdgesUsingBFS(String baseGuid, AtlasLineageListContext line
Set<String> skippedVertices = new HashSet<>();
Queue<String> traversalQueue = new LinkedList<>();

Map<String, List<String>> parentMapForNeighbours = new HashMap<>(); // New map to track parent nodes
akshaysw marked this conversation as resolved.
Show resolved Hide resolved
Map<String, List<String>> childrenMapForNeighbours = new HashMap<>(); // New map to track parent nodes


AtlasVertex baseVertex = AtlasGraphUtilsV2.findByGuid(this.graph, baseGuid);
boolean isBaseNodeDataset = validateEntityTypeAndCheckIfDataSet(baseGuid);
enqueueNeighbours(baseVertex, isBaseNodeDataset, lineageListContext, traversalQueue, visitedVertices, skippedVertices);
// Get the neighbors for the current node
enqueueNeighbours(baseVertex, isBaseNodeDataset, lineageListContext, traversalQueue, visitedVertices, skippedVertices, parentMapForNeighbours, childrenMapForNeighbours);
int currentDepth = 0;
int currentLevel = isBaseNodeDataset? 0: 1;

// Add the current node and its neighbors to the result
appendToResult(baseVertex, lineageListContext, ret, currentLevel);

while (!traversalQueue.isEmpty() && !lineageListContext.isEntityLimitReached() && currentDepth < lineageListContext.getDepth()) {
currentDepth++;

Expand All @@ -484,33 +491,45 @@ private void traverseEdgesUsingBFS(String baseGuid, AtlasLineageListContext line

boolean isDataset = validateEntityTypeAndCheckIfDataSet(currentGUID);
if (!lineageListContext.evaluateVertexFilter(currentVertex)) {
enqueueNeighbours(currentVertex, isDataset, lineageListContext, traversalQueue, visitedVertices, skippedVertices);
enqueueNeighbours(currentVertex, isDataset, lineageListContext, traversalQueue, visitedVertices, skippedVertices, parentMapForNeighbours, childrenMapForNeighbours);
continue;
}
if (checkOffsetAndSkipEntity(lineageListContext, ret)) {
skippedVertices.add(currentGUID);
enqueueNeighbours(currentVertex, isDataset, lineageListContext, traversalQueue, visitedVertices, skippedVertices);
enqueueNeighbours(currentVertex, isDataset, lineageListContext, traversalQueue, visitedVertices, skippedVertices, parentMapForNeighbours, childrenMapForNeighbours);
continue;
}

lineageListContext.incrementEntityCount();
// Get the neighbors for the current node
enqueueNeighbours(currentVertex, isDataset, lineageListContext, traversalQueue, visitedVertices, skippedVertices, parentMapForNeighbours, childrenMapForNeighbours);

// Add the current node and its neighbors to the result
appendToResult(currentVertex, lineageListContext, ret, currentLevel);
enqueueNeighbours(currentVertex, isDataset, lineageListContext, traversalQueue, visitedVertices, skippedVertices);

if (isLastEntityInLastDepth(lineageListContext.getDepth(), currentDepth, entitiesInCurrentDepth, i)) {
ret.setHasMore(false);
lineageListContext.setHasMoreUpdated(true);
}
}
}

if(lineageListContext.getImmediateNeighbours()){
// update parents for each entity
updateParentNodesForEachEntity(lineageListContext, ret, parentMapForNeighbours, childrenMapForNeighbours);
}

if (currentDepth > lineageListContext.getDepth())
lineageListContext.setDepthLimitReached(true);

setPageMetadata(lineageListContext, ret, traversalQueue);
RequestContext.get().endMetricRecord(metricRecorder);
}

private void enqueueNeighbours(AtlasVertex currentVertex, boolean isDataset, AtlasLineageListContext lineageListContext,
Queue<String> traversalQueue, Set<String> visitedVertices, Set<String> skippedVertices) {
private void enqueueNeighbours(AtlasVertex currentVertex, boolean isDataset,
AtlasLineageListContext lineageListContext, Queue<String> traversalQueue,
Set<String> visitedVertices, Set<String> skippedVertices,
Map<String, List<String>> parentMapForNeighbours, Map<String, List<String>> childrenMapForNeighbours) {
AtlasPerfMetrics.MetricRecorder traverseEdgesOnDemandGetEdges = RequestContext.get().startMetricRecord("traverseEdgesOnDemandGetEdges");
Iterator<AtlasEdge> edges;
if (isDataset)
Expand All @@ -519,6 +538,7 @@ private void enqueueNeighbours(AtlasVertex currentVertex, boolean isDataset, Atl
edges = currentVertex.getEdges(OUT, isInputDirection(lineageListContext) ? PROCESS_INPUTS_EDGE : PROCESS_OUTPUTS_EDGE).iterator();
RequestContext.get().endMetricRecord(traverseEdgesOnDemandGetEdges);

List<String> neighbors = new ArrayList<>();
while (edges.hasNext()) {
AtlasEdge currentEdge = edges.next();
if (!lineageListContext.evaluateTraversalFilter(currentEdge))
Expand All @@ -538,10 +558,67 @@ private void enqueueNeighbours(AtlasVertex currentVertex, boolean isDataset, Atl
traversalQueue.add(vertexGuid);
addEntitiesToCache(neighbourVertex);
}

if(lineageListContext.getImmediateNeighbours()){
parentMapForNeighbours
.computeIfAbsent(vertexGuid, k -> new ArrayList<>())
.add(getGuid(currentVertex));
childrenMapForNeighbours
.computeIfAbsent(getGuid(currentVertex), k -> new ArrayList<>())
.add(vertexGuid);
}
}
}

private void updateParentNodesForEachEntity(AtlasLineageListContext lineageListContext, AtlasLineageListInfo ret, Map<String, List<String>> parentMapForNeighbours, Map<String, List<String>> childrenMapForNeighbours){
List<AtlasEntityHeader> entityList = ret.getEntities();
for (AtlasEntityHeader entity : entityList) {
// Check if the entity GUID exists in the parentMapForNeighbours
if (parentMapForNeighbours.containsKey(entity.getGuid())) {
// Get the list of AtlasVertex from the map
List<String> parentNodes = parentMapForNeighbours.get(entity.getGuid());

List<Map<String,String>> parentNodesOfParentWithDetails = new ArrayList<>();
for (String parentNode : parentNodes) {
List<String> parentsOfParentNodes = parentMapForNeighbours.get(parentNode);
for (String parentOfParent : parentsOfParentNodes) {
parentNodesOfParentWithDetails.add(getNodeDetails(AtlasGraphUtilsV2.findByGuid(this.graph, parentOfParent)));
}
}

if(isInputDirection(lineageListContext)){
entity.setImmediateDownstream(parentNodesOfParentWithDetails);
}
else{
entity.setImmediateUpstream(parentNodesOfParentWithDetails);
}
}

if (childrenMapForNeighbours.containsKey(entity.getGuid())) {
// Get the list of AtlasVertex from the map
List<String> childrenNodes = childrenMapForNeighbours.get(entity.getGuid());

List<Map<String,String>> childrenNodesOfChildrenWithDetails = new ArrayList<>();
for (String childNode : childrenNodes) {
// Add all children for the current childNode
List<String> childrenOfChildNode = childrenMapForNeighbours.get(childNode);
for (String childOfChild : childrenOfChildNode) {
childrenNodesOfChildrenWithDetails.add(getNodeDetails(AtlasGraphUtilsV2.findByGuid(this.graph, childOfChild)));
}
}

if(isInputDirection(lineageListContext)){
entity.setImmediateUpstream(childrenNodesOfChildrenWithDetails);
}
else{
entity.setImmediateDownstream(childrenNodesOfChildrenWithDetails);
}
}
}
}

private void appendToResult(AtlasVertex currentVertex, AtlasLineageListContext lineageListContext, AtlasLineageListInfo ret, int currentLevel) throws AtlasBaseException {
private void appendToResult(AtlasVertex currentVertex, AtlasLineageListContext lineageListContext,
AtlasLineageListInfo ret, int currentLevel) throws AtlasBaseException {
AtlasEntityHeader entity = entityRetriever.toAtlasEntityHeader(currentVertex, lineageListContext.getAttributes());
entity.setDepth(currentLevel);
ret.getEntities().add(entity);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1025,6 +1025,20 @@ public static String getGuid(AtlasVertex vertex) {
return ret;
}

public static String getQalifiedName(AtlasVertex vertex) {
akshaysw marked this conversation as resolved.
Show resolved Hide resolved
return vertex.<String>getProperty(Constants.QUALIFIED_NAME, String.class);
}

public static Map<String,String> getNodeDetails(AtlasVertex vertex) {
akshaysw marked this conversation as resolved.
Show resolved Hide resolved
Map<String,String> stringList = new HashMap<>();
akshaysw marked this conversation as resolved.
Show resolved Hide resolved
// Add strings to the list
stringList.put(ATTRIBUTE_NAME_GUID, getGuid(vertex));
stringList.put(QUALIFIED_NAME, vertex.<String>getProperty(QUALIFIED_NAME, String.class));
stringList.put(NAME, vertex.<String>getProperty(NAME, String.class));
// Return the ArrayList
return stringList;
}

public static String getHomeId(AtlasElement element) {
return element.getProperty(Constants.HOME_ID_KEY, String.class);
}
Expand Down
Loading