Skip to content

Commit

Permalink
DG-1697: Adding endpoint for linking/unlink policy
Browse files Browse the repository at this point in the history
  • Loading branch information
arpit-at committed Jul 12, 2024
1 parent 12f7cb7 commit 8241f31
Show file tree
Hide file tree
Showing 3 changed files with 43 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2739,10 +2739,16 @@ public void repairAccesscontrolAlias(String guid) throws AtlasBaseException {
}

@Override
@GraphTransaction
public void linkBusinessPolicy(String guid, List<String> linkGuids) throws AtlasBaseException {
AtlasPerfMetrics.MetricRecorder metric = RequestContext.get().startMetricRecord("linkBusinessPolicy");
this.entityGraphMapper.linkBusinessPolicy(guid, linkGuids);
List<AtlasVertex> vertices = this.entityGraphMapper.linkBusinessPolicy(guid, linkGuids);
EntityMutationResponse entityMutationResponse = new EntityMutationResponse();
for (AtlasVertex v : vertices) {
AtlasEntityHeader aeh = null;
aeh = entityRetriever.toAtlasEntityHeader(v);
entityMutationResponse.addEntity(UPDATE, aeh);
}
entityChangeNotifier.onEntitiesMutated(entityMutationResponse, false);
RequestContext.get().endMetricRecord(metric);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4516,38 +4516,40 @@ public void addHasLineage(Set<AtlasEdge> inputOutputEdges, boolean isRestoreEnti
RequestContext.get().endMetricRecord(metricRecorder);
}

public void linkBusinessPolicy(String policyId, List<String> linkGuids) throws AtlasBaseException {
EntityMutationResponse entityMutationResponse = new EntityMutationResponse();

for (String guid : linkGuids) {
AtlasVertex ev = AtlasGraphUtilsV2.findByGuid(graph, guid);

if (ev != null) {
// Retrieve existing policy GUIDs and add the new policy ID
Set<String> existingValues = ev.getMultiValuedSetProperty("assetPolicyGUIDs", String.class);
existingValues.add(policyId);
ev.setProperty("assetPolicyGUIDs", existingValues);
ev.setProperty("assetPoliciesCount", existingValues.size());

// Update the modification metadata
updateModificationMetadata(ev);
@GraphTransaction
public List<AtlasVertex> linkBusinessPolicy(String policyId, List<String> linkGuids) throws AtlasBaseException {

return linkGuids.stream()
.map(guid -> AtlasGraphUtilsV2.findByGuid(graph, guid))
.filter(Objects::nonNull)
.filter(ev -> {
Set<String> existingValues = ev.getMultiValuedSetProperty("assetPolicyGUIDs", String.class);
return !existingValues.contains(policyId);
})
.peek(ev -> {
Set<String> existingValues = ev.getMultiValuedSetProperty("assetPolicyGUIDs", String.class);
existingValues.add(policyId);
ev.setProperty("assetPolicyGUIDs", policyId);
ev.setProperty("assetPoliciesCount", existingValues.size()+1);

updateModificationMetadata(ev);

cacheDifferentialEntity(ev, existingValues, policyId);
})
.collect(Collectors.toList());
}

// Create the AtlasEntityHeader and add it to the mutation response
AtlasEntityHeader aeh = entityRetriever.toAtlasEntityHeader(ev);
entityMutationResponse.addEntity(UPDATE, aeh);
}
}
private void cacheDifferentialEntity(AtlasVertex ev, Set<String> existingValues, String policyId) {
AtlasEntity diffEntity = new AtlasEntity(ev.getProperty("__typeName", String.class));
diffEntity.setGuid(ev.getProperty("__guid", String.class));
diffEntity.setAttribute("assetPolicyGUIDs", existingValues);
diffEntity.setAttribute("assetPoliciesCount", existingValues.size());
diffEntity.setUpdatedBy(ev.getProperty(MODIFIED_BY_KEY, String.class));
diffEntity.setUpdateTime(new Date(ev.getProperty(MODIFICATION_TIMESTAMP_PROPERTY_KEY, Long.class)));

entityChangeNotifier.onEntitiesMutated(entityMutationResponse, false);
// Send notifications for the entity changes asynchronously
/* CompletableFuture.runAsync(() -> {
try {
entityChangeNotifier.onEntitiesMutated(entityMutationResponse, false);
} catch (AtlasBaseException e) {
// Handle exception
LOG.error("Error in processing notification for policy link updates for asset ids {} ", linkGuids, e);
}
});*/
RequestContext requestContext = RequestContext.get();
requestContext.cacheDifferentialEntity(diffEntity);
}

public void unlinkBusinessPolicy(String policyId, List<String> unlinkGuids) throws AtlasBaseException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1971,7 +1971,11 @@ public void linkBusinessPolicy(@PathParam("policyId") final String policyId, fin
if (AtlasPerfTracer.isPerfTraceEnabled(PERF_LOG)) {
perf = AtlasPerfTracer.getPerfTracer(PERF_LOG, "EntityREST.linkBusinessPolicy(" + policyId + ")");
}

entitiesStore.linkBusinessPolicy(policyId, request.getLinkGuids());
} catch (AtlasBaseException abe) {
LOG.error("Error in policy linking: ", abe);
throw new AtlasBaseException(AtlasErrorCode.INTERNAL_ERROR, "During Policy linking");
} finally {
AtlasPerfTracer.log(perf);
}
Expand Down

0 comments on commit 8241f31

Please sign in to comment.