Skip to content

Commit

Permalink
Merge pull request #3358 from atlanhq/beta-DG-1720-helper
Browse files Browse the repository at this point in the history
DG-1720 Modified the /lineage/list api for acommadating for dataProduct<>Asset lineage
  • Loading branch information
hr2904 authored Jul 23, 2024
2 parents 1088a2a + 4f2f27d commit 7ebfb1f
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,6 @@ public interface AtlasLineageService {
* @param lineageListRequest lineage list request object
* @return AtlasLineageListInfo
*/
AtlasLineageListInfo getLineageListInfoOnDemand(String entityGuid, LineageListRequest lineageListRequest) throws AtlasBaseException;
AtlasLineageListInfo getLineageListInfoOnDemand(String entityGuid, LineageListRequest lineageListRequest, String lineageType) throws AtlasBaseException;

}
Original file line number Diff line number Diff line change
Expand Up @@ -202,11 +202,11 @@ public AtlasLineageOnDemandInfo getAtlasLineageInfo(String guid, LineageOnDemand

@Override
@GraphTransaction
public AtlasLineageListInfo getLineageListInfoOnDemand(String guid, LineageListRequest lineageListRequest) throws AtlasBaseException {
public AtlasLineageListInfo getLineageListInfoOnDemand(String guid, LineageListRequest lineageListRequest, String lineageType) throws AtlasBaseException {
AtlasPerfMetrics.MetricRecorder metricRecorder = RequestContext.get().startMetricRecord("getLineageListInfoOnDemand");

AtlasLineageListInfo ret = new AtlasLineageListInfo(new ArrayList<>());
traverseEdgesUsingBFS(guid, new AtlasLineageListContext(lineageListRequest, atlasTypeRegistry), ret);
traverseEdgesUsingBFS(guid, new AtlasLineageListContext(lineageListRequest, atlasTypeRegistry), ret, lineageType);
ret.setSearchParameters(lineageListRequest);

RequestContext.get().endMetricRecord(metricRecorder);
Expand Down Expand Up @@ -477,7 +477,7 @@ private static void setEntityLimitReachedFlag(boolean isInput, AtlasLineageOnDem
ret.setDownstreamEntityLimitReached(true);
}

private void traverseEdgesUsingBFS(String baseGuid, AtlasLineageListContext lineageListContext, AtlasLineageListInfo ret) throws AtlasBaseException {
private void traverseEdgesUsingBFS(String baseGuid, AtlasLineageListContext lineageListContext, AtlasLineageListInfo ret, String lineageType) throws AtlasBaseException {
AtlasPerfMetrics.MetricRecorder metricRecorder = RequestContext.get().startMetricRecord("traverseEdgesUsingBFS");

Set<String> visitedVertices = new HashSet<>();
Expand All @@ -486,16 +486,22 @@ private void traverseEdgesUsingBFS(String baseGuid, AtlasLineageListContext line
Queue<String> traversalQueue = new LinkedList<>();

AtlasVertex baseVertex = AtlasGraphUtilsV2.findByGuid(this.graph, baseGuid);
boolean isBaseNodeDataset = validateAndGetEntityTypeMap(baseGuid).get(IS_DATASET);
enqueueNeighbours(baseVertex, isBaseNodeDataset, lineageListContext, traversalQueue, visitedVertices, skippedVertices);
HashMap<String, Boolean> dataTypeMap = validateAndGetEntityTypeMap(baseGuid);
if (StringUtils.isEmpty(lineageType)){
lineageType = DATASET_PROCESS_LINEAGE;
}
boolean isNotConnecterVertex = lineageType.equals(PRODUCT_ASSET_LINEAGE)
? dataTypeMap.get(IS_DATA_PRODUCT)
: dataTypeMap.get(IS_DATASET);
enqueueNeighbours(baseVertex, dataTypeMap, lineageListContext, traversalQueue, visitedVertices, skippedVertices, lineageType);
int currentDepth = 0;
int currentLevel = isBaseNodeDataset? 0: 1;
int currentLevel = isNotConnecterVertex? 0: 1;

while (!traversalQueue.isEmpty() && !lineageListContext.isEntityLimitReached() && currentDepth < lineageListContext.getDepth()) {
currentDepth++;

// update level at every alternate depth
if ((isBaseNodeDataset && currentDepth % 2 != 0) || (!isBaseNodeDataset && currentDepth % 2 == 0))
if ((isNotConnecterVertex && currentDepth % 2 != 0) || (!isNotConnecterVertex && currentDepth % 2 == 0))
currentLevel++;

int entitiesInCurrentDepth = traversalQueue.size();
Expand All @@ -508,20 +514,20 @@ private void traverseEdgesUsingBFS(String baseGuid, AtlasLineageListContext line
if (Objects.isNull(currentVertex))
throw new AtlasBaseException("Found null vertex during lineage graph traversal for guid: " + currentGUID);

boolean isDataset = validateAndGetEntityTypeMap(currentGUID).get(IS_DATASET);
HashMap<String, Boolean> currentEntityDataTypeMap = validateAndGetEntityTypeMap(currentGUID);
if (!lineageListContext.evaluateVertexFilter(currentVertex)) {
enqueueNeighbours(currentVertex, isDataset, lineageListContext, traversalQueue, visitedVertices, skippedVertices);
enqueueNeighbours(currentVertex, currentEntityDataTypeMap, lineageListContext, traversalQueue, visitedVertices, skippedVertices, lineageType);
continue;
}
if (checkOffsetAndSkipEntity(lineageListContext, ret)) {
skippedVertices.add(currentGUID);
enqueueNeighbours(currentVertex, isDataset, lineageListContext, traversalQueue, visitedVertices, skippedVertices);
enqueueNeighbours(currentVertex, currentEntityDataTypeMap, lineageListContext, traversalQueue, visitedVertices, skippedVertices, lineageType);
continue;
}

lineageListContext.incrementEntityCount();
appendToResult(currentVertex, lineageListContext, ret, currentLevel);
enqueueNeighbours(currentVertex, isDataset, lineageListContext, traversalQueue, visitedVertices, skippedVertices);
enqueueNeighbours(currentVertex, currentEntityDataTypeMap, lineageListContext, traversalQueue, visitedVertices, skippedVertices, lineageType);
if (isLastEntityInLastDepth(lineageListContext.getDepth(), currentDepth, entitiesInCurrentDepth, i)) {
ret.setHasMore(false);
lineageListContext.setHasMoreUpdated(true);
Expand All @@ -535,22 +541,26 @@ private void traverseEdgesUsingBFS(String baseGuid, AtlasLineageListContext line
RequestContext.get().endMetricRecord(metricRecorder);
}

private void enqueueNeighbours(AtlasVertex currentVertex, boolean isDataset, AtlasLineageListContext lineageListContext,
Queue<String> traversalQueue, Set<String> visitedVertices, Set<String> skippedVertices) {
private void enqueueNeighbours(AtlasVertex currentVertex, HashMap<String, Boolean> dataTypeMap, AtlasLineageListContext lineageListContext,
Queue<String> traversalQueue, Set<String> visitedVertices, Set<String> skippedVertices,String lineageType) {
AtlasPerfMetrics.MetricRecorder traverseEdgesOnDemandGetEdges = RequestContext.get().startMetricRecord("traverseEdgesOnDemandGetEdges");
Iterator<AtlasEdge> edges;
if (isDataset)
edges = currentVertex.getEdges(IN, isInputDirection(lineageListContext) ? PROCESS_OUTPUTS_EDGE : PROCESS_INPUTS_EDGE).iterator();
boolean isConnecterVertex = lineageType.equals(PRODUCT_ASSET_LINEAGE)
? !dataTypeMap.get(IS_DATA_PRODUCT)
: !dataTypeMap.get(IS_DATASET);
String[] edgeDirectionLabels = LINEAGE_MAP.get(lineageType);
if (!isConnecterVertex)
edges = currentVertex.getEdges(IN, isInputDirection(lineageListContext) ? edgeDirectionLabels[1] : edgeDirectionLabels[0]).iterator();
else
edges = currentVertex.getEdges(OUT, isInputDirection(lineageListContext) ? PROCESS_INPUTS_EDGE : PROCESS_OUTPUTS_EDGE).iterator();
edges = currentVertex.getEdges(OUT, isInputDirection(lineageListContext) ? edgeDirectionLabels[0] : edgeDirectionLabels[1]).iterator();
RequestContext.get().endMetricRecord(traverseEdgesOnDemandGetEdges);

while (edges.hasNext()) {
AtlasEdge currentEdge = edges.next();
if (!lineageListContext.evaluateTraversalFilter(currentEdge))
continue;
AtlasVertex neighbourVertex;
if (isDataset)
if (!isConnecterVertex)
neighbourVertex = currentEdge.getOutVertex();
else
neighbourVertex = currentEdge.getInVertex();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ public AtlasLineageOnDemandInfo getLineageGraph(@PathParam("guid") String guid,@
@Consumes(Servlets.JSON_MEDIA_TYPE)
@Produces(Servlets.JSON_MEDIA_TYPE)
@Timed
public AtlasLineageListInfo getLineageList(LineageListRequest lineageListRequest) throws AtlasBaseException {
public AtlasLineageListInfo getLineageList(@QueryParam("lineageType") String lineageType, LineageListRequest lineageListRequest) throws AtlasBaseException {
lineageListRequestValidator.validate(lineageListRequest);

String guid = lineageListRequest.getGuid();
Expand All @@ -142,7 +142,7 @@ public AtlasLineageListInfo getLineageList(LineageListRequest lineageListRequest
if (AtlasPerfTracer.isPerfTraceEnabled(PERF_LOG))
perf = AtlasPerfTracer.getPerfTracer(PERF_LOG, "LineageREST.getLineageList(" + guid + "," + lineageListRequest + ")");

return atlasLineageService.getLineageListInfoOnDemand(guid, lineageListRequest);
return atlasLineageService.getLineageListInfoOnDemand(guid, lineageListRequest, lineageType);
} finally {
AtlasPerfTracer.log(perf);
}
Expand Down

0 comments on commit 7ebfb1f

Please sign in to comment.