Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DG-1720 Added Lineage support for DataProducts <> Assets #3354

Merged
merged 2 commits into from
Jul 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ public interface AtlasLineageService {
* @param lineageOnDemandRequest lineage on demand request object
* @return AtlasLineageInfo
*/
AtlasLineageOnDemandInfo getAtlasLineageInfo(String entityGuid, LineageOnDemandRequest lineageOnDemandRequest) throws AtlasBaseException;
AtlasLineageOnDemandInfo getAtlasLineageInfo(String entityGuid, LineageOnDemandRequest lineageOnDemandRequest, String lineageType) throws AtlasBaseException;

/**
* @param entityGuid unique ID of the entity
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@

import static org.apache.atlas.AtlasClient.DATA_SET_SUPER_TYPE;
import static org.apache.atlas.AtlasClient.PROCESS_SUPER_TYPE;
import static org.apache.atlas.AtlasErrorCode.INSTANCE_LINEAGE_QUERY_FAILED;
import static org.apache.atlas.AtlasErrorCode.*;
import static org.apache.atlas.model.instance.AtlasEntity.Status.DELETED;
import static org.apache.atlas.model.lineage.AtlasLineageInfo.LineageDirection.*;
import static org.apache.atlas.repository.Constants.*;
Expand All @@ -84,11 +84,25 @@ public class EntityLineageService implements AtlasLineageService {

private static final String PROCESS_INPUTS_EDGE = "__Process.inputs";
private static final String PROCESS_OUTPUTS_EDGE = "__Process.outputs";
private static final String OUTPUT_PORT_EDGE = "__Asset.outputPortDataProducts";
private static final String INPUT_PORT_EDGE = "__Asset.inputPortDataProducts";

/**
* String[] => [Input edge Label, Output Edge Label]
*/
public static final HashMap<String, String[]> LINEAGE_MAP = new HashMap<String, String[]>(){{
put(DATASET_PROCESS_LINEAGE, new String[]{PROCESS_INPUTS_EDGE, PROCESS_OUTPUTS_EDGE});
put(PRODUCT_ASSET_LINEAGE, new String[]{INPUT_PORT_EDGE, OUTPUT_PORT_EDGE});
}};
private static final String COLUMNS = "columns";
private static final boolean LINEAGE_USING_GREMLIN = AtlasConfiguration.LINEAGE_USING_GREMLIN.getBoolean();
private static final Integer DEFAULT_LINEAGE_MAX_NODE_COUNT = 9000;
private static final int LINEAGE_ON_DEMAND_DEFAULT_DEPTH = 3;
private static final String SEPARATOR = "->";
public static final String IS_DATA_PRODUCT = "isDataProduct";
public static final String IS_DATASET = "isProcess";
public static final String PRODUCT_ASSET_LINEAGE = "ProductAssetLineage";
public static final String DATASET_PROCESS_LINEAGE = "DatasetProcessLineage";

private final AtlasGraph graph;
private final AtlasGremlinQueryProvider gremlinQueryProvider;
Expand Down Expand Up @@ -170,13 +184,13 @@ public AtlasLineageInfo getAtlasLineageInfo(String guid, LineageDirection direct

@Override
@GraphTransaction
public AtlasLineageOnDemandInfo getAtlasLineageInfo(String guid, LineageOnDemandRequest lineageOnDemandRequest) throws AtlasBaseException {
public AtlasLineageOnDemandInfo getAtlasLineageInfo(String guid, LineageOnDemandRequest lineageOnDemandRequest, String lineageType) throws AtlasBaseException {
AtlasPerfMetrics.MetricRecorder metricRecorder = RequestContext.get().startMetricRecord("getAtlasLineageInfo");

RequestContext.get().setRelationAttrsForSearch(lineageOnDemandRequest.getRelationAttributes());
AtlasLineageOnDemandContext atlasLineageOnDemandContext = new AtlasLineageOnDemandContext(lineageOnDemandRequest, atlasTypeRegistry);
boolean isDataSet = validateEntityTypeAndCheckIfDataSet(guid);
AtlasLineageOnDemandInfo ret = getLineageInfoOnDemand(guid, atlasLineageOnDemandContext, isDataSet);
HashMap<String, Boolean> dataTypeMap = validateAndGetEntityTypeMap(guid);
AtlasLineageOnDemandInfo ret = getLineageInfoOnDemand(guid, atlasLineageOnDemandContext, dataTypeMap, lineageType);
appendLineageOnDemandPayload(ret, lineageOnDemandRequest);
// filtering out on-demand relations which has input & output nodes within the limit
cleanupRelationsOnDemand(ret);
Expand All @@ -199,20 +213,24 @@ public AtlasLineageListInfo getLineageListInfoOnDemand(String guid, LineageListR
return ret;
}

private boolean validateEntityTypeAndCheckIfDataSet(String guid) throws AtlasBaseException {
private HashMap<String, Boolean> validateAndGetEntityTypeMap(String guid) throws AtlasBaseException {
String typeName = entityRetriever.getEntityVertex(guid).getProperty(Constants.TYPE_NAME_PROPERTY_KEY, String.class);
AtlasEntityType entityType = atlasTypeRegistry.getEntityTypeByName(typeName);
if (entityType == null) {
throw new AtlasBaseException(AtlasErrorCode.TYPE_NAME_NOT_FOUND, typeName);
}
HashMap<String, Boolean> dataTypeMap = new HashMap<>();
boolean isProcess = entityType.getTypeAndAllSuperTypes().contains(PROCESS_SUPER_TYPE);
boolean isDataProduct = entityType.getTypeName().equals(DATA_PRODUCT_ENTITY_TYPE);
dataTypeMap.put(IS_DATA_PRODUCT, isDataProduct);
dataTypeMap.put(IS_DATASET, !isProcess);
if (!isProcess) {
boolean isDataSet = entityType.getTypeAndAllSuperTypes().contains(DATA_SET_SUPER_TYPE);
if (!isDataSet) {
throw new AtlasBaseException(AtlasErrorCode.INVALID_LINEAGE_ENTITY_TYPE, guid, typeName);
}
}
return !isProcess;
return dataTypeMap;
}

private LineageOnDemandConstraints getLineageConstraints(String guid, LineageOnDemandBaseParams defaultParams) {
Expand Down Expand Up @@ -277,8 +295,10 @@ private void cleanupRelationsOnDemand(AtlasLineageOnDemandInfo lineageInfo) {
}
}

private AtlasLineageOnDemandInfo getLineageInfoOnDemand(String guid, AtlasLineageOnDemandContext atlasLineageOnDemandContext, boolean isDataSet) throws AtlasBaseException {
private AtlasLineageOnDemandInfo getLineageInfoOnDemand(String guid, AtlasLineageOnDemandContext atlasLineageOnDemandContext, HashMap<String, Boolean> dataTypeMap, String lineageType) throws AtlasBaseException {
AtlasPerfMetrics.MetricRecorder metricRecorder = RequestContext.get().startMetricRecord("getLineageInfoOnDemand");
if(StringUtils.isEmpty(lineageType))
lineageType = DATASET_PROCESS_LINEAGE;

LineageOnDemandConstraints lineageConstraintsByGuid = getAndValidateLineageConstraintsByGuid(guid, atlasLineageOnDemandContext);
AtlasLineageOnDemandInfo.LineageDirection direction = lineageConstraintsByGuid.getDirection();
Expand All @@ -294,27 +314,35 @@ private AtlasLineageOnDemandInfo getLineageInfoOnDemand(String guid, AtlasLineag
AtomicInteger inputEntitiesTraversed = new AtomicInteger(0);
AtomicInteger outputEntitiesTraversed = new AtomicInteger(0);
AtomicInteger traversalOrder = new AtomicInteger(1);
if (isDataSet) {
AtlasVertex datasetVertex = AtlasGraphUtilsV2.findByGuid(this.graph, guid);
if (direction == AtlasLineageOnDemandInfo.LineageDirection.INPUT || direction == AtlasLineageOnDemandInfo.LineageDirection.BOTH)
traverseEdgesOnDemand(datasetVertex, true, depth, level, new HashSet<>(), atlasLineageOnDemandContext, ret, guid, inputEntitiesTraversed, traversalOrder);
if (direction == AtlasLineageOnDemandInfo.LineageDirection.OUTPUT || direction == AtlasLineageOnDemandInfo.LineageDirection.BOTH)
traverseEdgesOnDemand(datasetVertex, false, depth, level, new HashSet<>(), atlasLineageOnDemandContext, ret, guid, outputEntitiesTraversed, traversalOrder);
AtlasEntityHeader baseEntityHeader = entityRetriever.toAtlasEntityHeader(datasetVertex, atlasLineageOnDemandContext.getAttributes());
setGraphTraversalMetadata(level, traversalOrder, baseEntityHeader);
ret.getGuidEntityMap().put(guid, baseEntityHeader);
} else {
AtlasVertex processVertex = AtlasGraphUtilsV2.findByGuid(this.graph, guid);
// make one hop to the next dataset vertices from process vertex and traverse with 'depth = depth - 1'
if (direction == AtlasLineageOnDemandInfo.LineageDirection.INPUT || direction == AtlasLineageOnDemandInfo.LineageDirection.BOTH) {
Iterator<AtlasEdge> processEdges = processVertex.getEdges(AtlasEdgeDirection.OUT, PROCESS_INPUTS_EDGE).iterator();
traverseEdgesOnDemand(processEdges, true, depth, level, atlasLineageOnDemandContext, ret, processVertex, guid, inputEntitiesTraversed, traversalOrder);
}
if (direction == AtlasLineageOnDemandInfo.LineageDirection.OUTPUT || direction == AtlasLineageOnDemandInfo.LineageDirection.BOTH) {
Iterator<AtlasEdge> processEdges = processVertex.getEdges(AtlasEdgeDirection.OUT, PROCESS_OUTPUTS_EDGE).iterator();
traverseEdgesOnDemand(processEdges, false, depth, level, atlasLineageOnDemandContext, ret, processVertex, guid, outputEntitiesTraversed, traversalOrder);
String[] lineageEdgeLabels = LINEAGE_MAP.get(lineageType);
boolean isConnecterVertex;


isConnecterVertex = lineageType.equals(PRODUCT_ASSET_LINEAGE)
? !dataTypeMap.get(IS_DATA_PRODUCT)
: !dataTypeMap.get(IS_DATASET);

if (!isConnecterVertex) {
AtlasVertex datasetVertex = AtlasGraphUtilsV2.findByGuid(this.graph, guid);
if (direction == AtlasLineageOnDemandInfo.LineageDirection.INPUT || direction == AtlasLineageOnDemandInfo.LineageDirection.BOTH)
traverseEdgesOnDemand(datasetVertex, true, depth, level, new HashSet<>(), atlasLineageOnDemandContext, ret, guid, inputEntitiesTraversed, traversalOrder, lineageType);
if (direction == AtlasLineageOnDemandInfo.LineageDirection.OUTPUT || direction == AtlasLineageOnDemandInfo.LineageDirection.BOTH)
traverseEdgesOnDemand(datasetVertex, false, depth, level, new HashSet<>(), atlasLineageOnDemandContext, ret, guid, outputEntitiesTraversed, traversalOrder, lineageType);
AtlasEntityHeader baseEntityHeader = entityRetriever.toAtlasEntityHeader(datasetVertex, atlasLineageOnDemandContext.getAttributes());
setGraphTraversalMetadata(level, traversalOrder, baseEntityHeader);
ret.getGuidEntityMap().put(guid, baseEntityHeader);
} else {
AtlasVertex processVertex = AtlasGraphUtilsV2.findByGuid(this.graph, guid);
// make one hop to the next dataset vertices from process vertex and traverse with 'depth = depth - 1'
if (direction == AtlasLineageOnDemandInfo.LineageDirection.INPUT || direction == AtlasLineageOnDemandInfo.LineageDirection.BOTH) {
Iterator<AtlasEdge> processEdges = processVertex.getEdges(AtlasEdgeDirection.OUT, lineageEdgeLabels[0]).iterator();
traverseEdgesOnDemand(processEdges, true, depth, level, atlasLineageOnDemandContext, ret, processVertex, guid, inputEntitiesTraversed, traversalOrder, lineageType);
}
if (direction == AtlasLineageOnDemandInfo.LineageDirection.OUTPUT || direction == AtlasLineageOnDemandInfo.LineageDirection.BOTH) {
Iterator<AtlasEdge> processEdges = processVertex.getEdges(AtlasEdgeDirection.OUT, lineageEdgeLabels[1]).iterator();
traverseEdgesOnDemand(processEdges, false, depth, level, atlasLineageOnDemandContext, ret, processVertex, guid, outputEntitiesTraversed, traversalOrder, lineageType);
}
}
}
RequestContext.get().endMetricRecord(metricRecorder);
return ret;
}
Expand All @@ -325,7 +353,7 @@ private static void setGraphTraversalMetadata(int level, AtomicInteger traversal
baseEntityHeader.setFinishTime(traversalOrder.get());
}

private void traverseEdgesOnDemand(Iterator<AtlasEdge> processEdges, boolean isInput, int depth, int level, AtlasLineageOnDemandContext atlasLineageOnDemandContext, AtlasLineageOnDemandInfo ret, AtlasVertex processVertex, String baseGuid, AtomicInteger entitiesTraversed, AtomicInteger traversalOrder) throws AtlasBaseException {
private void traverseEdgesOnDemand(Iterator<AtlasEdge> processEdges, boolean isInput, int depth, int level, AtlasLineageOnDemandContext atlasLineageOnDemandContext, AtlasLineageOnDemandInfo ret, AtlasVertex processVertex, String baseGuid, AtomicInteger entitiesTraversed, AtomicInteger traversalOrder, String lineageType) throws AtlasBaseException {
AtlasLineageOnDemandInfo.LineageDirection direction = isInput ? AtlasLineageOnDemandInfo.LineageDirection.INPUT : AtlasLineageOnDemandInfo.LineageDirection.OUTPUT;
int nextLevel = isInput ? level - 1: level + 1;

Expand Down Expand Up @@ -356,11 +384,11 @@ private void traverseEdgesOnDemand(Iterator<AtlasEdge> processEdges, boolean isI
ret.getRelationsOnDemand().put(inGuid, new LineageInfoOnDemand(inGuidLineageConstrains));
}

traverseEdgesOnDemand(datasetVertex, isInput, depth - 1, nextLevel, new HashSet<>(), atlasLineageOnDemandContext, ret, baseGuid, entitiesTraversed, traversalOrder);
traverseEdgesOnDemand(datasetVertex, isInput, depth - 1, nextLevel, new HashSet<>(), atlasLineageOnDemandContext, ret, baseGuid, entitiesTraversed, traversalOrder, lineageType);
}
}

private void traverseEdgesOnDemand(AtlasVertex datasetVertex, boolean isInput, int depth, int level, Set<String> visitedVertices, AtlasLineageOnDemandContext atlasLineageOnDemandContext, AtlasLineageOnDemandInfo ret, String baseGuid, AtomicInteger entitiesTraversed, AtomicInteger traversalOrder) throws AtlasBaseException {
private void traverseEdgesOnDemand(AtlasVertex datasetVertex, boolean isInput, int depth, int level, Set<String> visitedVertices, AtlasLineageOnDemandContext atlasLineageOnDemandContext, AtlasLineageOnDemandInfo ret, String baseGuid, AtomicInteger entitiesTraversed, AtomicInteger traversalOrder, String lineageType) throws AtlasBaseException {
if (isEntityTraversalLimitReached(entitiesTraversed))
return;
if (depth != 0) { // base condition of recursion for depth
Expand All @@ -369,16 +397,17 @@ private void traverseEdgesOnDemand(AtlasVertex datasetVertex, boolean isInput, i
int nextLevel = isInput ? level - 1: level + 1;
// keep track of visited vertices to avoid circular loop
visitedVertices.add(getId(datasetVertex));

String[] edgeDirections = LINEAGE_MAP.get(lineageType);
AtlasPerfMetrics.MetricRecorder traverseEdgesOnDemandGetEdgesIn = RequestContext.get().startMetricRecord("traverseEdgesOnDemandGetEdgesIn");
Iterator<AtlasEdge> incomingEdges = datasetVertex.getEdges(IN, isInput ? PROCESS_OUTPUTS_EDGE : PROCESS_INPUTS_EDGE).iterator();

Iterator<AtlasEdge> incomingEdges = datasetVertex.getEdges(IN, isInput ? edgeDirections[1] : edgeDirections[0]).iterator();
RequestContext.get().endMetricRecord(traverseEdgesOnDemandGetEdgesIn);

while (incomingEdges.hasNext()) {
AtlasEdge incomingEdge = incomingEdges.next();
AtlasVertex processVertex = incomingEdge.getOutVertex();
AtlasVertex connecterVertex = incomingEdge.getOutVertex();

if (!vertexMatchesEvaluation(processVertex, atlasLineageOnDemandContext) || !edgeMatchesEvaluation(incomingEdge, atlasLineageOnDemandContext)) {
if (!vertexMatchesEvaluation(connecterVertex, atlasLineageOnDemandContext) || !edgeMatchesEvaluation(incomingEdge, atlasLineageOnDemandContext)) {
continue;
}

Expand All @@ -399,7 +428,7 @@ private void traverseEdgesOnDemand(AtlasVertex datasetVertex, boolean isInput, i
}

AtlasPerfMetrics.MetricRecorder traverseEdgesOnDemandGetEdgesOut = RequestContext.get().startMetricRecord("traverseEdgesOnDemandGetEdgesOut");
Iterator<AtlasEdge> outgoingEdges = processVertex.getEdges(OUT, isInput ? PROCESS_INPUTS_EDGE : PROCESS_OUTPUTS_EDGE).iterator();
Iterator<AtlasEdge> outgoingEdges = connecterVertex.getEdges(OUT, isInput ? edgeDirections[0] : edgeDirections[1]).iterator();
RequestContext.get().endMetricRecord(traverseEdgesOnDemandGetEdgesOut);

while (outgoingEdges.hasNext()) {
Expand All @@ -410,11 +439,11 @@ private void traverseEdgesOnDemand(AtlasVertex datasetVertex, boolean isInput, i
continue;
}

if (checkForOffset(outgoingEdge, processVertex, atlasLineageOnDemandContext, ret)) {
if (checkForOffset(outgoingEdge, connecterVertex, atlasLineageOnDemandContext, ret)) {
continue;
}
if (incrementAndCheckIfRelationsLimitReached(outgoingEdge, isInput, atlasLineageOnDemandContext, ret, depth, entitiesTraversed, direction, visitedVertices)) {
String processGuid = AtlasGraphUtilsV2.getIdFromVertex(processVertex);
String processGuid = AtlasGraphUtilsV2.getIdFromVertex(connecterVertex);
LineageInfoOnDemand entityOnDemandInfo = ret.getRelationsOnDemand().get(processGuid);
if (entityOnDemandInfo == null)
continue;
Expand All @@ -430,7 +459,7 @@ private void traverseEdgesOnDemand(AtlasVertex datasetVertex, boolean isInput, i
setEntityLimitReachedFlag(isInput, ret);
}
if (entityVertex != null && !visitedVertices.contains(getId(entityVertex))) {
traverseEdgesOnDemand(entityVertex, isInput, depth - 1, nextLevel, visitedVertices, atlasLineageOnDemandContext, ret, baseGuid, entitiesTraversed, traversalOrder); // execute inner depth
traverseEdgesOnDemand(entityVertex, isInput, depth - 1, nextLevel, visitedVertices, atlasLineageOnDemandContext, ret, baseGuid, entitiesTraversed, traversalOrder, lineageType); // execute inner depth
AtlasEntityHeader traversedEntity = ret.getGuidEntityMap().get(AtlasGraphUtilsV2.getIdFromVertex(entityVertex));
traversedEntity.setFinishTime(traversalOrder.get());
}
Expand All @@ -457,7 +486,7 @@ private void traverseEdgesUsingBFS(String baseGuid, AtlasLineageListContext line
Queue<String> traversalQueue = new LinkedList<>();

AtlasVertex baseVertex = AtlasGraphUtilsV2.findByGuid(this.graph, baseGuid);
boolean isBaseNodeDataset = validateEntityTypeAndCheckIfDataSet(baseGuid);
boolean isBaseNodeDataset = validateAndGetEntityTypeMap(baseGuid).get(IS_DATASET);
enqueueNeighbours(baseVertex, isBaseNodeDataset, lineageListContext, traversalQueue, visitedVertices, skippedVertices);
int currentDepth = 0;
int currentLevel = isBaseNodeDataset? 0: 1;
Expand All @@ -479,7 +508,7 @@ private void traverseEdgesUsingBFS(String baseGuid, AtlasLineageListContext line
if (Objects.isNull(currentVertex))
throw new AtlasBaseException("Found null vertex during lineage graph traversal for guid: " + currentGUID);

boolean isDataset = validateEntityTypeAndCheckIfDataSet(currentGUID);
boolean isDataset = validateAndGetEntityTypeMap(currentGUID).get(IS_DATASET);
if (!lineageListContext.evaluateVertexFilter(currentVertex)) {
enqueueNeighbours(currentVertex, isDataset, lineageListContext, traversalQueue, visitedVertices, skippedVertices);
continue;
Expand Down
Loading
Loading