diff --git a/auth-plugin-atlas/src/main/java/org/apache/atlas/authorization/atlas/authorizer/RangerAtlasAuthorizer.java b/auth-plugin-atlas/src/main/java/org/apache/atlas/authorization/atlas/authorizer/RangerAtlasAuthorizer.java index dd645b473b..bf4fd1ca03 100644 --- a/auth-plugin-atlas/src/main/java/org/apache/atlas/authorization/atlas/authorizer/RangerAtlasAuthorizer.java +++ b/auth-plugin-atlas/src/main/java/org/apache/atlas/authorization/atlas/authorizer/RangerAtlasAuthorizer.java @@ -61,6 +61,14 @@ import java.util.ListIterator; import java.util.Map; import java.util.Set; +import java.util.Objects; +import java.util.ArrayList; +import java.util.Optional; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.stream.Collectors; import static org.apache.atlas.authorization.atlas.authorizer.RangerAtlasAuthorizerUtil.*; import static org.apache.atlas.authorize.AtlasAuthorizationUtils.getCurrentUserGroups; @@ -81,6 +89,8 @@ public class RangerAtlasAuthorizer implements AtlasAuthorizer { add(AtlasPrivilege.ENTITY_UPDATE_CLASSIFICATION); }}; + static final ExecutorService entityAccessThreadpool = Executors.newFixedThreadPool(5); + @Override public void init() { if (LOG.isDebugEnabled()) { @@ -589,22 +599,27 @@ public void scrubSearchResults(AtlasSearchResultScrubRequest request, boolean is if (RangerPerfTracer.isPerfTraceEnabled(PERF_LOG)) perf = RangerPerfTracer.getPerfTracer(PERF_LOG, "RangerAtlasAuthorizer.scrubSearchResults(" + request + ")"); AtlasSearchResult result = request.getSearchResult(); + List entitiesToCheck = new ArrayList<>(); + if (CollectionUtils.isNotEmpty(result.getEntities())) { - for (AtlasEntityHeader entity : result.getEntities()) { - checkAccessAndScrub(entity, request, isScrubAuditEnabled); - } + entitiesToCheck.addAll(result.getEntities()); } + if (CollectionUtils.isNotEmpty(result.getFullTextResult())) { - for (AtlasSearchResult.AtlasFullTextResult fullTextResult : result.getFullTextResult()) { - if (fullTextResult != null) - checkAccessAndScrub(fullTextResult.getEntity(), request, isScrubAuditEnabled); - } + entitiesToCheck.addAll( + result.getFullTextResult() + .stream() + .filter(Objects::nonNull) + .map(res -> res.getEntity()) + .collect(Collectors.toList()) + ); } + if (MapUtils.isNotEmpty(result.getReferredEntities())) { - for (AtlasEntityHeader entity : result.getReferredEntities().values()) { - checkAccessAndScrub(entity, request, isScrubAuditEnabled); - } + entitiesToCheck.addAll(result.getReferredEntities().values()); } + + checkAccessAndScrubAsync(entitiesToCheck, request, isScrubAuditEnabled); } finally { RangerPerfTracer.log(perf); } @@ -626,6 +641,36 @@ public void filterTypesDef(AtlasTypesDefFilterRequest request) throws AtlasAutho } + private void checkAccessAndScrubAsync(List entitiesToCheck, AtlasSearchResultScrubRequest request, boolean isScrubAuditEnabled) throws AtlasAuthorizationException { + LOG.info("Creating futures to check access and scrub " + entitiesToCheck.size() + " entities"); + List> completableFutures = entitiesToCheck + .stream() + .map(entity -> CompletableFuture.supplyAsync(() -> { + try { + checkAccessAndScrub(entity, request, isScrubAuditEnabled); + return null; + } catch (AtlasAuthorizationException e) { + return e; + } + }, entityAccessThreadpool)) + .collect(Collectors.toList()); + + // wait for all threads to complete their execution + CompletableFuture.allOf(completableFutures.toArray(new CompletableFuture[0])).join(); + + // get the first exception from any checkAccessAndScrub calls + Optional maybeAuthException = completableFutures + .stream() + .map(CompletableFuture::join) + .filter(Objects::nonNull) + .findFirst(); + + LOG.info("Async check access and scrub is complete"); + if (maybeAuthException.isPresent()) { + throw maybeAuthException.get(); + } + } + private void filterTypes(AtlasAccessRequest request, List typeDefs)throws AtlasAuthorizationException { if (typeDefs != null) { for (ListIterator iter = typeDefs.listIterator(); iter.hasNext();) {