diff --git a/repository/src/main/java/org/apache/atlas/discovery/AtlasLineageService.java b/repository/src/main/java/org/apache/atlas/discovery/AtlasLineageService.java index 69c3e387bc..3842a9b10f 100644 --- a/repository/src/main/java/org/apache/atlas/discovery/AtlasLineageService.java +++ b/repository/src/main/java/org/apache/atlas/discovery/AtlasLineageService.java @@ -71,7 +71,7 @@ public interface AtlasLineageService { * @param lineageOnDemandRequest lineage on demand request object * @return AtlasLineageInfo */ - AtlasLineageOnDemandInfo getAtlasLineageInfo(String entityGuid, LineageOnDemandRequest lineageOnDemandRequest) throws AtlasBaseException; + AtlasLineageOnDemandInfo getAtlasLineageInfo(String entityGuid, LineageOnDemandRequest lineageOnDemandRequest, String lineageType) throws AtlasBaseException; /** * @param entityGuid unique ID of the entity diff --git a/repository/src/main/java/org/apache/atlas/discovery/EntityLineageService.java b/repository/src/main/java/org/apache/atlas/discovery/EntityLineageService.java index 976a254546..f09d8ecc50 100644 --- a/repository/src/main/java/org/apache/atlas/discovery/EntityLineageService.java +++ b/repository/src/main/java/org/apache/atlas/discovery/EntityLineageService.java @@ -69,7 +69,7 @@ import static org.apache.atlas.AtlasClient.DATA_SET_SUPER_TYPE; import static org.apache.atlas.AtlasClient.PROCESS_SUPER_TYPE; -import static org.apache.atlas.AtlasErrorCode.INSTANCE_LINEAGE_QUERY_FAILED; +import static org.apache.atlas.AtlasErrorCode.*; import static org.apache.atlas.model.instance.AtlasEntity.Status.DELETED; import static org.apache.atlas.model.lineage.AtlasLineageInfo.LineageDirection.*; import static org.apache.atlas.repository.Constants.*; @@ -84,11 +84,25 @@ public class EntityLineageService implements AtlasLineageService { private static final String PROCESS_INPUTS_EDGE = "__Process.inputs"; private static final String PROCESS_OUTPUTS_EDGE = "__Process.outputs"; + private static final String OUTPUT_PORT_EDGE = "__Asset.outputPortDataProducts"; + private static final String INPUT_PORT_EDGE = "__Asset.inputPortDataProducts"; + + /** + * String[] => [Input edge Label, Output Edge Label] + */ + public static final HashMap LINEAGE_MAP = new HashMap(){{ + put(DATASET_PROCESS_LINEAGE, new String[]{PROCESS_INPUTS_EDGE, PROCESS_OUTPUTS_EDGE}); + put(PRODUCT_ASSET_LINEAGE, new String[]{INPUT_PORT_EDGE, OUTPUT_PORT_EDGE}); + }}; private static final String COLUMNS = "columns"; private static final boolean LINEAGE_USING_GREMLIN = AtlasConfiguration.LINEAGE_USING_GREMLIN.getBoolean(); private static final Integer DEFAULT_LINEAGE_MAX_NODE_COUNT = 9000; private static final int LINEAGE_ON_DEMAND_DEFAULT_DEPTH = 3; private static final String SEPARATOR = "->"; + public static final String IS_DATA_PRODUCT = "isDataProduct"; + public static final String IS_DATASET = "isProcess"; + public static final String PRODUCT_ASSET_LINEAGE = "ProductAssetLineage"; + public static final String DATASET_PROCESS_LINEAGE = "DatasetProcessLineage"; private final AtlasGraph graph; private final AtlasGremlinQueryProvider gremlinQueryProvider; @@ -170,13 +184,13 @@ public AtlasLineageInfo getAtlasLineageInfo(String guid, LineageDirection direct @Override @GraphTransaction - public AtlasLineageOnDemandInfo getAtlasLineageInfo(String guid, LineageOnDemandRequest lineageOnDemandRequest) throws AtlasBaseException { + public AtlasLineageOnDemandInfo getAtlasLineageInfo(String guid, LineageOnDemandRequest lineageOnDemandRequest, String lineageType) throws AtlasBaseException { AtlasPerfMetrics.MetricRecorder metricRecorder = RequestContext.get().startMetricRecord("getAtlasLineageInfo"); RequestContext.get().setRelationAttrsForSearch(lineageOnDemandRequest.getRelationAttributes()); AtlasLineageOnDemandContext atlasLineageOnDemandContext = new AtlasLineageOnDemandContext(lineageOnDemandRequest, atlasTypeRegistry); - boolean isDataSet = validateEntityTypeAndCheckIfDataSet(guid); - AtlasLineageOnDemandInfo ret = getLineageInfoOnDemand(guid, atlasLineageOnDemandContext, isDataSet); + HashMap dataTypeMap = validateAndGetEntityTypeMap(guid); + AtlasLineageOnDemandInfo ret = getLineageInfoOnDemand(guid, atlasLineageOnDemandContext, dataTypeMap, lineageType); appendLineageOnDemandPayload(ret, lineageOnDemandRequest); // filtering out on-demand relations which has input & output nodes within the limit cleanupRelationsOnDemand(ret); @@ -199,20 +213,24 @@ public AtlasLineageListInfo getLineageListInfoOnDemand(String guid, LineageListR return ret; } - private boolean validateEntityTypeAndCheckIfDataSet(String guid) throws AtlasBaseException { + private HashMap validateAndGetEntityTypeMap(String guid) throws AtlasBaseException { String typeName = entityRetriever.getEntityVertex(guid).getProperty(Constants.TYPE_NAME_PROPERTY_KEY, String.class); AtlasEntityType entityType = atlasTypeRegistry.getEntityTypeByName(typeName); if (entityType == null) { throw new AtlasBaseException(AtlasErrorCode.TYPE_NAME_NOT_FOUND, typeName); } + HashMap dataTypeMap = new HashMap<>(); boolean isProcess = entityType.getTypeAndAllSuperTypes().contains(PROCESS_SUPER_TYPE); + boolean isDataProduct = entityType.getTypeName().equals(DATA_PRODUCT_ENTITY_TYPE); + dataTypeMap.put(IS_DATA_PRODUCT, isDataProduct); + dataTypeMap.put(IS_DATASET, !isProcess); if (!isProcess) { boolean isDataSet = entityType.getTypeAndAllSuperTypes().contains(DATA_SET_SUPER_TYPE); if (!isDataSet) { throw new AtlasBaseException(AtlasErrorCode.INVALID_LINEAGE_ENTITY_TYPE, guid, typeName); } } - return !isProcess; + return dataTypeMap; } private LineageOnDemandConstraints getLineageConstraints(String guid, LineageOnDemandBaseParams defaultParams) { @@ -277,8 +295,10 @@ private void cleanupRelationsOnDemand(AtlasLineageOnDemandInfo lineageInfo) { } } - private AtlasLineageOnDemandInfo getLineageInfoOnDemand(String guid, AtlasLineageOnDemandContext atlasLineageOnDemandContext, boolean isDataSet) throws AtlasBaseException { + private AtlasLineageOnDemandInfo getLineageInfoOnDemand(String guid, AtlasLineageOnDemandContext atlasLineageOnDemandContext, HashMap dataTypeMap, String lineageType) throws AtlasBaseException { AtlasPerfMetrics.MetricRecorder metricRecorder = RequestContext.get().startMetricRecord("getLineageInfoOnDemand"); + if(StringUtils.isEmpty(lineageType)) + lineageType = DATASET_PROCESS_LINEAGE; LineageOnDemandConstraints lineageConstraintsByGuid = getAndValidateLineageConstraintsByGuid(guid, atlasLineageOnDemandContext); AtlasLineageOnDemandInfo.LineageDirection direction = lineageConstraintsByGuid.getDirection(); @@ -294,27 +314,35 @@ private AtlasLineageOnDemandInfo getLineageInfoOnDemand(String guid, AtlasLineag AtomicInteger inputEntitiesTraversed = new AtomicInteger(0); AtomicInteger outputEntitiesTraversed = new AtomicInteger(0); AtomicInteger traversalOrder = new AtomicInteger(1); - if (isDataSet) { - AtlasVertex datasetVertex = AtlasGraphUtilsV2.findByGuid(this.graph, guid); - if (direction == AtlasLineageOnDemandInfo.LineageDirection.INPUT || direction == AtlasLineageOnDemandInfo.LineageDirection.BOTH) - traverseEdgesOnDemand(datasetVertex, true, depth, level, new HashSet<>(), atlasLineageOnDemandContext, ret, guid, inputEntitiesTraversed, traversalOrder); - if (direction == AtlasLineageOnDemandInfo.LineageDirection.OUTPUT || direction == AtlasLineageOnDemandInfo.LineageDirection.BOTH) - traverseEdgesOnDemand(datasetVertex, false, depth, level, new HashSet<>(), atlasLineageOnDemandContext, ret, guid, outputEntitiesTraversed, traversalOrder); - AtlasEntityHeader baseEntityHeader = entityRetriever.toAtlasEntityHeader(datasetVertex, atlasLineageOnDemandContext.getAttributes()); - setGraphTraversalMetadata(level, traversalOrder, baseEntityHeader); - ret.getGuidEntityMap().put(guid, baseEntityHeader); - } else { - AtlasVertex processVertex = AtlasGraphUtilsV2.findByGuid(this.graph, guid); - // make one hop to the next dataset vertices from process vertex and traverse with 'depth = depth - 1' - if (direction == AtlasLineageOnDemandInfo.LineageDirection.INPUT || direction == AtlasLineageOnDemandInfo.LineageDirection.BOTH) { - Iterator processEdges = processVertex.getEdges(AtlasEdgeDirection.OUT, PROCESS_INPUTS_EDGE).iterator(); - traverseEdgesOnDemand(processEdges, true, depth, level, atlasLineageOnDemandContext, ret, processVertex, guid, inputEntitiesTraversed, traversalOrder); - } - if (direction == AtlasLineageOnDemandInfo.LineageDirection.OUTPUT || direction == AtlasLineageOnDemandInfo.LineageDirection.BOTH) { - Iterator processEdges = processVertex.getEdges(AtlasEdgeDirection.OUT, PROCESS_OUTPUTS_EDGE).iterator(); - traverseEdgesOnDemand(processEdges, false, depth, level, atlasLineageOnDemandContext, ret, processVertex, guid, outputEntitiesTraversed, traversalOrder); + String[] lineageEdgeLabels = LINEAGE_MAP.get(lineageType); + boolean isConnecterVertex; + + + isConnecterVertex = lineageType.equals(PRODUCT_ASSET_LINEAGE) + ? !dataTypeMap.get(IS_DATA_PRODUCT) + : !dataTypeMap.get(IS_DATASET); + + if (!isConnecterVertex) { + AtlasVertex datasetVertex = AtlasGraphUtilsV2.findByGuid(this.graph, guid); + if (direction == AtlasLineageOnDemandInfo.LineageDirection.INPUT || direction == AtlasLineageOnDemandInfo.LineageDirection.BOTH) + traverseEdgesOnDemand(datasetVertex, true, depth, level, new HashSet<>(), atlasLineageOnDemandContext, ret, guid, inputEntitiesTraversed, traversalOrder, lineageType); + if (direction == AtlasLineageOnDemandInfo.LineageDirection.OUTPUT || direction == AtlasLineageOnDemandInfo.LineageDirection.BOTH) + traverseEdgesOnDemand(datasetVertex, false, depth, level, new HashSet<>(), atlasLineageOnDemandContext, ret, guid, outputEntitiesTraversed, traversalOrder, lineageType); + AtlasEntityHeader baseEntityHeader = entityRetriever.toAtlasEntityHeader(datasetVertex, atlasLineageOnDemandContext.getAttributes()); + setGraphTraversalMetadata(level, traversalOrder, baseEntityHeader); + ret.getGuidEntityMap().put(guid, baseEntityHeader); + } else { + AtlasVertex processVertex = AtlasGraphUtilsV2.findByGuid(this.graph, guid); + // make one hop to the next dataset vertices from process vertex and traverse with 'depth = depth - 1' + if (direction == AtlasLineageOnDemandInfo.LineageDirection.INPUT || direction == AtlasLineageOnDemandInfo.LineageDirection.BOTH) { + Iterator processEdges = processVertex.getEdges(AtlasEdgeDirection.OUT, lineageEdgeLabels[0]).iterator(); + traverseEdgesOnDemand(processEdges, true, depth, level, atlasLineageOnDemandContext, ret, processVertex, guid, inputEntitiesTraversed, traversalOrder, lineageType); + } + if (direction == AtlasLineageOnDemandInfo.LineageDirection.OUTPUT || direction == AtlasLineageOnDemandInfo.LineageDirection.BOTH) { + Iterator processEdges = processVertex.getEdges(AtlasEdgeDirection.OUT, lineageEdgeLabels[1]).iterator(); + traverseEdgesOnDemand(processEdges, false, depth, level, atlasLineageOnDemandContext, ret, processVertex, guid, outputEntitiesTraversed, traversalOrder, lineageType); + } } - } RequestContext.get().endMetricRecord(metricRecorder); return ret; } @@ -325,7 +353,7 @@ private static void setGraphTraversalMetadata(int level, AtomicInteger traversal baseEntityHeader.setFinishTime(traversalOrder.get()); } - private void traverseEdgesOnDemand(Iterator processEdges, boolean isInput, int depth, int level, AtlasLineageOnDemandContext atlasLineageOnDemandContext, AtlasLineageOnDemandInfo ret, AtlasVertex processVertex, String baseGuid, AtomicInteger entitiesTraversed, AtomicInteger traversalOrder) throws AtlasBaseException { + private void traverseEdgesOnDemand(Iterator processEdges, boolean isInput, int depth, int level, AtlasLineageOnDemandContext atlasLineageOnDemandContext, AtlasLineageOnDemandInfo ret, AtlasVertex processVertex, String baseGuid, AtomicInteger entitiesTraversed, AtomicInteger traversalOrder, String lineageType) throws AtlasBaseException { AtlasLineageOnDemandInfo.LineageDirection direction = isInput ? AtlasLineageOnDemandInfo.LineageDirection.INPUT : AtlasLineageOnDemandInfo.LineageDirection.OUTPUT; int nextLevel = isInput ? level - 1: level + 1; @@ -356,11 +384,11 @@ private void traverseEdgesOnDemand(Iterator processEdges, boolean isI ret.getRelationsOnDemand().put(inGuid, new LineageInfoOnDemand(inGuidLineageConstrains)); } - traverseEdgesOnDemand(datasetVertex, isInput, depth - 1, nextLevel, new HashSet<>(), atlasLineageOnDemandContext, ret, baseGuid, entitiesTraversed, traversalOrder); + traverseEdgesOnDemand(datasetVertex, isInput, depth - 1, nextLevel, new HashSet<>(), atlasLineageOnDemandContext, ret, baseGuid, entitiesTraversed, traversalOrder, lineageType); } } - private void traverseEdgesOnDemand(AtlasVertex datasetVertex, boolean isInput, int depth, int level, Set visitedVertices, AtlasLineageOnDemandContext atlasLineageOnDemandContext, AtlasLineageOnDemandInfo ret, String baseGuid, AtomicInteger entitiesTraversed, AtomicInteger traversalOrder) throws AtlasBaseException { + private void traverseEdgesOnDemand(AtlasVertex datasetVertex, boolean isInput, int depth, int level, Set visitedVertices, AtlasLineageOnDemandContext atlasLineageOnDemandContext, AtlasLineageOnDemandInfo ret, String baseGuid, AtomicInteger entitiesTraversed, AtomicInteger traversalOrder, String lineageType) throws AtlasBaseException { if (isEntityTraversalLimitReached(entitiesTraversed)) return; if (depth != 0) { // base condition of recursion for depth @@ -369,16 +397,17 @@ private void traverseEdgesOnDemand(AtlasVertex datasetVertex, boolean isInput, i int nextLevel = isInput ? level - 1: level + 1; // keep track of visited vertices to avoid circular loop visitedVertices.add(getId(datasetVertex)); - + String[] edgeDirections = LINEAGE_MAP.get(lineageType); AtlasPerfMetrics.MetricRecorder traverseEdgesOnDemandGetEdgesIn = RequestContext.get().startMetricRecord("traverseEdgesOnDemandGetEdgesIn"); - Iterator incomingEdges = datasetVertex.getEdges(IN, isInput ? PROCESS_OUTPUTS_EDGE : PROCESS_INPUTS_EDGE).iterator(); + + Iterator incomingEdges = datasetVertex.getEdges(IN, isInput ? edgeDirections[1] : edgeDirections[0]).iterator(); RequestContext.get().endMetricRecord(traverseEdgesOnDemandGetEdgesIn); while (incomingEdges.hasNext()) { AtlasEdge incomingEdge = incomingEdges.next(); - AtlasVertex processVertex = incomingEdge.getOutVertex(); + AtlasVertex connecterVertex = incomingEdge.getOutVertex(); - if (!vertexMatchesEvaluation(processVertex, atlasLineageOnDemandContext) || !edgeMatchesEvaluation(incomingEdge, atlasLineageOnDemandContext)) { + if (!vertexMatchesEvaluation(connecterVertex, atlasLineageOnDemandContext) || !edgeMatchesEvaluation(incomingEdge, atlasLineageOnDemandContext)) { continue; } @@ -399,7 +428,7 @@ private void traverseEdgesOnDemand(AtlasVertex datasetVertex, boolean isInput, i } AtlasPerfMetrics.MetricRecorder traverseEdgesOnDemandGetEdgesOut = RequestContext.get().startMetricRecord("traverseEdgesOnDemandGetEdgesOut"); - Iterator outgoingEdges = processVertex.getEdges(OUT, isInput ? PROCESS_INPUTS_EDGE : PROCESS_OUTPUTS_EDGE).iterator(); + Iterator outgoingEdges = connecterVertex.getEdges(OUT, isInput ? edgeDirections[0] : edgeDirections[1]).iterator(); RequestContext.get().endMetricRecord(traverseEdgesOnDemandGetEdgesOut); while (outgoingEdges.hasNext()) { @@ -410,11 +439,11 @@ private void traverseEdgesOnDemand(AtlasVertex datasetVertex, boolean isInput, i continue; } - if (checkForOffset(outgoingEdge, processVertex, atlasLineageOnDemandContext, ret)) { + if (checkForOffset(outgoingEdge, connecterVertex, atlasLineageOnDemandContext, ret)) { continue; } if (incrementAndCheckIfRelationsLimitReached(outgoingEdge, isInput, atlasLineageOnDemandContext, ret, depth, entitiesTraversed, direction, visitedVertices)) { - String processGuid = AtlasGraphUtilsV2.getIdFromVertex(processVertex); + String processGuid = AtlasGraphUtilsV2.getIdFromVertex(connecterVertex); LineageInfoOnDemand entityOnDemandInfo = ret.getRelationsOnDemand().get(processGuid); if (entityOnDemandInfo == null) continue; @@ -430,7 +459,7 @@ private void traverseEdgesOnDemand(AtlasVertex datasetVertex, boolean isInput, i setEntityLimitReachedFlag(isInput, ret); } if (entityVertex != null && !visitedVertices.contains(getId(entityVertex))) { - traverseEdgesOnDemand(entityVertex, isInput, depth - 1, nextLevel, visitedVertices, atlasLineageOnDemandContext, ret, baseGuid, entitiesTraversed, traversalOrder); // execute inner depth + traverseEdgesOnDemand(entityVertex, isInput, depth - 1, nextLevel, visitedVertices, atlasLineageOnDemandContext, ret, baseGuid, entitiesTraversed, traversalOrder, lineageType); // execute inner depth AtlasEntityHeader traversedEntity = ret.getGuidEntityMap().get(AtlasGraphUtilsV2.getIdFromVertex(entityVertex)); traversedEntity.setFinishTime(traversalOrder.get()); } @@ -457,7 +486,7 @@ private void traverseEdgesUsingBFS(String baseGuid, AtlasLineageListContext line Queue traversalQueue = new LinkedList<>(); AtlasVertex baseVertex = AtlasGraphUtilsV2.findByGuid(this.graph, baseGuid); - boolean isBaseNodeDataset = validateEntityTypeAndCheckIfDataSet(baseGuid); + boolean isBaseNodeDataset = validateAndGetEntityTypeMap(baseGuid).get(IS_DATASET); enqueueNeighbours(baseVertex, isBaseNodeDataset, lineageListContext, traversalQueue, visitedVertices, skippedVertices); int currentDepth = 0; int currentLevel = isBaseNodeDataset? 0: 1; @@ -479,7 +508,7 @@ private void traverseEdgesUsingBFS(String baseGuid, AtlasLineageListContext line if (Objects.isNull(currentVertex)) throw new AtlasBaseException("Found null vertex during lineage graph traversal for guid: " + currentGUID); - boolean isDataset = validateEntityTypeAndCheckIfDataSet(currentGUID); + boolean isDataset = validateAndGetEntityTypeMap(currentGUID).get(IS_DATASET); if (!lineageListContext.evaluateVertexFilter(currentVertex)) { enqueueNeighbours(currentVertex, isDataset, lineageListContext, traversalQueue, visitedVertices, skippedVertices); continue; diff --git a/webapp/src/main/java/org/apache/atlas/web/rest/LineageREST.java b/webapp/src/main/java/org/apache/atlas/web/rest/LineageREST.java index 829b9aaf28..4f8aeb8468 100644 --- a/webapp/src/main/java/org/apache/atlas/web/rest/LineageREST.java +++ b/webapp/src/main/java/org/apache/atlas/web/rest/LineageREST.java @@ -93,7 +93,7 @@ public LineageREST(AtlasTypeRegistry typeRegistry, AtlasLineageService atlasLine @Consumes(Servlets.JSON_MEDIA_TYPE) @Produces(Servlets.JSON_MEDIA_TYPE) @Timed - public AtlasLineageOnDemandInfo getLineageGraph(@PathParam("guid") String guid, + public AtlasLineageOnDemandInfo getLineageGraph(@PathParam("guid") String guid,@QueryParam("lineageType") String lineageType, LineageOnDemandRequest lineageOnDemandRequest) throws AtlasBaseException { if (!AtlasConfiguration.LINEAGE_ON_DEMAND_ENABLED.getBoolean()) { LOG.warn("LineageREST: "+ AtlasErrorCode.LINEAGE_ON_DEMAND_NOT_ENABLED.getFormattedErrorMessage(AtlasConfiguration.LINEAGE_ON_DEMAND_ENABLED.getPropertyName())); @@ -110,7 +110,7 @@ public AtlasLineageOnDemandInfo getLineageGraph(@PathParam("guid") String guid, perf = AtlasPerfTracer.getPerfTracer(PERF_LOG, "LineageREST.getOnDemandLineageGraph(" + guid + "," + lineageOnDemandRequest + ")"); } - return atlasLineageService.getAtlasLineageInfo(guid, lineageOnDemandRequest); + return atlasLineageService.getAtlasLineageInfo(guid, lineageOnDemandRequest, lineageType); } finally { AtlasPerfTracer.log(perf); }