diff --git a/repository/src/main/java/org/apache/atlas/discovery/AtlasLineageService.java b/repository/src/main/java/org/apache/atlas/discovery/AtlasLineageService.java index 3842a9b10f..967f9ed5d6 100644 --- a/repository/src/main/java/org/apache/atlas/discovery/AtlasLineageService.java +++ b/repository/src/main/java/org/apache/atlas/discovery/AtlasLineageService.java @@ -78,6 +78,6 @@ public interface AtlasLineageService { * @param lineageListRequest lineage list request object * @return AtlasLineageListInfo */ - AtlasLineageListInfo getLineageListInfoOnDemand(String entityGuid, LineageListRequest lineageListRequest) throws AtlasBaseException; + AtlasLineageListInfo getLineageListInfoOnDemand(String entityGuid, LineageListRequest lineageListRequest, String lineageType) throws AtlasBaseException; } diff --git a/repository/src/main/java/org/apache/atlas/discovery/EntityLineageService.java b/repository/src/main/java/org/apache/atlas/discovery/EntityLineageService.java index f09d8ecc50..84431a8e85 100644 --- a/repository/src/main/java/org/apache/atlas/discovery/EntityLineageService.java +++ b/repository/src/main/java/org/apache/atlas/discovery/EntityLineageService.java @@ -202,11 +202,11 @@ public AtlasLineageOnDemandInfo getAtlasLineageInfo(String guid, LineageOnDemand @Override @GraphTransaction - public AtlasLineageListInfo getLineageListInfoOnDemand(String guid, LineageListRequest lineageListRequest) throws AtlasBaseException { + public AtlasLineageListInfo getLineageListInfoOnDemand(String guid, LineageListRequest lineageListRequest, String lineageType) throws AtlasBaseException { AtlasPerfMetrics.MetricRecorder metricRecorder = RequestContext.get().startMetricRecord("getLineageListInfoOnDemand"); AtlasLineageListInfo ret = new AtlasLineageListInfo(new ArrayList<>()); - traverseEdgesUsingBFS(guid, new AtlasLineageListContext(lineageListRequest, atlasTypeRegistry), ret); + traverseEdgesUsingBFS(guid, new AtlasLineageListContext(lineageListRequest, atlasTypeRegistry), ret, lineageType); ret.setSearchParameters(lineageListRequest); RequestContext.get().endMetricRecord(metricRecorder); @@ -477,7 +477,7 @@ private static void setEntityLimitReachedFlag(boolean isInput, AtlasLineageOnDem ret.setDownstreamEntityLimitReached(true); } - private void traverseEdgesUsingBFS(String baseGuid, AtlasLineageListContext lineageListContext, AtlasLineageListInfo ret) throws AtlasBaseException { + private void traverseEdgesUsingBFS(String baseGuid, AtlasLineageListContext lineageListContext, AtlasLineageListInfo ret, String lineageType) throws AtlasBaseException { AtlasPerfMetrics.MetricRecorder metricRecorder = RequestContext.get().startMetricRecord("traverseEdgesUsingBFS"); Set visitedVertices = new HashSet<>(); @@ -486,16 +486,22 @@ private void traverseEdgesUsingBFS(String baseGuid, AtlasLineageListContext line Queue traversalQueue = new LinkedList<>(); AtlasVertex baseVertex = AtlasGraphUtilsV2.findByGuid(this.graph, baseGuid); - boolean isBaseNodeDataset = validateAndGetEntityTypeMap(baseGuid).get(IS_DATASET); - enqueueNeighbours(baseVertex, isBaseNodeDataset, lineageListContext, traversalQueue, visitedVertices, skippedVertices); + HashMap dataTypeMap = validateAndGetEntityTypeMap(baseGuid); + if (StringUtils.isEmpty(lineageType)){ + lineageType = DATASET_PROCESS_LINEAGE; + } + boolean isNotConnecterVertex = lineageType.equals(PRODUCT_ASSET_LINEAGE) + ? dataTypeMap.get(IS_DATA_PRODUCT) + : dataTypeMap.get(IS_DATASET); + enqueueNeighbours(baseVertex, dataTypeMap, lineageListContext, traversalQueue, visitedVertices, skippedVertices, lineageType); int currentDepth = 0; - int currentLevel = isBaseNodeDataset? 0: 1; + int currentLevel = isNotConnecterVertex? 0: 1; while (!traversalQueue.isEmpty() && !lineageListContext.isEntityLimitReached() && currentDepth < lineageListContext.getDepth()) { currentDepth++; // update level at every alternate depth - if ((isBaseNodeDataset && currentDepth % 2 != 0) || (!isBaseNodeDataset && currentDepth % 2 == 0)) + if ((isNotConnecterVertex && currentDepth % 2 != 0) || (!isNotConnecterVertex && currentDepth % 2 == 0)) currentLevel++; int entitiesInCurrentDepth = traversalQueue.size(); @@ -508,20 +514,20 @@ private void traverseEdgesUsingBFS(String baseGuid, AtlasLineageListContext line if (Objects.isNull(currentVertex)) throw new AtlasBaseException("Found null vertex during lineage graph traversal for guid: " + currentGUID); - boolean isDataset = validateAndGetEntityTypeMap(currentGUID).get(IS_DATASET); + HashMap currentEntityDataTypeMap = validateAndGetEntityTypeMap(currentGUID); if (!lineageListContext.evaluateVertexFilter(currentVertex)) { - enqueueNeighbours(currentVertex, isDataset, lineageListContext, traversalQueue, visitedVertices, skippedVertices); + enqueueNeighbours(currentVertex, currentEntityDataTypeMap, lineageListContext, traversalQueue, visitedVertices, skippedVertices, lineageType); continue; } if (checkOffsetAndSkipEntity(lineageListContext, ret)) { skippedVertices.add(currentGUID); - enqueueNeighbours(currentVertex, isDataset, lineageListContext, traversalQueue, visitedVertices, skippedVertices); + enqueueNeighbours(currentVertex, currentEntityDataTypeMap, lineageListContext, traversalQueue, visitedVertices, skippedVertices, lineageType); continue; } lineageListContext.incrementEntityCount(); appendToResult(currentVertex, lineageListContext, ret, currentLevel); - enqueueNeighbours(currentVertex, isDataset, lineageListContext, traversalQueue, visitedVertices, skippedVertices); + enqueueNeighbours(currentVertex, currentEntityDataTypeMap, lineageListContext, traversalQueue, visitedVertices, skippedVertices, lineageType); if (isLastEntityInLastDepth(lineageListContext.getDepth(), currentDepth, entitiesInCurrentDepth, i)) { ret.setHasMore(false); lineageListContext.setHasMoreUpdated(true); @@ -535,14 +541,18 @@ private void traverseEdgesUsingBFS(String baseGuid, AtlasLineageListContext line RequestContext.get().endMetricRecord(metricRecorder); } - private void enqueueNeighbours(AtlasVertex currentVertex, boolean isDataset, AtlasLineageListContext lineageListContext, - Queue traversalQueue, Set visitedVertices, Set skippedVertices) { + private void enqueueNeighbours(AtlasVertex currentVertex, HashMap dataTypeMap, AtlasLineageListContext lineageListContext, + Queue traversalQueue, Set visitedVertices, Set skippedVertices,String lineageType) { AtlasPerfMetrics.MetricRecorder traverseEdgesOnDemandGetEdges = RequestContext.get().startMetricRecord("traverseEdgesOnDemandGetEdges"); Iterator edges; - if (isDataset) - edges = currentVertex.getEdges(IN, isInputDirection(lineageListContext) ? PROCESS_OUTPUTS_EDGE : PROCESS_INPUTS_EDGE).iterator(); + boolean isConnecterVertex = lineageType.equals(PRODUCT_ASSET_LINEAGE) + ? !dataTypeMap.get(IS_DATA_PRODUCT) + : !dataTypeMap.get(IS_DATASET); + String[] edgeDirectionLabels = LINEAGE_MAP.get(lineageType); + if (!isConnecterVertex) + edges = currentVertex.getEdges(IN, isInputDirection(lineageListContext) ? edgeDirectionLabels[1] : edgeDirectionLabels[0]).iterator(); else - edges = currentVertex.getEdges(OUT, isInputDirection(lineageListContext) ? PROCESS_INPUTS_EDGE : PROCESS_OUTPUTS_EDGE).iterator(); + edges = currentVertex.getEdges(OUT, isInputDirection(lineageListContext) ? edgeDirectionLabels[0] : edgeDirectionLabels[1]).iterator(); RequestContext.get().endMetricRecord(traverseEdgesOnDemandGetEdges); while (edges.hasNext()) { @@ -550,7 +560,7 @@ private void enqueueNeighbours(AtlasVertex currentVertex, boolean isDataset, Atl if (!lineageListContext.evaluateTraversalFilter(currentEdge)) continue; AtlasVertex neighbourVertex; - if (isDataset) + if (!isConnecterVertex) neighbourVertex = currentEdge.getOutVertex(); else neighbourVertex = currentEdge.getInVertex(); diff --git a/webapp/src/main/java/org/apache/atlas/web/rest/LineageREST.java b/webapp/src/main/java/org/apache/atlas/web/rest/LineageREST.java index 4f8aeb8468..aad5799ff2 100644 --- a/webapp/src/main/java/org/apache/atlas/web/rest/LineageREST.java +++ b/webapp/src/main/java/org/apache/atlas/web/rest/LineageREST.java @@ -128,7 +128,7 @@ public AtlasLineageOnDemandInfo getLineageGraph(@PathParam("guid") String guid,@ @Consumes(Servlets.JSON_MEDIA_TYPE) @Produces(Servlets.JSON_MEDIA_TYPE) @Timed - public AtlasLineageListInfo getLineageList(LineageListRequest lineageListRequest) throws AtlasBaseException { + public AtlasLineageListInfo getLineageList(@QueryParam("lineageType") String lineageType, LineageListRequest lineageListRequest) throws AtlasBaseException { lineageListRequestValidator.validate(lineageListRequest); String guid = lineageListRequest.getGuid(); @@ -142,7 +142,7 @@ public AtlasLineageListInfo getLineageList(LineageListRequest lineageListRequest if (AtlasPerfTracer.isPerfTraceEnabled(PERF_LOG)) perf = AtlasPerfTracer.getPerfTracer(PERF_LOG, "LineageREST.getLineageList(" + guid + "," + lineageListRequest + ")"); - return atlasLineageService.getLineageListInfoOnDemand(guid, lineageListRequest); + return atlasLineageService.getLineageListInfoOnDemand(guid, lineageListRequest, lineageType); } finally { AtlasPerfTracer.log(perf); }