Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

LIN-1079 immediate downstream and upstream assets in impact report #3412

Merged
merged 11 commits into from
Aug 22, 2024
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,9 @@ public class AtlasEntityHeader extends AtlasStruct implements Serializable {

private Map<String, AtlasSearchResult> collapse = null;

private List<Map<String,String>> immediateUpstream; // New field
private List<Map<String,String>> immediateDownstream; // New field


public AtlasEntityHeader() {
this(null, null);
Expand Down Expand Up @@ -346,6 +349,22 @@ public void setMeanings(final List<AtlasTermAssignmentHeader> meanings) {
this.meanings = meanings;
}

public List<Map<String,String>> getImmediateUpstream() {
return immediateUpstream;
}

public void setImmediateUpstream(List<Map<String,String>> immediateUpstream) {
this.immediateUpstream = immediateUpstream;
}

public List<Map<String,String>> getImmediateDownstream() {
return immediateDownstream;
}

public void setImmediateDownstream(List<Map<String,String>> immediateDownstream) {
this.immediateDownstream = immediateDownstream;
}

/**
* REST serialization friendly list.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,15 @@ public class LineageListRequest {
private Set<String> relationAttributes;
private Boolean excludeMeanings;
private Boolean excludeClassifications;
private Boolean immediateNeighbours=false;
akshaysw marked this conversation as resolved.
Show resolved Hide resolved

public Boolean getImmediateNeighbours() {
return immediateNeighbours;
}

public void setImmediateNeighbours(Boolean immediateNeighbours) {
this.immediateNeighbours = immediateNeighbours;
}

public enum LineageDirection {INPUT, OUTPUT}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ public final class AtlasLineageListContext {
private int currentEntityCounter;
private boolean depthLimitReached;
private boolean hasMoreUpdated;
private Boolean immediateNeighbours;

public AtlasLineageListContext(LineageListRequest lineageListRequest, AtlasTypeRegistry typeRegistry) {
this.guid = lineageListRequest.getGuid();
Expand All @@ -35,6 +36,7 @@ public AtlasLineageListContext(LineageListRequest lineageListRequest, AtlasTypeR
this.edgeTraversalPredicate = constructInMemoryPredicate(typeRegistry, lineageListRequest.getRelationshipTraversalFilters());
this.attributes = lineageListRequest.getAttributes();
this.relationAttributes = lineageListRequest.getRelationAttributes();
this.immediateNeighbours = lineageListRequest.getImmediateNeighbours();
}

public String getGuid() {
Expand Down Expand Up @@ -190,4 +192,12 @@ public boolean isHasMoreUpdated() {
public void setHasMoreUpdated(boolean hasMoreUpdated) {
this.hasMoreUpdated = hasMoreUpdated;
}

public Boolean getImmediateNeighbours() {
return immediateNeighbours;
}

public void setImmediateNeighbours(Boolean immediateNeighbours) {
this.immediateNeighbours = immediateNeighbours;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,7 @@
import static org.apache.atlas.AtlasErrorCode.INSTANCE_LINEAGE_QUERY_FAILED;
import static org.apache.atlas.model.instance.AtlasEntity.Status.DELETED;
import static org.apache.atlas.model.lineage.AtlasLineageInfo.LineageDirection.*;
import static org.apache.atlas.repository.Constants.ACTIVE_STATE_VALUE;
import static org.apache.atlas.repository.Constants.RELATIONSHIP_GUID_PROPERTY_KEY;
import static org.apache.atlas.repository.Constants.*;
import static org.apache.atlas.repository.graph.GraphHelper.*;
import static org.apache.atlas.repository.graphdb.AtlasEdgeDirection.IN;
import static org.apache.atlas.repository.graphdb.AtlasEdgeDirection.OUT;
Expand All @@ -97,6 +96,8 @@ public class EntityLineageService implements AtlasLineageService {
private final AtlasTypeRegistry atlasTypeRegistry;
private final VertexEdgeCache vertexEdgeCache;

private static final List<String> FETCH_ENTITY_ATTRIBUTES = Arrays.asList(ATTRIBUTE_NAME_GUID, QUALIFIED_NAME, NAME);

@Inject
EntityLineageService(AtlasTypeRegistry typeRegistry, AtlasGraph atlasGraph, VertexEdgeCache vertexEdgeCache) {
this.graph = atlasGraph;
Expand Down Expand Up @@ -459,12 +460,20 @@ private void traverseEdgesUsingBFS(String baseGuid, AtlasLineageListContext line
Set<String> skippedVertices = new HashSet<>();
Queue<String> traversalQueue = new LinkedList<>();

Map<String, List<String>> lineageParentsForEntityMap = new HashMap<>(); // New map to track parent nodes
Map<String, List<String>> lineageChildrenForEntityMap = new HashMap<>(); // New map to track parent nodes


AtlasVertex baseVertex = AtlasGraphUtilsV2.findByGuid(this.graph, baseGuid);
boolean isBaseNodeDataset = validateEntityTypeAndCheckIfDataSet(baseGuid);
enqueueNeighbours(baseVertex, isBaseNodeDataset, lineageListContext, traversalQueue, visitedVertices, skippedVertices);
// Get the neighbors for the current node
enqueueNeighbours(baseVertex, isBaseNodeDataset, lineageListContext, traversalQueue, visitedVertices, skippedVertices, lineageParentsForEntityMap, lineageChildrenForEntityMap);
int currentDepth = 0;
int currentLevel = isBaseNodeDataset? 0: 1;

// Add the current node and its neighbors to the result
appendToResult(baseVertex, lineageListContext, ret, currentLevel);

while (!traversalQueue.isEmpty() && !lineageListContext.isEntityLimitReached() && currentDepth < lineageListContext.getDepth()) {
currentDepth++;

Expand All @@ -484,33 +493,45 @@ private void traverseEdgesUsingBFS(String baseGuid, AtlasLineageListContext line

boolean isDataset = validateEntityTypeAndCheckIfDataSet(currentGUID);
if (!lineageListContext.evaluateVertexFilter(currentVertex)) {
enqueueNeighbours(currentVertex, isDataset, lineageListContext, traversalQueue, visitedVertices, skippedVertices);
enqueueNeighbours(currentVertex, isDataset, lineageListContext, traversalQueue, visitedVertices, skippedVertices, lineageParentsForEntityMap, lineageChildrenForEntityMap);
continue;
}
if (checkOffsetAndSkipEntity(lineageListContext, ret)) {
skippedVertices.add(currentGUID);
enqueueNeighbours(currentVertex, isDataset, lineageListContext, traversalQueue, visitedVertices, skippedVertices);
enqueueNeighbours(currentVertex, isDataset, lineageListContext, traversalQueue, visitedVertices, skippedVertices, lineageParentsForEntityMap, lineageChildrenForEntityMap);
continue;
}

lineageListContext.incrementEntityCount();
// Get the neighbors for the current node
enqueueNeighbours(currentVertex, isDataset, lineageListContext, traversalQueue, visitedVertices, skippedVertices, lineageParentsForEntityMap, lineageChildrenForEntityMap);

// Add the current node and its neighbors to the result
appendToResult(currentVertex, lineageListContext, ret, currentLevel);
enqueueNeighbours(currentVertex, isDataset, lineageListContext, traversalQueue, visitedVertices, skippedVertices);

if (isLastEntityInLastDepth(lineageListContext.getDepth(), currentDepth, entitiesInCurrentDepth, i)) {
ret.setHasMore(false);
lineageListContext.setHasMoreUpdated(true);
}
}
}

if(lineageListContext.getImmediateNeighbours()){
// update parents for each entity
updateParentNodesForEachEntity(lineageListContext, ret, lineageParentsForEntityMap, lineageChildrenForEntityMap);
}

if (currentDepth > lineageListContext.getDepth())
lineageListContext.setDepthLimitReached(true);

setPageMetadata(lineageListContext, ret, traversalQueue);
RequestContext.get().endMetricRecord(metricRecorder);
}

private void enqueueNeighbours(AtlasVertex currentVertex, boolean isDataset, AtlasLineageListContext lineageListContext,
Queue<String> traversalQueue, Set<String> visitedVertices, Set<String> skippedVertices) {
private void enqueueNeighbours(AtlasVertex currentVertex, boolean isDataset,
AtlasLineageListContext lineageListContext, Queue<String> traversalQueue,
Set<String> visitedVertices, Set<String> skippedVertices,
Map<String, List<String>> lineageParentsForEntityMap, Map<String, List<String>> lineageChildrenForEntityMap) {
AtlasPerfMetrics.MetricRecorder traverseEdgesOnDemandGetEdges = RequestContext.get().startMetricRecord("traverseEdgesOnDemandGetEdges");
Iterator<AtlasEdge> edges;
if (isDataset)
Expand All @@ -519,6 +540,7 @@ private void enqueueNeighbours(AtlasVertex currentVertex, boolean isDataset, Atl
edges = currentVertex.getEdges(OUT, isInputDirection(lineageListContext) ? PROCESS_INPUTS_EDGE : PROCESS_OUTPUTS_EDGE).iterator();
RequestContext.get().endMetricRecord(traverseEdgesOnDemandGetEdges);

List<String> neighbors = new ArrayList<>();
while (edges.hasNext()) {
AtlasEdge currentEdge = edges.next();
if (!lineageListContext.evaluateTraversalFilter(currentEdge))
Expand All @@ -538,10 +560,98 @@ private void enqueueNeighbours(AtlasVertex currentVertex, boolean isDataset, Atl
traversalQueue.add(vertexGuid);
addEntitiesToCache(neighbourVertex);
}

if(lineageListContext.getImmediateNeighbours()){
lineageParentsForEntityMap
.computeIfAbsent(vertexGuid, k -> new ArrayList<>())
.add(getGuid(currentVertex));
lineageChildrenForEntityMap
.computeIfAbsent(getGuid(currentVertex), k -> new ArrayList<>())
.add(vertexGuid);
}
}
}

private void updateParentNodesForEachEntity(AtlasLineageListContext lineageListContext, AtlasLineageListInfo ret, Map<String, List<String>> lineageParentsForEntityMap, Map<String, List<String>> lineageChildrenForEntityMap){
List<AtlasEntityHeader> entityList = ret.getEntities();
if (entityList != null){
for (AtlasEntityHeader entity : entityList) {
if (entity != null && entity.getGuid() != null) {
// Check if the entity GUID exists in the lineageParentsForEntityMap
if (lineageParentsForEntityMap.containsKey(entity.getGuid())) {
// Get the list of AtlasVertex from the map
List<String> parentNodes = lineageParentsForEntityMap.get(entity.getGuid());
if (parentNodes != null) {
Set<String> seenGuids = new HashSet<>();
List<Map<String,String>> parentNodesOfParentWithDetails = new ArrayList<>();
for (String parentNode : parentNodes) {
if(lineageParentsForEntityMap.containsKey(parentNode)){
List<String> parentsOfParentNodes = lineageParentsForEntityMap.get(parentNode);
if (parentsOfParentNodes != null){
for (String parentOfParent : parentsOfParentNodes) {
AtlasVertex vertex = AtlasGraphUtilsV2.findByGuid(this.graph, parentOfParent);
if (vertex != null) {
Map<String, String> details = fetchAttributes(vertex, FETCH_ENTITY_ATTRIBUTES);
// Check if the guid is already in the set
if (!seenGuids.contains(parentOfParent)) {
parentNodesOfParentWithDetails.add(details);
seenGuids.add(parentOfParent); // Add the guid to the set
}
}
}
}
}
}

if(isInputDirection(lineageListContext)){
entity.setImmediateDownstream(parentNodesOfParentWithDetails);
}
else{
entity.setImmediateUpstream(parentNodesOfParentWithDetails);
}
}
}

if (lineageChildrenForEntityMap.containsKey(entity.getGuid())) {
// Get the list of AtlasVertex from the map
List<String> childrenNodes = lineageChildrenForEntityMap.get(entity.getGuid());
if (childrenNodes != null) {
Set<String> seenGuids = new HashSet<>();
List<Map<String,String>> childrenNodesOfChildrenWithDetails = new ArrayList<>();
for (String childNode : childrenNodes) {
if(lineageChildrenForEntityMap.containsKey(childNode)){
// Add all children for the current childNode
List<String> childrenOfChildNode = lineageChildrenForEntityMap.get(childNode);
if (childrenOfChildNode != null){
for (String childOfChild : childrenOfChildNode) {
AtlasVertex vertex = AtlasGraphUtilsV2.findByGuid(this.graph, childOfChild);
if (vertex != null) {
Map<String, String> details = fetchAttributes(vertex, FETCH_ENTITY_ATTRIBUTES);
if (!seenGuids.contains(childOfChild)) {
childrenNodesOfChildrenWithDetails.add(details);
seenGuids.add(childOfChild); // Add the guid to the set
}
}
}
}
}
}

if(isInputDirection(lineageListContext)){
entity.setImmediateUpstream(childrenNodesOfChildrenWithDetails);
}
else{
entity.setImmediateDownstream(childrenNodesOfChildrenWithDetails);
}
}
}
}
}
}
}

private void appendToResult(AtlasVertex currentVertex, AtlasLineageListContext lineageListContext, AtlasLineageListInfo ret, int currentLevel) throws AtlasBaseException {
private void appendToResult(AtlasVertex currentVertex, AtlasLineageListContext lineageListContext,
AtlasLineageListInfo ret, int currentLevel) throws AtlasBaseException {
AtlasEntityHeader entity = entityRetriever.toAtlasEntityHeader(currentVertex, lineageListContext.getAttributes());
entity.setDepth(currentLevel);
ret.getEntities().add(entity);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1025,6 +1025,22 @@ public static String getGuid(AtlasVertex vertex) {
return ret;
}

public static Map<String,String> fetchAttributes(AtlasVertex vertex, List<String> attributes) {
Map<String,String> attributesList = new HashMap<>();

for (String attr: attributes){
if (Objects.equals(attr, ATTRIBUTE_NAME_GUID)){
// always add guid to the list from cache
attributesList.put(ATTRIBUTE_NAME_GUID, getGuid(vertex));
}
else{
attributesList.put(attr, vertex.<String>getProperty(attr, String.class));
}
}
// Return the ArrayList
return attributesList;
}

public static String getHomeId(AtlasElement element) {
return element.getProperty(Constants.HOME_ID_KEY, String.class);
}
Expand Down
Loading