Skip to content

Commit

Permalink
DG-1697: Adding endpoint for linking/unlink policy
Browse files Browse the repository at this point in the history
DG-1697: Adding endpoint for linking/unlink policy

DG-1697: Adding endpoint for linking/unlink policy

DG-1697: Adding endpoint for linking/unlink policy

DG-1697: Adding endpoint for linking/unlink policy

DG-1697: Adding endpoint for linking/unlink policy

DG-1697: Adding endpoint for linking/unlink policy

DG-1697: Adding endpoint for linking/unlink policy

DG-1697: Adding endpoint for linking/unlink policy

DG-1697: Adding endpoint for linking/unlink policy

DG-1697: Adding endpoint for linking/unlink policy

DG-1697: Adding endpoint for linking/unlink policy

DG-1697: Adding endpoint for linking/unlink policy
  • Loading branch information
arpit-at committed Jul 13, 2024
1 parent 8936e59 commit 83e7c34
Show file tree
Hide file tree
Showing 7 changed files with 186 additions and 36 deletions.
1 change: 1 addition & 0 deletions .github/workflows/maven.yml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ on:
- development
- master
- lineageondemand
- policyendpointsmaster

jobs:
build:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@
import javax.inject.Inject;
import java.io.InputStream;
import java.util.*;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;

import static java.lang.Boolean.FALSE;
Expand Down Expand Up @@ -1550,8 +1551,8 @@ private EntityMutationResponse createOrUpdate(EntityStream entityStream, boolean


// Notify the change listeners
// entityChangeNotifier.onEntitiesMutated(ret, RequestContext.get().isImportInProgress());
// atlasRelationshipStore.onRelationshipsMutated(RequestContext.get().getRelationshipMutationMap());
entityChangeNotifier.onEntitiesMutated(ret, RequestContext.get().isImportInProgress());
atlasRelationshipStore.onRelationshipsMutated(RequestContext.get().getRelationshipMutationMap());
if (LOG.isDebugEnabled()) {
LOG.debug("<== createOrUpdate()");
}
Expand Down Expand Up @@ -2739,19 +2740,52 @@ public void repairAccesscontrolAlias(String guid) throws AtlasBaseException {
}

@Override
@GraphTransaction
public void linkBusinessPolicy(String guid, List<String> linkGuids) {
public void linkBusinessPolicy(String guid, List<String> linkGuids) throws AtlasBaseException {
AtlasPerfMetrics.MetricRecorder metric = RequestContext.get().startMetricRecord("linkBusinessPolicy");
this.entityGraphMapper.linkBusinessPolicy(guid, linkGuids);
List<AtlasVertex> vertices = this.entityGraphMapper.linkBusinessPolicy(guid, linkGuids);
EntityMutationResponse entityMutationResponse = new EntityMutationResponse();
for (AtlasVertex v : vertices) {
AtlasEntityHeader aeh = entityRetriever.toAtlasEntityHeader(v);
entityMutationResponse.addEntity(UPDATE, aeh);
}
// Send notifications for the entity changes asynchronously
RequestContext clonedContext = RequestContext.cloneCurrentContext();
CompletableFuture.runAsync(() -> {
try {
RequestContext.set(clonedContext);
entityChangeNotifier.onEntitiesMutated(entityMutationResponse, false);
} catch (AtlasBaseException e) {
LOG.error("Error in processing notification for policy link updates for asset ids {} ", linkGuids, e);
}finally {
RequestContext.clear();
}
});
RequestContext.get().endMetricRecord(metric);
}


@Override
@GraphTransaction
public void unlinkBusinessPolicy(String guid, List<String> unlinkGuids) {
public void unlinkBusinessPolicy(String guid, List<String> unlinkGuids) throws AtlasBaseException {
AtlasPerfMetrics.MetricRecorder metric = RequestContext.get().startMetricRecord("linkBusinessPolicy");
this.entityGraphMapper.unlinkBusinessPolicy(guid, unlinkGuids);
List<AtlasVertex> vertices = this.entityGraphMapper.unlinkBusinessPolicy(guid, unlinkGuids);
EntityMutationResponse entityMutationResponse = new EntityMutationResponse();
for (AtlasVertex v : vertices) {
AtlasEntityHeader aeh = entityRetriever.toAtlasEntityHeader(v);
entityMutationResponse.addEntity(UPDATE, aeh);
}
// Send notifications for the entity changes asynchronously
RequestContext clonedContext = RequestContext.cloneCurrentContext();
CompletableFuture.runAsync(() -> {
try {
RequestContext.set(clonedContext);
entityChangeNotifier.onEntitiesMutated(entityMutationResponse, false);
} catch (AtlasBaseException e) {
LOG.error("Error in processing notification for policy unlink updates for asset ids {} ", unlinkGuids, e);
} finally {
RequestContext.clear();
}
});
RequestContext.get().endMetricRecord(metric);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@
import java.util.Set;
import java.util.UUID;
import java.util.Date;
import java.util.concurrent.CompletableFuture;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -4514,27 +4515,56 @@ public void addHasLineage(Set<AtlasEdge> inputOutputEdges, boolean isRestoreEnti
}
RequestContext.get().endMetricRecord(metricRecorder);
}
public void linkBusinessPolicy(String policyId, List<String> linkGuids) {
for (String guid : linkGuids) {
AtlasVertex ev = AtlasGraphUtilsV2.findByGuid(graph, guid);
if (ev != null) {
Set<String> existingValues = ev.getMultiValuedSetProperty("assetPolicyGUIDs", String.class);
ev.setProperty("assetPolicyGUIDs", policyId);
ev.setProperty("assetPoliciesCount", existingValues.size() + 1);
updateModificationMetadata(ev);
}
}


@GraphTransaction
public List<AtlasVertex> linkBusinessPolicy(String policyId, List<String> linkGuids) {

return linkGuids.stream().map(guid -> AtlasGraphUtilsV2.findByGuid(graph, guid)).filter(Objects::nonNull).filter(ev -> {
Set<String> existingValues = ev.getMultiValuedSetProperty("assetPolicyGUIDs", String.class);
return !existingValues.contains(policyId);
}).peek(ev -> {
Set<String> existingValues = ev.getMultiValuedSetProperty("assetPolicyGUIDs", String.class);
existingValues.add(policyId);
ev.setProperty("assetPolicyGUIDs", policyId);
ev.setProperty("assetPoliciesCount", existingValues.size() + 1);

updateModificationMetadata(ev);

cacheDifferentialEntity(ev, existingValues, policyId);
}).collect(Collectors.toList());
}

public void unlinkBusinessPolicy(String policyId, List<String> unlinkGuids) {
for (String guid : unlinkGuids) {
AtlasVertex ev = AtlasGraphUtilsV2.findByGuid(graph, guid);
if (ev != null) {
Set<String> existingValues = ev.getMultiValuedSetProperty("assetPolicyGUIDs", String.class);
ev.removePropertyValue("assetPolicyGUIDs", policyId);
ev.setProperty("assetPoliciesCount", existingValues.size() - 1);
updateModificationMetadata(ev);
}
}

@GraphTransaction
public List<AtlasVertex> unlinkBusinessPolicy(String policyId, List<String> unlinkGuids) {

return unlinkGuids.stream().map(guid -> AtlasGraphUtilsV2.findByGuid(graph, guid)).filter(Objects::nonNull).filter(ev -> {
Set<String> existingValues = ev.getMultiValuedSetProperty("assetPolicyGUIDs", String.class);
return !existingValues.contains(policyId);
}).peek(ev -> {
Set<String> existingValues = ev.getMultiValuedSetProperty("assetPolicyGUIDs", String.class);
existingValues.add(policyId);
ev.removePropertyValue("assetPolicyGUIDs", policyId);
ev.setProperty("assetPoliciesCount", existingValues.size() - 1);

updateModificationMetadata(ev);

cacheDifferentialEntity(ev, existingValues, policyId);
}).collect(Collectors.toList());
}
}


private void cacheDifferentialEntity(AtlasVertex ev, Set<String> existingValues, String policyId) {
AtlasEntity diffEntity = new AtlasEntity(ev.getProperty("__typeName", String.class));
diffEntity.setGuid(ev.getProperty("__guid", String.class));
diffEntity.setAttribute("assetPolicyGUIDs", existingValues);
diffEntity.setAttribute("assetPoliciesCount", existingValues.size());
diffEntity.setUpdatedBy(ev.getProperty(MODIFIED_BY_KEY, String.class));
diffEntity.setUpdateTime(new Date(ev.getProperty(MODIFICATION_TIMESTAMP_PROPERTY_KEY, Long.class)));

RequestContext requestContext = RequestContext.get();
requestContext.cacheDifferentialEntity(diffEntity);
}

}
77 changes: 76 additions & 1 deletion server-api/src/main/java/org/apache/atlas/RequestContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ public class RequestContext {
private String currentTypePatchAction = "";
private AtlasTask currentTask;
private String traceId;
private final Map<AtlasObjectId, Object> relationshipEndToVertexIdMap = new HashMap<>();
private Map<AtlasObjectId, Object> relationshipEndToVertexIdMap = new HashMap<>();
private boolean allowDuplicateDisplayName;
private MetricsRegistry metricsRegistry;
private boolean skipAuthorizationCheck = false;
Expand All @@ -105,10 +105,81 @@ public class RequestContext {
private Map<AtlasClassification, Collection<Object>> deletedClassificationAndVertices = new HashMap<>();
private Map<AtlasClassification, Collection<Object>> addedClassificationAndVertices = new HashMap<>();

private boolean isCloned = false;


private RequestContext() {

}

private RequestContext(RequestContext other) {
this.user = other.user;
this.userGroups = other.userGroups != null ? new HashSet<>(other.userGroups) : null;
this.clientIPAddress = other.clientIPAddress;
this.forwardedAddresses = other.forwardedAddresses != null ? new ArrayList<>(other.forwardedAddresses) : null;
this.deleteType = other.deleteType;
this.isPurgeRequested = other.isPurgeRequested;
this.maxAttempts = other.maxAttempts;
this.attemptCount = other.attemptCount;
this.isImportInProgress = other.isImportInProgress;
this.isInNotificationProcessing = other.isInNotificationProcessing;
this.isInTypePatching = other.isInTypePatching;
this.createShellEntityForNonExistingReference = other.createShellEntityForNonExistingReference;
this.skipFailedEntities = other.skipFailedEntities;
this.allowDeletedRelationsIndexsearch = other.allowDeletedRelationsIndexsearch;
this.includeMeanings = other.includeMeanings;
this.includeClassifications = other.includeClassifications;
this.includeClassificationNames = other.includeClassificationNames;
this.currentTypePatchAction = other.currentTypePatchAction;
this.currentTask = other.currentTask;
this.traceId = other.traceId;
this.relationshipEndToVertexIdMap = new HashMap<>(other.relationshipEndToVertexIdMap);
this.allowDuplicateDisplayName = other.allowDuplicateDisplayName;
this.metricsRegistry = other.metricsRegistry;
this.skipAuthorizationCheck = other.skipAuthorizationCheck;
this.deletedEdgesIdsForResetHasLineage = new HashSet<>(other.deletedEdgesIdsForResetHasLineage);
this.requestUri = other.requestUri;
this.cacheEnabled = other.cacheEnabled;
this.delayTagNotifications = other.delayTagNotifications;
this.deletedClassificationAndVertices = new HashMap<>(other.deletedClassificationAndVertices);
this.addedClassificationAndVertices = new HashMap<>(other.addedClassificationAndVertices);

this.updatedEntities.putAll(other.updatedEntities);
this.deletedEntities.putAll(other.deletedEntities);
this.restoreEntities.putAll(other.restoreEntities);
this.entityCache.putAll(other.entityCache);
this.entityHeaderCache.putAll(other.entityHeaderCache);
this.entityExtInfoCache.putAll(other.entityExtInfoCache);
this.diffEntityCache.putAll(other.diffEntityCache);
this.addedPropagations.putAll(other.addedPropagations);
this.removedPropagations.putAll(other.removedPropagations);
this.requestContextHeaders.putAll(other.requestContextHeaders);
this.deletedEdgesIds.addAll(other.deletedEdgesIds);
this.processGuidIds.addAll(other.processGuidIds);
this.entitiesToSkipUpdate.addAll(other.entitiesToSkipUpdate);
this.onlyCAUpdateEntities.addAll(other.onlyCAUpdateEntities);
this.onlyBAUpdateEntities.addAll(other.onlyBAUpdateEntities);
this.queuedTasks.addAll(other.queuedTasks);
this.relationAttrsForSearch.addAll(other.relationAttrsForSearch);
this.removedElementsMap.putAll(other.removedElementsMap);
this.newElementsCreatedMap.putAll(other.newElementsCreatedMap);
this.relationshipMutationMap.putAll(other.relationshipMutationMap);
}


public static RequestContext cloneCurrentContext() {
RequestContext currentContext = get();
RequestContext requestContext = new RequestContext(currentContext);
requestContext.isCloned = true;
return requestContext;
}

public static void set(RequestContext context) {
CURRENT_CONTEXT.set(context);
}



//To handle gets from background threads where createContext() is not called
//createContext called for every request in the filter
public static RequestContext get() {
Expand Down Expand Up @@ -788,4 +859,8 @@ public void clearMutationContext(String event) {
public Map<String, Set<AtlasRelationship>> getRelationshipMutationMap() {
return relationshipMutationMap;
}

public boolean isCloned() {
return isCloned;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,6 @@ private void notifyEntityEvents(List<AtlasEntity> entities, OperationType operat
}

}

sendNotifications(operationType, messages);
RequestContext.get().endMetricRecord(metric);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.atlas.notification;

import org.apache.atlas.GraphTransactionInterceptor;
import org.apache.atlas.RequestContext;
import org.apache.atlas.model.notification.EntityNotification;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.configuration.Configuration;
Expand Down Expand Up @@ -95,10 +96,12 @@ public void send(EntityNotification.EntityNotificationV2.OperationType operation
notificationHook = new PostCommitNotificationHook(operationType, notifications);
postCommitNotificationHooks.set(notificationHook);
} else {
if (isRelationshipEvent(operationType))
notificationHook.addRelationshipNotifications(notifications);
else
notificationHook.addNotifications(notifications);
if (isRelationshipEvent(operationType)) notificationHook.addRelationshipNotifications(notifications);
else notificationHook.addNotifications(notifications);
}

if (RequestContext.get().isCloned()) {
notificationHook.onComplete(true);
}
}

Expand Down
12 changes: 10 additions & 2 deletions webapp/src/main/java/org/apache/atlas/web/rest/EntityREST.java
Original file line number Diff line number Diff line change
Expand Up @@ -1963,15 +1963,19 @@ public void repairAccessControlAlias(@PathParam("guid") String guid) throws Atla
@Consumes(Servlets.JSON_MEDIA_TYPE)
@Timed
public void linkBusinessPolicy(@PathParam("policyId") final String policyId, final LinkBusinessPolicyRequest request) throws AtlasBaseException {
if (ARGO_SERVICE_USER_NAME.equals(RequestContext.getCurrentUser())) {
if (!ARGO_SERVICE_USER_NAME.equals(RequestContext.getCurrentUser())) {
throw new AtlasBaseException(AtlasErrorCode.UNAUTHORIZED_ACCESS, RequestContext.getCurrentUser(), "Policy linking");
}
AtlasPerfTracer perf = null;
try {
if (AtlasPerfTracer.isPerfTraceEnabled(PERF_LOG)) {
perf = AtlasPerfTracer.getPerfTracer(PERF_LOG, "EntityREST.linkBusinessPolicy(" + policyId + ")");
}

entitiesStore.linkBusinessPolicy(policyId, request.getLinkGuids());
} catch (AtlasBaseException abe) {
LOG.error("Error in policy linking: ", abe);
throw new AtlasBaseException(AtlasErrorCode.INTERNAL_ERROR, "During Policy linking");
} finally {
AtlasPerfTracer.log(perf);
}
Expand All @@ -1984,15 +1988,19 @@ public void linkBusinessPolicy(@PathParam("policyId") final String policyId, fin
@Consumes(Servlets.JSON_MEDIA_TYPE)
@Timed
public void unlinkBusinessPolicy(@PathParam("policyId") final String policyId, final LinkBusinessPolicyRequest request) throws AtlasBaseException {
if (ARGO_SERVICE_USER_NAME.equals(RequestContext.getCurrentUser())) {
if (!ARGO_SERVICE_USER_NAME.equals(RequestContext.getCurrentUser())) {
throw new AtlasBaseException(AtlasErrorCode.UNAUTHORIZED_ACCESS, RequestContext.getCurrentUser(), "Policy unlinking");
}
AtlasPerfTracer perf = null;
try {
if (AtlasPerfTracer.isPerfTraceEnabled(PERF_LOG)) {
perf = AtlasPerfTracer.getPerfTracer(PERF_LOG, "EntityREST.unlinkBusinessPolicy(" + policyId + ")");
}

entitiesStore.unlinkBusinessPolicy(policyId, request.getUnlinkGuids());
} catch (AtlasBaseException abe) {
LOG.error("Error in policy unlinking: ", abe);
throw new AtlasBaseException(AtlasErrorCode.INTERNAL_ERROR, "During Policy unlinking");
} finally {
AtlasPerfTracer.log(perf);
}
Expand Down

0 comments on commit 83e7c34

Please sign in to comment.