Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DG-1720 Modified the /lineage/list api for acommadating for dataProduct<>Asset lineage #3358

Merged
merged 1 commit into from
Jul 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,6 @@ public interface AtlasLineageService {
* @param lineageListRequest lineage list request object
* @return AtlasLineageListInfo
*/
AtlasLineageListInfo getLineageListInfoOnDemand(String entityGuid, LineageListRequest lineageListRequest) throws AtlasBaseException;
AtlasLineageListInfo getLineageListInfoOnDemand(String entityGuid, LineageListRequest lineageListRequest, String lineageType) throws AtlasBaseException;

}
Original file line number Diff line number Diff line change
Expand Up @@ -202,11 +202,11 @@ public AtlasLineageOnDemandInfo getAtlasLineageInfo(String guid, LineageOnDemand

@Override
@GraphTransaction
public AtlasLineageListInfo getLineageListInfoOnDemand(String guid, LineageListRequest lineageListRequest) throws AtlasBaseException {
public AtlasLineageListInfo getLineageListInfoOnDemand(String guid, LineageListRequest lineageListRequest, String lineageType) throws AtlasBaseException {
AtlasPerfMetrics.MetricRecorder metricRecorder = RequestContext.get().startMetricRecord("getLineageListInfoOnDemand");

AtlasLineageListInfo ret = new AtlasLineageListInfo(new ArrayList<>());
traverseEdgesUsingBFS(guid, new AtlasLineageListContext(lineageListRequest, atlasTypeRegistry), ret);
traverseEdgesUsingBFS(guid, new AtlasLineageListContext(lineageListRequest, atlasTypeRegistry), ret, lineageType);
ret.setSearchParameters(lineageListRequest);

RequestContext.get().endMetricRecord(metricRecorder);
Expand Down Expand Up @@ -477,7 +477,7 @@ private static void setEntityLimitReachedFlag(boolean isInput, AtlasLineageOnDem
ret.setDownstreamEntityLimitReached(true);
}

private void traverseEdgesUsingBFS(String baseGuid, AtlasLineageListContext lineageListContext, AtlasLineageListInfo ret) throws AtlasBaseException {
private void traverseEdgesUsingBFS(String baseGuid, AtlasLineageListContext lineageListContext, AtlasLineageListInfo ret, String lineageType) throws AtlasBaseException {
AtlasPerfMetrics.MetricRecorder metricRecorder = RequestContext.get().startMetricRecord("traverseEdgesUsingBFS");

Set<String> visitedVertices = new HashSet<>();
Expand All @@ -486,16 +486,22 @@ private void traverseEdgesUsingBFS(String baseGuid, AtlasLineageListContext line
Queue<String> traversalQueue = new LinkedList<>();

AtlasVertex baseVertex = AtlasGraphUtilsV2.findByGuid(this.graph, baseGuid);
boolean isBaseNodeDataset = validateAndGetEntityTypeMap(baseGuid).get(IS_DATASET);
enqueueNeighbours(baseVertex, isBaseNodeDataset, lineageListContext, traversalQueue, visitedVertices, skippedVertices);
HashMap<String, Boolean> dataTypeMap = validateAndGetEntityTypeMap(baseGuid);
if (StringUtils.isEmpty(lineageType)){
lineageType = DATASET_PROCESS_LINEAGE;
}
boolean isNotConnecterVertex = lineageType.equals(PRODUCT_ASSET_LINEAGE)
? dataTypeMap.get(IS_DATA_PRODUCT)
: dataTypeMap.get(IS_DATASET);
enqueueNeighbours(baseVertex, dataTypeMap, lineageListContext, traversalQueue, visitedVertices, skippedVertices, lineageType);
int currentDepth = 0;
int currentLevel = isBaseNodeDataset? 0: 1;
int currentLevel = isNotConnecterVertex? 0: 1;

while (!traversalQueue.isEmpty() && !lineageListContext.isEntityLimitReached() && currentDepth < lineageListContext.getDepth()) {
currentDepth++;

// update level at every alternate depth
if ((isBaseNodeDataset && currentDepth % 2 != 0) || (!isBaseNodeDataset && currentDepth % 2 == 0))
if ((isNotConnecterVertex && currentDepth % 2 != 0) || (!isNotConnecterVertex && currentDepth % 2 == 0))
currentLevel++;

int entitiesInCurrentDepth = traversalQueue.size();
Expand All @@ -508,20 +514,20 @@ private void traverseEdgesUsingBFS(String baseGuid, AtlasLineageListContext line
if (Objects.isNull(currentVertex))
throw new AtlasBaseException("Found null vertex during lineage graph traversal for guid: " + currentGUID);

boolean isDataset = validateAndGetEntityTypeMap(currentGUID).get(IS_DATASET);
HashMap<String, Boolean> currentEntityDataTypeMap = validateAndGetEntityTypeMap(currentGUID);
if (!lineageListContext.evaluateVertexFilter(currentVertex)) {
enqueueNeighbours(currentVertex, isDataset, lineageListContext, traversalQueue, visitedVertices, skippedVertices);
enqueueNeighbours(currentVertex, currentEntityDataTypeMap, lineageListContext, traversalQueue, visitedVertices, skippedVertices, lineageType);
continue;
}
if (checkOffsetAndSkipEntity(lineageListContext, ret)) {
skippedVertices.add(currentGUID);
enqueueNeighbours(currentVertex, isDataset, lineageListContext, traversalQueue, visitedVertices, skippedVertices);
enqueueNeighbours(currentVertex, currentEntityDataTypeMap, lineageListContext, traversalQueue, visitedVertices, skippedVertices, lineageType);
continue;
}

lineageListContext.incrementEntityCount();
appendToResult(currentVertex, lineageListContext, ret, currentLevel);
enqueueNeighbours(currentVertex, isDataset, lineageListContext, traversalQueue, visitedVertices, skippedVertices);
enqueueNeighbours(currentVertex, currentEntityDataTypeMap, lineageListContext, traversalQueue, visitedVertices, skippedVertices, lineageType);
if (isLastEntityInLastDepth(lineageListContext.getDepth(), currentDepth, entitiesInCurrentDepth, i)) {
ret.setHasMore(false);
lineageListContext.setHasMoreUpdated(true);
Expand All @@ -535,22 +541,26 @@ private void traverseEdgesUsingBFS(String baseGuid, AtlasLineageListContext line
RequestContext.get().endMetricRecord(metricRecorder);
}

private void enqueueNeighbours(AtlasVertex currentVertex, boolean isDataset, AtlasLineageListContext lineageListContext,
Queue<String> traversalQueue, Set<String> visitedVertices, Set<String> skippedVertices) {
private void enqueueNeighbours(AtlasVertex currentVertex, HashMap<String, Boolean> dataTypeMap, AtlasLineageListContext lineageListContext,
Queue<String> traversalQueue, Set<String> visitedVertices, Set<String> skippedVertices,String lineageType) {
AtlasPerfMetrics.MetricRecorder traverseEdgesOnDemandGetEdges = RequestContext.get().startMetricRecord("traverseEdgesOnDemandGetEdges");
Iterator<AtlasEdge> edges;
if (isDataset)
edges = currentVertex.getEdges(IN, isInputDirection(lineageListContext) ? PROCESS_OUTPUTS_EDGE : PROCESS_INPUTS_EDGE).iterator();
boolean isConnecterVertex = lineageType.equals(PRODUCT_ASSET_LINEAGE)
? !dataTypeMap.get(IS_DATA_PRODUCT)
: !dataTypeMap.get(IS_DATASET);
String[] edgeDirectionLabels = LINEAGE_MAP.get(lineageType);
if (!isConnecterVertex)
edges = currentVertex.getEdges(IN, isInputDirection(lineageListContext) ? edgeDirectionLabels[1] : edgeDirectionLabels[0]).iterator();
else
edges = currentVertex.getEdges(OUT, isInputDirection(lineageListContext) ? PROCESS_INPUTS_EDGE : PROCESS_OUTPUTS_EDGE).iterator();
edges = currentVertex.getEdges(OUT, isInputDirection(lineageListContext) ? edgeDirectionLabels[0] : edgeDirectionLabels[1]).iterator();
RequestContext.get().endMetricRecord(traverseEdgesOnDemandGetEdges);

while (edges.hasNext()) {
AtlasEdge currentEdge = edges.next();
if (!lineageListContext.evaluateTraversalFilter(currentEdge))
continue;
AtlasVertex neighbourVertex;
if (isDataset)
if (!isConnecterVertex)
neighbourVertex = currentEdge.getOutVertex();
else
neighbourVertex = currentEdge.getInVertex();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ public AtlasLineageOnDemandInfo getLineageGraph(@PathParam("guid") String guid,@
@Consumes(Servlets.JSON_MEDIA_TYPE)
@Produces(Servlets.JSON_MEDIA_TYPE)
@Timed
public AtlasLineageListInfo getLineageList(LineageListRequest lineageListRequest) throws AtlasBaseException {
public AtlasLineageListInfo getLineageList(@QueryParam("lineageType") String lineageType, LineageListRequest lineageListRequest) throws AtlasBaseException {
lineageListRequestValidator.validate(lineageListRequest);

String guid = lineageListRequest.getGuid();
Expand All @@ -142,7 +142,7 @@ public AtlasLineageListInfo getLineageList(LineageListRequest lineageListRequest
if (AtlasPerfTracer.isPerfTraceEnabled(PERF_LOG))
perf = AtlasPerfTracer.getPerfTracer(PERF_LOG, "LineageREST.getLineageList(" + guid + "," + lineageListRequest + ")");

return atlasLineageService.getLineageListInfoOnDemand(guid, lineageListRequest);
return atlasLineageService.getLineageListInfoOnDemand(guid, lineageListRequest, lineageType);
} finally {
AtlasPerfTracer.log(perf);
}
Expand Down
Loading