From 83e7c349b6e9bde42a1c381ffff2a26ce26bc2cc Mon Sep 17 00:00:00 2001 From: arpit-at Date: Wed, 3 Jul 2024 20:02:07 +0530 Subject: [PATCH] DG-1697: Adding endpoint for linking/unlink policy DG-1697: Adding endpoint for linking/unlink policy DG-1697: Adding endpoint for linking/unlink policy DG-1697: Adding endpoint for linking/unlink policy DG-1697: Adding endpoint for linking/unlink policy DG-1697: Adding endpoint for linking/unlink policy DG-1697: Adding endpoint for linking/unlink policy DG-1697: Adding endpoint for linking/unlink policy DG-1697: Adding endpoint for linking/unlink policy DG-1697: Adding endpoint for linking/unlink policy DG-1697: Adding endpoint for linking/unlink policy DG-1697: Adding endpoint for linking/unlink policy DG-1697: Adding endpoint for linking/unlink policy --- .github/workflows/maven.yml | 1 + .../store/graph/v2/AtlasEntityStoreV2.java | 48 ++++++++++-- .../store/graph/v2/EntityGraphMapper.java | 72 ++++++++++++----- .../java/org/apache/atlas/RequestContext.java | 77 ++++++++++++++++++- .../EntityNotificationListenerV2.java | 1 - .../EntityNotificationSender.java | 11 ++- .../org/apache/atlas/web/rest/EntityREST.java | 12 ++- 7 files changed, 186 insertions(+), 36 deletions(-) diff --git a/.github/workflows/maven.yml b/.github/workflows/maven.yml index 0977cb36a25..1070038c39e 100644 --- a/.github/workflows/maven.yml +++ b/.github/workflows/maven.yml @@ -26,6 +26,7 @@ on: - development - master - lineageondemand + - policyendpointsmaster jobs: build: diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityStoreV2.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityStoreV2.java index a227e5d36ca..0dce89e9ad0 100644 --- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityStoreV2.java +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityStoreV2.java @@ -104,6 +104,7 @@ import javax.inject.Inject; import java.io.InputStream; import java.util.*; +import java.util.concurrent.CompletableFuture; import java.util.stream.Collectors; import static java.lang.Boolean.FALSE; @@ -1550,8 +1551,8 @@ private EntityMutationResponse createOrUpdate(EntityStream entityStream, boolean // Notify the change listeners - // entityChangeNotifier.onEntitiesMutated(ret, RequestContext.get().isImportInProgress()); - // atlasRelationshipStore.onRelationshipsMutated(RequestContext.get().getRelationshipMutationMap()); + entityChangeNotifier.onEntitiesMutated(ret, RequestContext.get().isImportInProgress()); + atlasRelationshipStore.onRelationshipsMutated(RequestContext.get().getRelationshipMutationMap()); if (LOG.isDebugEnabled()) { LOG.debug("<== createOrUpdate()"); } @@ -2739,19 +2740,52 @@ public void repairAccesscontrolAlias(String guid) throws AtlasBaseException { } @Override - @GraphTransaction - public void linkBusinessPolicy(String guid, List linkGuids) { + public void linkBusinessPolicy(String guid, List linkGuids) throws AtlasBaseException { AtlasPerfMetrics.MetricRecorder metric = RequestContext.get().startMetricRecord("linkBusinessPolicy"); - this.entityGraphMapper.linkBusinessPolicy(guid, linkGuids); + List vertices = this.entityGraphMapper.linkBusinessPolicy(guid, linkGuids); + EntityMutationResponse entityMutationResponse = new EntityMutationResponse(); + for (AtlasVertex v : vertices) { + AtlasEntityHeader aeh = entityRetriever.toAtlasEntityHeader(v); + entityMutationResponse.addEntity(UPDATE, aeh); + } + // Send notifications for the entity changes asynchronously + RequestContext clonedContext = RequestContext.cloneCurrentContext(); + CompletableFuture.runAsync(() -> { + try { + RequestContext.set(clonedContext); + entityChangeNotifier.onEntitiesMutated(entityMutationResponse, false); + } catch (AtlasBaseException e) { + LOG.error("Error in processing notification for policy link updates for asset ids {} ", linkGuids, e); + }finally { + RequestContext.clear(); + } + }); RequestContext.get().endMetricRecord(metric); } @Override @GraphTransaction - public void unlinkBusinessPolicy(String guid, List unlinkGuids) { + public void unlinkBusinessPolicy(String guid, List unlinkGuids) throws AtlasBaseException { AtlasPerfMetrics.MetricRecorder metric = RequestContext.get().startMetricRecord("linkBusinessPolicy"); - this.entityGraphMapper.unlinkBusinessPolicy(guid, unlinkGuids); + List vertices = this.entityGraphMapper.unlinkBusinessPolicy(guid, unlinkGuids); + EntityMutationResponse entityMutationResponse = new EntityMutationResponse(); + for (AtlasVertex v : vertices) { + AtlasEntityHeader aeh = entityRetriever.toAtlasEntityHeader(v); + entityMutationResponse.addEntity(UPDATE, aeh); + } + // Send notifications for the entity changes asynchronously + RequestContext clonedContext = RequestContext.cloneCurrentContext(); + CompletableFuture.runAsync(() -> { + try { + RequestContext.set(clonedContext); + entityChangeNotifier.onEntitiesMutated(entityMutationResponse, false); + } catch (AtlasBaseException e) { + LOG.error("Error in processing notification for policy unlink updates for asset ids {} ", unlinkGuids, e); + } finally { + RequestContext.clear(); + } + }); RequestContext.get().endMetricRecord(metric); } } diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphMapper.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphMapper.java index 70835b74cb5..fce0460ec00 100644 --- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphMapper.java +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphMapper.java @@ -98,6 +98,7 @@ import java.util.Set; import java.util.UUID; import java.util.Date; +import java.util.concurrent.CompletableFuture; import java.util.regex.Matcher; import java.util.regex.Pattern; import java.util.stream.Collectors; @@ -4514,27 +4515,56 @@ public void addHasLineage(Set inputOutputEdges, boolean isRestoreEnti } RequestContext.get().endMetricRecord(metricRecorder); } - public void linkBusinessPolicy(String policyId, List linkGuids) { - for (String guid : linkGuids) { - AtlasVertex ev = AtlasGraphUtilsV2.findByGuid(graph, guid); - if (ev != null) { - Set existingValues = ev.getMultiValuedSetProperty("assetPolicyGUIDs", String.class); - ev.setProperty("assetPolicyGUIDs", policyId); - ev.setProperty("assetPoliciesCount", existingValues.size() + 1); - updateModificationMetadata(ev); - } - } + + + @GraphTransaction + public List linkBusinessPolicy(String policyId, List linkGuids) { + + return linkGuids.stream().map(guid -> AtlasGraphUtilsV2.findByGuid(graph, guid)).filter(Objects::nonNull).filter(ev -> { + Set existingValues = ev.getMultiValuedSetProperty("assetPolicyGUIDs", String.class); + return !existingValues.contains(policyId); + }).peek(ev -> { + Set existingValues = ev.getMultiValuedSetProperty("assetPolicyGUIDs", String.class); + existingValues.add(policyId); + ev.setProperty("assetPolicyGUIDs", policyId); + ev.setProperty("assetPoliciesCount", existingValues.size() + 1); + + updateModificationMetadata(ev); + + cacheDifferentialEntity(ev, existingValues, policyId); + }).collect(Collectors.toList()); } - public void unlinkBusinessPolicy(String policyId, List unlinkGuids) { - for (String guid : unlinkGuids) { - AtlasVertex ev = AtlasGraphUtilsV2.findByGuid(graph, guid); - if (ev != null) { - Set existingValues = ev.getMultiValuedSetProperty("assetPolicyGUIDs", String.class); - ev.removePropertyValue("assetPolicyGUIDs", policyId); - ev.setProperty("assetPoliciesCount", existingValues.size() - 1); - updateModificationMetadata(ev); - } - } + + @GraphTransaction + public List unlinkBusinessPolicy(String policyId, List unlinkGuids) { + + return unlinkGuids.stream().map(guid -> AtlasGraphUtilsV2.findByGuid(graph, guid)).filter(Objects::nonNull).filter(ev -> { + Set existingValues = ev.getMultiValuedSetProperty("assetPolicyGUIDs", String.class); + return !existingValues.contains(policyId); + }).peek(ev -> { + Set existingValues = ev.getMultiValuedSetProperty("assetPolicyGUIDs", String.class); + existingValues.add(policyId); + ev.removePropertyValue("assetPolicyGUIDs", policyId); + ev.setProperty("assetPoliciesCount", existingValues.size() - 1); + + updateModificationMetadata(ev); + + cacheDifferentialEntity(ev, existingValues, policyId); + }).collect(Collectors.toList()); } -} \ No newline at end of file + + + private void cacheDifferentialEntity(AtlasVertex ev, Set existingValues, String policyId) { + AtlasEntity diffEntity = new AtlasEntity(ev.getProperty("__typeName", String.class)); + diffEntity.setGuid(ev.getProperty("__guid", String.class)); + diffEntity.setAttribute("assetPolicyGUIDs", existingValues); + diffEntity.setAttribute("assetPoliciesCount", existingValues.size()); + diffEntity.setUpdatedBy(ev.getProperty(MODIFIED_BY_KEY, String.class)); + diffEntity.setUpdateTime(new Date(ev.getProperty(MODIFICATION_TIMESTAMP_PROPERTY_KEY, Long.class))); + + RequestContext requestContext = RequestContext.get(); + requestContext.cacheDifferentialEntity(diffEntity); + } + +} diff --git a/server-api/src/main/java/org/apache/atlas/RequestContext.java b/server-api/src/main/java/org/apache/atlas/RequestContext.java index 565832b7bd5..f519bebd981 100644 --- a/server-api/src/main/java/org/apache/atlas/RequestContext.java +++ b/server-api/src/main/java/org/apache/atlas/RequestContext.java @@ -93,7 +93,7 @@ public class RequestContext { private String currentTypePatchAction = ""; private AtlasTask currentTask; private String traceId; - private final Map relationshipEndToVertexIdMap = new HashMap<>(); + private Map relationshipEndToVertexIdMap = new HashMap<>(); private boolean allowDuplicateDisplayName; private MetricsRegistry metricsRegistry; private boolean skipAuthorizationCheck = false; @@ -105,10 +105,81 @@ public class RequestContext { private Map> deletedClassificationAndVertices = new HashMap<>(); private Map> addedClassificationAndVertices = new HashMap<>(); + private boolean isCloned = false; + private RequestContext() { + + } + + private RequestContext(RequestContext other) { + this.user = other.user; + this.userGroups = other.userGroups != null ? new HashSet<>(other.userGroups) : null; + this.clientIPAddress = other.clientIPAddress; + this.forwardedAddresses = other.forwardedAddresses != null ? new ArrayList<>(other.forwardedAddresses) : null; + this.deleteType = other.deleteType; + this.isPurgeRequested = other.isPurgeRequested; + this.maxAttempts = other.maxAttempts; + this.attemptCount = other.attemptCount; + this.isImportInProgress = other.isImportInProgress; + this.isInNotificationProcessing = other.isInNotificationProcessing; + this.isInTypePatching = other.isInTypePatching; + this.createShellEntityForNonExistingReference = other.createShellEntityForNonExistingReference; + this.skipFailedEntities = other.skipFailedEntities; + this.allowDeletedRelationsIndexsearch = other.allowDeletedRelationsIndexsearch; + this.includeMeanings = other.includeMeanings; + this.includeClassifications = other.includeClassifications; + this.includeClassificationNames = other.includeClassificationNames; + this.currentTypePatchAction = other.currentTypePatchAction; + this.currentTask = other.currentTask; + this.traceId = other.traceId; + this.relationshipEndToVertexIdMap = new HashMap<>(other.relationshipEndToVertexIdMap); + this.allowDuplicateDisplayName = other.allowDuplicateDisplayName; + this.metricsRegistry = other.metricsRegistry; + this.skipAuthorizationCheck = other.skipAuthorizationCheck; + this.deletedEdgesIdsForResetHasLineage = new HashSet<>(other.deletedEdgesIdsForResetHasLineage); + this.requestUri = other.requestUri; + this.cacheEnabled = other.cacheEnabled; + this.delayTagNotifications = other.delayTagNotifications; + this.deletedClassificationAndVertices = new HashMap<>(other.deletedClassificationAndVertices); + this.addedClassificationAndVertices = new HashMap<>(other.addedClassificationAndVertices); + + this.updatedEntities.putAll(other.updatedEntities); + this.deletedEntities.putAll(other.deletedEntities); + this.restoreEntities.putAll(other.restoreEntities); + this.entityCache.putAll(other.entityCache); + this.entityHeaderCache.putAll(other.entityHeaderCache); + this.entityExtInfoCache.putAll(other.entityExtInfoCache); + this.diffEntityCache.putAll(other.diffEntityCache); + this.addedPropagations.putAll(other.addedPropagations); + this.removedPropagations.putAll(other.removedPropagations); + this.requestContextHeaders.putAll(other.requestContextHeaders); + this.deletedEdgesIds.addAll(other.deletedEdgesIds); + this.processGuidIds.addAll(other.processGuidIds); + this.entitiesToSkipUpdate.addAll(other.entitiesToSkipUpdate); + this.onlyCAUpdateEntities.addAll(other.onlyCAUpdateEntities); + this.onlyBAUpdateEntities.addAll(other.onlyBAUpdateEntities); + this.queuedTasks.addAll(other.queuedTasks); + this.relationAttrsForSearch.addAll(other.relationAttrsForSearch); + this.removedElementsMap.putAll(other.removedElementsMap); + this.newElementsCreatedMap.putAll(other.newElementsCreatedMap); + this.relationshipMutationMap.putAll(other.relationshipMutationMap); + } + + + public static RequestContext cloneCurrentContext() { + RequestContext currentContext = get(); + RequestContext requestContext = new RequestContext(currentContext); + requestContext.isCloned = true; + return requestContext; + } + + public static void set(RequestContext context) { + CURRENT_CONTEXT.set(context); } + + //To handle gets from background threads where createContext() is not called //createContext called for every request in the filter public static RequestContext get() { @@ -788,4 +859,8 @@ public void clearMutationContext(String event) { public Map> getRelationshipMutationMap() { return relationshipMutationMap; } + + public boolean isCloned() { + return isCloned; + } } \ No newline at end of file diff --git a/webapp/src/main/java/org/apache/atlas/notification/EntityNotificationListenerV2.java b/webapp/src/main/java/org/apache/atlas/notification/EntityNotificationListenerV2.java index 06b8f9678d5..a2ed2f693b1 100644 --- a/webapp/src/main/java/org/apache/atlas/notification/EntityNotificationListenerV2.java +++ b/webapp/src/main/java/org/apache/atlas/notification/EntityNotificationListenerV2.java @@ -174,7 +174,6 @@ private void notifyEntityEvents(List entities, OperationType operat } } - sendNotifications(operationType, messages); RequestContext.get().endMetricRecord(metric); } diff --git a/webapp/src/main/java/org/apache/atlas/notification/EntityNotificationSender.java b/webapp/src/main/java/org/apache/atlas/notification/EntityNotificationSender.java index c039cc837ac..b4419da3ade 100644 --- a/webapp/src/main/java/org/apache/atlas/notification/EntityNotificationSender.java +++ b/webapp/src/main/java/org/apache/atlas/notification/EntityNotificationSender.java @@ -18,6 +18,7 @@ package org.apache.atlas.notification; import org.apache.atlas.GraphTransactionInterceptor; +import org.apache.atlas.RequestContext; import org.apache.atlas.model.notification.EntityNotification; import org.apache.commons.collections.CollectionUtils; import org.apache.commons.configuration.Configuration; @@ -95,10 +96,12 @@ public void send(EntityNotification.EntityNotificationV2.OperationType operation notificationHook = new PostCommitNotificationHook(operationType, notifications); postCommitNotificationHooks.set(notificationHook); } else { - if (isRelationshipEvent(operationType)) - notificationHook.addRelationshipNotifications(notifications); - else - notificationHook.addNotifications(notifications); + if (isRelationshipEvent(operationType)) notificationHook.addRelationshipNotifications(notifications); + else notificationHook.addNotifications(notifications); + } + + if (RequestContext.get().isCloned()) { + notificationHook.onComplete(true); } } diff --git a/webapp/src/main/java/org/apache/atlas/web/rest/EntityREST.java b/webapp/src/main/java/org/apache/atlas/web/rest/EntityREST.java index 62c77b142d8..4a98f236c9e 100644 --- a/webapp/src/main/java/org/apache/atlas/web/rest/EntityREST.java +++ b/webapp/src/main/java/org/apache/atlas/web/rest/EntityREST.java @@ -1963,7 +1963,7 @@ public void repairAccessControlAlias(@PathParam("guid") String guid) throws Atla @Consumes(Servlets.JSON_MEDIA_TYPE) @Timed public void linkBusinessPolicy(@PathParam("policyId") final String policyId, final LinkBusinessPolicyRequest request) throws AtlasBaseException { - if (ARGO_SERVICE_USER_NAME.equals(RequestContext.getCurrentUser())) { + if (!ARGO_SERVICE_USER_NAME.equals(RequestContext.getCurrentUser())) { throw new AtlasBaseException(AtlasErrorCode.UNAUTHORIZED_ACCESS, RequestContext.getCurrentUser(), "Policy linking"); } AtlasPerfTracer perf = null; @@ -1971,7 +1971,11 @@ public void linkBusinessPolicy(@PathParam("policyId") final String policyId, fin if (AtlasPerfTracer.isPerfTraceEnabled(PERF_LOG)) { perf = AtlasPerfTracer.getPerfTracer(PERF_LOG, "EntityREST.linkBusinessPolicy(" + policyId + ")"); } + entitiesStore.linkBusinessPolicy(policyId, request.getLinkGuids()); + } catch (AtlasBaseException abe) { + LOG.error("Error in policy linking: ", abe); + throw new AtlasBaseException(AtlasErrorCode.INTERNAL_ERROR, "During Policy linking"); } finally { AtlasPerfTracer.log(perf); } @@ -1984,7 +1988,7 @@ public void linkBusinessPolicy(@PathParam("policyId") final String policyId, fin @Consumes(Servlets.JSON_MEDIA_TYPE) @Timed public void unlinkBusinessPolicy(@PathParam("policyId") final String policyId, final LinkBusinessPolicyRequest request) throws AtlasBaseException { - if (ARGO_SERVICE_USER_NAME.equals(RequestContext.getCurrentUser())) { + if (!ARGO_SERVICE_USER_NAME.equals(RequestContext.getCurrentUser())) { throw new AtlasBaseException(AtlasErrorCode.UNAUTHORIZED_ACCESS, RequestContext.getCurrentUser(), "Policy unlinking"); } AtlasPerfTracer perf = null; @@ -1992,7 +1996,11 @@ public void unlinkBusinessPolicy(@PathParam("policyId") final String policyId, f if (AtlasPerfTracer.isPerfTraceEnabled(PERF_LOG)) { perf = AtlasPerfTracer.getPerfTracer(PERF_LOG, "EntityREST.unlinkBusinessPolicy(" + policyId + ")"); } + entitiesStore.unlinkBusinessPolicy(policyId, request.getUnlinkGuids()); + } catch (AtlasBaseException abe) { + LOG.error("Error in policy unlinking: ", abe); + throw new AtlasBaseException(AtlasErrorCode.INTERNAL_ERROR, "During Policy unlinking"); } finally { AtlasPerfTracer.log(perf); }