From d32bb9f51514340bb89baf08b94e12c66f83008d Mon Sep 17 00:00:00 2001 From: Hritik Rai <162408549+hr2904@users.noreply.github.com> Date: Tue, 23 Jul 2024 14:24:44 +0530 Subject: [PATCH] Revert "DG-1720 Added Lineage support for DataProducts <> Assets" --- .../atlas/discovery/AtlasLineageService.java | 2 +- .../atlas/discovery/EntityLineageService.java | 109 +++++++----------- .../apache/atlas/web/rest/LineageREST.java | 4 +- 3 files changed, 43 insertions(+), 72 deletions(-) diff --git a/repository/src/main/java/org/apache/atlas/discovery/AtlasLineageService.java b/repository/src/main/java/org/apache/atlas/discovery/AtlasLineageService.java index 3842a9b10f..69c3e387bc 100644 --- a/repository/src/main/java/org/apache/atlas/discovery/AtlasLineageService.java +++ b/repository/src/main/java/org/apache/atlas/discovery/AtlasLineageService.java @@ -71,7 +71,7 @@ public interface AtlasLineageService { * @param lineageOnDemandRequest lineage on demand request object * @return AtlasLineageInfo */ - AtlasLineageOnDemandInfo getAtlasLineageInfo(String entityGuid, LineageOnDemandRequest lineageOnDemandRequest, String lineageType) throws AtlasBaseException; + AtlasLineageOnDemandInfo getAtlasLineageInfo(String entityGuid, LineageOnDemandRequest lineageOnDemandRequest) throws AtlasBaseException; /** * @param entityGuid unique ID of the entity diff --git a/repository/src/main/java/org/apache/atlas/discovery/EntityLineageService.java b/repository/src/main/java/org/apache/atlas/discovery/EntityLineageService.java index f09d8ecc50..976a254546 100644 --- a/repository/src/main/java/org/apache/atlas/discovery/EntityLineageService.java +++ b/repository/src/main/java/org/apache/atlas/discovery/EntityLineageService.java @@ -69,7 +69,7 @@ import static org.apache.atlas.AtlasClient.DATA_SET_SUPER_TYPE; import static org.apache.atlas.AtlasClient.PROCESS_SUPER_TYPE; -import static org.apache.atlas.AtlasErrorCode.*; +import static org.apache.atlas.AtlasErrorCode.INSTANCE_LINEAGE_QUERY_FAILED; import static org.apache.atlas.model.instance.AtlasEntity.Status.DELETED; import static org.apache.atlas.model.lineage.AtlasLineageInfo.LineageDirection.*; import static org.apache.atlas.repository.Constants.*; @@ -84,25 +84,11 @@ public class EntityLineageService implements AtlasLineageService { private static final String PROCESS_INPUTS_EDGE = "__Process.inputs"; private static final String PROCESS_OUTPUTS_EDGE = "__Process.outputs"; - private static final String OUTPUT_PORT_EDGE = "__Asset.outputPortDataProducts"; - private static final String INPUT_PORT_EDGE = "__Asset.inputPortDataProducts"; - - /** - * String[] => [Input edge Label, Output Edge Label] - */ - public static final HashMap LINEAGE_MAP = new HashMap(){{ - put(DATASET_PROCESS_LINEAGE, new String[]{PROCESS_INPUTS_EDGE, PROCESS_OUTPUTS_EDGE}); - put(PRODUCT_ASSET_LINEAGE, new String[]{INPUT_PORT_EDGE, OUTPUT_PORT_EDGE}); - }}; private static final String COLUMNS = "columns"; private static final boolean LINEAGE_USING_GREMLIN = AtlasConfiguration.LINEAGE_USING_GREMLIN.getBoolean(); private static final Integer DEFAULT_LINEAGE_MAX_NODE_COUNT = 9000; private static final int LINEAGE_ON_DEMAND_DEFAULT_DEPTH = 3; private static final String SEPARATOR = "->"; - public static final String IS_DATA_PRODUCT = "isDataProduct"; - public static final String IS_DATASET = "isProcess"; - public static final String PRODUCT_ASSET_LINEAGE = "ProductAssetLineage"; - public static final String DATASET_PROCESS_LINEAGE = "DatasetProcessLineage"; private final AtlasGraph graph; private final AtlasGremlinQueryProvider gremlinQueryProvider; @@ -184,13 +170,13 @@ public AtlasLineageInfo getAtlasLineageInfo(String guid, LineageDirection direct @Override @GraphTransaction - public AtlasLineageOnDemandInfo getAtlasLineageInfo(String guid, LineageOnDemandRequest lineageOnDemandRequest, String lineageType) throws AtlasBaseException { + public AtlasLineageOnDemandInfo getAtlasLineageInfo(String guid, LineageOnDemandRequest lineageOnDemandRequest) throws AtlasBaseException { AtlasPerfMetrics.MetricRecorder metricRecorder = RequestContext.get().startMetricRecord("getAtlasLineageInfo"); RequestContext.get().setRelationAttrsForSearch(lineageOnDemandRequest.getRelationAttributes()); AtlasLineageOnDemandContext atlasLineageOnDemandContext = new AtlasLineageOnDemandContext(lineageOnDemandRequest, atlasTypeRegistry); - HashMap dataTypeMap = validateAndGetEntityTypeMap(guid); - AtlasLineageOnDemandInfo ret = getLineageInfoOnDemand(guid, atlasLineageOnDemandContext, dataTypeMap, lineageType); + boolean isDataSet = validateEntityTypeAndCheckIfDataSet(guid); + AtlasLineageOnDemandInfo ret = getLineageInfoOnDemand(guid, atlasLineageOnDemandContext, isDataSet); appendLineageOnDemandPayload(ret, lineageOnDemandRequest); // filtering out on-demand relations which has input & output nodes within the limit cleanupRelationsOnDemand(ret); @@ -213,24 +199,20 @@ public AtlasLineageListInfo getLineageListInfoOnDemand(String guid, LineageListR return ret; } - private HashMap validateAndGetEntityTypeMap(String guid) throws AtlasBaseException { + private boolean validateEntityTypeAndCheckIfDataSet(String guid) throws AtlasBaseException { String typeName = entityRetriever.getEntityVertex(guid).getProperty(Constants.TYPE_NAME_PROPERTY_KEY, String.class); AtlasEntityType entityType = atlasTypeRegistry.getEntityTypeByName(typeName); if (entityType == null) { throw new AtlasBaseException(AtlasErrorCode.TYPE_NAME_NOT_FOUND, typeName); } - HashMap dataTypeMap = new HashMap<>(); boolean isProcess = entityType.getTypeAndAllSuperTypes().contains(PROCESS_SUPER_TYPE); - boolean isDataProduct = entityType.getTypeName().equals(DATA_PRODUCT_ENTITY_TYPE); - dataTypeMap.put(IS_DATA_PRODUCT, isDataProduct); - dataTypeMap.put(IS_DATASET, !isProcess); if (!isProcess) { boolean isDataSet = entityType.getTypeAndAllSuperTypes().contains(DATA_SET_SUPER_TYPE); if (!isDataSet) { throw new AtlasBaseException(AtlasErrorCode.INVALID_LINEAGE_ENTITY_TYPE, guid, typeName); } } - return dataTypeMap; + return !isProcess; } private LineageOnDemandConstraints getLineageConstraints(String guid, LineageOnDemandBaseParams defaultParams) { @@ -295,10 +277,8 @@ private void cleanupRelationsOnDemand(AtlasLineageOnDemandInfo lineageInfo) { } } - private AtlasLineageOnDemandInfo getLineageInfoOnDemand(String guid, AtlasLineageOnDemandContext atlasLineageOnDemandContext, HashMap dataTypeMap, String lineageType) throws AtlasBaseException { + private AtlasLineageOnDemandInfo getLineageInfoOnDemand(String guid, AtlasLineageOnDemandContext atlasLineageOnDemandContext, boolean isDataSet) throws AtlasBaseException { AtlasPerfMetrics.MetricRecorder metricRecorder = RequestContext.get().startMetricRecord("getLineageInfoOnDemand"); - if(StringUtils.isEmpty(lineageType)) - lineageType = DATASET_PROCESS_LINEAGE; LineageOnDemandConstraints lineageConstraintsByGuid = getAndValidateLineageConstraintsByGuid(guid, atlasLineageOnDemandContext); AtlasLineageOnDemandInfo.LineageDirection direction = lineageConstraintsByGuid.getDirection(); @@ -314,35 +294,27 @@ private AtlasLineageOnDemandInfo getLineageInfoOnDemand(String guid, AtlasLineag AtomicInteger inputEntitiesTraversed = new AtomicInteger(0); AtomicInteger outputEntitiesTraversed = new AtomicInteger(0); AtomicInteger traversalOrder = new AtomicInteger(1); - String[] lineageEdgeLabels = LINEAGE_MAP.get(lineageType); - boolean isConnecterVertex; - - - isConnecterVertex = lineageType.equals(PRODUCT_ASSET_LINEAGE) - ? !dataTypeMap.get(IS_DATA_PRODUCT) - : !dataTypeMap.get(IS_DATASET); - - if (!isConnecterVertex) { - AtlasVertex datasetVertex = AtlasGraphUtilsV2.findByGuid(this.graph, guid); - if (direction == AtlasLineageOnDemandInfo.LineageDirection.INPUT || direction == AtlasLineageOnDemandInfo.LineageDirection.BOTH) - traverseEdgesOnDemand(datasetVertex, true, depth, level, new HashSet<>(), atlasLineageOnDemandContext, ret, guid, inputEntitiesTraversed, traversalOrder, lineageType); - if (direction == AtlasLineageOnDemandInfo.LineageDirection.OUTPUT || direction == AtlasLineageOnDemandInfo.LineageDirection.BOTH) - traverseEdgesOnDemand(datasetVertex, false, depth, level, new HashSet<>(), atlasLineageOnDemandContext, ret, guid, outputEntitiesTraversed, traversalOrder, lineageType); - AtlasEntityHeader baseEntityHeader = entityRetriever.toAtlasEntityHeader(datasetVertex, atlasLineageOnDemandContext.getAttributes()); - setGraphTraversalMetadata(level, traversalOrder, baseEntityHeader); - ret.getGuidEntityMap().put(guid, baseEntityHeader); - } else { - AtlasVertex processVertex = AtlasGraphUtilsV2.findByGuid(this.graph, guid); - // make one hop to the next dataset vertices from process vertex and traverse with 'depth = depth - 1' - if (direction == AtlasLineageOnDemandInfo.LineageDirection.INPUT || direction == AtlasLineageOnDemandInfo.LineageDirection.BOTH) { - Iterator processEdges = processVertex.getEdges(AtlasEdgeDirection.OUT, lineageEdgeLabels[0]).iterator(); - traverseEdgesOnDemand(processEdges, true, depth, level, atlasLineageOnDemandContext, ret, processVertex, guid, inputEntitiesTraversed, traversalOrder, lineageType); - } - if (direction == AtlasLineageOnDemandInfo.LineageDirection.OUTPUT || direction == AtlasLineageOnDemandInfo.LineageDirection.BOTH) { - Iterator processEdges = processVertex.getEdges(AtlasEdgeDirection.OUT, lineageEdgeLabels[1]).iterator(); - traverseEdgesOnDemand(processEdges, false, depth, level, atlasLineageOnDemandContext, ret, processVertex, guid, outputEntitiesTraversed, traversalOrder, lineageType); - } + if (isDataSet) { + AtlasVertex datasetVertex = AtlasGraphUtilsV2.findByGuid(this.graph, guid); + if (direction == AtlasLineageOnDemandInfo.LineageDirection.INPUT || direction == AtlasLineageOnDemandInfo.LineageDirection.BOTH) + traverseEdgesOnDemand(datasetVertex, true, depth, level, new HashSet<>(), atlasLineageOnDemandContext, ret, guid, inputEntitiesTraversed, traversalOrder); + if (direction == AtlasLineageOnDemandInfo.LineageDirection.OUTPUT || direction == AtlasLineageOnDemandInfo.LineageDirection.BOTH) + traverseEdgesOnDemand(datasetVertex, false, depth, level, new HashSet<>(), atlasLineageOnDemandContext, ret, guid, outputEntitiesTraversed, traversalOrder); + AtlasEntityHeader baseEntityHeader = entityRetriever.toAtlasEntityHeader(datasetVertex, atlasLineageOnDemandContext.getAttributes()); + setGraphTraversalMetadata(level, traversalOrder, baseEntityHeader); + ret.getGuidEntityMap().put(guid, baseEntityHeader); + } else { + AtlasVertex processVertex = AtlasGraphUtilsV2.findByGuid(this.graph, guid); + // make one hop to the next dataset vertices from process vertex and traverse with 'depth = depth - 1' + if (direction == AtlasLineageOnDemandInfo.LineageDirection.INPUT || direction == AtlasLineageOnDemandInfo.LineageDirection.BOTH) { + Iterator processEdges = processVertex.getEdges(AtlasEdgeDirection.OUT, PROCESS_INPUTS_EDGE).iterator(); + traverseEdgesOnDemand(processEdges, true, depth, level, atlasLineageOnDemandContext, ret, processVertex, guid, inputEntitiesTraversed, traversalOrder); + } + if (direction == AtlasLineageOnDemandInfo.LineageDirection.OUTPUT || direction == AtlasLineageOnDemandInfo.LineageDirection.BOTH) { + Iterator processEdges = processVertex.getEdges(AtlasEdgeDirection.OUT, PROCESS_OUTPUTS_EDGE).iterator(); + traverseEdgesOnDemand(processEdges, false, depth, level, atlasLineageOnDemandContext, ret, processVertex, guid, outputEntitiesTraversed, traversalOrder); } + } RequestContext.get().endMetricRecord(metricRecorder); return ret; } @@ -353,7 +325,7 @@ private static void setGraphTraversalMetadata(int level, AtomicInteger traversal baseEntityHeader.setFinishTime(traversalOrder.get()); } - private void traverseEdgesOnDemand(Iterator processEdges, boolean isInput, int depth, int level, AtlasLineageOnDemandContext atlasLineageOnDemandContext, AtlasLineageOnDemandInfo ret, AtlasVertex processVertex, String baseGuid, AtomicInteger entitiesTraversed, AtomicInteger traversalOrder, String lineageType) throws AtlasBaseException { + private void traverseEdgesOnDemand(Iterator processEdges, boolean isInput, int depth, int level, AtlasLineageOnDemandContext atlasLineageOnDemandContext, AtlasLineageOnDemandInfo ret, AtlasVertex processVertex, String baseGuid, AtomicInteger entitiesTraversed, AtomicInteger traversalOrder) throws AtlasBaseException { AtlasLineageOnDemandInfo.LineageDirection direction = isInput ? AtlasLineageOnDemandInfo.LineageDirection.INPUT : AtlasLineageOnDemandInfo.LineageDirection.OUTPUT; int nextLevel = isInput ? level - 1: level + 1; @@ -384,11 +356,11 @@ private void traverseEdgesOnDemand(Iterator processEdges, boolean isI ret.getRelationsOnDemand().put(inGuid, new LineageInfoOnDemand(inGuidLineageConstrains)); } - traverseEdgesOnDemand(datasetVertex, isInput, depth - 1, nextLevel, new HashSet<>(), atlasLineageOnDemandContext, ret, baseGuid, entitiesTraversed, traversalOrder, lineageType); + traverseEdgesOnDemand(datasetVertex, isInput, depth - 1, nextLevel, new HashSet<>(), atlasLineageOnDemandContext, ret, baseGuid, entitiesTraversed, traversalOrder); } } - private void traverseEdgesOnDemand(AtlasVertex datasetVertex, boolean isInput, int depth, int level, Set visitedVertices, AtlasLineageOnDemandContext atlasLineageOnDemandContext, AtlasLineageOnDemandInfo ret, String baseGuid, AtomicInteger entitiesTraversed, AtomicInteger traversalOrder, String lineageType) throws AtlasBaseException { + private void traverseEdgesOnDemand(AtlasVertex datasetVertex, boolean isInput, int depth, int level, Set visitedVertices, AtlasLineageOnDemandContext atlasLineageOnDemandContext, AtlasLineageOnDemandInfo ret, String baseGuid, AtomicInteger entitiesTraversed, AtomicInteger traversalOrder) throws AtlasBaseException { if (isEntityTraversalLimitReached(entitiesTraversed)) return; if (depth != 0) { // base condition of recursion for depth @@ -397,17 +369,16 @@ private void traverseEdgesOnDemand(AtlasVertex datasetVertex, boolean isInput, i int nextLevel = isInput ? level - 1: level + 1; // keep track of visited vertices to avoid circular loop visitedVertices.add(getId(datasetVertex)); - String[] edgeDirections = LINEAGE_MAP.get(lineageType); - AtlasPerfMetrics.MetricRecorder traverseEdgesOnDemandGetEdgesIn = RequestContext.get().startMetricRecord("traverseEdgesOnDemandGetEdgesIn"); - Iterator incomingEdges = datasetVertex.getEdges(IN, isInput ? edgeDirections[1] : edgeDirections[0]).iterator(); + AtlasPerfMetrics.MetricRecorder traverseEdgesOnDemandGetEdgesIn = RequestContext.get().startMetricRecord("traverseEdgesOnDemandGetEdgesIn"); + Iterator incomingEdges = datasetVertex.getEdges(IN, isInput ? PROCESS_OUTPUTS_EDGE : PROCESS_INPUTS_EDGE).iterator(); RequestContext.get().endMetricRecord(traverseEdgesOnDemandGetEdgesIn); while (incomingEdges.hasNext()) { AtlasEdge incomingEdge = incomingEdges.next(); - AtlasVertex connecterVertex = incomingEdge.getOutVertex(); + AtlasVertex processVertex = incomingEdge.getOutVertex(); - if (!vertexMatchesEvaluation(connecterVertex, atlasLineageOnDemandContext) || !edgeMatchesEvaluation(incomingEdge, atlasLineageOnDemandContext)) { + if (!vertexMatchesEvaluation(processVertex, atlasLineageOnDemandContext) || !edgeMatchesEvaluation(incomingEdge, atlasLineageOnDemandContext)) { continue; } @@ -428,7 +399,7 @@ private void traverseEdgesOnDemand(AtlasVertex datasetVertex, boolean isInput, i } AtlasPerfMetrics.MetricRecorder traverseEdgesOnDemandGetEdgesOut = RequestContext.get().startMetricRecord("traverseEdgesOnDemandGetEdgesOut"); - Iterator outgoingEdges = connecterVertex.getEdges(OUT, isInput ? edgeDirections[0] : edgeDirections[1]).iterator(); + Iterator outgoingEdges = processVertex.getEdges(OUT, isInput ? PROCESS_INPUTS_EDGE : PROCESS_OUTPUTS_EDGE).iterator(); RequestContext.get().endMetricRecord(traverseEdgesOnDemandGetEdgesOut); while (outgoingEdges.hasNext()) { @@ -439,11 +410,11 @@ private void traverseEdgesOnDemand(AtlasVertex datasetVertex, boolean isInput, i continue; } - if (checkForOffset(outgoingEdge, connecterVertex, atlasLineageOnDemandContext, ret)) { + if (checkForOffset(outgoingEdge, processVertex, atlasLineageOnDemandContext, ret)) { continue; } if (incrementAndCheckIfRelationsLimitReached(outgoingEdge, isInput, atlasLineageOnDemandContext, ret, depth, entitiesTraversed, direction, visitedVertices)) { - String processGuid = AtlasGraphUtilsV2.getIdFromVertex(connecterVertex); + String processGuid = AtlasGraphUtilsV2.getIdFromVertex(processVertex); LineageInfoOnDemand entityOnDemandInfo = ret.getRelationsOnDemand().get(processGuid); if (entityOnDemandInfo == null) continue; @@ -459,7 +430,7 @@ private void traverseEdgesOnDemand(AtlasVertex datasetVertex, boolean isInput, i setEntityLimitReachedFlag(isInput, ret); } if (entityVertex != null && !visitedVertices.contains(getId(entityVertex))) { - traverseEdgesOnDemand(entityVertex, isInput, depth - 1, nextLevel, visitedVertices, atlasLineageOnDemandContext, ret, baseGuid, entitiesTraversed, traversalOrder, lineageType); // execute inner depth + traverseEdgesOnDemand(entityVertex, isInput, depth - 1, nextLevel, visitedVertices, atlasLineageOnDemandContext, ret, baseGuid, entitiesTraversed, traversalOrder); // execute inner depth AtlasEntityHeader traversedEntity = ret.getGuidEntityMap().get(AtlasGraphUtilsV2.getIdFromVertex(entityVertex)); traversedEntity.setFinishTime(traversalOrder.get()); } @@ -486,7 +457,7 @@ private void traverseEdgesUsingBFS(String baseGuid, AtlasLineageListContext line Queue traversalQueue = new LinkedList<>(); AtlasVertex baseVertex = AtlasGraphUtilsV2.findByGuid(this.graph, baseGuid); - boolean isBaseNodeDataset = validateAndGetEntityTypeMap(baseGuid).get(IS_DATASET); + boolean isBaseNodeDataset = validateEntityTypeAndCheckIfDataSet(baseGuid); enqueueNeighbours(baseVertex, isBaseNodeDataset, lineageListContext, traversalQueue, visitedVertices, skippedVertices); int currentDepth = 0; int currentLevel = isBaseNodeDataset? 0: 1; @@ -508,7 +479,7 @@ private void traverseEdgesUsingBFS(String baseGuid, AtlasLineageListContext line if (Objects.isNull(currentVertex)) throw new AtlasBaseException("Found null vertex during lineage graph traversal for guid: " + currentGUID); - boolean isDataset = validateAndGetEntityTypeMap(currentGUID).get(IS_DATASET); + boolean isDataset = validateEntityTypeAndCheckIfDataSet(currentGUID); if (!lineageListContext.evaluateVertexFilter(currentVertex)) { enqueueNeighbours(currentVertex, isDataset, lineageListContext, traversalQueue, visitedVertices, skippedVertices); continue; diff --git a/webapp/src/main/java/org/apache/atlas/web/rest/LineageREST.java b/webapp/src/main/java/org/apache/atlas/web/rest/LineageREST.java index 4f8aeb8468..829b9aaf28 100644 --- a/webapp/src/main/java/org/apache/atlas/web/rest/LineageREST.java +++ b/webapp/src/main/java/org/apache/atlas/web/rest/LineageREST.java @@ -93,7 +93,7 @@ public LineageREST(AtlasTypeRegistry typeRegistry, AtlasLineageService atlasLine @Consumes(Servlets.JSON_MEDIA_TYPE) @Produces(Servlets.JSON_MEDIA_TYPE) @Timed - public AtlasLineageOnDemandInfo getLineageGraph(@PathParam("guid") String guid,@QueryParam("lineageType") String lineageType, + public AtlasLineageOnDemandInfo getLineageGraph(@PathParam("guid") String guid, LineageOnDemandRequest lineageOnDemandRequest) throws AtlasBaseException { if (!AtlasConfiguration.LINEAGE_ON_DEMAND_ENABLED.getBoolean()) { LOG.warn("LineageREST: "+ AtlasErrorCode.LINEAGE_ON_DEMAND_NOT_ENABLED.getFormattedErrorMessage(AtlasConfiguration.LINEAGE_ON_DEMAND_ENABLED.getPropertyName())); @@ -110,7 +110,7 @@ public AtlasLineageOnDemandInfo getLineageGraph(@PathParam("guid") String guid,@ perf = AtlasPerfTracer.getPerfTracer(PERF_LOG, "LineageREST.getOnDemandLineageGraph(" + guid + "," + lineageOnDemandRequest + ")"); } - return atlasLineageService.getAtlasLineageInfo(guid, lineageOnDemandRequest, lineageType); + return atlasLineageService.getAtlasLineageInfo(guid, lineageOnDemandRequest); } finally { AtlasPerfTracer.log(perf); }