diff --git a/auth-plugin-atlas/src/main/java/org/apache/atlas/authorization/atlas/authorizer/RangerAtlasAuthorizer.java b/auth-plugin-atlas/src/main/java/org/apache/atlas/authorization/atlas/authorizer/RangerAtlasAuthorizer.java index 6462931f93c..3b04ade0dec 100644 --- a/auth-plugin-atlas/src/main/java/org/apache/atlas/authorization/atlas/authorizer/RangerAtlasAuthorizer.java +++ b/auth-plugin-atlas/src/main/java/org/apache/atlas/authorization/atlas/authorizer/RangerAtlasAuthorizer.java @@ -53,7 +53,25 @@ import org.apache.atlas.plugin.service.RangerBasePlugin; import org.apache.atlas.plugin.util.RangerPerfTracer; -import java.util.*; + +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.ListIterator; +import java.util.ArrayList; +import java.util.Set; +import java.util.Map; +import java.util.HashMap; +import java.util.UUID; +import java.util.Collection; +import java.util.Optional; +import java.util.Objects; + + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.stream.Collectors; import static org.apache.atlas.authorization.atlas.authorizer.RangerAtlasAuthorizerUtil.*; import static org.apache.atlas.authorize.AtlasAuthorizationUtils.getCurrentUserGroups; @@ -74,6 +92,8 @@ public class RangerAtlasAuthorizer implements AtlasAuthorizer { add(AtlasPrivilege.ENTITY_UPDATE_CLASSIFICATION); }}; + private static final ExecutorService classificationAccessThreadpool = Executors.newFixedThreadPool(20); + @Override public void init() { if (LOG.isDebugEnabled()) { @@ -582,25 +602,27 @@ public void scrubSearchResults(AtlasSearchResultScrubRequest request, boolean is if (RangerPerfTracer.isPerfTraceEnabled(PERF_LOG)) perf = RangerPerfTracer.getPerfTracer(PERF_LOG, "RangerAtlasAuthorizer.scrubSearchResults(" + request + ")"); AtlasSearchResult result = request.getSearchResult(); + List entitiesToCheck = new ArrayList<>(); - long startTime = System.currentTimeMillis(); if (CollectionUtils.isNotEmpty(result.getEntities())) { - for (AtlasEntityHeader entity : result.getEntities()) { - checkAccessAndScrub(entity, request, isScrubAuditEnabled); - } - LOG.info("scrubSearchResults ended for entites: " + (System.currentTimeMillis()-startTime)); + entitiesToCheck.addAll(result.getEntities()); } + if (CollectionUtils.isNotEmpty(result.getFullTextResult())) { - for (AtlasSearchResult.AtlasFullTextResult fullTextResult : result.getFullTextResult()) { - if (fullTextResult != null) - checkAccessAndScrub(fullTextResult.getEntity(), request, isScrubAuditEnabled); - } + entitiesToCheck.addAll( + result.getFullTextResult() + .stream() + .filter(Objects::nonNull) + .map(res -> res.getEntity()) + .collect(Collectors.toList()) + ); } + if (MapUtils.isNotEmpty(result.getReferredEntities())) { - for (AtlasEntityHeader entity : result.getReferredEntities().values()) { - checkAccessAndScrub(entity, request, isScrubAuditEnabled); - } + entitiesToCheck.addAll(result.getReferredEntities().values()); } + + checkAccessAndScrubAsync(entitiesToCheck, request, isScrubAuditEnabled); } finally { RangerPerfTracer.log(perf); } @@ -622,6 +644,37 @@ public void filterTypesDef(AtlasTypesDefFilterRequest request) throws AtlasAutho } + private void checkAccessAndScrubAsync(List entitiesToCheck, AtlasSearchResultScrubRequest request, boolean isScrubAuditEnabled) throws AtlasAuthorizationException { + LOG.info("Creating futures to check access and scrub " + entitiesToCheck.size() + " entities"); + long startTime = System.currentTimeMillis(); + List> completableFutures = entitiesToCheck + .stream() + .map(entity -> CompletableFuture.supplyAsync(() -> { + try { + checkAccessAndScrub(entity, request, isScrubAuditEnabled, startTime); + return null; + } catch (AtlasAuthorizationException e) { + return e; + } + }, classificationAccessThreadpool)) + .collect(Collectors.toList()); + + // wait for all threads to complete their execution + CompletableFuture.allOf(completableFutures.toArray(new CompletableFuture[0])).join(); + + // get the first exception from any checkAccessAndScrub calls + Optional maybeAuthException = completableFutures + .stream() + .map(CompletableFuture::join) + .filter(Objects::nonNull) + .findFirst(); + + LOG.info("Async check access and scrub is complete"); + if (maybeAuthException.isPresent()) { + throw maybeAuthException.get(); + } + } + private void filterTypes(AtlasAccessRequest request, List typeDefs)throws AtlasAuthorizationException { if (typeDefs != null) { for (ListIterator iter = typeDefs.listIterator(); iter.hasNext();) { @@ -650,9 +703,11 @@ private boolean isAccessAllowed(AtlasEntityAccessRequest request, RangerAtlasAud if (LOG.isDebugEnabled()) { LOG.debug("==> isAccessAllowed(" + request + ")"); } - boolean ret = false; + + String uuid = UUID.randomUUID().toString(); long startTime = System.currentTimeMillis(); - final String uuid = UUID.randomUUID().toString(); + LOG.info("start isAccessAllowed : " + startTime + " uuid: " + uuid); + boolean ret = false; try { final String action = request.getAction() != null ? request.getAction().getType() : null; @@ -860,7 +915,8 @@ private void checkAccessAndScrub(AtlasEntityHeader entity, AtlasSearchResultScru } } - private void checkAccessAndScrub(AtlasEntityHeader entity, AtlasSearchResultScrubRequest request, boolean isScrubAuditEnabled) throws AtlasAuthorizationException { + private void checkAccessAndScrub(AtlasEntityHeader entity, AtlasSearchResultScrubRequest request, boolean isScrubAuditEnabled, long startTime) throws AtlasAuthorizationException { + LOG.info("checkAccessAndScrub is scheduled after: " + (System.currentTimeMillis() - startTime)); if (entity != null && request != null) { final AtlasEntityAccessRequest entityAccessRequest = new AtlasEntityAccessRequest(request.getTypeRegistry(), AtlasPrivilege.ENTITY_READ, entity, request.getUser(), request.getUserGroups()); @@ -870,7 +926,10 @@ private void checkAccessAndScrub(AtlasEntityHeader entity, AtlasSearchResultScru boolean isEntityAccessAllowed = isScrubAuditEnabled ? isAccessAllowed(entityAccessRequest) : isAccessAllowed(entityAccessRequest, null); if (!isEntityAccessAllowed) { + long scrubStartTime = System.currentTimeMillis(); + LOG.info("scrubEntityHeader started" + scrubStartTime); scrubEntityHeader(entity, request.getTypeRegistry()); + LOG.info("scrubEntityHeader ended" + (System.currentTimeMillis() - scrubStartTime)); } } }