diff --git a/intg/src/main/java/org/apache/atlas/model/lineage/LineageListRequest.java b/intg/src/main/java/org/apache/atlas/model/lineage/LineageListRequest.java index e717fda066..4897c76767 100644 --- a/intg/src/main/java/org/apache/atlas/model/lineage/LineageListRequest.java +++ b/intg/src/main/java/org/apache/atlas/model/lineage/LineageListRequest.java @@ -26,6 +26,9 @@ public class LineageListRequest { private Boolean excludeMeanings; private Boolean excludeClassifications; + + private String lineageType = "DatasetProcessLineage"; + public enum LineageDirection {INPUT, OUTPUT} public LineageListRequest() { @@ -81,6 +84,13 @@ public Integer getDepth() { public void setDepth(Integer depth) { this.depth = depth; } + public String getLineageType() { + return lineageType; + } + + public void setLineageType(String lineageType) { + this.lineageType = lineageType; + } public LineageDirection getDirection() { return direction; diff --git a/intg/src/main/java/org/apache/atlas/model/lineage/LineageOnDemandRequest.java b/intg/src/main/java/org/apache/atlas/model/lineage/LineageOnDemandRequest.java index 7fa953373a..772e3dda9c 100644 --- a/intg/src/main/java/org/apache/atlas/model/lineage/LineageOnDemandRequest.java +++ b/intg/src/main/java/org/apache/atlas/model/lineage/LineageOnDemandRequest.java @@ -22,6 +22,7 @@ public class LineageOnDemandRequest { private Set attributes; private Set relationAttributes; private LineageOnDemandBaseParams defaultParams; + private String lineageType = "DatasetProcessLineage"; public LineageOnDemandRequest() { this.attributes = new HashSet<>(); @@ -64,6 +65,13 @@ public void setRelationshipTraversalFilters(SearchParameters.FilterCriteria rela this.relationshipTraversalFilters = relationshipTraversalFilters; } + public String getLineageType() { + return lineageType; + } + + public void setLineageType(String lineageType) { + this.lineageType = lineageType; + } public Set getAttributes() { return attributes; } diff --git a/repository/src/main/java/org/apache/atlas/discovery/AtlasLineageListContext.java b/repository/src/main/java/org/apache/atlas/discovery/AtlasLineageListContext.java index 95657aaabe..600b496293 100644 --- a/repository/src/main/java/org/apache/atlas/discovery/AtlasLineageListContext.java +++ b/repository/src/main/java/org/apache/atlas/discovery/AtlasLineageListContext.java @@ -23,6 +23,7 @@ public final class AtlasLineageListContext { private int currentEntityCounter; private boolean depthLimitReached; private boolean hasMoreUpdated; + private String lineageType = "DatasetProcessLineage"; public AtlasLineageListContext(LineageListRequest lineageListRequest, AtlasTypeRegistry typeRegistry) { this.guid = lineageListRequest.getGuid(); @@ -34,6 +35,7 @@ public AtlasLineageListContext(LineageListRequest lineageListRequest, AtlasTypeR this.vertexTraversalPredicate = constructInMemoryPredicate(typeRegistry, lineageListRequest.getEntityTraversalFilters()); this.edgeTraversalPredicate = constructInMemoryPredicate(typeRegistry, lineageListRequest.getRelationshipTraversalFilters()); this.attributes = lineageListRequest.getAttributes(); + this.lineageType = lineageListRequest.getLineageType(); this.relationAttributes = lineageListRequest.getRelationAttributes(); } @@ -129,6 +131,14 @@ public void setCurrentFromCounter(int currentFromCounter) { this.currentFromCounter = currentFromCounter; } + public String getLineageType() { + return lineageType; + } + + public void setLineageType(String lineageType) { + this.lineageType = lineageType; + } + public int getCurrentEntityCounter() { return currentEntityCounter; } diff --git a/repository/src/main/java/org/apache/atlas/discovery/AtlasLineageOnDemandContext.java b/repository/src/main/java/org/apache/atlas/discovery/AtlasLineageOnDemandContext.java index 5509684855..71d877fb22 100644 --- a/repository/src/main/java/org/apache/atlas/discovery/AtlasLineageOnDemandContext.java +++ b/repository/src/main/java/org/apache/atlas/discovery/AtlasLineageOnDemandContext.java @@ -24,6 +24,9 @@ public class AtlasLineageOnDemandContext { private Set relationAttributes; private LineageOnDemandBaseParams defaultParams; + + private String lineageType = "DatasetProcessLineage"; + public AtlasLineageOnDemandContext(LineageOnDemandRequest lineageOnDemandRequest, AtlasTypeRegistry typeRegistry) { this.constraints = lineageOnDemandRequest.getConstraints(); this.attributes = lineageOnDemandRequest.getAttributes(); @@ -31,6 +34,7 @@ public AtlasLineageOnDemandContext(LineageOnDemandRequest lineageOnDemandRequest this.defaultParams = lineageOnDemandRequest.getDefaultParams(); this.vertexPredicate = constructInMemoryPredicate(typeRegistry, lineageOnDemandRequest.getEntityTraversalFilters()); this.edgePredicate = constructInMemoryPredicate(typeRegistry, lineageOnDemandRequest.getRelationshipTraversalFilters()); + this.lineageType = lineageOnDemandRequest.getLineageType(); } public Map getConstraints() { @@ -56,6 +60,13 @@ public Predicate getEdgePredicate() { public void setEdgePredicate(Predicate edgePredicate) { this.edgePredicate = edgePredicate; } + public String getLineageType() { + return lineageType; + } + + public void setLineageType(String lineageType) { + this.lineageType = lineageType; + } public Set getAttributes() { return attributes; diff --git a/repository/src/main/java/org/apache/atlas/discovery/EntityLineageService.java b/repository/src/main/java/org/apache/atlas/discovery/EntityLineageService.java index 7e83794113..5a876f0383 100644 --- a/repository/src/main/java/org/apache/atlas/discovery/EntityLineageService.java +++ b/repository/src/main/java/org/apache/atlas/discovery/EntityLineageService.java @@ -69,11 +69,10 @@ import static org.apache.atlas.AtlasClient.DATA_SET_SUPER_TYPE; import static org.apache.atlas.AtlasClient.PROCESS_SUPER_TYPE; -import static org.apache.atlas.AtlasErrorCode.INSTANCE_LINEAGE_QUERY_FAILED; +import static org.apache.atlas.AtlasErrorCode.*; import static org.apache.atlas.model.instance.AtlasEntity.Status.DELETED; import static org.apache.atlas.model.lineage.AtlasLineageInfo.LineageDirection.*; -import static org.apache.atlas.repository.Constants.ACTIVE_STATE_VALUE; -import static org.apache.atlas.repository.Constants.RELATIONSHIP_GUID_PROPERTY_KEY; +import static org.apache.atlas.repository.Constants.*; import static org.apache.atlas.repository.graph.GraphHelper.*; import static org.apache.atlas.repository.graphdb.AtlasEdgeDirection.IN; import static org.apache.atlas.repository.graphdb.AtlasEdgeDirection.OUT; @@ -85,11 +84,25 @@ public class EntityLineageService implements AtlasLineageService { private static final String PROCESS_INPUTS_EDGE = "__Process.inputs"; private static final String PROCESS_OUTPUTS_EDGE = "__Process.outputs"; + private static final String OUTPUT_PORT_EDGE = "__Asset.outputPortDataProducts"; + private static final String INPUT_PORT_EDGE = "__Asset.inputPortDataProducts"; + + /** + * String[] => [Input edge Label, Output Edge Label] + */ + public static final HashMap LINEAGE_MAP = new HashMap(){{ + put(DATASET_PROCESS_LINEAGE, new String[]{PROCESS_INPUTS_EDGE, PROCESS_OUTPUTS_EDGE}); + put(PRODUCT_ASSET_LINEAGE, new String[]{OUTPUT_PORT_EDGE, INPUT_PORT_EDGE}); + }}; private static final String COLUMNS = "columns"; private static final boolean LINEAGE_USING_GREMLIN = AtlasConfiguration.LINEAGE_USING_GREMLIN.getBoolean(); private static final Integer DEFAULT_LINEAGE_MAX_NODE_COUNT = 9000; private static final int LINEAGE_ON_DEMAND_DEFAULT_DEPTH = 3; private static final String SEPARATOR = "->"; + public static final String IS_DATA_PRODUCT = "isDataProduct"; + public static final String IS_DATASET = "isDataset"; + public static final String PRODUCT_ASSET_LINEAGE = "ProductAssetLineage"; + public static final String DATASET_PROCESS_LINEAGE = "DatasetProcessLineage"; private final AtlasGraph graph; private final AtlasGremlinQueryProvider gremlinQueryProvider; @@ -173,11 +186,13 @@ public AtlasLineageInfo getAtlasLineageInfo(String guid, LineageDirection direct @GraphTransaction public AtlasLineageOnDemandInfo getAtlasLineageInfo(String guid, LineageOnDemandRequest lineageOnDemandRequest) throws AtlasBaseException { AtlasPerfMetrics.MetricRecorder metricRecorder = RequestContext.get().startMetricRecord("getAtlasLineageInfo"); - RequestContext.get().setRelationAttrsForSearch(lineageOnDemandRequest.getRelationAttributes()); + RequestContext.get().setLineageInputLabel(LINEAGE_MAP.get(lineageOnDemandRequest.getLineageType())[0]); + RequestContext.get().setLineageOutputLabel(LINEAGE_MAP.get(lineageOnDemandRequest.getLineageType())[1]); + AtlasLineageOnDemandContext atlasLineageOnDemandContext = new AtlasLineageOnDemandContext(lineageOnDemandRequest, atlasTypeRegistry); - boolean isDataSet = validateEntityTypeAndCheckIfDataSet(guid); - AtlasLineageOnDemandInfo ret = getLineageInfoOnDemand(guid, atlasLineageOnDemandContext, isDataSet); + HashMap dataTypeMap = validateAndGetEntityTypeMap(guid); + AtlasLineageOnDemandInfo ret = getLineageInfoOnDemand(guid, atlasLineageOnDemandContext, dataTypeMap); appendLineageOnDemandPayload(ret, lineageOnDemandRequest); // filtering out on-demand relations which has input & output nodes within the limit cleanupRelationsOnDemand(ret); @@ -191,7 +206,8 @@ public AtlasLineageOnDemandInfo getAtlasLineageInfo(String guid, LineageOnDemand @GraphTransaction public AtlasLineageListInfo getLineageListInfoOnDemand(String guid, LineageListRequest lineageListRequest) throws AtlasBaseException { AtlasPerfMetrics.MetricRecorder metricRecorder = RequestContext.get().startMetricRecord("getLineageListInfoOnDemand"); - + RequestContext.get().setLineageInputLabel(LINEAGE_MAP.get(lineageListRequest.getLineageType())[0]); + RequestContext.get().setLineageOutputLabel(LINEAGE_MAP.get(lineageListRequest.getLineageType())[1]); AtlasLineageListInfo ret = new AtlasLineageListInfo(new ArrayList<>()); RequestContext.get().setRelationAttrsForSearch(lineageListRequest.getRelationAttributes()); @@ -202,20 +218,24 @@ public AtlasLineageListInfo getLineageListInfoOnDemand(String guid, LineageListR return ret; } - private boolean validateEntityTypeAndCheckIfDataSet(String guid) throws AtlasBaseException { + private HashMap validateAndGetEntityTypeMap(String guid) throws AtlasBaseException { String typeName = entityRetriever.getEntityVertex(guid).getProperty(Constants.TYPE_NAME_PROPERTY_KEY, String.class); AtlasEntityType entityType = atlasTypeRegistry.getEntityTypeByName(typeName); if (entityType == null) { throw new AtlasBaseException(AtlasErrorCode.TYPE_NAME_NOT_FOUND, typeName); } + HashMap dataTypeMap = new HashMap<>(); boolean isProcess = entityType.getTypeAndAllSuperTypes().contains(PROCESS_SUPER_TYPE); + boolean isDataProduct = entityType.getTypeName().equals(DATA_PRODUCT_ENTITY_TYPE); + dataTypeMap.put(IS_DATA_PRODUCT, isDataProduct); + dataTypeMap.put(IS_DATASET, !isProcess); if (!isProcess) { boolean isDataSet = entityType.getTypeAndAllSuperTypes().contains(DATA_SET_SUPER_TYPE); if (!isDataSet) { throw new AtlasBaseException(AtlasErrorCode.INVALID_LINEAGE_ENTITY_TYPE, guid, typeName); } } - return !isProcess; + return dataTypeMap; } private LineageOnDemandConstraints getLineageConstraints(String guid, LineageOnDemandBaseParams defaultParams) { @@ -280,8 +300,11 @@ private void cleanupRelationsOnDemand(AtlasLineageOnDemandInfo lineageInfo) { } } - private AtlasLineageOnDemandInfo getLineageInfoOnDemand(String guid, AtlasLineageOnDemandContext atlasLineageOnDemandContext, boolean isDataSet) throws AtlasBaseException { + private AtlasLineageOnDemandInfo getLineageInfoOnDemand(String guid, AtlasLineageOnDemandContext atlasLineageOnDemandContext, HashMap dataTypeMap) throws AtlasBaseException { AtlasPerfMetrics.MetricRecorder metricRecorder = RequestContext.get().startMetricRecord("getLineageInfoOnDemand"); + String lineageInputLabel = RequestContext.get().getLineageInputLabel(); + String lineageOutputLabel = RequestContext.get().getLineageOutputLabel(); + String lineageType = atlasLineageOnDemandContext.getLineageType(); LineageOnDemandConstraints lineageConstraintsByGuid = getAndValidateLineageConstraintsByGuid(guid, atlasLineageOnDemandContext); AtlasLineageOnDemandInfo.LineageDirection direction = lineageConstraintsByGuid.getDirection(); @@ -297,27 +320,35 @@ private AtlasLineageOnDemandInfo getLineageInfoOnDemand(String guid, AtlasLineag AtomicInteger inputEntitiesTraversed = new AtomicInteger(0); AtomicInteger outputEntitiesTraversed = new AtomicInteger(0); AtomicInteger traversalOrder = new AtomicInteger(1); - if (isDataSet) { - AtlasVertex datasetVertex = AtlasGraphUtilsV2.findByGuid(this.graph, guid); - if (direction == AtlasLineageOnDemandInfo.LineageDirection.INPUT || direction == AtlasLineageOnDemandInfo.LineageDirection.BOTH) - traverseEdgesOnDemand(datasetVertex, true, depth, level, new HashSet<>(), atlasLineageOnDemandContext, ret, guid, inputEntitiesTraversed, traversalOrder); - if (direction == AtlasLineageOnDemandInfo.LineageDirection.OUTPUT || direction == AtlasLineageOnDemandInfo.LineageDirection.BOTH) - traverseEdgesOnDemand(datasetVertex, false, depth, level, new HashSet<>(), atlasLineageOnDemandContext, ret, guid, outputEntitiesTraversed, traversalOrder); - AtlasEntityHeader baseEntityHeader = entityRetriever.toAtlasEntityHeader(datasetVertex, atlasLineageOnDemandContext.getAttributes()); - setGraphTraversalMetadata(level, traversalOrder, baseEntityHeader); - ret.getGuidEntityMap().put(guid, baseEntityHeader); - } else { - AtlasVertex processVertex = AtlasGraphUtilsV2.findByGuid(this.graph, guid); - // make one hop to the next dataset vertices from process vertex and traverse with 'depth = depth - 1' - if (direction == AtlasLineageOnDemandInfo.LineageDirection.INPUT || direction == AtlasLineageOnDemandInfo.LineageDirection.BOTH) { - Iterator processEdges = processVertex.getEdges(AtlasEdgeDirection.OUT, PROCESS_INPUTS_EDGE).iterator(); - traverseEdgesOnDemand(processEdges, true, depth, level, atlasLineageOnDemandContext, ret, processVertex, guid, inputEntitiesTraversed, traversalOrder); - } - if (direction == AtlasLineageOnDemandInfo.LineageDirection.OUTPUT || direction == AtlasLineageOnDemandInfo.LineageDirection.BOTH) { - Iterator processEdges = processVertex.getEdges(AtlasEdgeDirection.OUT, PROCESS_OUTPUTS_EDGE).iterator(); - traverseEdgesOnDemand(processEdges, false, depth, level, atlasLineageOnDemandContext, ret, processVertex, guid, outputEntitiesTraversed, traversalOrder); + + boolean isConnecterVertex; + + + isConnecterVertex = lineageType.equals(PRODUCT_ASSET_LINEAGE) + ? !dataTypeMap.get(IS_DATA_PRODUCT) + : !dataTypeMap.get(IS_DATASET); + + if (!isConnecterVertex) { + AtlasVertex datasetVertex = AtlasGraphUtilsV2.findByGuid(this.graph, guid); + if (direction == AtlasLineageOnDemandInfo.LineageDirection.INPUT || direction == AtlasLineageOnDemandInfo.LineageDirection.BOTH) + traverseEdgesOnDemand(datasetVertex, true, depth, level, new HashSet<>(), atlasLineageOnDemandContext, ret, guid, inputEntitiesTraversed, traversalOrder); + if (direction == AtlasLineageOnDemandInfo.LineageDirection.OUTPUT || direction == AtlasLineageOnDemandInfo.LineageDirection.BOTH) + traverseEdgesOnDemand(datasetVertex, false, depth, level, new HashSet<>(), atlasLineageOnDemandContext, ret, guid, outputEntitiesTraversed, traversalOrder); + AtlasEntityHeader baseEntityHeader = entityRetriever.toAtlasEntityHeader(datasetVertex, atlasLineageOnDemandContext.getAttributes()); + setGraphTraversalMetadata(level, traversalOrder, baseEntityHeader); + ret.getGuidEntityMap().put(guid, baseEntityHeader); + } else { + AtlasVertex processVertex = AtlasGraphUtilsV2.findByGuid(this.graph, guid); + // make one hop to the next dataset vertices from process vertex and traverse with 'depth = depth - 1' + if (direction == AtlasLineageOnDemandInfo.LineageDirection.INPUT || direction == AtlasLineageOnDemandInfo.LineageDirection.BOTH) { + Iterator processEdges = processVertex.getEdges(AtlasEdgeDirection.OUT, lineageInputLabel).iterator(); + traverseEdgesOnDemand(processEdges, true, depth, level, atlasLineageOnDemandContext, ret, processVertex, guid, inputEntitiesTraversed, traversalOrder); + } + if (direction == AtlasLineageOnDemandInfo.LineageDirection.OUTPUT || direction == AtlasLineageOnDemandInfo.LineageDirection.BOTH) { + Iterator processEdges = processVertex.getEdges(AtlasEdgeDirection.OUT, lineageOutputLabel).iterator(); + traverseEdgesOnDemand(processEdges, false, depth, level, atlasLineageOnDemandContext, ret, processVertex, guid, outputEntitiesTraversed, traversalOrder); + } } - } RequestContext.get().endMetricRecord(metricRecorder); return ret; } @@ -330,8 +361,8 @@ private static void setGraphTraversalMetadata(int level, AtomicInteger traversal private void traverseEdgesOnDemand(Iterator processEdges, boolean isInput, int depth, int level, AtlasLineageOnDemandContext atlasLineageOnDemandContext, AtlasLineageOnDemandInfo ret, AtlasVertex processVertex, String baseGuid, AtomicInteger entitiesTraversed, AtomicInteger traversalOrder) throws AtlasBaseException { AtlasLineageOnDemandInfo.LineageDirection direction = isInput ? AtlasLineageOnDemandInfo.LineageDirection.INPUT : AtlasLineageOnDemandInfo.LineageDirection.OUTPUT; + String lineageInputLabel = RequestContext.get().getLineageInputLabel(); int nextLevel = isInput ? level - 1: level + 1; - while (processEdges.hasNext()) { AtlasEdge processEdge = processEdges.next(); AtlasVertex datasetVertex = processEdge.getInVertex(); @@ -344,7 +375,7 @@ private void traverseEdgesOnDemand(Iterator processEdges, boolean isI continue; } - boolean isInputEdge = processEdge.getLabel().equalsIgnoreCase(PROCESS_INPUTS_EDGE); + boolean isInputEdge = processEdge.getLabel().equalsIgnoreCase(lineageInputLabel); if (incrementAndCheckIfRelationsLimitReached(processEdge, isInputEdge, atlasLineageOnDemandContext, ret, depth, entitiesTraversed, direction, new HashSet<>())) { break; } else { @@ -369,19 +400,21 @@ private void traverseEdgesOnDemand(AtlasVertex datasetVertex, boolean isInput, i if (depth != 0) { // base condition of recursion for depth AtlasPerfMetrics.MetricRecorder metricRecorder = RequestContext.get().startMetricRecord("traverseEdgesOnDemand"); AtlasLineageOnDemandInfo.LineageDirection direction = isInput ? AtlasLineageOnDemandInfo.LineageDirection.INPUT : AtlasLineageOnDemandInfo.LineageDirection.OUTPUT; + String lineageInputLabel = RequestContext.get().getLineageInputLabel(); + String lineageOutputLabel = RequestContext.get().getLineageOutputLabel(); int nextLevel = isInput ? level - 1: level + 1; // keep track of visited vertices to avoid circular loop visitedVertices.add(getId(datasetVertex)); - AtlasPerfMetrics.MetricRecorder traverseEdgesOnDemandGetEdgesIn = RequestContext.get().startMetricRecord("traverseEdgesOnDemandGetEdgesIn"); - Iterator incomingEdges = datasetVertex.getEdges(IN, isInput ? PROCESS_OUTPUTS_EDGE : PROCESS_INPUTS_EDGE).iterator(); + + Iterator incomingEdges = datasetVertex.getEdges(IN, isInput ? lineageOutputLabel : lineageInputLabel).iterator(); RequestContext.get().endMetricRecord(traverseEdgesOnDemandGetEdgesIn); while (incomingEdges.hasNext()) { AtlasEdge incomingEdge = incomingEdges.next(); - AtlasVertex processVertex = incomingEdge.getOutVertex(); + AtlasVertex connecterVertex = incomingEdge.getOutVertex(); - if (!vertexMatchesEvaluation(processVertex, atlasLineageOnDemandContext) || !edgeMatchesEvaluation(incomingEdge, atlasLineageOnDemandContext)) { + if (!vertexMatchesEvaluation(connecterVertex, atlasLineageOnDemandContext) || !edgeMatchesEvaluation(incomingEdge, atlasLineageOnDemandContext)) { continue; } @@ -402,7 +435,7 @@ private void traverseEdgesOnDemand(AtlasVertex datasetVertex, boolean isInput, i } AtlasPerfMetrics.MetricRecorder traverseEdgesOnDemandGetEdgesOut = RequestContext.get().startMetricRecord("traverseEdgesOnDemandGetEdgesOut"); - Iterator outgoingEdges = processVertex.getEdges(OUT, isInput ? PROCESS_INPUTS_EDGE : PROCESS_OUTPUTS_EDGE).iterator(); + Iterator outgoingEdges = connecterVertex.getEdges(OUT, isInput ? lineageInputLabel : lineageOutputLabel).iterator(); RequestContext.get().endMetricRecord(traverseEdgesOnDemandGetEdgesOut); while (outgoingEdges.hasNext()) { @@ -413,11 +446,11 @@ private void traverseEdgesOnDemand(AtlasVertex datasetVertex, boolean isInput, i continue; } - if (checkForOffset(outgoingEdge, processVertex, atlasLineageOnDemandContext, ret)) { + if (checkForOffset(outgoingEdge, connecterVertex, atlasLineageOnDemandContext, ret)) { continue; } if (incrementAndCheckIfRelationsLimitReached(outgoingEdge, isInput, atlasLineageOnDemandContext, ret, depth, entitiesTraversed, direction, visitedVertices)) { - String processGuid = AtlasGraphUtilsV2.getIdFromVertex(processVertex); + String processGuid = AtlasGraphUtilsV2.getIdFromVertex(connecterVertex); LineageInfoOnDemand entityOnDemandInfo = ret.getRelationsOnDemand().get(processGuid); if (entityOnDemandInfo == null) continue; @@ -453,23 +486,27 @@ private static void setEntityLimitReachedFlag(boolean isInput, AtlasLineageOnDem private void traverseEdgesUsingBFS(String baseGuid, AtlasLineageListContext lineageListContext, AtlasLineageListInfo ret) throws AtlasBaseException { AtlasPerfMetrics.MetricRecorder metricRecorder = RequestContext.get().startMetricRecord("traverseEdgesUsingBFS"); - + String lineageType = lineageListContext.getLineageType(); Set visitedVertices = new HashSet<>(); visitedVertices.add(baseGuid); Set skippedVertices = new HashSet<>(); Queue traversalQueue = new LinkedList<>(); AtlasVertex baseVertex = AtlasGraphUtilsV2.findByGuid(this.graph, baseGuid); - boolean isBaseNodeDataset = validateEntityTypeAndCheckIfDataSet(baseGuid); - enqueueNeighbours(baseVertex, isBaseNodeDataset, lineageListContext, traversalQueue, visitedVertices, skippedVertices); + HashMap dataTypeMap = validateAndGetEntityTypeMap(baseGuid); + + boolean isNotConnecterVertex = lineageType.equals(PRODUCT_ASSET_LINEAGE) + ? dataTypeMap.get(IS_DATA_PRODUCT) + : dataTypeMap.get(IS_DATASET); + enqueueNeighbours(baseVertex, dataTypeMap, lineageListContext, traversalQueue, visitedVertices, skippedVertices); int currentDepth = 0; - int currentLevel = isBaseNodeDataset? 0: 1; + int currentLevel = isNotConnecterVertex? 0: 1; while (!traversalQueue.isEmpty() && !lineageListContext.isEntityLimitReached() && currentDepth < lineageListContext.getDepth()) { currentDepth++; // update level at every alternate depth - if ((isBaseNodeDataset && currentDepth % 2 != 0) || (!isBaseNodeDataset && currentDepth % 2 == 0)) + if ((isNotConnecterVertex && currentDepth % 2 != 0) || (!isNotConnecterVertex && currentDepth % 2 == 0)) currentLevel++; int entitiesInCurrentDepth = traversalQueue.size(); @@ -482,20 +519,20 @@ private void traverseEdgesUsingBFS(String baseGuid, AtlasLineageListContext line if (Objects.isNull(currentVertex)) throw new AtlasBaseException("Found null vertex during lineage graph traversal for guid: " + currentGUID); - boolean isDataset = validateEntityTypeAndCheckIfDataSet(currentGUID); + HashMap currentEntityDataTypeMap = validateAndGetEntityTypeMap(currentGUID); if (!lineageListContext.evaluateVertexFilter(currentVertex)) { - enqueueNeighbours(currentVertex, isDataset, lineageListContext, traversalQueue, visitedVertices, skippedVertices); + enqueueNeighbours(currentVertex, currentEntityDataTypeMap, lineageListContext, traversalQueue, visitedVertices, skippedVertices); continue; } if (checkOffsetAndSkipEntity(lineageListContext, ret)) { skippedVertices.add(currentGUID); - enqueueNeighbours(currentVertex, isDataset, lineageListContext, traversalQueue, visitedVertices, skippedVertices); + enqueueNeighbours(currentVertex, currentEntityDataTypeMap, lineageListContext, traversalQueue, visitedVertices, skippedVertices); continue; } lineageListContext.incrementEntityCount(); appendToResult(currentVertex, lineageListContext, ret, currentLevel); - enqueueNeighbours(currentVertex, isDataset, lineageListContext, traversalQueue, visitedVertices, skippedVertices); + enqueueNeighbours(currentVertex, currentEntityDataTypeMap, lineageListContext, traversalQueue, visitedVertices, skippedVertices); if (isLastEntityInLastDepth(lineageListContext.getDepth(), currentDepth, entitiesInCurrentDepth, i)) { ret.setHasMore(false); lineageListContext.setHasMoreUpdated(true); @@ -509,14 +546,20 @@ private void traverseEdgesUsingBFS(String baseGuid, AtlasLineageListContext line RequestContext.get().endMetricRecord(metricRecorder); } - private void enqueueNeighbours(AtlasVertex currentVertex, boolean isDataset, AtlasLineageListContext lineageListContext, + private void enqueueNeighbours(AtlasVertex currentVertex, HashMap dataTypeMap, AtlasLineageListContext lineageListContext, Queue traversalQueue, Set visitedVertices, Set skippedVertices) { AtlasPerfMetrics.MetricRecorder traverseEdgesOnDemandGetEdges = RequestContext.get().startMetricRecord("traverseEdgesOnDemandGetEdges"); Iterator edges; - if (isDataset) - edges = currentVertex.getEdges(IN, isInputDirection(lineageListContext) ? PROCESS_OUTPUTS_EDGE : PROCESS_INPUTS_EDGE).iterator(); + String lineageInputLabel = RequestContext.get().getLineageInputLabel(); + String lineageOutputLabel = RequestContext.get().getLineageOutputLabel(); + String lineageType = lineageListContext.getLineageType(); + boolean isConnecterVertex = lineageType.equals(PRODUCT_ASSET_LINEAGE) + ? !dataTypeMap.get(IS_DATA_PRODUCT) + : !dataTypeMap.get(IS_DATASET); + if (!isConnecterVertex) + edges = currentVertex.getEdges(IN, isInputDirection(lineageListContext) ? lineageOutputLabel : lineageInputLabel).iterator(); else - edges = currentVertex.getEdges(OUT, isInputDirection(lineageListContext) ? PROCESS_INPUTS_EDGE : PROCESS_OUTPUTS_EDGE).iterator(); + edges = currentVertex.getEdges(OUT, isInputDirection(lineageListContext) ? lineageInputLabel : lineageOutputLabel).iterator(); RequestContext.get().endMetricRecord(traverseEdgesOnDemandGetEdges); while (edges.hasNext()) { @@ -524,7 +567,7 @@ private void enqueueNeighbours(AtlasVertex currentVertex, boolean isDataset, Atl if (!lineageListContext.evaluateTraversalFilter(currentEdge)) continue; AtlasVertex neighbourVertex; - if (isDataset) + if (!isConnecterVertex) neighbourVertex = currentEdge.getOutVertex(); else neighbourVertex = currentEdge.getInVertex(); @@ -652,19 +695,21 @@ else if (!isInput && ! isInVertexVisited) } private void setHasDownstream(AtlasLineageOnDemandContext atlasLineageOnDemandContext, AtlasVertex inVertex, LineageInfoOnDemand inLineageInfo) { - List filteredEdges = getFilteredAtlasEdges(inVertex, IN, PROCESS_INPUTS_EDGE, atlasLineageOnDemandContext); + String lineageInputLabel = RequestContext.get().getLineageInputLabel(); + List filteredEdges = getFilteredAtlasEdges(inVertex, IN, lineageInputLabel, atlasLineageOnDemandContext); if (!filteredEdges.isEmpty()) { inLineageInfo.setHasDownstream(true); - inLineageInfo.setTotalOutputRelationsCount(filteredEdges.size()); } + inLineageInfo.setTotalOutputRelationsCount(filteredEdges.size()); } private void setHasUpstream(AtlasLineageOnDemandContext atlasLineageOnDemandContext, AtlasVertex outVertex, LineageInfoOnDemand outLineageInfo) { - List filteredEdges = getFilteredAtlasEdges(outVertex, IN, PROCESS_OUTPUTS_EDGE, atlasLineageOnDemandContext); + String lineageOutputLabel = RequestContext.get().getLineageOutputLabel(); + List filteredEdges = getFilteredAtlasEdges(outVertex, IN, lineageOutputLabel, atlasLineageOnDemandContext); if (!filteredEdges.isEmpty()) { outLineageInfo.setHasUpstream(true); - outLineageInfo.setTotalInputRelationsCount(filteredEdges.size()); } + outLineageInfo.setTotalInputRelationsCount(filteredEdges.size()); } private List getFilteredAtlasEdges(AtlasVertex outVertex, AtlasEdgeDirection direction, String processEdgeLabel, AtlasLineageOnDemandContext atlasLineageOnDemandContext) { @@ -819,7 +864,8 @@ private AtlasLineageInfo getLineageInfo(AtlasLineageContext lineageContext, Line private AtlasLineageInfo getLineageInfoV2(AtlasLineageContext lineageContext) throws AtlasBaseException { AtlasPerfMetrics.MetricRecorder metric = RequestContext.get().startMetricRecord("getLineageInfoV2"); - + String lineageInputLabel = RequestContext.get().getLineageInputLabel(); + String lineageOutputLabel = RequestContext.get().getLineageOutputLabel(); int depth = lineageContext.getDepth(); String guid = lineageContext.getGuid(); LineageDirection direction = lineageContext.getDirection(); @@ -846,7 +892,7 @@ private AtlasLineageInfo getLineageInfoV2(AtlasLineageContext lineageContext) th // make one hop to the next dataset vertices from process vertex and traverse with 'depth = depth - 1' if (direction == INPUT || direction == BOTH) { - Iterator processEdges = vertexEdgeCache.getEdges(processVertex, OUT, PROCESS_INPUTS_EDGE).iterator(); + Iterator processEdges = vertexEdgeCache.getEdges(processVertex, OUT, lineageInputLabel).iterator(); List qualifyingEdges = getQualifyingProcessEdges(processEdges, lineageContext); ret.setHasChildrenForDirection(getGuid(processVertex), new LineageChildrenInfo(INPUT, hasMoreChildren(qualifyingEdges))); @@ -861,7 +907,7 @@ private AtlasLineageInfo getLineageInfoV2(AtlasLineageContext lineageContext) th } if (direction == OUTPUT || direction == BOTH) { - Iterator processEdges = vertexEdgeCache.getEdges(processVertex, OUT, PROCESS_OUTPUTS_EDGE).iterator(); + Iterator processEdges = vertexEdgeCache.getEdges(processVertex, OUT, lineageOutputLabel).iterator(); List qualifyingEdges = getQualifyingProcessEdges(processEdges, lineageContext); ret.setHasChildrenForDirection(getGuid(processVertex), new LineageChildrenInfo(OUTPUT, hasMoreChildren(qualifyingEdges))); @@ -911,12 +957,13 @@ private int getLineageMaxNodeAllowedCount() { } private String getEdgeLabel(AtlasEdge edge) { + String lineageInputLabel = RequestContext.get().getLineageInputLabel(); AtlasVertex inVertex = edge.getInVertex(); AtlasVertex outVertex = edge.getOutVertex(); String inGuid = AtlasGraphUtilsV2.getIdFromVertex(inVertex); String outGuid = AtlasGraphUtilsV2.getIdFromVertex(outVertex); String relationGuid = AtlasGraphUtilsV2.getEncodedProperty(edge, RELATIONSHIP_GUID_PROPERTY_KEY, String.class); - boolean isInputEdge = edge.getLabel().equalsIgnoreCase(PROCESS_INPUTS_EDGE); + boolean isInputEdge = edge.getLabel().equalsIgnoreCase(lineageInputLabel); if (isLineageOnDemandEnabled()) { return getEdgeLabelFromGuids(isInputEdge, inGuid, outGuid); @@ -1087,10 +1134,11 @@ private List> getUnvisitedProcessEdgesWithOutputVertexId lineageContext.getIgnoredProcesses().contains(processVertex.getProperty(Constants.ENTITY_TYPE_PROPERTY_KEY, String.class))) { return Collections.emptyList(); } - + String lineageInputLabel = RequestContext.get().getLineageInputLabel(); + String lineageOutputLabel = RequestContext.get().getLineageOutputLabel(); List> unvisitedProcessEdgesWithOutputVertexIds = new ArrayList<>(); - Iterable outgoingEdges = vertexEdgeCache.getEdges(processVertex, OUT, isInput ? PROCESS_INPUTS_EDGE : PROCESS_OUTPUTS_EDGE); + Iterable outgoingEdges = vertexEdgeCache.getEdges(processVertex, OUT, isInput ? lineageInputLabel : lineageOutputLabel); for (AtlasEdge outgoingEdge : outgoingEdges) { AtlasVertex outputVertex = outgoingEdge.getInVertex(); @@ -1177,7 +1225,9 @@ private void addLimitlessVerticesToResult(boolean isInput, int depth, Set processEdges = vertexEdgeCache.getEdges(currentVertex, IN, isInput ? PROCESS_OUTPUTS_EDGE : PROCESS_INPUTS_EDGE); + String lineageInputLabel = RequestContext.get().getLineageInputLabel(); + String lineageOutputLabel = RequestContext.get().getLineageOutputLabel(); + List processEdges = vertexEdgeCache.getEdges(currentVertex, IN, isInput ? lineageOutputLabel : lineageInputLabel); // Filter lineages based on ignored process types processEdges = CollectionUtils.isNotEmpty(lineageContext.getIgnoredProcesses()) ? @@ -1198,8 +1248,10 @@ private void processLastLevel(AtlasVertex currentVertex, boolean isInput, AtlasL private boolean childHasOnlySelfCycle(AtlasVertex processVertex, AtlasVertex currentVertex, boolean isInput) { AtlasPerfMetrics.MetricRecorder metricRecorder = RequestContext.get().startMetricRecord("childHasSelfCycle"); + String lineageInputLabel = RequestContext.get().getLineageInputLabel(); + String lineageOutputLabel = RequestContext.get().getLineageOutputLabel(); Iterator processEdgeIterator; - processEdgeIterator = processVertex.getEdges(OUT, isInput ? PROCESS_INPUTS_EDGE : PROCESS_OUTPUTS_EDGE).iterator(); + processEdgeIterator = processVertex.getEdges(OUT, isInput ? lineageInputLabel : lineageOutputLabel).iterator(); Set processOutputEdges = new HashSet<>(); while (processEdgeIterator.hasNext()) { processOutputEdges.add(processEdgeIterator.next()); @@ -1215,8 +1267,9 @@ private List getEdgesOfProcess(boolean isInput, AtlasLineageContext l lineageContext.getIgnoredProcesses().contains(processVertex.getProperty(Constants.ENTITY_TYPE_PROPERTY_KEY, String.class))) { return Collections.emptyList(); } - - return vertexEdgeCache.getEdges(processVertex, OUT, isInput ? PROCESS_INPUTS_EDGE : PROCESS_OUTPUTS_EDGE) + String lineageInputLabel = RequestContext.get().getLineageInputLabel(); + String lineageOutputLabel = RequestContext.get().getLineageOutputLabel(); + return vertexEdgeCache.getEdges(processVertex, OUT, isInput ? lineageInputLabel : lineageOutputLabel) .stream() .filter(edge -> shouldProcessEdge(lineageContext, edge) && vertexMatchesEvaluation(edge.getInVertex(), lineageContext)) .sorted(Comparator.comparing(edge -> edge.getProperty("_r__guid", String.class))) @@ -1241,8 +1294,10 @@ private boolean shouldProcessEdge(AtlasLineageContext lineageContext, AtlasEdge } private List getEdgesOfCurrentVertex(AtlasVertex currentVertex, boolean isInput, AtlasLineageContext lineageContext) { + String lineageInputLabel = RequestContext.get().getLineageInputLabel(); + String lineageOutputLabel = RequestContext.get().getLineageOutputLabel(); return vertexEdgeCache - .getEdges(currentVertex, IN, isInput ? PROCESS_OUTPUTS_EDGE : PROCESS_INPUTS_EDGE) + .getEdges(currentVertex, IN, isInput ? lineageOutputLabel : lineageInputLabel) .stream() .sorted(Comparator.comparing(edge -> edge.getProperty("_r__guid", String.class))) .filter(edge -> shouldProcessEdge(lineageContext, edge)) @@ -1331,7 +1386,7 @@ private boolean processVirtualEdge(final AtlasEdge incomingEdge, final AtlasEdge AtlasLineageContext lineageContext) throws AtlasBaseException { final Map entities = lineageInfo.getGuidEntityMap(); final Set relations = lineageInfo.getRelations(); - + String lineageInputLabel = RequestContext.get().getLineageInputLabel(); AtlasVertex inVertex = incomingEdge.getInVertex(); AtlasVertex outVertex = outgoingEdge.getInVertex(); AtlasVertex processVertex = outgoingEdge.getOutVertex(); @@ -1339,7 +1394,7 @@ private boolean processVirtualEdge(final AtlasEdge incomingEdge, final AtlasEdge String outGuid = AtlasGraphUtilsV2.getIdFromVertex(outVertex); String processGuid = AtlasGraphUtilsV2.getIdFromVertex(processVertex); String relationGuid = null; - boolean isInputEdge = incomingEdge.getLabel().equalsIgnoreCase(PROCESS_INPUTS_EDGE); + boolean isInputEdge = incomingEdge.getLabel().equalsIgnoreCase(lineageInputLabel); if (!entities.containsKey(inGuid)) { AtlasEntityHeader entityHeader = entityRetriever.toAtlasEntityHeader(inVertex, lineageContext.getAttributes()); @@ -1370,7 +1425,7 @@ private boolean processVirtualEdge(final AtlasEdge incomingEdge, final AtlasEdge private void processEdges(final AtlasEdge incomingEdge, AtlasEdge outgoingEdge, AtlasLineageInfo lineageInfo, AtlasLineageContext lineageContext) throws AtlasBaseException { AtlasPerfMetrics.MetricRecorder metric = RequestContext.get().startMetricRecord("processEdges"); - + String lineageInputLabel = RequestContext.get().getLineageInputLabel(); final Map entities = lineageInfo.getGuidEntityMap(); final Set relations = lineageInfo.getRelations(); @@ -1398,14 +1453,14 @@ private void processEdges(final AtlasEdge incomingEdge, AtlasEdge outgoingEdge, } String relationGuid = AtlasGraphUtilsV2.getEncodedProperty(incomingEdge, RELATIONSHIP_GUID_PROPERTY_KEY, String.class); - if (incomingEdge.getLabel().equalsIgnoreCase(PROCESS_INPUTS_EDGE)) { + if (incomingEdge.getLabel().equalsIgnoreCase(lineageInputLabel)) { relations.add(new LineageRelation(leftGuid, processGuid, relationGuid)); } else { relations.add(new LineageRelation(processGuid, leftGuid, relationGuid)); } relationGuid = AtlasGraphUtilsV2.getEncodedProperty(outgoingEdge, RELATIONSHIP_GUID_PROPERTY_KEY, String.class); - if (outgoingEdge.getLabel().equalsIgnoreCase(PROCESS_INPUTS_EDGE)) { + if (outgoingEdge.getLabel().equalsIgnoreCase(lineageInputLabel)) { relations.add(new LineageRelation(rightGuid, processGuid, relationGuid)); } else { relations.add(new LineageRelation(processGuid, rightGuid, relationGuid)); @@ -1419,13 +1474,13 @@ private void processEdge(final AtlasEdge edge, AtlasLineageInfo lineageInfo, final Map entities = lineageInfo.getGuidEntityMap(); final Set relations = lineageInfo.getRelations(); - + String lineageInputLabel = RequestContext.get().getLineageInputLabel(); AtlasVertex inVertex = edge.getInVertex(); AtlasVertex outVertex = edge.getOutVertex(); String inGuid = AtlasGraphUtilsV2.getIdFromVertex(inVertex); String outGuid = AtlasGraphUtilsV2.getIdFromVertex(outVertex); String relationGuid = AtlasGraphUtilsV2.getEncodedProperty(edge, RELATIONSHIP_GUID_PROPERTY_KEY, String.class); - boolean isInputEdge = edge.getLabel().equalsIgnoreCase(PROCESS_INPUTS_EDGE); + boolean isInputEdge = edge.getLabel().equalsIgnoreCase(lineageInputLabel); if (!entities.containsKey(inGuid)) { AtlasEntityHeader entityHeader = entityRetriever.toAtlasEntityHeaderWithClassifications(inVertex, lineageContext.getAttributes()); @@ -1448,12 +1503,13 @@ private void processEdge(final AtlasEdge edge, AtlasLineageInfo lineageInfo, private void processEdge(final AtlasEdge edge, final Map entities, final Set relations, AtlasLineageContext lineageContext) throws AtlasBaseException { //Backward compatibility method + String lineageInputLabel = RequestContext.get().getLineageInputLabel(); AtlasVertex inVertex = edge.getInVertex(); AtlasVertex outVertex = edge.getOutVertex(); String inGuid = AtlasGraphUtilsV2.getIdFromVertex(inVertex); String outGuid = AtlasGraphUtilsV2.getIdFromVertex(outVertex); String relationGuid = AtlasGraphUtilsV2.getEncodedProperty(edge, RELATIONSHIP_GUID_PROPERTY_KEY, String.class); - boolean isInputEdge = edge.getLabel().equalsIgnoreCase(PROCESS_INPUTS_EDGE); + boolean isInputEdge = edge.getLabel().equalsIgnoreCase(lineageInputLabel); if (!entities.containsKey(inGuid)) { AtlasEntityHeader entityHeader = entityRetriever.toAtlasEntityHeaderWithClassifications(inVertex, lineageContext.getAttributes()); @@ -1478,9 +1534,10 @@ private void processEdge(final AtlasEdge edge, final AtlasLineageOnDemandInfo li private void processEdge(final AtlasEdge edge, final Map entities, final Set relations, final Set visitedEdges, final Set attributes, int level, AtomicInteger traversalOrder) throws AtlasBaseException { AtlasPerfMetrics.MetricRecorder metricRecorder = RequestContext.get().startMetricRecord("processEdge"); + AtlasVertex inVertex = edge.getInVertex(); AtlasVertex outVertex = edge.getOutVertex(); - + String lineageInputLabel = RequestContext.get().getLineageInputLabel(); String inTypeName = AtlasGraphUtilsV2.getTypeName(inVertex); AtlasEntityType inEntityType = atlasTypeRegistry.getEntityTypeByName(inTypeName); if (inEntityType == null) { @@ -1491,7 +1548,7 @@ private void processEdge(final AtlasEdge edge, final Map> getRemovedElementsMap() { return removedElementsMap; } + public String getLineageInputLabel() { + return lineageInputLabel; + } + + public void setLineageInputLabel(String lineageInputLabel) { + this.lineageInputLabel = lineageInputLabel; + } + + public String getLineageOutputLabel() { + return lineageOutputLabel; + } + + public void setLineageOutputLabel(String lineageOutputLabel) { + this.lineageOutputLabel = lineageOutputLabel; + } public Map> getNewElementsCreatedMap() { return newElementsCreatedMap; } diff --git a/webapp/src/main/java/org/apache/atlas/web/rest/LineageREST.java b/webapp/src/main/java/org/apache/atlas/web/rest/LineageREST.java index 829b9aaf28..fb979c5f85 100644 --- a/webapp/src/main/java/org/apache/atlas/web/rest/LineageREST.java +++ b/webapp/src/main/java/org/apache/atlas/web/rest/LineageREST.java @@ -48,6 +48,7 @@ import javax.ws.rs.core.MediaType; import java.util.HashMap; import java.util.Map; +import java.util.Objects; /** * REST interface for an entity's lineage information @@ -109,7 +110,6 @@ public AtlasLineageOnDemandInfo getLineageGraph(@PathParam("guid") String guid, if (AtlasPerfTracer.isPerfTraceEnabled(PERF_LOG)) { perf = AtlasPerfTracer.getPerfTracer(PERF_LOG, "LineageREST.getOnDemandLineageGraph(" + guid + "," + lineageOnDemandRequest + ")"); } - return atlasLineageService.getAtlasLineageInfo(guid, lineageOnDemandRequest); } finally { AtlasPerfTracer.log(perf);