Skip to content

Commit

Permalink
Merge pull request #2395 from atlanhq/nb/hasLineage
Browse files Browse the repository at this point in the history
[stag] DG-128 Fix __hasLineage filter issue
  • Loading branch information
nikhilbonte21 authored Oct 3, 2023
2 parents e8a528c + 89462d8 commit a339e74
Show file tree
Hide file tree
Showing 4 changed files with 72 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ public class AtlasHasLineageRequest implements Serializable {
public AtlasHasLineageRequest() {
}

private String assetGuid;
private String processGuid;
private String endGuid;
private String label;
Expand Down Expand Up @@ -68,9 +69,18 @@ public void setLabel(String label) {
this.label = label;
}

public String getAssetGuid() {
return assetGuid;
}

public void setAssetGuid(String assetGuid) {
this.assetGuid = assetGuid;
}

@Override
public String toString() {
return "AtlasHasLineageRequest{" +
"assetGuid='" + assetGuid + '\'' +
"processGuid='" + processGuid + '\'' +
", endGuid='" + endGuid + '\'' +
", label='" + label + '\'' +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1527,6 +1527,8 @@ public void resetHasLineageOnInputOutputDelete(Collection<AtlasEdge> removedEdge
private void updateAssetHasLineageStatus(AtlasVertex assetVertex, AtlasEdge currentEdge, Collection<AtlasEdge> removedEdges) {
AtlasPerfMetrics.MetricRecorder metricRecorder = RequestContext.get().startMetricRecord("updateAssetHasLineageStatus");

removedEdges.forEach(edge -> RequestContext.get().addToDeletedEdgesIdsForResetHasLineage(edge.getIdForDisplay()));

Iterator<AtlasEdge> edgeIterator = assetVertex.query()
.direction(AtlasEdgeDirection.BOTH)
.label(PROCESS_EDGE_LABELS)
Expand All @@ -1537,7 +1539,7 @@ private void updateAssetHasLineageStatus(AtlasVertex assetVertex, AtlasEdge curr
int processHasLineageCount = 0;
while (edgeIterator.hasNext()) {
AtlasEdge edge = edgeIterator.next();
if (!removedEdges.contains(edge) && !currentEdge.equals(edge)) {
if (!RequestContext.get().getDeletedEdgesIdsForResetHasLineage().contains(edge.getIdForDisplay()) && !currentEdge.equals(edge)) {
AtlasVertex relatedProcessVertex = edge.getOutVertex();
boolean processHasLineage = getEntityHasLineage(relatedProcessVertex);
if (processHasLineage) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2443,28 +2443,61 @@ public void repairHasLineage(AtlasHasLineageRequests requests) throws AtlasBaseE
Set<AtlasEdge> inputOutputEdges = new HashSet<>();

for (AtlasHasLineageRequest request : requests.getRequest()) {
AtlasVertex processVertex = AtlasGraphUtilsV2.findByGuid(this.graph, request.getProcessGuid());
AtlasVertex assetVertex = AtlasGraphUtilsV2.findByGuid(this.graph, request.getEndGuid());
AtlasEdge edge = null;
try {
if (processVertex != null && assetVertex != null) {
edge = graphHelper.getEdge(processVertex, assetVertex, request.getLabel());
} else {
LOG.warn("Skipping since vertex is null for processGuid {} and asset Guid {}"
,request.getProcessGuid(),request.getEndGuid() );
if (StringUtils.isNotEmpty(request.getAssetGuid())) {
//only supports repairing scenario mentioned here - https://atlanhq.atlassian.net/browse/DG-128?focusedCommentId=20652
repairHasLineageForAsset(request);

} else {
AtlasVertex processVertex = AtlasGraphUtilsV2.findByGuid(this.graph, request.getProcessGuid());
AtlasVertex assetVertex = AtlasGraphUtilsV2.findByGuid(this.graph, request.getEndGuid());
AtlasEdge edge = null;
try {
if (processVertex != null && assetVertex != null) {
edge = graphHelper.getEdge(processVertex, assetVertex, request.getLabel());
} else {
LOG.warn("Skipping since vertex is null for processGuid {} and asset Guid {}"
,request.getProcessGuid(),request.getEndGuid() );
}
} catch (RepositoryException re) {
throw new AtlasBaseException(AtlasErrorCode.HAS_LINEAGE_GET_EDGE_FAILED, re);
}
} catch (RepositoryException re) {
throw new AtlasBaseException(AtlasErrorCode.HAS_LINEAGE_GET_EDGE_FAILED, re);
}

if (edge != null) {
inputOutputEdges.add(edge);
if (edge != null) {
inputOutputEdges.add(edge);
}
}
}
repairHasLineageWithAtlasEdges(inputOutputEdges);

if (CollectionUtils.isNotEmpty(inputOutputEdges)) {
repairHasLineageWithAtlasEdges(inputOutputEdges);
}

RequestContext.get().endMetricRecord(metricRecorder);
}

private void repairHasLineageForAsset(AtlasHasLineageRequest request) {
//only supports repairing scenario mentioned here - https://atlanhq.atlassian.net/browse/DG-128?focusedCommentId=20652

AtlasVertex assetVertex = AtlasGraphUtilsV2.findByGuid(this.graph, request.getAssetGuid());

if (getEntityHasLineage(assetVertex)) {
Iterator<AtlasEdge> lineageEdges = assetVertex.getEdges(AtlasEdgeDirection.BOTH, PROCESS_EDGE_LABELS).iterator();
boolean foundActiveRel = false;

while (lineageEdges.hasNext()) {
AtlasEdge edge = lineageEdges.next();
if (getStatus(edge) == ACTIVE) {
foundActiveRel = true;
break;
}
}

if (!foundActiveRel) {
AtlasGraphUtilsV2.setEncodedProperty(assetVertex, HAS_LINEAGE, false);
}
}
}

public void repairHasLineageWithAtlasEdges(Set<AtlasEdge> inputOutputEdges) {
AtlasPerfMetrics.MetricRecorder metricRecorder = RequestContext.get().startMetricRecord("repairHasLineageWithAtlasEdges");

Expand Down
11 changes: 11 additions & 0 deletions server-api/src/main/java/org/apache/atlas/RequestContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,8 @@ public class RequestContext {
private boolean allowDuplicateDisplayName;
private MetricsRegistry metricsRegistry;
private boolean skipAuthorizationCheck = false;
private Set<String> deletedEdgesIdsForResetHasLineage = new HashSet<>(0);


private RequestContext() {
}
Expand Down Expand Up @@ -147,6 +149,7 @@ public void clearCache() {
this.removedElementsMap.clear();
this.deletedEdgesIds.clear();
this.processGuidIds.clear();
this.deletedEdgesIdsForResetHasLineage.clear();
this.requestContextHeaders.clear();
this.relationshipEndToVertexIdMap.clear();
this.relationshipMutationMap.clear();
Expand Down Expand Up @@ -389,6 +392,14 @@ public Set<String> getDeletedEdgesIds() {
return deletedEdgesIds;
}

public void addToDeletedEdgesIdsForResetHasLineage(String edgeId) {
deletedEdgesIdsForResetHasLineage.add(edgeId);
}

public Set<String> getDeletedEdgesIdsForResetHasLineage() {
return deletedEdgesIdsForResetHasLineage;
}

public Set<String> getProcessGuidIds() {
return processGuidIds;
}
Expand Down

0 comments on commit a339e74

Please sign in to comment.