diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityStoreV2.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityStoreV2.java index 03794180116..3b51df35b1e 100644 --- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityStoreV2.java +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityStoreV2.java @@ -2750,12 +2750,15 @@ public void linkBusinessPolicy(String guid, List linkGuids) throws Atlas entityMutationResponse.addEntity(UPDATE, aeh); } // Send notifications for the entity changes asynchronously + RequestContext clonedContext = RequestContext.cloneCurrentContext(); CompletableFuture.runAsync(() -> { try { + RequestContext.set(clonedContext); entityChangeNotifier.onEntitiesMutated(entityMutationResponse, false); } catch (AtlasBaseException e) { - // Handle exception LOG.error("Error in processing notification for policy link updates for asset ids {} ", linkGuids, e); + }finally { + RequestContext.clear(); } }); RequestContext.get().endMetricRecord(metric); diff --git a/server-api/src/main/java/org/apache/atlas/RequestContext.java b/server-api/src/main/java/org/apache/atlas/RequestContext.java index 565832b7bd5..23712b0a918 100644 --- a/server-api/src/main/java/org/apache/atlas/RequestContext.java +++ b/server-api/src/main/java/org/apache/atlas/RequestContext.java @@ -93,7 +93,7 @@ public class RequestContext { private String currentTypePatchAction = ""; private AtlasTask currentTask; private String traceId; - private final Map relationshipEndToVertexIdMap = new HashMap<>(); + private Map relationshipEndToVertexIdMap = new HashMap<>(); private boolean allowDuplicateDisplayName; private MetricsRegistry metricsRegistry; private boolean skipAuthorizationCheck = false; @@ -107,8 +107,76 @@ public class RequestContext { private RequestContext() { + + } + + private RequestContext(RequestContext other) { + this.user = other.user; + this.userGroups = other.userGroups != null ? new HashSet<>(other.userGroups) : null; + this.clientIPAddress = other.clientIPAddress; + this.forwardedAddresses = other.forwardedAddresses != null ? new ArrayList<>(other.forwardedAddresses) : null; + this.deleteType = other.deleteType; + this.isPurgeRequested = other.isPurgeRequested; + this.maxAttempts = other.maxAttempts; + this.attemptCount = other.attemptCount; + this.isImportInProgress = other.isImportInProgress; + this.isInNotificationProcessing = other.isInNotificationProcessing; + this.isInTypePatching = other.isInTypePatching; + this.createShellEntityForNonExistingReference = other.createShellEntityForNonExistingReference; + this.skipFailedEntities = other.skipFailedEntities; + this.allowDeletedRelationsIndexsearch = other.allowDeletedRelationsIndexsearch; + this.includeMeanings = other.includeMeanings; + this.includeClassifications = other.includeClassifications; + this.includeClassificationNames = other.includeClassificationNames; + this.currentTypePatchAction = other.currentTypePatchAction; + this.currentTask = other.currentTask; + this.traceId = other.traceId; + this.relationshipEndToVertexIdMap = new HashMap<>(other.relationshipEndToVertexIdMap); + this.allowDuplicateDisplayName = other.allowDuplicateDisplayName; + this.metricsRegistry = other.metricsRegistry; + this.skipAuthorizationCheck = other.skipAuthorizationCheck; + this.deletedEdgesIdsForResetHasLineage = new HashSet<>(other.deletedEdgesIdsForResetHasLineage); + this.requestUri = other.requestUri; + this.cacheEnabled = other.cacheEnabled; + this.delayTagNotifications = other.delayTagNotifications; + this.deletedClassificationAndVertices = new HashMap<>(other.deletedClassificationAndVertices); + this.addedClassificationAndVertices = new HashMap<>(other.addedClassificationAndVertices); + + this.updatedEntities.putAll(other.updatedEntities); + this.deletedEntities.putAll(other.deletedEntities); + this.restoreEntities.putAll(other.restoreEntities); + this.entityCache.putAll(other.entityCache); + this.entityHeaderCache.putAll(other.entityHeaderCache); + this.entityExtInfoCache.putAll(other.entityExtInfoCache); + this.diffEntityCache.putAll(other.diffEntityCache); + this.addedPropagations.putAll(other.addedPropagations); + this.removedPropagations.putAll(other.removedPropagations); + this.requestContextHeaders.putAll(other.requestContextHeaders); + this.deletedEdgesIds.addAll(other.deletedEdgesIds); + this.processGuidIds.addAll(other.processGuidIds); + this.entitiesToSkipUpdate.addAll(other.entitiesToSkipUpdate); + this.onlyCAUpdateEntities.addAll(other.onlyCAUpdateEntities); + this.onlyBAUpdateEntities.addAll(other.onlyBAUpdateEntities); + this.queuedTasks.addAll(other.queuedTasks); + this.relationAttrsForSearch.addAll(other.relationAttrsForSearch); + this.removedElementsMap.putAll(other.removedElementsMap); + this.newElementsCreatedMap.putAll(other.newElementsCreatedMap); + this.relationshipMutationMap.putAll(other.relationshipMutationMap); + } + + + public static RequestContext cloneCurrentContext() { + RequestContext currentContext = get(); + RequestContext requestContext = new RequestContext(currentContext); + return requestContext; + } + + public static void set(RequestContext context) { + CURRENT_CONTEXT.set(context); } + + //To handle gets from background threads where createContext() is not called //createContext called for every request in the filter public static RequestContext get() { diff --git a/webapp/src/main/java/org/apache/atlas/web/rest/EntityREST.java b/webapp/src/main/java/org/apache/atlas/web/rest/EntityREST.java index 55182c11124..a011a96c3a8 100644 --- a/webapp/src/main/java/org/apache/atlas/web/rest/EntityREST.java +++ b/webapp/src/main/java/org/apache/atlas/web/rest/EntityREST.java @@ -1963,7 +1963,7 @@ public void repairAccessControlAlias(@PathParam("guid") String guid) throws Atla @Consumes(Servlets.JSON_MEDIA_TYPE) @Timed public void linkBusinessPolicy(@PathParam("policyId") final String policyId, final LinkBusinessPolicyRequest request) throws AtlasBaseException { - if (!ARGO_SERVICE_USER_NAME.equals(RequestContext.getCurrentUser())) { + if (ARGO_SERVICE_USER_NAME.equals(RequestContext.getCurrentUser())) { throw new AtlasBaseException(AtlasErrorCode.UNAUTHORIZED_ACCESS, RequestContext.getCurrentUser(), "Policy linking"); } AtlasPerfTracer perf = null;