From 7c11e850b960d627881ab9aeeefba37dadcb796d Mon Sep 17 00:00:00 2001 From: ektavarma10 Date: Fri, 1 Dec 2023 13:40:22 +0530 Subject: [PATCH 1/2] Add entity and classification level parallelisation --- .../authorizer/RangerAtlasAuthorizer.java | 232 +++++++++++++----- .../authorizer/RangerAtlasAuthorizerUtil.java | 2 + 2 files changed, 173 insertions(+), 61 deletions(-) diff --git a/auth-plugin-atlas/src/main/java/org/apache/atlas/authorization/atlas/authorizer/RangerAtlasAuthorizer.java b/auth-plugin-atlas/src/main/java/org/apache/atlas/authorization/atlas/authorizer/RangerAtlasAuthorizer.java index 6462931f93..9d286892c8 100644 --- a/auth-plugin-atlas/src/main/java/org/apache/atlas/authorization/atlas/authorizer/RangerAtlasAuthorizer.java +++ b/auth-plugin-atlas/src/main/java/org/apache/atlas/authorization/atlas/authorizer/RangerAtlasAuthorizer.java @@ -53,7 +53,25 @@ import org.apache.atlas.plugin.service.RangerBasePlugin; import org.apache.atlas.plugin.util.RangerPerfTracer; -import java.util.*; + +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.ListIterator; +import java.util.ArrayList; +import java.util.Set; +import java.util.Map; +import java.util.HashMap; +import java.util.UUID; +import java.util.Collection; +import java.util.Optional; +import java.util.Objects; + + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.stream.Collectors; import static org.apache.atlas.authorization.atlas.authorizer.RangerAtlasAuthorizerUtil.*; import static org.apache.atlas.authorize.AtlasAuthorizationUtils.getCurrentUserGroups; @@ -74,6 +92,8 @@ public class RangerAtlasAuthorizer implements AtlasAuthorizer { add(AtlasPrivilege.ENTITY_UPDATE_CLASSIFICATION); }}; + private static final ExecutorService classificationAndEntityAccessThreadpool = Executors.newFixedThreadPool(NUM_THREADS); + @Override public void init() { if (LOG.isDebugEnabled()) { @@ -582,25 +602,27 @@ public void scrubSearchResults(AtlasSearchResultScrubRequest request, boolean is if (RangerPerfTracer.isPerfTraceEnabled(PERF_LOG)) perf = RangerPerfTracer.getPerfTracer(PERF_LOG, "RangerAtlasAuthorizer.scrubSearchResults(" + request + ")"); AtlasSearchResult result = request.getSearchResult(); + List entitiesToCheck = new ArrayList<>(); - long startTime = System.currentTimeMillis(); if (CollectionUtils.isNotEmpty(result.getEntities())) { - for (AtlasEntityHeader entity : result.getEntities()) { - checkAccessAndScrub(entity, request, isScrubAuditEnabled); - } - LOG.info("scrubSearchResults ended for entites: " + (System.currentTimeMillis()-startTime)); + entitiesToCheck.addAll(result.getEntities()); } + if (CollectionUtils.isNotEmpty(result.getFullTextResult())) { - for (AtlasSearchResult.AtlasFullTextResult fullTextResult : result.getFullTextResult()) { - if (fullTextResult != null) - checkAccessAndScrub(fullTextResult.getEntity(), request, isScrubAuditEnabled); - } + entitiesToCheck.addAll( + result.getFullTextResult() + .stream() + .filter(Objects::nonNull) + .map(res -> res.getEntity()) + .collect(Collectors.toList()) + ); } + if (MapUtils.isNotEmpty(result.getReferredEntities())) { - for (AtlasEntityHeader entity : result.getReferredEntities().values()) { - checkAccessAndScrub(entity, request, isScrubAuditEnabled); - } + entitiesToCheck.addAll(result.getReferredEntities().values()); } + + checkAccessAndScrubAsync(entitiesToCheck, request, isScrubAuditEnabled); } finally { RangerPerfTracer.log(perf); } @@ -622,6 +644,36 @@ public void filterTypesDef(AtlasTypesDefFilterRequest request) throws AtlasAutho } + private void checkAccessAndScrubAsync(List entitiesToCheck, AtlasSearchResultScrubRequest request, boolean isScrubAuditEnabled) throws AtlasAuthorizationException { + LOG.info("Creating futures to check access and scrub " + entitiesToCheck.size() + " entities"); + List> completableFutures = entitiesToCheck + .stream() + .map(entity -> CompletableFuture.supplyAsync(() -> { + try { + checkAccessAndScrub(entity, request, isScrubAuditEnabled); + return null; + } catch (AtlasAuthorizationException e) { + return e; + } + }, classificationAndEntityAccessThreadpool)) + .collect(Collectors.toList()); + + // wait for all threads to complete their execution + CompletableFuture.allOf(completableFutures.toArray(new CompletableFuture[0])).join(); + + // get the first exception from any checkAccessAndScrub calls + Optional maybeAuthException = completableFutures + .stream() + .map(CompletableFuture::join) + .filter(Objects::nonNull) + .findFirst(); + + LOG.info("Async check access and scrub is complete"); + if (maybeAuthException.isPresent()) { + throw maybeAuthException.get(); + } + } + private void filterTypes(AtlasAccessRequest request, List typeDefs)throws AtlasAuthorizationException { if (typeDefs != null) { for (ListIterator iter = typeDefs.listIterator(); iter.hasNext();) { @@ -650,68 +702,52 @@ private boolean isAccessAllowed(AtlasEntityAccessRequest request, RangerAtlasAud if (LOG.isDebugEnabled()) { LOG.debug("==> isAccessAllowed(" + request + ")"); } - boolean ret = false; + + String uuid = UUID.randomUUID().toString(); long startTime = System.currentTimeMillis(); - final String uuid = UUID.randomUUID().toString(); + LOG.info("start isAccessAllowed : " + startTime + " uuid: " + uuid); + boolean ret = false; try { - final String action = request.getAction() != null ? request.getAction().getType() : null; - final Set entityTypes = request.getEntityTypeAndAllSuperTypes(); - final String entityId = request.getEntityId(); - final String classification = request.getClassification() != null ? request.getClassification().getTypeName() : null; - final RangerAccessRequestImpl rangerRequest = new RangerAccessRequestImpl(); - final RangerAccessResourceImpl rangerResource = new RangerAccessResourceImpl(); - final String ownerUser = request.getEntity() != null ? (String) request.getEntity().getAttribute(RESOURCE_ENTITY_OWNER) : null; - - rangerResource.setValue(RESOURCE_ENTITY_TYPE, entityTypes); - rangerResource.setValue(RESOURCE_ENTITY_ID, entityId); - rangerResource.setOwnerUser(ownerUser); - rangerRequest.setAccessType(action); - rangerRequest.setAction(action); - rangerRequest.setUser(request.getUser()); - rangerRequest.setUserGroups(request.getUserGroups()); - rangerRequest.setClientIPAddress(request.getClientIPAddress()); - rangerRequest.setAccessTime(request.getAccessTime()); - rangerRequest.setResource(rangerResource); - rangerRequest.setForwardedAddresses(request.getForwardedAddresses()); - rangerRequest.setRemoteIPAddress(request.getRemoteIPAddress()); - - if (AtlasPrivilege.ENTITY_ADD_LABEL.equals(request.getAction()) || AtlasPrivilege.ENTITY_REMOVE_LABEL.equals(request.getAction())) { - rangerResource.setValue(RESOURCE_ENTITY_LABEL, request.getLabel()); - } else if (AtlasPrivilege.ENTITY_UPDATE_BUSINESS_METADATA.equals(request.getAction())) { - rangerResource.setValue(RESOURCE_ENTITY_BUSINESS_METADATA, request.getBusinessMetadata()); - } else if (StringUtils.isNotEmpty(classification) && CLASSIFICATION_PRIVILEGES.contains(request.getAction())) { - rangerResource.setValue(RESOURCE_CLASSIFICATION, request.getClassificationTypeAndAllSuperTypes(classification)); - } if (CollectionUtils.isNotEmpty(request.getEntityClassifications())) { Set entityClassifications = request.getEntityClassifications(); - Map contextOjb = rangerRequest.getContext(); Set rangerTagForEval = getRangerServiceTag(entityClassifications); - if (contextOjb == null) { - Map contextOjb1 = new HashMap(); - contextOjb1.put("CLASSIFICATIONS", rangerTagForEval); - rangerRequest.setContext(contextOjb1); - } else { - contextOjb.put("CLASSIFICATIONS", rangerTagForEval); - rangerRequest.setContext(contextOjb); - } + List> completableFutures = new ArrayList<>(); + LOG.info("isAccessAllowed started : " + startTime); // check authorization for each classification - LOG.info("classification level authorization started: " + (System.currentTimeMillis()-startTime) + "uuid: "+uuid); + LOG.info("start check authrization for each classification: " + (System.currentTimeMillis() - startTime)+ " uuid: " + uuid); for (AtlasClassification classificationToAuthorize : request.getEntityClassifications()) { - rangerResource.setValue(RESOURCE_ENTITY_CLASSIFICATION, request.getClassificationTypeAndAllSuperTypes(classificationToAuthorize.getTypeName())); + long rangerRequestCreationStartTime = System.currentTimeMillis(); + RangerAccessRequestImpl rangerRequest = createRangerAccessRequest(request, classificationToAuthorize, rangerTagForEval); + LOG.info("Time taken to create a ranger request for uuid: "+uuid+ "is "+ (System.currentTimeMillis()-rangerRequestCreationStartTime)); + completableFutures.add(CompletableFuture.supplyAsync(()->checkAccess(rangerRequest, auditHandler, uuid), classificationAndEntityAccessThreadpool)); + } - ret = checkAccess(rangerRequest, auditHandler, uuid); + // wait for all threads to complete their execution + CompletableFuture.allOf(completableFutures.toArray(new CompletableFuture[0])).join(); + LOG.info("end check authorization for each classification: " + (System.currentTimeMillis() - startTime) + " uuid: " + uuid); + + + // if all checkAccess calls return true, then ret is true, else it is false + ret = completableFutures + .stream() + .map(CompletableFuture::join) + .allMatch(result -> result == true); - if (!ret) { - break; - } - } - LOG.info("classification level authorization ended: " + (System.currentTimeMillis()-startTime) + "uuid: "+uuid); } else { + + RangerAccessRequestImpl rangerRequest = new RangerAccessRequestImpl(); + RangerAccessResourceImpl rangerResource = new RangerAccessResourceImpl(); + + initRangerRequest(rangerRequest, request); + initRangerResource(rangerResource, request); + + rangerRequest.setResource(rangerResource); + rangerResource.setValue(RESOURCE_ENTITY_CLASSIFICATION, ENTITY_NOT_CLASSIFIED ); ret = checkAccess(rangerRequest, auditHandler, uuid); @@ -726,10 +762,81 @@ private boolean isAccessAllowed(AtlasEntityAccessRequest request, RangerAtlasAud if (LOG.isDebugEnabled()) { LOG.debug("<== isAccessAllowed(" + request + "): " + ret); } - LOG.info("isAccessAllowed ended: " + (System.currentTimeMillis()-startTime) + "uuid: "+uuid); + return ret; } + private RangerAccessRequestImpl createRangerAccessRequest(AtlasEntityAccessRequest request, + AtlasClassification classificationToAuthorize, + Set rangerTagForEval) { + + long startTime = System.currentTimeMillis(); + LOG.info("createRangerAccessRequest start: " + startTime); + + RangerAccessRequestImpl rangerRequest = new RangerAccessRequestImpl(); + RangerAccessResourceImpl rangerResource = new RangerAccessResourceImpl(); + + initRangerRequest(rangerRequest, request); + initRangerResource(rangerResource, request); + + rangerResource.setValue(RESOURCE_ENTITY_CLASSIFICATION, request.getClassificationTypeAndAllSuperTypes(classificationToAuthorize.getTypeName())); + + rangerRequest.setResource(rangerResource); + + setClassificationContextForRanger(rangerTagForEval, rangerRequest); + + LOG.info("createRangerAccessRequest end: " + (System.currentTimeMillis() - startTime)); + + return rangerRequest; + + } + + private static void setClassificationContextForRanger(Set rangerTagForEval, RangerAccessRequestImpl rangerRequest) { + Map contextOjb = rangerRequest.getContext(); + + if (contextOjb == null) { + Map contextOjb1 = new HashMap(); + contextOjb1.put("CLASSIFICATIONS", rangerTagForEval); + rangerRequest.setContext(contextOjb1); + } else { + contextOjb.put("CLASSIFICATIONS", rangerTagForEval); + rangerRequest.setContext(contextOjb); + } + } + + private void initRangerRequest(RangerAccessRequestImpl rangerRequest, AtlasEntityAccessRequest request) { + final String action = request.getAction() != null ? request.getAction().getType() : null; + + rangerRequest.setAccessType(action); + rangerRequest.setAction(action); + rangerRequest.setUser(request.getUser()); + rangerRequest.setUserGroups(request.getUserGroups()); + rangerRequest.setClientIPAddress(request.getClientIPAddress()); + rangerRequest.setAccessTime(request.getAccessTime()); + rangerRequest.setForwardedAddresses(request.getForwardedAddresses()); + rangerRequest.setRemoteIPAddress(request.getRemoteIPAddress()); + } + + private void initRangerResource(RangerAccessResourceImpl rangerResource, AtlasEntityAccessRequest request) { + final Set entityTypes = request.getEntityTypeAndAllSuperTypes(); + final String entityId = request.getEntityId(); + final String ownerUser = request.getEntity() != null ? (String) request.getEntity().getAttribute(RESOURCE_ENTITY_OWNER) : null; + final String classification = request.getClassification() != null ? request.getClassification().getTypeName() : null; + + rangerResource.setValue(RESOURCE_ENTITY_TYPE, entityTypes); + rangerResource.setValue(RESOURCE_ENTITY_ID, entityId); + rangerResource.setOwnerUser(ownerUser); + + if (AtlasPrivilege.ENTITY_ADD_LABEL.equals(request.getAction()) || AtlasPrivilege.ENTITY_REMOVE_LABEL.equals(request.getAction())) { + rangerResource.setValue(RESOURCE_ENTITY_LABEL, request.getLabel()); + } else if (AtlasPrivilege.ENTITY_UPDATE_BUSINESS_METADATA.equals(request.getAction())) { + rangerResource.setValue(RESOURCE_ENTITY_BUSINESS_METADATA, request.getBusinessMetadata()); + } else if (StringUtils.isNotEmpty(classification) && CLASSIFICATION_PRIVILEGES.contains(request.getAction())) { + rangerResource.setValue(RESOURCE_CLASSIFICATION, request.getClassificationTypeAndAllSuperTypes(classification)); + } + + } + private void setClassificationsToRequestContext(Set entityClassifications, RangerAccessRequestImpl rangerRequest) { Map contextOjb = rangerRequest.getContext(); @@ -870,7 +977,10 @@ private void checkAccessAndScrub(AtlasEntityHeader entity, AtlasSearchResultScru boolean isEntityAccessAllowed = isScrubAuditEnabled ? isAccessAllowed(entityAccessRequest) : isAccessAllowed(entityAccessRequest, null); if (!isEntityAccessAllowed) { + long startTime = System.currentTimeMillis(); + LOG.info("scrubEntityHeader started" + startTime); scrubEntityHeader(entity, request.getTypeRegistry()); + LOG.info("scrubEntityHeader ended" + (System.currentTimeMillis() - startTime)); } } } diff --git a/auth-plugin-atlas/src/main/java/org/apache/atlas/authorization/atlas/authorizer/RangerAtlasAuthorizerUtil.java b/auth-plugin-atlas/src/main/java/org/apache/atlas/authorization/atlas/authorizer/RangerAtlasAuthorizerUtil.java index c78e986581..1a81bb180e 100644 --- a/auth-plugin-atlas/src/main/java/org/apache/atlas/authorization/atlas/authorizer/RangerAtlasAuthorizerUtil.java +++ b/auth-plugin-atlas/src/main/java/org/apache/atlas/authorization/atlas/authorizer/RangerAtlasAuthorizerUtil.java @@ -43,6 +43,8 @@ public class RangerAtlasAuthorizerUtil { + public static Integer NUM_THREADS = 5; + static void toRangerRequest(AtlasEntityAccessRequest request, RangerAccessRequestImpl rangerRequest, RangerAccessResourceImpl rangerResource){ final String action = request.getAction() != null ? request.getAction().getType() : null; From 232d11fe5a1e85d4c04c92e1067bf81c7ac80dfa Mon Sep 17 00:00:00 2001 From: ektavarma10 Date: Fri, 1 Dec 2023 14:00:17 +0530 Subject: [PATCH 2/2] Add logs for scrub search --- .../authorization/atlas/authorizer/RangerAtlasAuthorizer.java | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/auth-plugin-atlas/src/main/java/org/apache/atlas/authorization/atlas/authorizer/RangerAtlasAuthorizer.java b/auth-plugin-atlas/src/main/java/org/apache/atlas/authorization/atlas/authorizer/RangerAtlasAuthorizer.java index 9d286892c8..8bb226716d 100644 --- a/auth-plugin-atlas/src/main/java/org/apache/atlas/authorization/atlas/authorizer/RangerAtlasAuthorizer.java +++ b/auth-plugin-atlas/src/main/java/org/apache/atlas/authorization/atlas/authorizer/RangerAtlasAuthorizer.java @@ -598,6 +598,7 @@ public void scrubSearchResults(AtlasSearchResultScrubRequest request, boolean is if (LOG.isDebugEnabled()) LOG.debug("==> scrubSearchResults(" + request + " " + isScrubAuditEnabled); RangerPerfTracer perf = null; + long startTime = System.currentTimeMillis(); try { if (RangerPerfTracer.isPerfTraceEnabled(PERF_LOG)) perf = RangerPerfTracer.getPerfTracer(PERF_LOG, "RangerAtlasAuthorizer.scrubSearchResults(" + request + ")"); @@ -624,6 +625,7 @@ public void scrubSearchResults(AtlasSearchResultScrubRequest request, boolean is checkAccessAndScrubAsync(entitiesToCheck, request, isScrubAuditEnabled); } finally { + LOG.info("scrubSearchResults ended in : "+ (System.currentTimeMillis()-startTime)); RangerPerfTracer.log(perf); } if (LOG.isDebugEnabled()) @@ -969,6 +971,7 @@ private void checkAccessAndScrub(AtlasEntityHeader entity, AtlasSearchResultScru private void checkAccessAndScrub(AtlasEntityHeader entity, AtlasSearchResultScrubRequest request, boolean isScrubAuditEnabled) throws AtlasAuthorizationException { if (entity != null && request != null) { + long startTimeOfAccessAndScrub = System.currentTimeMillis(); final AtlasEntityAccessRequest entityAccessRequest = new AtlasEntityAccessRequest(request.getTypeRegistry(), AtlasPrivilege.ENTITY_READ, entity, request.getUser(), request.getUserGroups()); entityAccessRequest.setClientIPAddress(request.getClientIPAddress()); @@ -982,6 +985,7 @@ private void checkAccessAndScrub(AtlasEntityHeader entity, AtlasSearchResultScru scrubEntityHeader(entity, request.getTypeRegistry()); LOG.info("scrubEntityHeader ended" + (System.currentTimeMillis() - startTime)); } + LOG.info("checkAccessAndScrub ended in " + (System.currentTimeMillis()-startTimeOfAccessAndScrub)); } }