Skip to content

Commit

Permalink
Merge pull request #2537 from atlanhq/scrub_optimisation
Browse files Browse the repository at this point in the history
Scrub optimisation
  • Loading branch information
ektavarma10 authored Nov 30, 2023
2 parents f533128 + e855ebf commit 5333f80
Show file tree
Hide file tree
Showing 2 changed files with 110 additions and 95 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
import org.apache.atlas.plugin.service.RangerBasePlugin;
import org.apache.atlas.plugin.util.RangerPerfTracer;

import java.awt.*;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
Expand All @@ -61,14 +62,10 @@
import java.util.ListIterator;
import java.util.Map;
import java.util.Set;
import java.util.Objects;
import java.util.ArrayList;
import java.util.Optional;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.stream.Collectors;

import static org.apache.atlas.authorization.atlas.authorizer.RangerAtlasAuthorizerUtil.*;
import static org.apache.atlas.authorize.AtlasAuthorizationUtils.getCurrentUserGroups;
Expand All @@ -89,7 +86,7 @@ public class RangerAtlasAuthorizer implements AtlasAuthorizer {
add(AtlasPrivilege.ENTITY_UPDATE_CLASSIFICATION);
}};

static final ExecutorService entityAccessThreadpool = Executors.newFixedThreadPool(10);
private static final ExecutorService classificationAccessThreadpool = Executors.newFixedThreadPool(NUM_THREADS);

@Override
public void init() {
Expand Down Expand Up @@ -599,27 +596,22 @@ public void scrubSearchResults(AtlasSearchResultScrubRequest request, boolean is
if (RangerPerfTracer.isPerfTraceEnabled(PERF_LOG))
perf = RangerPerfTracer.getPerfTracer(PERF_LOG, "RangerAtlasAuthorizer.scrubSearchResults(" + request + ")");
AtlasSearchResult result = request.getSearchResult();
List<AtlasEntityHeader> entitiesToCheck = new ArrayList<>();

if (CollectionUtils.isNotEmpty(result.getEntities())) {
entitiesToCheck.addAll(result.getEntities());
for (AtlasEntityHeader entity : result.getEntities()) {
checkAccessAndScrub(entity, request, isScrubAuditEnabled);
}
}

if (CollectionUtils.isNotEmpty(result.getFullTextResult())) {
entitiesToCheck.addAll(
result.getFullTextResult()
.stream()
.filter(Objects::nonNull)
.map(res -> res.getEntity())
.collect(Collectors.toList())
);
for (AtlasSearchResult.AtlasFullTextResult fullTextResult : result.getFullTextResult()) {
if (fullTextResult != null)
checkAccessAndScrub(fullTextResult.getEntity(), request, isScrubAuditEnabled);
}
}

if (MapUtils.isNotEmpty(result.getReferredEntities())) {
entitiesToCheck.addAll(result.getReferredEntities().values());
for (AtlasEntityHeader entity : result.getReferredEntities().values()) {
checkAccessAndScrub(entity, request, isScrubAuditEnabled);
}
}

checkAccessAndScrubAsync(entitiesToCheck, request, isScrubAuditEnabled);
} finally {
RangerPerfTracer.log(perf);
}
Expand All @@ -641,38 +633,6 @@ public void filterTypesDef(AtlasTypesDefFilterRequest request) throws AtlasAutho

}

private void checkAccessAndScrubAsync(List<AtlasEntityHeader> entitiesToCheck, AtlasSearchResultScrubRequest request, boolean isScrubAuditEnabled) throws AtlasAuthorizationException {
LOG.info("Creating futures to check access and scrub " + entitiesToCheck.size() + " entities");
long startTime = System.currentTimeMillis();
LOG.info("Started checkAccessAndScrubAsync: " + System.currentTimeMillis());
List<CompletableFuture<AtlasAuthorizationException>> completableFutures = entitiesToCheck
.stream()
.map(entity -> CompletableFuture.supplyAsync(() -> {
try {
checkAccessAndScrub(entity, request, isScrubAuditEnabled);
return null;
} catch (AtlasAuthorizationException e) {
return e;
}
}, entityAccessThreadpool))
.collect(Collectors.toList());
LOG.info("Completed async submission " + (System.currentTimeMillis()-startTime));
// wait for all threads to complete their execution
CompletableFuture.allOf(completableFutures.toArray(new CompletableFuture[0])).join();
LOG.info("Completed waiting for completion of threads: " + (System.currentTimeMillis()-startTime));
// get the first exception from any checkAccessAndScrub calls
Optional<AtlasAuthorizationException> maybeAuthException = completableFutures
.stream()
.map(CompletableFuture::join)
.filter(Objects::nonNull)
.findFirst();

LOG.info("Async check access and scrub is complete: "+ (System.currentTimeMillis()-startTime));
if (maybeAuthException.isPresent()) {
throw maybeAuthException.get();
}
}

private void filterTypes(AtlasAccessRequest request, List<? extends AtlasBaseTypeDef> typeDefs)throws AtlasAuthorizationException {
if (typeDefs != null) {
for (ListIterator<? extends AtlasBaseTypeDef> iter = typeDefs.listIterator(); iter.hasNext();) {
Expand Down Expand Up @@ -701,64 +661,49 @@ private boolean isAccessAllowed(AtlasEntityAccessRequest request, RangerAtlasAud
if (LOG.isDebugEnabled()) {
LOG.debug("==> isAccessAllowed(" + request + ")");
}
long startTime = System.currentTimeMillis();
LOG.info("start isAccessAllowed : " + startTime);
boolean ret = false;

try {
final String action = request.getAction() != null ? request.getAction().getType() : null;
final Set<String> entityTypes = request.getEntityTypeAndAllSuperTypes();
final String entityId = request.getEntityId();
final String classification = request.getClassification() != null ? request.getClassification().getTypeName() : null;
final RangerAccessRequestImpl rangerRequest = new RangerAccessRequestImpl();
final RangerAccessResourceImpl rangerResource = new RangerAccessResourceImpl();
final String ownerUser = request.getEntity() != null ? (String) request.getEntity().getAttribute(RESOURCE_ENTITY_OWNER) : null;

rangerResource.setValue(RESOURCE_ENTITY_TYPE, entityTypes);
rangerResource.setValue(RESOURCE_ENTITY_ID, entityId);
rangerResource.setOwnerUser(ownerUser);
rangerRequest.setAccessType(action);
rangerRequest.setAction(action);
rangerRequest.setUser(request.getUser());
rangerRequest.setUserGroups(request.getUserGroups());
rangerRequest.setClientIPAddress(request.getClientIPAddress());
rangerRequest.setAccessTime(request.getAccessTime());
rangerRequest.setResource(rangerResource);
rangerRequest.setForwardedAddresses(request.getForwardedAddresses());
rangerRequest.setRemoteIPAddress(request.getRemoteIPAddress());

if (AtlasPrivilege.ENTITY_ADD_LABEL.equals(request.getAction()) || AtlasPrivilege.ENTITY_REMOVE_LABEL.equals(request.getAction())) {
rangerResource.setValue(RESOURCE_ENTITY_LABEL, request.getLabel());
} else if (AtlasPrivilege.ENTITY_UPDATE_BUSINESS_METADATA.equals(request.getAction())) {
rangerResource.setValue(RESOURCE_ENTITY_BUSINESS_METADATA, request.getBusinessMetadata());
} else if (StringUtils.isNotEmpty(classification) && CLASSIFICATION_PRIVILEGES.contains(request.getAction())) {
rangerResource.setValue(RESOURCE_CLASSIFICATION, request.getClassificationTypeAndAllSuperTypes(classification));
}

if (CollectionUtils.isNotEmpty(request.getEntityClassifications())) {
Set<AtlasClassification> entityClassifications = request.getEntityClassifications();
Map<String, Object> contextOjb = rangerRequest.getContext();

Set<RangerTagForEval> rangerTagForEval = getRangerServiceTag(entityClassifications);

if (contextOjb == null) {
Map<String, Object> contextOjb1 = new HashMap<String, Object>();
contextOjb1.put("CLASSIFICATIONS", rangerTagForEval);
rangerRequest.setContext(contextOjb1);
} else {
contextOjb.put("CLASSIFICATIONS", rangerTagForEval);
rangerRequest.setContext(contextOjb);
}
List<CompletableFuture<Boolean>> completableFutures = new ArrayList<>();

// check authorization for each classification
LOG.info("start check authrization for each classification: " + (System.currentTimeMillis() - startTime));
for (AtlasClassification classificationToAuthorize : request.getEntityClassifications()) {
rangerResource.setValue(RESOURCE_ENTITY_CLASSIFICATION, request.getClassificationTypeAndAllSuperTypes(classificationToAuthorize.getTypeName()));

ret = checkAccess(rangerRequest, auditHandler);
RangerAccessRequestImpl rangerRequest = createRangerAccessRequest(request, classificationToAuthorize, rangerTagForEval);

if (!ret) {
break;
}
completableFutures.add(CompletableFuture.supplyAsync(()->checkAccess(rangerRequest, auditHandler), classificationAccessThreadpool));
}

// wait for all threads to complete their execution
CompletableFuture.allOf(completableFutures.toArray(new CompletableFuture[0])).join();
LOG.info("end check authorization for each classification: " + (System.currentTimeMillis() - startTime));


// if all checkAccess calls return true, then ret is true, else it is false
ret = completableFutures
.stream()
.map(CompletableFuture::join)
.allMatch(result -> result == true);

} else {

RangerAccessRequestImpl rangerRequest = new RangerAccessRequestImpl();
RangerAccessResourceImpl rangerResource = new RangerAccessResourceImpl();

initRangerRequest(rangerRequest, request);
initRangerResource(rangerResource, request);

rangerRequest.setResource(rangerResource);

rangerResource.setValue(RESOURCE_ENTITY_CLASSIFICATION, ENTITY_NOT_CLASSIFIED );

ret = checkAccess(rangerRequest, auditHandler);
Expand All @@ -777,6 +722,72 @@ private boolean isAccessAllowed(AtlasEntityAccessRequest request, RangerAtlasAud
return ret;
}

private RangerAccessRequestImpl createRangerAccessRequest(AtlasEntityAccessRequest request,
AtlasClassification classificationToAuthorize,
Set<RangerTagForEval> rangerTagForEval) {

RangerAccessRequestImpl rangerRequest = new RangerAccessRequestImpl();
RangerAccessResourceImpl rangerResource = new RangerAccessResourceImpl();

initRangerRequest(rangerRequest, request);
initRangerResource(rangerResource, request);

rangerResource.setValue(RESOURCE_ENTITY_CLASSIFICATION, request.getClassificationTypeAndAllSuperTypes(classificationToAuthorize.getTypeName()));

rangerRequest.setResource(rangerResource);

setClassificationContextForRanger(rangerTagForEval, rangerRequest);

return rangerRequest;

}

private static void setClassificationContextForRanger(Set<RangerTagForEval> rangerTagForEval, RangerAccessRequestImpl rangerRequest) {
Map<String, Object> contextOjb = rangerRequest.getContext();

if (contextOjb == null) {
Map<String, Object> contextOjb1 = new HashMap<String, Object>();
contextOjb1.put("CLASSIFICATIONS", rangerTagForEval);
rangerRequest.setContext(contextOjb1);
} else {
contextOjb.put("CLASSIFICATIONS", rangerTagForEval);
rangerRequest.setContext(contextOjb);
}
}

private void initRangerRequest(RangerAccessRequestImpl rangerRequest, AtlasEntityAccessRequest request) {
final String action = request.getAction() != null ? request.getAction().getType() : null;

rangerRequest.setAccessType(action);
rangerRequest.setAction(action);
rangerRequest.setUser(request.getUser());
rangerRequest.setUserGroups(request.getUserGroups());
rangerRequest.setClientIPAddress(request.getClientIPAddress());
rangerRequest.setAccessTime(request.getAccessTime());
rangerRequest.setForwardedAddresses(request.getForwardedAddresses());
rangerRequest.setRemoteIPAddress(request.getRemoteIPAddress());
}

private void initRangerResource(RangerAccessResourceImpl rangerResource, AtlasEntityAccessRequest request) {
final Set<String> entityTypes = request.getEntityTypeAndAllSuperTypes();
final String entityId = request.getEntityId();
final String ownerUser = request.getEntity() != null ? (String) request.getEntity().getAttribute(RESOURCE_ENTITY_OWNER) : null;
final String classification = request.getClassification() != null ? request.getClassification().getTypeName() : null;

rangerResource.setValue(RESOURCE_ENTITY_TYPE, entityTypes);
rangerResource.setValue(RESOURCE_ENTITY_ID, entityId);
rangerResource.setOwnerUser(ownerUser);

if (AtlasPrivilege.ENTITY_ADD_LABEL.equals(request.getAction()) || AtlasPrivilege.ENTITY_REMOVE_LABEL.equals(request.getAction())) {
rangerResource.setValue(RESOURCE_ENTITY_LABEL, request.getLabel());
} else if (AtlasPrivilege.ENTITY_UPDATE_BUSINESS_METADATA.equals(request.getAction())) {
rangerResource.setValue(RESOURCE_ENTITY_BUSINESS_METADATA, request.getBusinessMetadata());
} else if (StringUtils.isNotEmpty(classification) && CLASSIFICATION_PRIVILEGES.contains(request.getAction())) {
rangerResource.setValue(RESOURCE_CLASSIFICATION, request.getClassificationTypeAndAllSuperTypes(classification));
}

}


private void setClassificationsToRequestContext(Set<AtlasClassification> entityClassifications, RangerAccessRequestImpl rangerRequest) {
Map<String, Object> contextOjb = rangerRequest.getContext();
Expand Down Expand Up @@ -912,11 +923,13 @@ private void checkAccessAndScrub(AtlasEntityHeader entity, AtlasSearchResultScru
entityAccessRequest.setClientIPAddress(request.getClientIPAddress());
entityAccessRequest.setForwardedAddresses(request.getForwardedAddresses());
entityAccessRequest.setRemoteIPAddress(request.getRemoteIPAddress());

long startTime = System.currentTimeMillis();
boolean isEntityAccessAllowed = isScrubAuditEnabled ? isAccessAllowed(entityAccessRequest) : isAccessAllowed(entityAccessRequest, null);
LOG.info("isEntityAccessAllowed: " + (System.currentTimeMillis() - startTime));
if (!isEntityAccessAllowed) {
scrubEntityHeader(entity, request.getTypeRegistry());
}
LOG.info("scrubEntityHeader completed" + (System.currentTimeMillis() - startTime));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@

public class RangerAtlasAuthorizerUtil {

public static Integer NUM_THREADS = 5;

static void toRangerRequest(AtlasEntityAccessRequest request, RangerAccessRequestImpl rangerRequest, RangerAccessResourceImpl rangerResource){

final String action = request.getAction() != null ? request.getAction().getType() : null;
Expand Down

0 comments on commit 5333f80

Please sign in to comment.