Skip to content

Commit

Permalink
Merge pull request #2563 from atlanhq/optimise_scrub_search
Browse files Browse the repository at this point in the history
Add optimisation for entity access.
  • Loading branch information
ektavarma10 authored Dec 4, 2023
2 parents a9ed6fc + 16df39b commit 965509f
Showing 1 changed file with 75 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,25 @@
import org.apache.atlas.plugin.service.RangerBasePlugin;
import org.apache.atlas.plugin.util.RangerPerfTracer;

import java.util.*;

import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.ListIterator;
import java.util.ArrayList;
import java.util.Set;
import java.util.Map;
import java.util.HashMap;
import java.util.UUID;
import java.util.Collection;
import java.util.Optional;
import java.util.Objects;


import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.stream.Collectors;

import static org.apache.atlas.authorization.atlas.authorizer.RangerAtlasAuthorizerUtil.*;
import static org.apache.atlas.authorize.AtlasAuthorizationUtils.getCurrentUserGroups;
Expand All @@ -74,6 +92,8 @@ public class RangerAtlasAuthorizer implements AtlasAuthorizer {
add(AtlasPrivilege.ENTITY_UPDATE_CLASSIFICATION);
}};

private static final ExecutorService classificationAccessThreadpool = Executors.newFixedThreadPool(20);

@Override
public void init() {
if (LOG.isDebugEnabled()) {
Expand Down Expand Up @@ -582,25 +602,27 @@ public void scrubSearchResults(AtlasSearchResultScrubRequest request, boolean is
if (RangerPerfTracer.isPerfTraceEnabled(PERF_LOG))
perf = RangerPerfTracer.getPerfTracer(PERF_LOG, "RangerAtlasAuthorizer.scrubSearchResults(" + request + ")");
AtlasSearchResult result = request.getSearchResult();
List<AtlasEntityHeader> entitiesToCheck = new ArrayList<>();

long startTime = System.currentTimeMillis();
if (CollectionUtils.isNotEmpty(result.getEntities())) {
for (AtlasEntityHeader entity : result.getEntities()) {
checkAccessAndScrub(entity, request, isScrubAuditEnabled);
}
LOG.info("scrubSearchResults ended for entites: " + (System.currentTimeMillis()-startTime));
entitiesToCheck.addAll(result.getEntities());
}

if (CollectionUtils.isNotEmpty(result.getFullTextResult())) {
for (AtlasSearchResult.AtlasFullTextResult fullTextResult : result.getFullTextResult()) {
if (fullTextResult != null)
checkAccessAndScrub(fullTextResult.getEntity(), request, isScrubAuditEnabled);
}
entitiesToCheck.addAll(
result.getFullTextResult()
.stream()
.filter(Objects::nonNull)
.map(res -> res.getEntity())
.collect(Collectors.toList())
);
}

if (MapUtils.isNotEmpty(result.getReferredEntities())) {
for (AtlasEntityHeader entity : result.getReferredEntities().values()) {
checkAccessAndScrub(entity, request, isScrubAuditEnabled);
}
entitiesToCheck.addAll(result.getReferredEntities().values());
}

checkAccessAndScrubAsync(entitiesToCheck, request, isScrubAuditEnabled);
} finally {
RangerPerfTracer.log(perf);
}
Expand All @@ -622,6 +644,37 @@ public void filterTypesDef(AtlasTypesDefFilterRequest request) throws AtlasAutho

}

private void checkAccessAndScrubAsync(List<AtlasEntityHeader> entitiesToCheck, AtlasSearchResultScrubRequest request, boolean isScrubAuditEnabled) throws AtlasAuthorizationException {
LOG.info("Creating futures to check access and scrub " + entitiesToCheck.size() + " entities");
long startTime = System.currentTimeMillis();
List<CompletableFuture<AtlasAuthorizationException>> completableFutures = entitiesToCheck
.stream()
.map(entity -> CompletableFuture.supplyAsync(() -> {
try {
checkAccessAndScrub(entity, request, isScrubAuditEnabled, startTime);
return null;
} catch (AtlasAuthorizationException e) {
return e;
}
}, classificationAccessThreadpool))
.collect(Collectors.toList());

// wait for all threads to complete their execution
CompletableFuture.allOf(completableFutures.toArray(new CompletableFuture[0])).join();

// get the first exception from any checkAccessAndScrub calls
Optional<AtlasAuthorizationException> maybeAuthException = completableFutures
.stream()
.map(CompletableFuture::join)
.filter(Objects::nonNull)
.findFirst();

LOG.info("Async check access and scrub is complete");
if (maybeAuthException.isPresent()) {
throw maybeAuthException.get();
}
}

private void filterTypes(AtlasAccessRequest request, List<? extends AtlasBaseTypeDef> typeDefs)throws AtlasAuthorizationException {
if (typeDefs != null) {
for (ListIterator<? extends AtlasBaseTypeDef> iter = typeDefs.listIterator(); iter.hasNext();) {
Expand Down Expand Up @@ -650,9 +703,11 @@ private boolean isAccessAllowed(AtlasEntityAccessRequest request, RangerAtlasAud
if (LOG.isDebugEnabled()) {
LOG.debug("==> isAccessAllowed(" + request + ")");
}
boolean ret = false;

String uuid = UUID.randomUUID().toString();
long startTime = System.currentTimeMillis();
final String uuid = UUID.randomUUID().toString();
LOG.info("start isAccessAllowed : " + startTime + " uuid: " + uuid);
boolean ret = false;

try {
final String action = request.getAction() != null ? request.getAction().getType() : null;
Expand Down Expand Up @@ -860,7 +915,8 @@ private void checkAccessAndScrub(AtlasEntityHeader entity, AtlasSearchResultScru
}
}

private void checkAccessAndScrub(AtlasEntityHeader entity, AtlasSearchResultScrubRequest request, boolean isScrubAuditEnabled) throws AtlasAuthorizationException {
private void checkAccessAndScrub(AtlasEntityHeader entity, AtlasSearchResultScrubRequest request, boolean isScrubAuditEnabled, long startTime) throws AtlasAuthorizationException {
LOG.info("checkAccessAndScrub is scheduled after: " + (System.currentTimeMillis() - startTime));
if (entity != null && request != null) {
final AtlasEntityAccessRequest entityAccessRequest = new AtlasEntityAccessRequest(request.getTypeRegistry(), AtlasPrivilege.ENTITY_READ, entity, request.getUser(), request.getUserGroups());

Expand All @@ -870,7 +926,10 @@ private void checkAccessAndScrub(AtlasEntityHeader entity, AtlasSearchResultScru

boolean isEntityAccessAllowed = isScrubAuditEnabled ? isAccessAllowed(entityAccessRequest) : isAccessAllowed(entityAccessRequest, null);
if (!isEntityAccessAllowed) {
long scrubStartTime = System.currentTimeMillis();
LOG.info("scrubEntityHeader started" + scrubStartTime);
scrubEntityHeader(entity, request.getTypeRegistry());
LOG.info("scrubEntityHeader ended" + (System.currentTimeMillis() - scrubStartTime));
}
}
}
Expand Down

0 comments on commit 965509f

Please sign in to comment.