Skip to content

Commit

Permalink
Merge pull request #2394 from atlanhq/nb/hasLineage
Browse files Browse the repository at this point in the history
DG-128 repair __hasLineage for assets
  • Loading branch information
nikhilbonte21 authored Sep 28, 2023
2 parents 1ff0f10 + 89462d8 commit 0349a48
Show file tree
Hide file tree
Showing 2 changed files with 58 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ public class AtlasHasLineageRequest implements Serializable {
public AtlasHasLineageRequest() {
}

private String assetGuid;
private String processGuid;
private String endGuid;
private String label;
Expand Down Expand Up @@ -68,9 +69,18 @@ public void setLabel(String label) {
this.label = label;
}

public String getAssetGuid() {
return assetGuid;
}

public void setAssetGuid(String assetGuid) {
this.assetGuid = assetGuid;
}

@Override
public String toString() {
return "AtlasHasLineageRequest{" +
"assetGuid='" + assetGuid + '\'' +
"processGuid='" + processGuid + '\'' +
", endGuid='" + endGuid + '\'' +
", label='" + label + '\'' +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2443,28 +2443,61 @@ public void repairHasLineage(AtlasHasLineageRequests requests) throws AtlasBaseE
Set<AtlasEdge> inputOutputEdges = new HashSet<>();

for (AtlasHasLineageRequest request : requests.getRequest()) {
AtlasVertex processVertex = AtlasGraphUtilsV2.findByGuid(this.graph, request.getProcessGuid());
AtlasVertex assetVertex = AtlasGraphUtilsV2.findByGuid(this.graph, request.getEndGuid());
AtlasEdge edge = null;
try {
if (processVertex != null && assetVertex != null) {
edge = graphHelper.getEdge(processVertex, assetVertex, request.getLabel());
} else {
LOG.warn("Skipping since vertex is null for processGuid {} and asset Guid {}"
,request.getProcessGuid(),request.getEndGuid() );
if (StringUtils.isNotEmpty(request.getAssetGuid())) {
//only supports repairing scenario mentioned here - https://atlanhq.atlassian.net/browse/DG-128?focusedCommentId=20652
repairHasLineageForAsset(request);

} else {
AtlasVertex processVertex = AtlasGraphUtilsV2.findByGuid(this.graph, request.getProcessGuid());
AtlasVertex assetVertex = AtlasGraphUtilsV2.findByGuid(this.graph, request.getEndGuid());
AtlasEdge edge = null;
try {
if (processVertex != null && assetVertex != null) {
edge = graphHelper.getEdge(processVertex, assetVertex, request.getLabel());
} else {
LOG.warn("Skipping since vertex is null for processGuid {} and asset Guid {}"
,request.getProcessGuid(),request.getEndGuid() );
}
} catch (RepositoryException re) {
throw new AtlasBaseException(AtlasErrorCode.HAS_LINEAGE_GET_EDGE_FAILED, re);
}
} catch (RepositoryException re) {
throw new AtlasBaseException(AtlasErrorCode.HAS_LINEAGE_GET_EDGE_FAILED, re);
}

if (edge != null) {
inputOutputEdges.add(edge);
if (edge != null) {
inputOutputEdges.add(edge);
}
}
}
repairHasLineageWithAtlasEdges(inputOutputEdges);

if (CollectionUtils.isNotEmpty(inputOutputEdges)) {
repairHasLineageWithAtlasEdges(inputOutputEdges);
}

RequestContext.get().endMetricRecord(metricRecorder);
}

private void repairHasLineageForAsset(AtlasHasLineageRequest request) {
//only supports repairing scenario mentioned here - https://atlanhq.atlassian.net/browse/DG-128?focusedCommentId=20652

AtlasVertex assetVertex = AtlasGraphUtilsV2.findByGuid(this.graph, request.getAssetGuid());

if (getEntityHasLineage(assetVertex)) {
Iterator<AtlasEdge> lineageEdges = assetVertex.getEdges(AtlasEdgeDirection.BOTH, PROCESS_EDGE_LABELS).iterator();
boolean foundActiveRel = false;

while (lineageEdges.hasNext()) {
AtlasEdge edge = lineageEdges.next();
if (getStatus(edge) == ACTIVE) {
foundActiveRel = true;
break;
}
}

if (!foundActiveRel) {
AtlasGraphUtilsV2.setEncodedProperty(assetVertex, HAS_LINEAGE, false);
}
}
}

public void repairHasLineageWithAtlasEdges(Set<AtlasEdge> inputOutputEdges) {
AtlasPerfMetrics.MetricRecorder metricRecorder = RequestContext.get().startMetricRecord("repairHasLineageWithAtlasEdges");

Expand Down

0 comments on commit 0349a48

Please sign in to comment.