Skip to content

Commit

Permalink
Merge pull request #3356 from atlanhq/revert-3355-revert-3354-beta-DG…
Browse files Browse the repository at this point in the history
…-1720-helper

DG-1720  Revert "Revert "DG-1720 Added Lineage support for DataProducts <> Assets""
  • Loading branch information
hr2904 authored Jul 23, 2024
2 parents c3e6ca6 + d859a77 commit 653c606
Show file tree
Hide file tree
Showing 3 changed files with 72 additions and 43 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ public interface AtlasLineageService {
* @param lineageOnDemandRequest lineage on demand request object
* @return AtlasLineageInfo
*/
AtlasLineageOnDemandInfo getAtlasLineageInfo(String entityGuid, LineageOnDemandRequest lineageOnDemandRequest) throws AtlasBaseException;
AtlasLineageOnDemandInfo getAtlasLineageInfo(String entityGuid, LineageOnDemandRequest lineageOnDemandRequest, String lineageType) throws AtlasBaseException;

/**
* @param entityGuid unique ID of the entity
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@

import static org.apache.atlas.AtlasClient.DATA_SET_SUPER_TYPE;
import static org.apache.atlas.AtlasClient.PROCESS_SUPER_TYPE;
import static org.apache.atlas.AtlasErrorCode.INSTANCE_LINEAGE_QUERY_FAILED;
import static org.apache.atlas.AtlasErrorCode.*;
import static org.apache.atlas.model.instance.AtlasEntity.Status.DELETED;
import static org.apache.atlas.model.lineage.AtlasLineageInfo.LineageDirection.*;
import static org.apache.atlas.repository.Constants.*;
Expand All @@ -84,11 +84,25 @@ public class EntityLineageService implements AtlasLineageService {

private static final String PROCESS_INPUTS_EDGE = "__Process.inputs";
private static final String PROCESS_OUTPUTS_EDGE = "__Process.outputs";
private static final String OUTPUT_PORT_EDGE = "__Asset.outputPortDataProducts";
private static final String INPUT_PORT_EDGE = "__Asset.inputPortDataProducts";

/**
* String[] => [Input edge Label, Output Edge Label]
*/
public static final HashMap<String, String[]> LINEAGE_MAP = new HashMap<String, String[]>(){{
put(DATASET_PROCESS_LINEAGE, new String[]{PROCESS_INPUTS_EDGE, PROCESS_OUTPUTS_EDGE});
put(PRODUCT_ASSET_LINEAGE, new String[]{INPUT_PORT_EDGE, OUTPUT_PORT_EDGE});
}};
private static final String COLUMNS = "columns";
private static final boolean LINEAGE_USING_GREMLIN = AtlasConfiguration.LINEAGE_USING_GREMLIN.getBoolean();
private static final Integer DEFAULT_LINEAGE_MAX_NODE_COUNT = 9000;
private static final int LINEAGE_ON_DEMAND_DEFAULT_DEPTH = 3;
private static final String SEPARATOR = "->";
public static final String IS_DATA_PRODUCT = "isDataProduct";
public static final String IS_DATASET = "isProcess";
public static final String PRODUCT_ASSET_LINEAGE = "ProductAssetLineage";
public static final String DATASET_PROCESS_LINEAGE = "DatasetProcessLineage";

private final AtlasGraph graph;
private final AtlasGremlinQueryProvider gremlinQueryProvider;
Expand Down Expand Up @@ -170,13 +184,13 @@ public AtlasLineageInfo getAtlasLineageInfo(String guid, LineageDirection direct

@Override
@GraphTransaction
public AtlasLineageOnDemandInfo getAtlasLineageInfo(String guid, LineageOnDemandRequest lineageOnDemandRequest) throws AtlasBaseException {
public AtlasLineageOnDemandInfo getAtlasLineageInfo(String guid, LineageOnDemandRequest lineageOnDemandRequest, String lineageType) throws AtlasBaseException {
AtlasPerfMetrics.MetricRecorder metricRecorder = RequestContext.get().startMetricRecord("getAtlasLineageInfo");

RequestContext.get().setRelationAttrsForSearch(lineageOnDemandRequest.getRelationAttributes());
AtlasLineageOnDemandContext atlasLineageOnDemandContext = new AtlasLineageOnDemandContext(lineageOnDemandRequest, atlasTypeRegistry);
boolean isDataSet = validateEntityTypeAndCheckIfDataSet(guid);
AtlasLineageOnDemandInfo ret = getLineageInfoOnDemand(guid, atlasLineageOnDemandContext, isDataSet);
HashMap<String, Boolean> dataTypeMap = validateAndGetEntityTypeMap(guid);
AtlasLineageOnDemandInfo ret = getLineageInfoOnDemand(guid, atlasLineageOnDemandContext, dataTypeMap, lineageType);
appendLineageOnDemandPayload(ret, lineageOnDemandRequest);
// filtering out on-demand relations which has input & output nodes within the limit
cleanupRelationsOnDemand(ret);
Expand All @@ -199,20 +213,24 @@ public AtlasLineageListInfo getLineageListInfoOnDemand(String guid, LineageListR
return ret;
}

private boolean validateEntityTypeAndCheckIfDataSet(String guid) throws AtlasBaseException {
private HashMap<String, Boolean> validateAndGetEntityTypeMap(String guid) throws AtlasBaseException {
String typeName = entityRetriever.getEntityVertex(guid).getProperty(Constants.TYPE_NAME_PROPERTY_KEY, String.class);
AtlasEntityType entityType = atlasTypeRegistry.getEntityTypeByName(typeName);
if (entityType == null) {
throw new AtlasBaseException(AtlasErrorCode.TYPE_NAME_NOT_FOUND, typeName);
}
HashMap<String, Boolean> dataTypeMap = new HashMap<>();
boolean isProcess = entityType.getTypeAndAllSuperTypes().contains(PROCESS_SUPER_TYPE);
boolean isDataProduct = entityType.getTypeName().equals(DATA_PRODUCT_ENTITY_TYPE);
dataTypeMap.put(IS_DATA_PRODUCT, isDataProduct);
dataTypeMap.put(IS_DATASET, !isProcess);
if (!isProcess) {
boolean isDataSet = entityType.getTypeAndAllSuperTypes().contains(DATA_SET_SUPER_TYPE);
if (!isDataSet) {
throw new AtlasBaseException(AtlasErrorCode.INVALID_LINEAGE_ENTITY_TYPE, guid, typeName);
}
}
return !isProcess;
return dataTypeMap;
}

private LineageOnDemandConstraints getLineageConstraints(String guid, LineageOnDemandBaseParams defaultParams) {
Expand Down Expand Up @@ -277,8 +295,10 @@ private void cleanupRelationsOnDemand(AtlasLineageOnDemandInfo lineageInfo) {
}
}

private AtlasLineageOnDemandInfo getLineageInfoOnDemand(String guid, AtlasLineageOnDemandContext atlasLineageOnDemandContext, boolean isDataSet) throws AtlasBaseException {
private AtlasLineageOnDemandInfo getLineageInfoOnDemand(String guid, AtlasLineageOnDemandContext atlasLineageOnDemandContext, HashMap<String, Boolean> dataTypeMap, String lineageType) throws AtlasBaseException {
AtlasPerfMetrics.MetricRecorder metricRecorder = RequestContext.get().startMetricRecord("getLineageInfoOnDemand");
if(StringUtils.isEmpty(lineageType))
lineageType = DATASET_PROCESS_LINEAGE;

LineageOnDemandConstraints lineageConstraintsByGuid = getAndValidateLineageConstraintsByGuid(guid, atlasLineageOnDemandContext);
AtlasLineageOnDemandInfo.LineageDirection direction = lineageConstraintsByGuid.getDirection();
Expand All @@ -294,27 +314,35 @@ private AtlasLineageOnDemandInfo getLineageInfoOnDemand(String guid, AtlasLineag
AtomicInteger inputEntitiesTraversed = new AtomicInteger(0);
AtomicInteger outputEntitiesTraversed = new AtomicInteger(0);
AtomicInteger traversalOrder = new AtomicInteger(1);
if (isDataSet) {
AtlasVertex datasetVertex = AtlasGraphUtilsV2.findByGuid(this.graph, guid);
if (direction == AtlasLineageOnDemandInfo.LineageDirection.INPUT || direction == AtlasLineageOnDemandInfo.LineageDirection.BOTH)
traverseEdgesOnDemand(datasetVertex, true, depth, level, new HashSet<>(), atlasLineageOnDemandContext, ret, guid, inputEntitiesTraversed, traversalOrder);
if (direction == AtlasLineageOnDemandInfo.LineageDirection.OUTPUT || direction == AtlasLineageOnDemandInfo.LineageDirection.BOTH)
traverseEdgesOnDemand(datasetVertex, false, depth, level, new HashSet<>(), atlasLineageOnDemandContext, ret, guid, outputEntitiesTraversed, traversalOrder);
AtlasEntityHeader baseEntityHeader = entityRetriever.toAtlasEntityHeader(datasetVertex, atlasLineageOnDemandContext.getAttributes());
setGraphTraversalMetadata(level, traversalOrder, baseEntityHeader);
ret.getGuidEntityMap().put(guid, baseEntityHeader);
} else {
AtlasVertex processVertex = AtlasGraphUtilsV2.findByGuid(this.graph, guid);
// make one hop to the next dataset vertices from process vertex and traverse with 'depth = depth - 1'
if (direction == AtlasLineageOnDemandInfo.LineageDirection.INPUT || direction == AtlasLineageOnDemandInfo.LineageDirection.BOTH) {
Iterator<AtlasEdge> processEdges = processVertex.getEdges(AtlasEdgeDirection.OUT, PROCESS_INPUTS_EDGE).iterator();
traverseEdgesOnDemand(processEdges, true, depth, level, atlasLineageOnDemandContext, ret, processVertex, guid, inputEntitiesTraversed, traversalOrder);
}
if (direction == AtlasLineageOnDemandInfo.LineageDirection.OUTPUT || direction == AtlasLineageOnDemandInfo.LineageDirection.BOTH) {
Iterator<AtlasEdge> processEdges = processVertex.getEdges(AtlasEdgeDirection.OUT, PROCESS_OUTPUTS_EDGE).iterator();
traverseEdgesOnDemand(processEdges, false, depth, level, atlasLineageOnDemandContext, ret, processVertex, guid, outputEntitiesTraversed, traversalOrder);
String[] lineageEdgeLabels = LINEAGE_MAP.get(lineageType);
boolean isConnecterVertex;


isConnecterVertex = lineageType.equals(PRODUCT_ASSET_LINEAGE)
? !dataTypeMap.get(IS_DATA_PRODUCT)
: !dataTypeMap.get(IS_DATASET);

if (!isConnecterVertex) {
AtlasVertex datasetVertex = AtlasGraphUtilsV2.findByGuid(this.graph, guid);
if (direction == AtlasLineageOnDemandInfo.LineageDirection.INPUT || direction == AtlasLineageOnDemandInfo.LineageDirection.BOTH)
traverseEdgesOnDemand(datasetVertex, true, depth, level, new HashSet<>(), atlasLineageOnDemandContext, ret, guid, inputEntitiesTraversed, traversalOrder, lineageType);
if (direction == AtlasLineageOnDemandInfo.LineageDirection.OUTPUT || direction == AtlasLineageOnDemandInfo.LineageDirection.BOTH)
traverseEdgesOnDemand(datasetVertex, false, depth, level, new HashSet<>(), atlasLineageOnDemandContext, ret, guid, outputEntitiesTraversed, traversalOrder, lineageType);
AtlasEntityHeader baseEntityHeader = entityRetriever.toAtlasEntityHeader(datasetVertex, atlasLineageOnDemandContext.getAttributes());
setGraphTraversalMetadata(level, traversalOrder, baseEntityHeader);
ret.getGuidEntityMap().put(guid, baseEntityHeader);
} else {
AtlasVertex processVertex = AtlasGraphUtilsV2.findByGuid(this.graph, guid);
// make one hop to the next dataset vertices from process vertex and traverse with 'depth = depth - 1'
if (direction == AtlasLineageOnDemandInfo.LineageDirection.INPUT || direction == AtlasLineageOnDemandInfo.LineageDirection.BOTH) {
Iterator<AtlasEdge> processEdges = processVertex.getEdges(AtlasEdgeDirection.OUT, lineageEdgeLabels[0]).iterator();
traverseEdgesOnDemand(processEdges, true, depth, level, atlasLineageOnDemandContext, ret, processVertex, guid, inputEntitiesTraversed, traversalOrder, lineageType);
}
if (direction == AtlasLineageOnDemandInfo.LineageDirection.OUTPUT || direction == AtlasLineageOnDemandInfo.LineageDirection.BOTH) {
Iterator<AtlasEdge> processEdges = processVertex.getEdges(AtlasEdgeDirection.OUT, lineageEdgeLabels[1]).iterator();
traverseEdgesOnDemand(processEdges, false, depth, level, atlasLineageOnDemandContext, ret, processVertex, guid, outputEntitiesTraversed, traversalOrder, lineageType);
}
}
}
RequestContext.get().endMetricRecord(metricRecorder);
return ret;
}
Expand All @@ -325,7 +353,7 @@ private static void setGraphTraversalMetadata(int level, AtomicInteger traversal
baseEntityHeader.setFinishTime(traversalOrder.get());
}

private void traverseEdgesOnDemand(Iterator<AtlasEdge> processEdges, boolean isInput, int depth, int level, AtlasLineageOnDemandContext atlasLineageOnDemandContext, AtlasLineageOnDemandInfo ret, AtlasVertex processVertex, String baseGuid, AtomicInteger entitiesTraversed, AtomicInteger traversalOrder) throws AtlasBaseException {
private void traverseEdgesOnDemand(Iterator<AtlasEdge> processEdges, boolean isInput, int depth, int level, AtlasLineageOnDemandContext atlasLineageOnDemandContext, AtlasLineageOnDemandInfo ret, AtlasVertex processVertex, String baseGuid, AtomicInteger entitiesTraversed, AtomicInteger traversalOrder, String lineageType) throws AtlasBaseException {
AtlasLineageOnDemandInfo.LineageDirection direction = isInput ? AtlasLineageOnDemandInfo.LineageDirection.INPUT : AtlasLineageOnDemandInfo.LineageDirection.OUTPUT;
int nextLevel = isInput ? level - 1: level + 1;

Expand Down Expand Up @@ -356,11 +384,11 @@ private void traverseEdgesOnDemand(Iterator<AtlasEdge> processEdges, boolean isI
ret.getRelationsOnDemand().put(inGuid, new LineageInfoOnDemand(inGuidLineageConstrains));
}

traverseEdgesOnDemand(datasetVertex, isInput, depth - 1, nextLevel, new HashSet<>(), atlasLineageOnDemandContext, ret, baseGuid, entitiesTraversed, traversalOrder);
traverseEdgesOnDemand(datasetVertex, isInput, depth - 1, nextLevel, new HashSet<>(), atlasLineageOnDemandContext, ret, baseGuid, entitiesTraversed, traversalOrder, lineageType);
}
}

private void traverseEdgesOnDemand(AtlasVertex datasetVertex, boolean isInput, int depth, int level, Set<String> visitedVertices, AtlasLineageOnDemandContext atlasLineageOnDemandContext, AtlasLineageOnDemandInfo ret, String baseGuid, AtomicInteger entitiesTraversed, AtomicInteger traversalOrder) throws AtlasBaseException {
private void traverseEdgesOnDemand(AtlasVertex datasetVertex, boolean isInput, int depth, int level, Set<String> visitedVertices, AtlasLineageOnDemandContext atlasLineageOnDemandContext, AtlasLineageOnDemandInfo ret, String baseGuid, AtomicInteger entitiesTraversed, AtomicInteger traversalOrder, String lineageType) throws AtlasBaseException {
if (isEntityTraversalLimitReached(entitiesTraversed))
return;
if (depth != 0) { // base condition of recursion for depth
Expand All @@ -369,16 +397,17 @@ private void traverseEdgesOnDemand(AtlasVertex datasetVertex, boolean isInput, i
int nextLevel = isInput ? level - 1: level + 1;
// keep track of visited vertices to avoid circular loop
visitedVertices.add(getId(datasetVertex));

String[] edgeDirections = LINEAGE_MAP.get(lineageType);
AtlasPerfMetrics.MetricRecorder traverseEdgesOnDemandGetEdgesIn = RequestContext.get().startMetricRecord("traverseEdgesOnDemandGetEdgesIn");
Iterator<AtlasEdge> incomingEdges = datasetVertex.getEdges(IN, isInput ? PROCESS_OUTPUTS_EDGE : PROCESS_INPUTS_EDGE).iterator();

Iterator<AtlasEdge> incomingEdges = datasetVertex.getEdges(IN, isInput ? edgeDirections[1] : edgeDirections[0]).iterator();
RequestContext.get().endMetricRecord(traverseEdgesOnDemandGetEdgesIn);

while (incomingEdges.hasNext()) {
AtlasEdge incomingEdge = incomingEdges.next();
AtlasVertex processVertex = incomingEdge.getOutVertex();
AtlasVertex connecterVertex = incomingEdge.getOutVertex();

if (!vertexMatchesEvaluation(processVertex, atlasLineageOnDemandContext) || !edgeMatchesEvaluation(incomingEdge, atlasLineageOnDemandContext)) {
if (!vertexMatchesEvaluation(connecterVertex, atlasLineageOnDemandContext) || !edgeMatchesEvaluation(incomingEdge, atlasLineageOnDemandContext)) {
continue;
}

Expand All @@ -399,7 +428,7 @@ private void traverseEdgesOnDemand(AtlasVertex datasetVertex, boolean isInput, i
}

AtlasPerfMetrics.MetricRecorder traverseEdgesOnDemandGetEdgesOut = RequestContext.get().startMetricRecord("traverseEdgesOnDemandGetEdgesOut");
Iterator<AtlasEdge> outgoingEdges = processVertex.getEdges(OUT, isInput ? PROCESS_INPUTS_EDGE : PROCESS_OUTPUTS_EDGE).iterator();
Iterator<AtlasEdge> outgoingEdges = connecterVertex.getEdges(OUT, isInput ? edgeDirections[0] : edgeDirections[1]).iterator();
RequestContext.get().endMetricRecord(traverseEdgesOnDemandGetEdgesOut);

while (outgoingEdges.hasNext()) {
Expand All @@ -410,11 +439,11 @@ private void traverseEdgesOnDemand(AtlasVertex datasetVertex, boolean isInput, i
continue;
}

if (checkForOffset(outgoingEdge, processVertex, atlasLineageOnDemandContext, ret)) {
if (checkForOffset(outgoingEdge, connecterVertex, atlasLineageOnDemandContext, ret)) {
continue;
}
if (incrementAndCheckIfRelationsLimitReached(outgoingEdge, isInput, atlasLineageOnDemandContext, ret, depth, entitiesTraversed, direction, visitedVertices)) {
String processGuid = AtlasGraphUtilsV2.getIdFromVertex(processVertex);
String processGuid = AtlasGraphUtilsV2.getIdFromVertex(connecterVertex);
LineageInfoOnDemand entityOnDemandInfo = ret.getRelationsOnDemand().get(processGuid);
if (entityOnDemandInfo == null)
continue;
Expand All @@ -430,7 +459,7 @@ private void traverseEdgesOnDemand(AtlasVertex datasetVertex, boolean isInput, i
setEntityLimitReachedFlag(isInput, ret);
}
if (entityVertex != null && !visitedVertices.contains(getId(entityVertex))) {
traverseEdgesOnDemand(entityVertex, isInput, depth - 1, nextLevel, visitedVertices, atlasLineageOnDemandContext, ret, baseGuid, entitiesTraversed, traversalOrder); // execute inner depth
traverseEdgesOnDemand(entityVertex, isInput, depth - 1, nextLevel, visitedVertices, atlasLineageOnDemandContext, ret, baseGuid, entitiesTraversed, traversalOrder, lineageType); // execute inner depth
AtlasEntityHeader traversedEntity = ret.getGuidEntityMap().get(AtlasGraphUtilsV2.getIdFromVertex(entityVertex));
traversedEntity.setFinishTime(traversalOrder.get());
}
Expand All @@ -457,7 +486,7 @@ private void traverseEdgesUsingBFS(String baseGuid, AtlasLineageListContext line
Queue<String> traversalQueue = new LinkedList<>();

AtlasVertex baseVertex = AtlasGraphUtilsV2.findByGuid(this.graph, baseGuid);
boolean isBaseNodeDataset = validateEntityTypeAndCheckIfDataSet(baseGuid);
boolean isBaseNodeDataset = validateAndGetEntityTypeMap(baseGuid).get(IS_DATASET);
enqueueNeighbours(baseVertex, isBaseNodeDataset, lineageListContext, traversalQueue, visitedVertices, skippedVertices);
int currentDepth = 0;
int currentLevel = isBaseNodeDataset? 0: 1;
Expand All @@ -479,7 +508,7 @@ private void traverseEdgesUsingBFS(String baseGuid, AtlasLineageListContext line
if (Objects.isNull(currentVertex))
throw new AtlasBaseException("Found null vertex during lineage graph traversal for guid: " + currentGUID);

boolean isDataset = validateEntityTypeAndCheckIfDataSet(currentGUID);
boolean isDataset = validateAndGetEntityTypeMap(currentGUID).get(IS_DATASET);
if (!lineageListContext.evaluateVertexFilter(currentVertex)) {
enqueueNeighbours(currentVertex, isDataset, lineageListContext, traversalQueue, visitedVertices, skippedVertices);
continue;
Expand Down
Loading

0 comments on commit 653c606

Please sign in to comment.