diff --git a/repository/src/main/java/org/apache/atlas/discovery/AtlasLineageService.java b/repository/src/main/java/org/apache/atlas/discovery/AtlasLineageService.java index 967f9ed5d6..69c3e387bc 100644 --- a/repository/src/main/java/org/apache/atlas/discovery/AtlasLineageService.java +++ b/repository/src/main/java/org/apache/atlas/discovery/AtlasLineageService.java @@ -71,13 +71,13 @@ public interface AtlasLineageService { * @param lineageOnDemandRequest lineage on demand request object * @return AtlasLineageInfo */ - AtlasLineageOnDemandInfo getAtlasLineageInfo(String entityGuid, LineageOnDemandRequest lineageOnDemandRequest, String lineageType) throws AtlasBaseException; + AtlasLineageOnDemandInfo getAtlasLineageInfo(String entityGuid, LineageOnDemandRequest lineageOnDemandRequest) throws AtlasBaseException; /** * @param entityGuid unique ID of the entity * @param lineageListRequest lineage list request object * @return AtlasLineageListInfo */ - AtlasLineageListInfo getLineageListInfoOnDemand(String entityGuid, LineageListRequest lineageListRequest, String lineageType) throws AtlasBaseException; + AtlasLineageListInfo getLineageListInfoOnDemand(String entityGuid, LineageListRequest lineageListRequest) throws AtlasBaseException; } diff --git a/repository/src/main/java/org/apache/atlas/discovery/EntityLineageService.java b/repository/src/main/java/org/apache/atlas/discovery/EntityLineageService.java index 21028731b8..d9f04272e8 100644 --- a/repository/src/main/java/org/apache/atlas/discovery/EntityLineageService.java +++ b/repository/src/main/java/org/apache/atlas/discovery/EntityLineageService.java @@ -92,7 +92,7 @@ public class EntityLineageService implements AtlasLineageService { */ public static final HashMap LINEAGE_MAP = new HashMap(){{ put(DATASET_PROCESS_LINEAGE, new String[]{PROCESS_INPUTS_EDGE, PROCESS_OUTPUTS_EDGE}); - put(PRODUCT_ASSET_LINEAGE, new String[]{INPUT_PORT_EDGE, OUTPUT_PORT_EDGE}); + put(PRODUCT_ASSET_LINEAGE, new String[]{OUTPUT_PORT_EDGE, INPUT_PORT_EDGE}); }}; private static final String COLUMNS = "columns"; private static final boolean LINEAGE_USING_GREMLIN = AtlasConfiguration.LINEAGE_USING_GREMLIN.getBoolean(); @@ -184,13 +184,13 @@ public AtlasLineageInfo getAtlasLineageInfo(String guid, LineageDirection direct @Override @GraphTransaction - public AtlasLineageOnDemandInfo getAtlasLineageInfo(String guid, LineageOnDemandRequest lineageOnDemandRequest, String lineageType) throws AtlasBaseException { + public AtlasLineageOnDemandInfo getAtlasLineageInfo(String guid, LineageOnDemandRequest lineageOnDemandRequest) throws AtlasBaseException { AtlasPerfMetrics.MetricRecorder metricRecorder = RequestContext.get().startMetricRecord("getAtlasLineageInfo"); RequestContext.get().setRelationAttrsForSearch(lineageOnDemandRequest.getRelationAttributes()); AtlasLineageOnDemandContext atlasLineageOnDemandContext = new AtlasLineageOnDemandContext(lineageOnDemandRequest, atlasTypeRegistry); HashMap dataTypeMap = validateAndGetEntityTypeMap(guid); - AtlasLineageOnDemandInfo ret = getLineageInfoOnDemand(guid, atlasLineageOnDemandContext, dataTypeMap, lineageType); + AtlasLineageOnDemandInfo ret = getLineageInfoOnDemand(guid, atlasLineageOnDemandContext, dataTypeMap); appendLineageOnDemandPayload(ret, lineageOnDemandRequest); // filtering out on-demand relations which has input & output nodes within the limit cleanupRelationsOnDemand(ret); @@ -202,13 +202,13 @@ public AtlasLineageOnDemandInfo getAtlasLineageInfo(String guid, LineageOnDemand @Override @GraphTransaction - public AtlasLineageListInfo getLineageListInfoOnDemand(String guid, LineageListRequest lineageListRequest, String lineageType) throws AtlasBaseException { + public AtlasLineageListInfo getLineageListInfoOnDemand(String guid, LineageListRequest lineageListRequest) throws AtlasBaseException { AtlasPerfMetrics.MetricRecorder metricRecorder = RequestContext.get().startMetricRecord("getLineageListInfoOnDemand"); AtlasLineageListInfo ret = new AtlasLineageListInfo(new ArrayList<>()); RequestContext.get().setRelationAttrsForSearch(lineageListRequest.getRelationAttributes()); - traverseEdgesUsingBFS(guid, new AtlasLineageListContext(lineageListRequest, atlasTypeRegistry), ret, lineageType); + traverseEdgesUsingBFS(guid, new AtlasLineageListContext(lineageListRequest, atlasTypeRegistry), ret); ret.setSearchParameters(lineageListRequest); RequestContext.get().endMetricRecord(metricRecorder); @@ -297,8 +297,9 @@ private void cleanupRelationsOnDemand(AtlasLineageOnDemandInfo lineageInfo) { } } - private AtlasLineageOnDemandInfo getLineageInfoOnDemand(String guid, AtlasLineageOnDemandContext atlasLineageOnDemandContext, HashMap dataTypeMap, String lineageType) throws AtlasBaseException { + private AtlasLineageOnDemandInfo getLineageInfoOnDemand(String guid, AtlasLineageOnDemandContext atlasLineageOnDemandContext, HashMap dataTypeMap) throws AtlasBaseException { AtlasPerfMetrics.MetricRecorder metricRecorder = RequestContext.get().startMetricRecord("getLineageInfoOnDemand"); + String lineageType = RequestContext.get().getLineageType(); if(StringUtils.isEmpty(lineageType)) lineageType = DATASET_PROCESS_LINEAGE; @@ -327,9 +328,9 @@ private AtlasLineageOnDemandInfo getLineageInfoOnDemand(String guid, AtlasLineag if (!isConnecterVertex) { AtlasVertex datasetVertex = AtlasGraphUtilsV2.findByGuid(this.graph, guid); if (direction == AtlasLineageOnDemandInfo.LineageDirection.INPUT || direction == AtlasLineageOnDemandInfo.LineageDirection.BOTH) - traverseEdgesOnDemand(datasetVertex, true, depth, level, new HashSet<>(), atlasLineageOnDemandContext, ret, guid, inputEntitiesTraversed, traversalOrder, lineageType); + traverseEdgesOnDemand(datasetVertex, true, depth, level, new HashSet<>(), atlasLineageOnDemandContext, ret, guid, inputEntitiesTraversed, traversalOrder); if (direction == AtlasLineageOnDemandInfo.LineageDirection.OUTPUT || direction == AtlasLineageOnDemandInfo.LineageDirection.BOTH) - traverseEdgesOnDemand(datasetVertex, false, depth, level, new HashSet<>(), atlasLineageOnDemandContext, ret, guid, outputEntitiesTraversed, traversalOrder, lineageType); + traverseEdgesOnDemand(datasetVertex, false, depth, level, new HashSet<>(), atlasLineageOnDemandContext, ret, guid, outputEntitiesTraversed, traversalOrder); AtlasEntityHeader baseEntityHeader = entityRetriever.toAtlasEntityHeader(datasetVertex, atlasLineageOnDemandContext.getAttributes()); setGraphTraversalMetadata(level, traversalOrder, baseEntityHeader); ret.getGuidEntityMap().put(guid, baseEntityHeader); @@ -338,11 +339,11 @@ private AtlasLineageOnDemandInfo getLineageInfoOnDemand(String guid, AtlasLineag // make one hop to the next dataset vertices from process vertex and traverse with 'depth = depth - 1' if (direction == AtlasLineageOnDemandInfo.LineageDirection.INPUT || direction == AtlasLineageOnDemandInfo.LineageDirection.BOTH) { Iterator processEdges = processVertex.getEdges(AtlasEdgeDirection.OUT, lineageEdgeLabels[0]).iterator(); - traverseEdgesOnDemand(processEdges, true, depth, level, atlasLineageOnDemandContext, ret, processVertex, guid, inputEntitiesTraversed, traversalOrder, lineageType); + traverseEdgesOnDemand(processEdges, true, depth, level, atlasLineageOnDemandContext, ret, processVertex, guid, inputEntitiesTraversed, traversalOrder); } if (direction == AtlasLineageOnDemandInfo.LineageDirection.OUTPUT || direction == AtlasLineageOnDemandInfo.LineageDirection.BOTH) { Iterator processEdges = processVertex.getEdges(AtlasEdgeDirection.OUT, lineageEdgeLabels[1]).iterator(); - traverseEdgesOnDemand(processEdges, false, depth, level, atlasLineageOnDemandContext, ret, processVertex, guid, outputEntitiesTraversed, traversalOrder, lineageType); + traverseEdgesOnDemand(processEdges, false, depth, level, atlasLineageOnDemandContext, ret, processVertex, guid, outputEntitiesTraversed, traversalOrder); } } RequestContext.get().endMetricRecord(metricRecorder); @@ -355,10 +356,10 @@ private static void setGraphTraversalMetadata(int level, AtomicInteger traversal baseEntityHeader.setFinishTime(traversalOrder.get()); } - private void traverseEdgesOnDemand(Iterator processEdges, boolean isInput, int depth, int level, AtlasLineageOnDemandContext atlasLineageOnDemandContext, AtlasLineageOnDemandInfo ret, AtlasVertex processVertex, String baseGuid, AtomicInteger entitiesTraversed, AtomicInteger traversalOrder, String lineageType) throws AtlasBaseException { + private void traverseEdgesOnDemand(Iterator processEdges, boolean isInput, int depth, int level, AtlasLineageOnDemandContext atlasLineageOnDemandContext, AtlasLineageOnDemandInfo ret, AtlasVertex processVertex, String baseGuid, AtomicInteger entitiesTraversed, AtomicInteger traversalOrder) throws AtlasBaseException { AtlasLineageOnDemandInfo.LineageDirection direction = isInput ? AtlasLineageOnDemandInfo.LineageDirection.INPUT : AtlasLineageOnDemandInfo.LineageDirection.OUTPUT; int nextLevel = isInput ? level - 1: level + 1; - + String lineageType = RequestContext.get().getLineageType(); while (processEdges.hasNext()) { AtlasEdge processEdge = processEdges.next(); AtlasVertex datasetVertex = processEdge.getInVertex(); @@ -371,7 +372,7 @@ private void traverseEdgesOnDemand(Iterator processEdges, boolean isI continue; } - boolean isInputEdge = processEdge.getLabel().equalsIgnoreCase(PROCESS_INPUTS_EDGE); + boolean isInputEdge = processEdge.getLabel().equalsIgnoreCase(LINEAGE_MAP.get(lineageType)[0]); if (incrementAndCheckIfRelationsLimitReached(processEdge, isInputEdge, atlasLineageOnDemandContext, ret, depth, entitiesTraversed, direction, new HashSet<>())) { break; } else { @@ -386,13 +387,14 @@ private void traverseEdgesOnDemand(Iterator processEdges, boolean isI ret.getRelationsOnDemand().put(inGuid, new LineageInfoOnDemand(inGuidLineageConstrains)); } - traverseEdgesOnDemand(datasetVertex, isInput, depth - 1, nextLevel, new HashSet<>(), atlasLineageOnDemandContext, ret, baseGuid, entitiesTraversed, traversalOrder, lineageType); + traverseEdgesOnDemand(datasetVertex, isInput, depth - 1, nextLevel, new HashSet<>(), atlasLineageOnDemandContext, ret, baseGuid, entitiesTraversed, traversalOrder); } } - private void traverseEdgesOnDemand(AtlasVertex datasetVertex, boolean isInput, int depth, int level, Set visitedVertices, AtlasLineageOnDemandContext atlasLineageOnDemandContext, AtlasLineageOnDemandInfo ret, String baseGuid, AtomicInteger entitiesTraversed, AtomicInteger traversalOrder, String lineageType) throws AtlasBaseException { + private void traverseEdgesOnDemand(AtlasVertex datasetVertex, boolean isInput, int depth, int level, Set visitedVertices, AtlasLineageOnDemandContext atlasLineageOnDemandContext, AtlasLineageOnDemandInfo ret, String baseGuid, AtomicInteger entitiesTraversed, AtomicInteger traversalOrder) throws AtlasBaseException { if (isEntityTraversalLimitReached(entitiesTraversed)) return; + String lineageType = RequestContext.get().getLineageType(); if (depth != 0) { // base condition of recursion for depth AtlasPerfMetrics.MetricRecorder metricRecorder = RequestContext.get().startMetricRecord("traverseEdgesOnDemand"); AtlasLineageOnDemandInfo.LineageDirection direction = isInput ? AtlasLineageOnDemandInfo.LineageDirection.INPUT : AtlasLineageOnDemandInfo.LineageDirection.OUTPUT; @@ -461,7 +463,7 @@ private void traverseEdgesOnDemand(AtlasVertex datasetVertex, boolean isInput, i setEntityLimitReachedFlag(isInput, ret); } if (entityVertex != null && !visitedVertices.contains(getId(entityVertex))) { - traverseEdgesOnDemand(entityVertex, isInput, depth - 1, nextLevel, visitedVertices, atlasLineageOnDemandContext, ret, baseGuid, entitiesTraversed, traversalOrder, lineageType); // execute inner depth + traverseEdgesOnDemand(entityVertex, isInput, depth - 1, nextLevel, visitedVertices, atlasLineageOnDemandContext, ret, baseGuid, entitiesTraversed, traversalOrder); // execute inner depth AtlasEntityHeader traversedEntity = ret.getGuidEntityMap().get(AtlasGraphUtilsV2.getIdFromVertex(entityVertex)); traversedEntity.setFinishTime(traversalOrder.get()); } @@ -479,9 +481,9 @@ private static void setEntityLimitReachedFlag(boolean isInput, AtlasLineageOnDem ret.setDownstreamEntityLimitReached(true); } - private void traverseEdgesUsingBFS(String baseGuid, AtlasLineageListContext lineageListContext, AtlasLineageListInfo ret, String lineageType) throws AtlasBaseException { + private void traverseEdgesUsingBFS(String baseGuid, AtlasLineageListContext lineageListContext, AtlasLineageListInfo ret) throws AtlasBaseException { AtlasPerfMetrics.MetricRecorder metricRecorder = RequestContext.get().startMetricRecord("traverseEdgesUsingBFS"); - + String lineageType = RequestContext.get().getLineageType(); Set visitedVertices = new HashSet<>(); visitedVertices.add(baseGuid); Set skippedVertices = new HashSet<>(); @@ -495,7 +497,7 @@ private void traverseEdgesUsingBFS(String baseGuid, AtlasLineageListContext line boolean isNotConnecterVertex = lineageType.equals(PRODUCT_ASSET_LINEAGE) ? dataTypeMap.get(IS_DATA_PRODUCT) : dataTypeMap.get(IS_DATASET); - enqueueNeighbours(baseVertex, dataTypeMap, lineageListContext, traversalQueue, visitedVertices, skippedVertices, lineageType); + enqueueNeighbours(baseVertex, dataTypeMap, lineageListContext, traversalQueue, visitedVertices, skippedVertices); int currentDepth = 0; int currentLevel = isNotConnecterVertex? 0: 1; @@ -518,18 +520,18 @@ private void traverseEdgesUsingBFS(String baseGuid, AtlasLineageListContext line HashMap currentEntityDataTypeMap = validateAndGetEntityTypeMap(currentGUID); if (!lineageListContext.evaluateVertexFilter(currentVertex)) { - enqueueNeighbours(currentVertex, currentEntityDataTypeMap, lineageListContext, traversalQueue, visitedVertices, skippedVertices, lineageType); + enqueueNeighbours(currentVertex, currentEntityDataTypeMap, lineageListContext, traversalQueue, visitedVertices, skippedVertices); continue; } if (checkOffsetAndSkipEntity(lineageListContext, ret)) { skippedVertices.add(currentGUID); - enqueueNeighbours(currentVertex, currentEntityDataTypeMap, lineageListContext, traversalQueue, visitedVertices, skippedVertices, lineageType); + enqueueNeighbours(currentVertex, currentEntityDataTypeMap, lineageListContext, traversalQueue, visitedVertices, skippedVertices); continue; } lineageListContext.incrementEntityCount(); appendToResult(currentVertex, lineageListContext, ret, currentLevel); - enqueueNeighbours(currentVertex, currentEntityDataTypeMap, lineageListContext, traversalQueue, visitedVertices, skippedVertices, lineageType); + enqueueNeighbours(currentVertex, currentEntityDataTypeMap, lineageListContext, traversalQueue, visitedVertices, skippedVertices); if (isLastEntityInLastDepth(lineageListContext.getDepth(), currentDepth, entitiesInCurrentDepth, i)) { ret.setHasMore(false); lineageListContext.setHasMoreUpdated(true); @@ -544,9 +546,10 @@ private void traverseEdgesUsingBFS(String baseGuid, AtlasLineageListContext line } private void enqueueNeighbours(AtlasVertex currentVertex, HashMap dataTypeMap, AtlasLineageListContext lineageListContext, - Queue traversalQueue, Set visitedVertices, Set skippedVertices,String lineageType) { + Queue traversalQueue, Set visitedVertices, Set skippedVertices) { AtlasPerfMetrics.MetricRecorder traverseEdgesOnDemandGetEdges = RequestContext.get().startMetricRecord("traverseEdgesOnDemandGetEdges"); Iterator edges; + String lineageType = RequestContext.get().getLineageType(); boolean isConnecterVertex = lineageType.equals(PRODUCT_ASSET_LINEAGE) ? !dataTypeMap.get(IS_DATA_PRODUCT) : !dataTypeMap.get(IS_DATASET); @@ -690,19 +693,21 @@ else if (!isInput && ! isInVertexVisited) } private void setHasDownstream(AtlasLineageOnDemandContext atlasLineageOnDemandContext, AtlasVertex inVertex, LineageInfoOnDemand inLineageInfo) { - List filteredEdges = getFilteredAtlasEdges(inVertex, IN, PROCESS_INPUTS_EDGE, atlasLineageOnDemandContext); + String lineageType = RequestContext.get().getLineageType(); + List filteredEdges = getFilteredAtlasEdges(inVertex, IN, LINEAGE_MAP.get(lineageType)[0], atlasLineageOnDemandContext); if (!filteredEdges.isEmpty()) { inLineageInfo.setHasDownstream(true); - inLineageInfo.setTotalOutputRelationsCount(filteredEdges.size()); } + inLineageInfo.setTotalOutputRelationsCount(filteredEdges.size()); } private void setHasUpstream(AtlasLineageOnDemandContext atlasLineageOnDemandContext, AtlasVertex outVertex, LineageInfoOnDemand outLineageInfo) { - List filteredEdges = getFilteredAtlasEdges(outVertex, IN, PROCESS_OUTPUTS_EDGE, atlasLineageOnDemandContext); + String lineageType = RequestContext.get().getLineageType(); + List filteredEdges = getFilteredAtlasEdges(outVertex, IN, LINEAGE_MAP.get(lineageType)[1], atlasLineageOnDemandContext); if (!filteredEdges.isEmpty()) { outLineageInfo.setHasUpstream(true); - outLineageInfo.setTotalInputRelationsCount(filteredEdges.size()); } + outLineageInfo.setTotalInputRelationsCount(filteredEdges.size()); } private List getFilteredAtlasEdges(AtlasVertex outVertex, AtlasEdgeDirection direction, String processEdgeLabel, AtlasLineageOnDemandContext atlasLineageOnDemandContext) { @@ -857,7 +862,7 @@ private AtlasLineageInfo getLineageInfo(AtlasLineageContext lineageContext, Line private AtlasLineageInfo getLineageInfoV2(AtlasLineageContext lineageContext) throws AtlasBaseException { AtlasPerfMetrics.MetricRecorder metric = RequestContext.get().startMetricRecord("getLineageInfoV2"); - + String lineageType = RequestContext.get().getLineageType(); int depth = lineageContext.getDepth(); String guid = lineageContext.getGuid(); LineageDirection direction = lineageContext.getDirection(); @@ -884,7 +889,7 @@ private AtlasLineageInfo getLineageInfoV2(AtlasLineageContext lineageContext) th // make one hop to the next dataset vertices from process vertex and traverse with 'depth = depth - 1' if (direction == INPUT || direction == BOTH) { - Iterator processEdges = vertexEdgeCache.getEdges(processVertex, OUT, PROCESS_INPUTS_EDGE).iterator(); + Iterator processEdges = vertexEdgeCache.getEdges(processVertex, OUT, LINEAGE_MAP.get(lineageType)[0]).iterator(); List qualifyingEdges = getQualifyingProcessEdges(processEdges, lineageContext); ret.setHasChildrenForDirection(getGuid(processVertex), new LineageChildrenInfo(INPUT, hasMoreChildren(qualifyingEdges))); @@ -899,7 +904,7 @@ private AtlasLineageInfo getLineageInfoV2(AtlasLineageContext lineageContext) th } if (direction == OUTPUT || direction == BOTH) { - Iterator processEdges = vertexEdgeCache.getEdges(processVertex, OUT, PROCESS_OUTPUTS_EDGE).iterator(); + Iterator processEdges = vertexEdgeCache.getEdges(processVertex, OUT, LINEAGE_MAP.get(lineageType)[1]).iterator(); List qualifyingEdges = getQualifyingProcessEdges(processEdges, lineageContext); ret.setHasChildrenForDirection(getGuid(processVertex), new LineageChildrenInfo(OUTPUT, hasMoreChildren(qualifyingEdges))); @@ -949,12 +954,13 @@ private int getLineageMaxNodeAllowedCount() { } private String getEdgeLabel(AtlasEdge edge) { + String lineageType = RequestContext.get().getLineageType(); AtlasVertex inVertex = edge.getInVertex(); AtlasVertex outVertex = edge.getOutVertex(); String inGuid = AtlasGraphUtilsV2.getIdFromVertex(inVertex); String outGuid = AtlasGraphUtilsV2.getIdFromVertex(outVertex); String relationGuid = AtlasGraphUtilsV2.getEncodedProperty(edge, RELATIONSHIP_GUID_PROPERTY_KEY, String.class); - boolean isInputEdge = edge.getLabel().equalsIgnoreCase(PROCESS_INPUTS_EDGE); + boolean isInputEdge = edge.getLabel().equalsIgnoreCase(LINEAGE_MAP.get(lineageType)[0]); if (isLineageOnDemandEnabled()) { return getEdgeLabelFromGuids(isInputEdge, inGuid, outGuid); @@ -1125,10 +1131,10 @@ private List> getUnvisitedProcessEdgesWithOutputVertexId lineageContext.getIgnoredProcesses().contains(processVertex.getProperty(Constants.ENTITY_TYPE_PROPERTY_KEY, String.class))) { return Collections.emptyList(); } - + String lineageType = RequestContext.get().getLineageType(); List> unvisitedProcessEdgesWithOutputVertexIds = new ArrayList<>(); - Iterable outgoingEdges = vertexEdgeCache.getEdges(processVertex, OUT, isInput ? PROCESS_INPUTS_EDGE : PROCESS_OUTPUTS_EDGE); + Iterable outgoingEdges = vertexEdgeCache.getEdges(processVertex, OUT, isInput ? LINEAGE_MAP.get(lineageType)[0] : LINEAGE_MAP.get(lineageType)[1]); for (AtlasEdge outgoingEdge : outgoingEdges) { AtlasVertex outputVertex = outgoingEdge.getInVertex(); @@ -1215,7 +1221,8 @@ private void addLimitlessVerticesToResult(boolean isInput, int depth, Set processEdges = vertexEdgeCache.getEdges(currentVertex, IN, isInput ? PROCESS_OUTPUTS_EDGE : PROCESS_INPUTS_EDGE); + String lineageType = RequestContext.get().getLineageType(); + List processEdges = vertexEdgeCache.getEdges(currentVertex, IN, isInput ? LINEAGE_MAP.get(lineageType)[1] : LINEAGE_MAP.get(lineageType)[0]); // Filter lineages based on ignored process types processEdges = CollectionUtils.isNotEmpty(lineageContext.getIgnoredProcesses()) ? @@ -1236,8 +1243,9 @@ private void processLastLevel(AtlasVertex currentVertex, boolean isInput, AtlasL private boolean childHasOnlySelfCycle(AtlasVertex processVertex, AtlasVertex currentVertex, boolean isInput) { AtlasPerfMetrics.MetricRecorder metricRecorder = RequestContext.get().startMetricRecord("childHasSelfCycle"); + String lineageType = RequestContext.get().getLineageType(); Iterator processEdgeIterator; - processEdgeIterator = processVertex.getEdges(OUT, isInput ? PROCESS_INPUTS_EDGE : PROCESS_OUTPUTS_EDGE).iterator(); + processEdgeIterator = processVertex.getEdges(OUT, isInput ? LINEAGE_MAP.get(lineageType)[0] : LINEAGE_MAP.get(lineageType)[1]).iterator(); Set processOutputEdges = new HashSet<>(); while (processEdgeIterator.hasNext()) { processOutputEdges.add(processEdgeIterator.next()); @@ -1253,8 +1261,8 @@ private List getEdgesOfProcess(boolean isInput, AtlasLineageContext l lineageContext.getIgnoredProcesses().contains(processVertex.getProperty(Constants.ENTITY_TYPE_PROPERTY_KEY, String.class))) { return Collections.emptyList(); } - - return vertexEdgeCache.getEdges(processVertex, OUT, isInput ? PROCESS_INPUTS_EDGE : PROCESS_OUTPUTS_EDGE) + String lineageType = RequestContext.get().getLineageType(); + return vertexEdgeCache.getEdges(processVertex, OUT, isInput ? LINEAGE_MAP.get(lineageType)[0] : LINEAGE_MAP.get(lineageType)[1]) .stream() .filter(edge -> shouldProcessEdge(lineageContext, edge) && vertexMatchesEvaluation(edge.getInVertex(), lineageContext)) .sorted(Comparator.comparing(edge -> edge.getProperty("_r__guid", String.class))) @@ -1279,8 +1287,9 @@ private boolean shouldProcessEdge(AtlasLineageContext lineageContext, AtlasEdge } private List getEdgesOfCurrentVertex(AtlasVertex currentVertex, boolean isInput, AtlasLineageContext lineageContext) { + String lineageType = RequestContext.get().getLineageType(); return vertexEdgeCache - .getEdges(currentVertex, IN, isInput ? PROCESS_OUTPUTS_EDGE : PROCESS_INPUTS_EDGE) + .getEdges(currentVertex, IN, isInput ? LINEAGE_MAP.get(lineageType)[1] : LINEAGE_MAP.get(lineageType)[0]) .stream() .sorted(Comparator.comparing(edge -> edge.getProperty("_r__guid", String.class))) .filter(edge -> shouldProcessEdge(lineageContext, edge)) @@ -1369,7 +1378,7 @@ private boolean processVirtualEdge(final AtlasEdge incomingEdge, final AtlasEdge AtlasLineageContext lineageContext) throws AtlasBaseException { final Map entities = lineageInfo.getGuidEntityMap(); final Set relations = lineageInfo.getRelations(); - + String lineageType = RequestContext.get().getLineageType(); AtlasVertex inVertex = incomingEdge.getInVertex(); AtlasVertex outVertex = outgoingEdge.getInVertex(); AtlasVertex processVertex = outgoingEdge.getOutVertex(); @@ -1377,7 +1386,7 @@ private boolean processVirtualEdge(final AtlasEdge incomingEdge, final AtlasEdge String outGuid = AtlasGraphUtilsV2.getIdFromVertex(outVertex); String processGuid = AtlasGraphUtilsV2.getIdFromVertex(processVertex); String relationGuid = null; - boolean isInputEdge = incomingEdge.getLabel().equalsIgnoreCase(PROCESS_INPUTS_EDGE); + boolean isInputEdge = incomingEdge.getLabel().equalsIgnoreCase(LINEAGE_MAP.get(lineageType)[0]); if (!entities.containsKey(inGuid)) { AtlasEntityHeader entityHeader = entityRetriever.toAtlasEntityHeader(inVertex, lineageContext.getAttributes()); @@ -1408,7 +1417,7 @@ private boolean processVirtualEdge(final AtlasEdge incomingEdge, final AtlasEdge private void processEdges(final AtlasEdge incomingEdge, AtlasEdge outgoingEdge, AtlasLineageInfo lineageInfo, AtlasLineageContext lineageContext) throws AtlasBaseException { AtlasPerfMetrics.MetricRecorder metric = RequestContext.get().startMetricRecord("processEdges"); - + String lineageType = RequestContext.get().getLineageType(); final Map entities = lineageInfo.getGuidEntityMap(); final Set relations = lineageInfo.getRelations(); @@ -1436,14 +1445,14 @@ private void processEdges(final AtlasEdge incomingEdge, AtlasEdge outgoingEdge, } String relationGuid = AtlasGraphUtilsV2.getEncodedProperty(incomingEdge, RELATIONSHIP_GUID_PROPERTY_KEY, String.class); - if (incomingEdge.getLabel().equalsIgnoreCase(PROCESS_INPUTS_EDGE)) { + if (incomingEdge.getLabel().equalsIgnoreCase(LINEAGE_MAP.get(lineageType)[0])) { relations.add(new LineageRelation(leftGuid, processGuid, relationGuid)); } else { relations.add(new LineageRelation(processGuid, leftGuid, relationGuid)); } relationGuid = AtlasGraphUtilsV2.getEncodedProperty(outgoingEdge, RELATIONSHIP_GUID_PROPERTY_KEY, String.class); - if (outgoingEdge.getLabel().equalsIgnoreCase(PROCESS_INPUTS_EDGE)) { + if (outgoingEdge.getLabel().equalsIgnoreCase(LINEAGE_MAP.get(lineageType)[0])) { relations.add(new LineageRelation(rightGuid, processGuid, relationGuid)); } else { relations.add(new LineageRelation(processGuid, rightGuid, relationGuid)); @@ -1454,7 +1463,7 @@ private void processEdges(final AtlasEdge incomingEdge, AtlasEdge outgoingEdge, private void processEdge(final AtlasEdge edge, AtlasLineageInfo lineageInfo, AtlasLineageContext lineageContext) throws AtlasBaseException { AtlasPerfMetrics.MetricRecorder metric = RequestContext.get().startMetricRecord("processEdge"); - + String lineageType = RequestContext.get().getLineageType(); final Map entities = lineageInfo.getGuidEntityMap(); final Set relations = lineageInfo.getRelations(); @@ -1463,7 +1472,7 @@ private void processEdge(final AtlasEdge edge, AtlasLineageInfo lineageInfo, String inGuid = AtlasGraphUtilsV2.getIdFromVertex(inVertex); String outGuid = AtlasGraphUtilsV2.getIdFromVertex(outVertex); String relationGuid = AtlasGraphUtilsV2.getEncodedProperty(edge, RELATIONSHIP_GUID_PROPERTY_KEY, String.class); - boolean isInputEdge = edge.getLabel().equalsIgnoreCase(PROCESS_INPUTS_EDGE); + boolean isInputEdge = edge.getLabel().equalsIgnoreCase(LINEAGE_MAP.get(lineageType)[0]); if (!entities.containsKey(inGuid)) { AtlasEntityHeader entityHeader = entityRetriever.toAtlasEntityHeaderWithClassifications(inVertex, lineageContext.getAttributes()); @@ -1486,12 +1495,13 @@ private void processEdge(final AtlasEdge edge, AtlasLineageInfo lineageInfo, private void processEdge(final AtlasEdge edge, final Map entities, final Set relations, AtlasLineageContext lineageContext) throws AtlasBaseException { //Backward compatibility method + String lineageType = RequestContext.get().getLineageType(); AtlasVertex inVertex = edge.getInVertex(); AtlasVertex outVertex = edge.getOutVertex(); String inGuid = AtlasGraphUtilsV2.getIdFromVertex(inVertex); String outGuid = AtlasGraphUtilsV2.getIdFromVertex(outVertex); String relationGuid = AtlasGraphUtilsV2.getEncodedProperty(edge, RELATIONSHIP_GUID_PROPERTY_KEY, String.class); - boolean isInputEdge = edge.getLabel().equalsIgnoreCase(PROCESS_INPUTS_EDGE); + boolean isInputEdge = edge.getLabel().equalsIgnoreCase(LINEAGE_MAP.get(lineageType)[0]); if (!entities.containsKey(inGuid)) { AtlasEntityHeader entityHeader = entityRetriever.toAtlasEntityHeaderWithClassifications(inVertex, lineageContext.getAttributes()); @@ -1516,6 +1526,7 @@ private void processEdge(final AtlasEdge edge, final AtlasLineageOnDemandInfo li private void processEdge(final AtlasEdge edge, final Map entities, final Set relations, final Set visitedEdges, final Set attributes, int level, AtomicInteger traversalOrder) throws AtlasBaseException { AtlasPerfMetrics.MetricRecorder metricRecorder = RequestContext.get().startMetricRecord("processEdge"); + String lineageType = RequestContext.get().getLineageType(); AtlasVertex inVertex = edge.getInVertex(); AtlasVertex outVertex = edge.getOutVertex(); @@ -1529,7 +1540,7 @@ private void processEdge(final AtlasEdge edge, final Map