From b040cc4ec142172e3f970837cb1bc14b7a8e4752 Mon Sep 17 00:00:00 2001 From: hr2904 Date: Wed, 24 Jul 2024 20:18:09 +0530 Subject: [PATCH] Improved Code visuals etc. --- .../model/lineage/LineageListRequest.java | 10 ++ .../model/lineage/LineageOnDemandRequest.java | 8 ++ .../discovery/AtlasLineageListContext.java | 10 ++ .../AtlasLineageOnDemandContext.java | 11 ++ .../atlas/discovery/EntityLineageService.java | 127 ++++++++++-------- .../java/org/apache/atlas/RequestContext.java | 26 ++-- .../apache/atlas/web/rest/LineageREST.java | 11 +- 7 files changed, 128 insertions(+), 75 deletions(-) diff --git a/intg/src/main/java/org/apache/atlas/model/lineage/LineageListRequest.java b/intg/src/main/java/org/apache/atlas/model/lineage/LineageListRequest.java index e717fda066..4897c76767 100644 --- a/intg/src/main/java/org/apache/atlas/model/lineage/LineageListRequest.java +++ b/intg/src/main/java/org/apache/atlas/model/lineage/LineageListRequest.java @@ -26,6 +26,9 @@ public class LineageListRequest { private Boolean excludeMeanings; private Boolean excludeClassifications; + + private String lineageType = "DatasetProcessLineage"; + public enum LineageDirection {INPUT, OUTPUT} public LineageListRequest() { @@ -81,6 +84,13 @@ public Integer getDepth() { public void setDepth(Integer depth) { this.depth = depth; } + public String getLineageType() { + return lineageType; + } + + public void setLineageType(String lineageType) { + this.lineageType = lineageType; + } public LineageDirection getDirection() { return direction; diff --git a/intg/src/main/java/org/apache/atlas/model/lineage/LineageOnDemandRequest.java b/intg/src/main/java/org/apache/atlas/model/lineage/LineageOnDemandRequest.java index 7fa953373a..772e3dda9c 100644 --- a/intg/src/main/java/org/apache/atlas/model/lineage/LineageOnDemandRequest.java +++ b/intg/src/main/java/org/apache/atlas/model/lineage/LineageOnDemandRequest.java @@ -22,6 +22,7 @@ public class LineageOnDemandRequest { private Set attributes; private Set relationAttributes; private LineageOnDemandBaseParams defaultParams; + private String lineageType = "DatasetProcessLineage"; public LineageOnDemandRequest() { this.attributes = new HashSet<>(); @@ -64,6 +65,13 @@ public void setRelationshipTraversalFilters(SearchParameters.FilterCriteria rela this.relationshipTraversalFilters = relationshipTraversalFilters; } + public String getLineageType() { + return lineageType; + } + + public void setLineageType(String lineageType) { + this.lineageType = lineageType; + } public Set getAttributes() { return attributes; } diff --git a/repository/src/main/java/org/apache/atlas/discovery/AtlasLineageListContext.java b/repository/src/main/java/org/apache/atlas/discovery/AtlasLineageListContext.java index 95657aaabe..600b496293 100644 --- a/repository/src/main/java/org/apache/atlas/discovery/AtlasLineageListContext.java +++ b/repository/src/main/java/org/apache/atlas/discovery/AtlasLineageListContext.java @@ -23,6 +23,7 @@ public final class AtlasLineageListContext { private int currentEntityCounter; private boolean depthLimitReached; private boolean hasMoreUpdated; + private String lineageType = "DatasetProcessLineage"; public AtlasLineageListContext(LineageListRequest lineageListRequest, AtlasTypeRegistry typeRegistry) { this.guid = lineageListRequest.getGuid(); @@ -34,6 +35,7 @@ public AtlasLineageListContext(LineageListRequest lineageListRequest, AtlasTypeR this.vertexTraversalPredicate = constructInMemoryPredicate(typeRegistry, lineageListRequest.getEntityTraversalFilters()); this.edgeTraversalPredicate = constructInMemoryPredicate(typeRegistry, lineageListRequest.getRelationshipTraversalFilters()); this.attributes = lineageListRequest.getAttributes(); + this.lineageType = lineageListRequest.getLineageType(); this.relationAttributes = lineageListRequest.getRelationAttributes(); } @@ -129,6 +131,14 @@ public void setCurrentFromCounter(int currentFromCounter) { this.currentFromCounter = currentFromCounter; } + public String getLineageType() { + return lineageType; + } + + public void setLineageType(String lineageType) { + this.lineageType = lineageType; + } + public int getCurrentEntityCounter() { return currentEntityCounter; } diff --git a/repository/src/main/java/org/apache/atlas/discovery/AtlasLineageOnDemandContext.java b/repository/src/main/java/org/apache/atlas/discovery/AtlasLineageOnDemandContext.java index 5509684855..71d877fb22 100644 --- a/repository/src/main/java/org/apache/atlas/discovery/AtlasLineageOnDemandContext.java +++ b/repository/src/main/java/org/apache/atlas/discovery/AtlasLineageOnDemandContext.java @@ -24,6 +24,9 @@ public class AtlasLineageOnDemandContext { private Set relationAttributes; private LineageOnDemandBaseParams defaultParams; + + private String lineageType = "DatasetProcessLineage"; + public AtlasLineageOnDemandContext(LineageOnDemandRequest lineageOnDemandRequest, AtlasTypeRegistry typeRegistry) { this.constraints = lineageOnDemandRequest.getConstraints(); this.attributes = lineageOnDemandRequest.getAttributes(); @@ -31,6 +34,7 @@ public AtlasLineageOnDemandContext(LineageOnDemandRequest lineageOnDemandRequest this.defaultParams = lineageOnDemandRequest.getDefaultParams(); this.vertexPredicate = constructInMemoryPredicate(typeRegistry, lineageOnDemandRequest.getEntityTraversalFilters()); this.edgePredicate = constructInMemoryPredicate(typeRegistry, lineageOnDemandRequest.getRelationshipTraversalFilters()); + this.lineageType = lineageOnDemandRequest.getLineageType(); } public Map getConstraints() { @@ -56,6 +60,13 @@ public Predicate getEdgePredicate() { public void setEdgePredicate(Predicate edgePredicate) { this.edgePredicate = edgePredicate; } + public String getLineageType() { + return lineageType; + } + + public void setLineageType(String lineageType) { + this.lineageType = lineageType; + } public Set getAttributes() { return attributes; diff --git a/repository/src/main/java/org/apache/atlas/discovery/EntityLineageService.java b/repository/src/main/java/org/apache/atlas/discovery/EntityLineageService.java index d9f04272e8..36cbe8760c 100644 --- a/repository/src/main/java/org/apache/atlas/discovery/EntityLineageService.java +++ b/repository/src/main/java/org/apache/atlas/discovery/EntityLineageService.java @@ -186,8 +186,10 @@ public AtlasLineageInfo getAtlasLineageInfo(String guid, LineageDirection direct @GraphTransaction public AtlasLineageOnDemandInfo getAtlasLineageInfo(String guid, LineageOnDemandRequest lineageOnDemandRequest) throws AtlasBaseException { AtlasPerfMetrics.MetricRecorder metricRecorder = RequestContext.get().startMetricRecord("getAtlasLineageInfo"); - RequestContext.get().setRelationAttrsForSearch(lineageOnDemandRequest.getRelationAttributes()); + RequestContext.get().setLineageInputLabel(LINEAGE_MAP.get(lineageOnDemandRequest.getLineageType())[0]); + RequestContext.get().setLineageOutputLabel(LINEAGE_MAP.get(lineageOnDemandRequest.getLineageType())[1]); + AtlasLineageOnDemandContext atlasLineageOnDemandContext = new AtlasLineageOnDemandContext(lineageOnDemandRequest, atlasTypeRegistry); HashMap dataTypeMap = validateAndGetEntityTypeMap(guid); AtlasLineageOnDemandInfo ret = getLineageInfoOnDemand(guid, atlasLineageOnDemandContext, dataTypeMap); @@ -204,7 +206,8 @@ public AtlasLineageOnDemandInfo getAtlasLineageInfo(String guid, LineageOnDemand @GraphTransaction public AtlasLineageListInfo getLineageListInfoOnDemand(String guid, LineageListRequest lineageListRequest) throws AtlasBaseException { AtlasPerfMetrics.MetricRecorder metricRecorder = RequestContext.get().startMetricRecord("getLineageListInfoOnDemand"); - + RequestContext.get().setLineageInputLabel(LINEAGE_MAP.get(lineageListRequest.getLineageType())[0]); + RequestContext.get().setLineageOutputLabel(LINEAGE_MAP.get(lineageListRequest.getLineageType())[1]); AtlasLineageListInfo ret = new AtlasLineageListInfo(new ArrayList<>()); RequestContext.get().setRelationAttrsForSearch(lineageListRequest.getRelationAttributes()); @@ -299,9 +302,9 @@ private void cleanupRelationsOnDemand(AtlasLineageOnDemandInfo lineageInfo) { private AtlasLineageOnDemandInfo getLineageInfoOnDemand(String guid, AtlasLineageOnDemandContext atlasLineageOnDemandContext, HashMap dataTypeMap) throws AtlasBaseException { AtlasPerfMetrics.MetricRecorder metricRecorder = RequestContext.get().startMetricRecord("getLineageInfoOnDemand"); - String lineageType = RequestContext.get().getLineageType(); - if(StringUtils.isEmpty(lineageType)) - lineageType = DATASET_PROCESS_LINEAGE; + String lineageInputLabel = RequestContext.get().getLineageInputLabel(); + String lineageOutputLabel = RequestContext.get().getLineageOutputLabel(); + String lineageType = atlasLineageOnDemandContext.getLineageType(); LineageOnDemandConstraints lineageConstraintsByGuid = getAndValidateLineageConstraintsByGuid(guid, atlasLineageOnDemandContext); AtlasLineageOnDemandInfo.LineageDirection direction = lineageConstraintsByGuid.getDirection(); @@ -317,7 +320,7 @@ private AtlasLineageOnDemandInfo getLineageInfoOnDemand(String guid, AtlasLineag AtomicInteger inputEntitiesTraversed = new AtomicInteger(0); AtomicInteger outputEntitiesTraversed = new AtomicInteger(0); AtomicInteger traversalOrder = new AtomicInteger(1); - String[] lineageEdgeLabels = LINEAGE_MAP.get(lineageType); + boolean isConnecterVertex; @@ -338,11 +341,11 @@ private AtlasLineageOnDemandInfo getLineageInfoOnDemand(String guid, AtlasLineag AtlasVertex processVertex = AtlasGraphUtilsV2.findByGuid(this.graph, guid); // make one hop to the next dataset vertices from process vertex and traverse with 'depth = depth - 1' if (direction == AtlasLineageOnDemandInfo.LineageDirection.INPUT || direction == AtlasLineageOnDemandInfo.LineageDirection.BOTH) { - Iterator processEdges = processVertex.getEdges(AtlasEdgeDirection.OUT, lineageEdgeLabels[0]).iterator(); + Iterator processEdges = processVertex.getEdges(AtlasEdgeDirection.OUT, lineageInputLabel).iterator(); traverseEdgesOnDemand(processEdges, true, depth, level, atlasLineageOnDemandContext, ret, processVertex, guid, inputEntitiesTraversed, traversalOrder); } if (direction == AtlasLineageOnDemandInfo.LineageDirection.OUTPUT || direction == AtlasLineageOnDemandInfo.LineageDirection.BOTH) { - Iterator processEdges = processVertex.getEdges(AtlasEdgeDirection.OUT, lineageEdgeLabels[1]).iterator(); + Iterator processEdges = processVertex.getEdges(AtlasEdgeDirection.OUT, lineageOutputLabel).iterator(); traverseEdgesOnDemand(processEdges, false, depth, level, atlasLineageOnDemandContext, ret, processVertex, guid, outputEntitiesTraversed, traversalOrder); } } @@ -358,8 +361,8 @@ private static void setGraphTraversalMetadata(int level, AtomicInteger traversal private void traverseEdgesOnDemand(Iterator processEdges, boolean isInput, int depth, int level, AtlasLineageOnDemandContext atlasLineageOnDemandContext, AtlasLineageOnDemandInfo ret, AtlasVertex processVertex, String baseGuid, AtomicInteger entitiesTraversed, AtomicInteger traversalOrder) throws AtlasBaseException { AtlasLineageOnDemandInfo.LineageDirection direction = isInput ? AtlasLineageOnDemandInfo.LineageDirection.INPUT : AtlasLineageOnDemandInfo.LineageDirection.OUTPUT; + String lineageInputLabel = RequestContext.get().getLineageInputLabel(); int nextLevel = isInput ? level - 1: level + 1; - String lineageType = RequestContext.get().getLineageType(); while (processEdges.hasNext()) { AtlasEdge processEdge = processEdges.next(); AtlasVertex datasetVertex = processEdge.getInVertex(); @@ -372,7 +375,7 @@ private void traverseEdgesOnDemand(Iterator processEdges, boolean isI continue; } - boolean isInputEdge = processEdge.getLabel().equalsIgnoreCase(LINEAGE_MAP.get(lineageType)[0]); + boolean isInputEdge = processEdge.getLabel().equalsIgnoreCase(lineageInputLabel); if (incrementAndCheckIfRelationsLimitReached(processEdge, isInputEdge, atlasLineageOnDemandContext, ret, depth, entitiesTraversed, direction, new HashSet<>())) { break; } else { @@ -394,17 +397,17 @@ private void traverseEdgesOnDemand(Iterator processEdges, boolean isI private void traverseEdgesOnDemand(AtlasVertex datasetVertex, boolean isInput, int depth, int level, Set visitedVertices, AtlasLineageOnDemandContext atlasLineageOnDemandContext, AtlasLineageOnDemandInfo ret, String baseGuid, AtomicInteger entitiesTraversed, AtomicInteger traversalOrder) throws AtlasBaseException { if (isEntityTraversalLimitReached(entitiesTraversed)) return; - String lineageType = RequestContext.get().getLineageType(); if (depth != 0) { // base condition of recursion for depth AtlasPerfMetrics.MetricRecorder metricRecorder = RequestContext.get().startMetricRecord("traverseEdgesOnDemand"); AtlasLineageOnDemandInfo.LineageDirection direction = isInput ? AtlasLineageOnDemandInfo.LineageDirection.INPUT : AtlasLineageOnDemandInfo.LineageDirection.OUTPUT; + String lineageInputLabel = RequestContext.get().getLineageInputLabel(); + String lineageOutputLabel = RequestContext.get().getLineageOutputLabel(); int nextLevel = isInput ? level - 1: level + 1; // keep track of visited vertices to avoid circular loop visitedVertices.add(getId(datasetVertex)); - String[] edgeDirections = LINEAGE_MAP.get(lineageType); AtlasPerfMetrics.MetricRecorder traverseEdgesOnDemandGetEdgesIn = RequestContext.get().startMetricRecord("traverseEdgesOnDemandGetEdgesIn"); - Iterator incomingEdges = datasetVertex.getEdges(IN, isInput ? edgeDirections[1] : edgeDirections[0]).iterator(); + Iterator incomingEdges = datasetVertex.getEdges(IN, isInput ? lineageOutputLabel : lineageInputLabel).iterator(); RequestContext.get().endMetricRecord(traverseEdgesOnDemandGetEdgesIn); while (incomingEdges.hasNext()) { @@ -432,7 +435,7 @@ private void traverseEdgesOnDemand(AtlasVertex datasetVertex, boolean isInput, i } AtlasPerfMetrics.MetricRecorder traverseEdgesOnDemandGetEdgesOut = RequestContext.get().startMetricRecord("traverseEdgesOnDemandGetEdgesOut"); - Iterator outgoingEdges = connecterVertex.getEdges(OUT, isInput ? edgeDirections[0] : edgeDirections[1]).iterator(); + Iterator outgoingEdges = connecterVertex.getEdges(OUT, isInput ? lineageInputLabel : lineageOutputLabel).iterator(); RequestContext.get().endMetricRecord(traverseEdgesOnDemandGetEdgesOut); while (outgoingEdges.hasNext()) { @@ -483,7 +486,7 @@ private static void setEntityLimitReachedFlag(boolean isInput, AtlasLineageOnDem private void traverseEdgesUsingBFS(String baseGuid, AtlasLineageListContext lineageListContext, AtlasLineageListInfo ret) throws AtlasBaseException { AtlasPerfMetrics.MetricRecorder metricRecorder = RequestContext.get().startMetricRecord("traverseEdgesUsingBFS"); - String lineageType = RequestContext.get().getLineageType(); + String lineageType = lineageListContext.getLineageType(); Set visitedVertices = new HashSet<>(); visitedVertices.add(baseGuid); Set skippedVertices = new HashSet<>(); @@ -491,9 +494,7 @@ private void traverseEdgesUsingBFS(String baseGuid, AtlasLineageListContext line AtlasVertex baseVertex = AtlasGraphUtilsV2.findByGuid(this.graph, baseGuid); HashMap dataTypeMap = validateAndGetEntityTypeMap(baseGuid); - if (StringUtils.isEmpty(lineageType)){ - lineageType = DATASET_PROCESS_LINEAGE; - } + boolean isNotConnecterVertex = lineageType.equals(PRODUCT_ASSET_LINEAGE) ? dataTypeMap.get(IS_DATA_PRODUCT) : dataTypeMap.get(IS_DATASET); @@ -549,15 +550,16 @@ private void enqueueNeighbours(AtlasVertex currentVertex, HashMap traversalQueue, Set visitedVertices, Set skippedVertices) { AtlasPerfMetrics.MetricRecorder traverseEdgesOnDemandGetEdges = RequestContext.get().startMetricRecord("traverseEdgesOnDemandGetEdges"); Iterator edges; - String lineageType = RequestContext.get().getLineageType(); + String lineageInputLabel = RequestContext.get().getLineageInputLabel(); + String lineageOutputLabel = RequestContext.get().getLineageOutputLabel(); + String lineageType = lineageListContext.getLineageType(); boolean isConnecterVertex = lineageType.equals(PRODUCT_ASSET_LINEAGE) ? !dataTypeMap.get(IS_DATA_PRODUCT) : !dataTypeMap.get(IS_DATASET); - String[] edgeDirectionLabels = LINEAGE_MAP.get(lineageType); if (!isConnecterVertex) - edges = currentVertex.getEdges(IN, isInputDirection(lineageListContext) ? edgeDirectionLabels[1] : edgeDirectionLabels[0]).iterator(); + edges = currentVertex.getEdges(IN, isInputDirection(lineageListContext) ? lineageOutputLabel : lineageInputLabel).iterator(); else - edges = currentVertex.getEdges(OUT, isInputDirection(lineageListContext) ? edgeDirectionLabels[0] : edgeDirectionLabels[1]).iterator(); + edges = currentVertex.getEdges(OUT, isInputDirection(lineageListContext) ? lineageInputLabel : lineageOutputLabel).iterator(); RequestContext.get().endMetricRecord(traverseEdgesOnDemandGetEdges); while (edges.hasNext()) { @@ -693,8 +695,8 @@ else if (!isInput && ! isInVertexVisited) } private void setHasDownstream(AtlasLineageOnDemandContext atlasLineageOnDemandContext, AtlasVertex inVertex, LineageInfoOnDemand inLineageInfo) { - String lineageType = RequestContext.get().getLineageType(); - List filteredEdges = getFilteredAtlasEdges(inVertex, IN, LINEAGE_MAP.get(lineageType)[0], atlasLineageOnDemandContext); + String lineageInputLabel = RequestContext.get().getLineageInputLabel(); + List filteredEdges = getFilteredAtlasEdges(inVertex, IN, lineageInputLabel, atlasLineageOnDemandContext); if (!filteredEdges.isEmpty()) { inLineageInfo.setHasDownstream(true); } @@ -702,8 +704,8 @@ private void setHasDownstream(AtlasLineageOnDemandContext atlasLineageOnDemandCo } private void setHasUpstream(AtlasLineageOnDemandContext atlasLineageOnDemandContext, AtlasVertex outVertex, LineageInfoOnDemand outLineageInfo) { - String lineageType = RequestContext.get().getLineageType(); - List filteredEdges = getFilteredAtlasEdges(outVertex, IN, LINEAGE_MAP.get(lineageType)[1], atlasLineageOnDemandContext); + String lineageOutputLabel = RequestContext.get().getLineageOutputLabel(); + List filteredEdges = getFilteredAtlasEdges(outVertex, IN, lineageOutputLabel, atlasLineageOnDemandContext); if (!filteredEdges.isEmpty()) { outLineageInfo.setHasUpstream(true); } @@ -862,7 +864,8 @@ private AtlasLineageInfo getLineageInfo(AtlasLineageContext lineageContext, Line private AtlasLineageInfo getLineageInfoV2(AtlasLineageContext lineageContext) throws AtlasBaseException { AtlasPerfMetrics.MetricRecorder metric = RequestContext.get().startMetricRecord("getLineageInfoV2"); - String lineageType = RequestContext.get().getLineageType(); + String lineageInputLabel = RequestContext.get().getLineageInputLabel(); + String lineageOutputLabel = RequestContext.get().getLineageOutputLabel(); int depth = lineageContext.getDepth(); String guid = lineageContext.getGuid(); LineageDirection direction = lineageContext.getDirection(); @@ -889,7 +892,7 @@ private AtlasLineageInfo getLineageInfoV2(AtlasLineageContext lineageContext) th // make one hop to the next dataset vertices from process vertex and traverse with 'depth = depth - 1' if (direction == INPUT || direction == BOTH) { - Iterator processEdges = vertexEdgeCache.getEdges(processVertex, OUT, LINEAGE_MAP.get(lineageType)[0]).iterator(); + Iterator processEdges = vertexEdgeCache.getEdges(processVertex, OUT, lineageInputLabel).iterator(); List qualifyingEdges = getQualifyingProcessEdges(processEdges, lineageContext); ret.setHasChildrenForDirection(getGuid(processVertex), new LineageChildrenInfo(INPUT, hasMoreChildren(qualifyingEdges))); @@ -904,7 +907,7 @@ private AtlasLineageInfo getLineageInfoV2(AtlasLineageContext lineageContext) th } if (direction == OUTPUT || direction == BOTH) { - Iterator processEdges = vertexEdgeCache.getEdges(processVertex, OUT, LINEAGE_MAP.get(lineageType)[1]).iterator(); + Iterator processEdges = vertexEdgeCache.getEdges(processVertex, OUT, lineageOutputLabel).iterator(); List qualifyingEdges = getQualifyingProcessEdges(processEdges, lineageContext); ret.setHasChildrenForDirection(getGuid(processVertex), new LineageChildrenInfo(OUTPUT, hasMoreChildren(qualifyingEdges))); @@ -954,13 +957,13 @@ private int getLineageMaxNodeAllowedCount() { } private String getEdgeLabel(AtlasEdge edge) { - String lineageType = RequestContext.get().getLineageType(); + String lineageInputLabel = RequestContext.get().getLineageInputLabel(); AtlasVertex inVertex = edge.getInVertex(); AtlasVertex outVertex = edge.getOutVertex(); String inGuid = AtlasGraphUtilsV2.getIdFromVertex(inVertex); String outGuid = AtlasGraphUtilsV2.getIdFromVertex(outVertex); String relationGuid = AtlasGraphUtilsV2.getEncodedProperty(edge, RELATIONSHIP_GUID_PROPERTY_KEY, String.class); - boolean isInputEdge = edge.getLabel().equalsIgnoreCase(LINEAGE_MAP.get(lineageType)[0]); + boolean isInputEdge = edge.getLabel().equalsIgnoreCase(lineageInputLabel); if (isLineageOnDemandEnabled()) { return getEdgeLabelFromGuids(isInputEdge, inGuid, outGuid); @@ -1131,10 +1134,11 @@ private List> getUnvisitedProcessEdgesWithOutputVertexId lineageContext.getIgnoredProcesses().contains(processVertex.getProperty(Constants.ENTITY_TYPE_PROPERTY_KEY, String.class))) { return Collections.emptyList(); } - String lineageType = RequestContext.get().getLineageType(); + String lineageInputLabel = RequestContext.get().getLineageInputLabel(); + String lineageOutputLabel = RequestContext.get().getLineageOutputLabel(); List> unvisitedProcessEdgesWithOutputVertexIds = new ArrayList<>(); - Iterable outgoingEdges = vertexEdgeCache.getEdges(processVertex, OUT, isInput ? LINEAGE_MAP.get(lineageType)[0] : LINEAGE_MAP.get(lineageType)[1]); + Iterable outgoingEdges = vertexEdgeCache.getEdges(processVertex, OUT, isInput ? lineageInputLabel : lineageOutputLabel); for (AtlasEdge outgoingEdge : outgoingEdges) { AtlasVertex outputVertex = outgoingEdge.getInVertex(); @@ -1221,8 +1225,9 @@ private void addLimitlessVerticesToResult(boolean isInput, int depth, Set processEdges = vertexEdgeCache.getEdges(currentVertex, IN, isInput ? LINEAGE_MAP.get(lineageType)[1] : LINEAGE_MAP.get(lineageType)[0]); + String lineageInputLabel = RequestContext.get().getLineageInputLabel(); + String lineageOutputLabel = RequestContext.get().getLineageOutputLabel(); + List processEdges = vertexEdgeCache.getEdges(currentVertex, IN, isInput ? lineageOutputLabel : lineageInputLabel); // Filter lineages based on ignored process types processEdges = CollectionUtils.isNotEmpty(lineageContext.getIgnoredProcesses()) ? @@ -1243,9 +1248,10 @@ private void processLastLevel(AtlasVertex currentVertex, boolean isInput, AtlasL private boolean childHasOnlySelfCycle(AtlasVertex processVertex, AtlasVertex currentVertex, boolean isInput) { AtlasPerfMetrics.MetricRecorder metricRecorder = RequestContext.get().startMetricRecord("childHasSelfCycle"); - String lineageType = RequestContext.get().getLineageType(); + String lineageInputLabel = RequestContext.get().getLineageInputLabel(); + String lineageOutputLabel = RequestContext.get().getLineageOutputLabel(); Iterator processEdgeIterator; - processEdgeIterator = processVertex.getEdges(OUT, isInput ? LINEAGE_MAP.get(lineageType)[0] : LINEAGE_MAP.get(lineageType)[1]).iterator(); + processEdgeIterator = processVertex.getEdges(OUT, isInput ? lineageInputLabel : lineageOutputLabel).iterator(); Set processOutputEdges = new HashSet<>(); while (processEdgeIterator.hasNext()) { processOutputEdges.add(processEdgeIterator.next()); @@ -1261,8 +1267,9 @@ private List getEdgesOfProcess(boolean isInput, AtlasLineageContext l lineageContext.getIgnoredProcesses().contains(processVertex.getProperty(Constants.ENTITY_TYPE_PROPERTY_KEY, String.class))) { return Collections.emptyList(); } - String lineageType = RequestContext.get().getLineageType(); - return vertexEdgeCache.getEdges(processVertex, OUT, isInput ? LINEAGE_MAP.get(lineageType)[0] : LINEAGE_MAP.get(lineageType)[1]) + String lineageInputLabel = RequestContext.get().getLineageInputLabel(); + String lineageOutputLabel = RequestContext.get().getLineageOutputLabel(); + return vertexEdgeCache.getEdges(processVertex, OUT, isInput ? lineageInputLabel : lineageOutputLabel) .stream() .filter(edge -> shouldProcessEdge(lineageContext, edge) && vertexMatchesEvaluation(edge.getInVertex(), lineageContext)) .sorted(Comparator.comparing(edge -> edge.getProperty("_r__guid", String.class))) @@ -1287,9 +1294,10 @@ private boolean shouldProcessEdge(AtlasLineageContext lineageContext, AtlasEdge } private List getEdgesOfCurrentVertex(AtlasVertex currentVertex, boolean isInput, AtlasLineageContext lineageContext) { - String lineageType = RequestContext.get().getLineageType(); + String lineageInputLabel = RequestContext.get().getLineageInputLabel(); + String lineageOutputLabel = RequestContext.get().getLineageOutputLabel(); return vertexEdgeCache - .getEdges(currentVertex, IN, isInput ? LINEAGE_MAP.get(lineageType)[1] : LINEAGE_MAP.get(lineageType)[0]) + .getEdges(currentVertex, IN, isInput ? lineageOutputLabel : lineageInputLabel) .stream() .sorted(Comparator.comparing(edge -> edge.getProperty("_r__guid", String.class))) .filter(edge -> shouldProcessEdge(lineageContext, edge)) @@ -1378,7 +1386,7 @@ private boolean processVirtualEdge(final AtlasEdge incomingEdge, final AtlasEdge AtlasLineageContext lineageContext) throws AtlasBaseException { final Map entities = lineageInfo.getGuidEntityMap(); final Set relations = lineageInfo.getRelations(); - String lineageType = RequestContext.get().getLineageType(); + String lineageInputLabel = RequestContext.get().getLineageInputLabel(); AtlasVertex inVertex = incomingEdge.getInVertex(); AtlasVertex outVertex = outgoingEdge.getInVertex(); AtlasVertex processVertex = outgoingEdge.getOutVertex(); @@ -1386,7 +1394,7 @@ private boolean processVirtualEdge(final AtlasEdge incomingEdge, final AtlasEdge String outGuid = AtlasGraphUtilsV2.getIdFromVertex(outVertex); String processGuid = AtlasGraphUtilsV2.getIdFromVertex(processVertex); String relationGuid = null; - boolean isInputEdge = incomingEdge.getLabel().equalsIgnoreCase(LINEAGE_MAP.get(lineageType)[0]); + boolean isInputEdge = incomingEdge.getLabel().equalsIgnoreCase(lineageInputLabel); if (!entities.containsKey(inGuid)) { AtlasEntityHeader entityHeader = entityRetriever.toAtlasEntityHeader(inVertex, lineageContext.getAttributes()); @@ -1417,7 +1425,7 @@ private boolean processVirtualEdge(final AtlasEdge incomingEdge, final AtlasEdge private void processEdges(final AtlasEdge incomingEdge, AtlasEdge outgoingEdge, AtlasLineageInfo lineageInfo, AtlasLineageContext lineageContext) throws AtlasBaseException { AtlasPerfMetrics.MetricRecorder metric = RequestContext.get().startMetricRecord("processEdges"); - String lineageType = RequestContext.get().getLineageType(); + String lineageInputLabel = RequestContext.get().getLineageInputLabel(); final Map entities = lineageInfo.getGuidEntityMap(); final Set relations = lineageInfo.getRelations(); @@ -1445,14 +1453,14 @@ private void processEdges(final AtlasEdge incomingEdge, AtlasEdge outgoingEdge, } String relationGuid = AtlasGraphUtilsV2.getEncodedProperty(incomingEdge, RELATIONSHIP_GUID_PROPERTY_KEY, String.class); - if (incomingEdge.getLabel().equalsIgnoreCase(LINEAGE_MAP.get(lineageType)[0])) { + if (incomingEdge.getLabel().equalsIgnoreCase(lineageInputLabel)) { relations.add(new LineageRelation(leftGuid, processGuid, relationGuid)); } else { relations.add(new LineageRelation(processGuid, leftGuid, relationGuid)); } relationGuid = AtlasGraphUtilsV2.getEncodedProperty(outgoingEdge, RELATIONSHIP_GUID_PROPERTY_KEY, String.class); - if (outgoingEdge.getLabel().equalsIgnoreCase(LINEAGE_MAP.get(lineageType)[0])) { + if (outgoingEdge.getLabel().equalsIgnoreCase(lineageInputLabel)) { relations.add(new LineageRelation(rightGuid, processGuid, relationGuid)); } else { relations.add(new LineageRelation(processGuid, rightGuid, relationGuid)); @@ -1463,16 +1471,16 @@ private void processEdges(final AtlasEdge incomingEdge, AtlasEdge outgoingEdge, private void processEdge(final AtlasEdge edge, AtlasLineageInfo lineageInfo, AtlasLineageContext lineageContext) throws AtlasBaseException { AtlasPerfMetrics.MetricRecorder metric = RequestContext.get().startMetricRecord("processEdge"); - String lineageType = RequestContext.get().getLineageType(); + final Map entities = lineageInfo.getGuidEntityMap(); final Set relations = lineageInfo.getRelations(); - + String lineageInputLabel = RequestContext.get().getLineageInputLabel(); AtlasVertex inVertex = edge.getInVertex(); AtlasVertex outVertex = edge.getOutVertex(); String inGuid = AtlasGraphUtilsV2.getIdFromVertex(inVertex); String outGuid = AtlasGraphUtilsV2.getIdFromVertex(outVertex); String relationGuid = AtlasGraphUtilsV2.getEncodedProperty(edge, RELATIONSHIP_GUID_PROPERTY_KEY, String.class); - boolean isInputEdge = edge.getLabel().equalsIgnoreCase(LINEAGE_MAP.get(lineageType)[0]); + boolean isInputEdge = edge.getLabel().equalsIgnoreCase(lineageInputLabel); if (!entities.containsKey(inGuid)) { AtlasEntityHeader entityHeader = entityRetriever.toAtlasEntityHeaderWithClassifications(inVertex, lineageContext.getAttributes()); @@ -1495,13 +1503,13 @@ private void processEdge(final AtlasEdge edge, AtlasLineageInfo lineageInfo, private void processEdge(final AtlasEdge edge, final Map entities, final Set relations, AtlasLineageContext lineageContext) throws AtlasBaseException { //Backward compatibility method - String lineageType = RequestContext.get().getLineageType(); + String lineageInputLabel = RequestContext.get().getLineageInputLabel(); AtlasVertex inVertex = edge.getInVertex(); AtlasVertex outVertex = edge.getOutVertex(); String inGuid = AtlasGraphUtilsV2.getIdFromVertex(inVertex); String outGuid = AtlasGraphUtilsV2.getIdFromVertex(outVertex); String relationGuid = AtlasGraphUtilsV2.getEncodedProperty(edge, RELATIONSHIP_GUID_PROPERTY_KEY, String.class); - boolean isInputEdge = edge.getLabel().equalsIgnoreCase(LINEAGE_MAP.get(lineageType)[0]); + boolean isInputEdge = edge.getLabel().equalsIgnoreCase(lineageInputLabel); if (!entities.containsKey(inGuid)) { AtlasEntityHeader entityHeader = entityRetriever.toAtlasEntityHeaderWithClassifications(inVertex, lineageContext.getAttributes()); @@ -1526,10 +1534,10 @@ private void processEdge(final AtlasEdge edge, final AtlasLineageOnDemandInfo li private void processEdge(final AtlasEdge edge, final Map entities, final Set relations, final Set visitedEdges, final Set attributes, int level, AtomicInteger traversalOrder) throws AtlasBaseException { AtlasPerfMetrics.MetricRecorder metricRecorder = RequestContext.get().startMetricRecord("processEdge"); - String lineageType = RequestContext.get().getLineageType(); + AtlasVertex inVertex = edge.getInVertex(); AtlasVertex outVertex = edge.getOutVertex(); - + String lineageInputLabel = RequestContext.get().getLineageInputLabel(); String inTypeName = AtlasGraphUtilsV2.getTypeName(inVertex); AtlasEntityType inEntityType = atlasTypeRegistry.getEntityTypeByName(inTypeName); if (inEntityType == null) { @@ -1540,7 +1548,7 @@ private void processEdge(final AtlasEdge edge, final Map> getRemovedElementsMap() { return removedElementsMap; } + public String getLineageInputLabel() { + return lineageInputLabel; + } + + public void setLineageInputLabel(String lineageInputLabel) { + this.lineageInputLabel = lineageInputLabel; + } + + public String getLineageOutputLabel() { + return lineageOutputLabel; + } + + public void setLineageOutputLabel(String lineageOutputLabel) { + this.lineageOutputLabel = lineageOutputLabel; + } public Map> getNewElementsCreatedMap() { return newElementsCreatedMap; } diff --git a/webapp/src/main/java/org/apache/atlas/web/rest/LineageREST.java b/webapp/src/main/java/org/apache/atlas/web/rest/LineageREST.java index 05ae292c10..fb979c5f85 100644 --- a/webapp/src/main/java/org/apache/atlas/web/rest/LineageREST.java +++ b/webapp/src/main/java/org/apache/atlas/web/rest/LineageREST.java @@ -94,7 +94,7 @@ public LineageREST(AtlasTypeRegistry typeRegistry, AtlasLineageService atlasLine @Consumes(Servlets.JSON_MEDIA_TYPE) @Produces(Servlets.JSON_MEDIA_TYPE) @Timed - public AtlasLineageOnDemandInfo getLineageGraph(@PathParam("guid") String guid,@QueryParam("lineageType") String lineageType, + public AtlasLineageOnDemandInfo getLineageGraph(@PathParam("guid") String guid, LineageOnDemandRequest lineageOnDemandRequest) throws AtlasBaseException { if (!AtlasConfiguration.LINEAGE_ON_DEMAND_ENABLED.getBoolean()) { LOG.warn("LineageREST: "+ AtlasErrorCode.LINEAGE_ON_DEMAND_NOT_ENABLED.getFormattedErrorMessage(AtlasConfiguration.LINEAGE_ON_DEMAND_ENABLED.getPropertyName())); @@ -110,9 +110,6 @@ public AtlasLineageOnDemandInfo getLineageGraph(@PathParam("guid") String guid,@ if (AtlasPerfTracer.isPerfTraceEnabled(PERF_LOG)) { perf = AtlasPerfTracer.getPerfTracer(PERF_LOG, "LineageREST.getOnDemandLineageGraph(" + guid + "," + lineageOnDemandRequest + ")"); } - if(Objects.nonNull(lineageType)) { - RequestContext.get().setLineageType(lineageType); - } return atlasLineageService.getAtlasLineageInfo(guid, lineageOnDemandRequest); } finally { AtlasPerfTracer.log(perf); @@ -131,7 +128,7 @@ public AtlasLineageOnDemandInfo getLineageGraph(@PathParam("guid") String guid,@ @Consumes(Servlets.JSON_MEDIA_TYPE) @Produces(Servlets.JSON_MEDIA_TYPE) @Timed - public AtlasLineageListInfo getLineageList(@QueryParam("lineageType") String lineageType, LineageListRequest lineageListRequest) throws AtlasBaseException { + public AtlasLineageListInfo getLineageList(LineageListRequest lineageListRequest) throws AtlasBaseException { lineageListRequestValidator.validate(lineageListRequest); String guid = lineageListRequest.getGuid(); @@ -144,9 +141,7 @@ public AtlasLineageListInfo getLineageList(@QueryParam("lineageType") String lin try { if (AtlasPerfTracer.isPerfTraceEnabled(PERF_LOG)) perf = AtlasPerfTracer.getPerfTracer(PERF_LOG, "LineageREST.getLineageList(" + guid + "," + lineageListRequest + ")"); - if(Objects.nonNull(lineageType)) { - RequestContext.get().setLineageType(lineageType); - } + return atlasLineageService.getLineageListInfoOnDemand(guid, lineageListRequest); } finally { AtlasPerfTracer.log(perf);