Skip to content

Commit

Permalink
Merge pull request #2589 from atlanhq/revert-2564-optimise_scrub_search
Browse files Browse the repository at this point in the history
Revert "Add optimisation for classification access."
  • Loading branch information
ektavarma10 authored Dec 6, 2023
2 parents 2656b55 + 45e1829 commit f56832f
Showing 1 changed file with 52 additions and 129 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -53,22 +53,7 @@
import org.apache.atlas.plugin.service.RangerBasePlugin;
import org.apache.atlas.plugin.util.RangerPerfTracer;


import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.ListIterator;
import java.util.ArrayList;
import java.util.Set;
import java.util.Map;
import java.util.HashMap;
import java.util.UUID;
import java.util.Collection;


import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.*;

import static org.apache.atlas.authorization.atlas.authorizer.RangerAtlasAuthorizerUtil.*;
import static org.apache.atlas.authorize.AtlasAuthorizationUtils.getCurrentUserGroups;
Expand All @@ -89,8 +74,6 @@ public class RangerAtlasAuthorizer implements AtlasAuthorizer {
add(AtlasPrivilege.ENTITY_UPDATE_CLASSIFICATION);
}};

private static final ExecutorService classificationAccessThreadpool = Executors.newFixedThreadPool(20);

@Override
public void init() {
if (LOG.isDebugEnabled()) {
Expand Down Expand Up @@ -257,7 +240,7 @@ public boolean isAccessAllowed(AtlasTypeAccessRequest request) throws AtlasAutho
boolean isAuditDisabled = ACCESS_TYPE_TYPE_READ.equalsIgnoreCase(action);

if (isAuditDisabled) {
ret = checkAccess(rangerRequest, null, "", System.currentTimeMillis());
ret = checkAccess(rangerRequest, null, "");
} else {
ret = checkAccess(rangerRequest);
}
Expand Down Expand Up @@ -667,56 +650,71 @@ private boolean isAccessAllowed(AtlasEntityAccessRequest request, RangerAtlasAud
if (LOG.isDebugEnabled()) {
LOG.debug("==> isAccessAllowed(" + request + ")");
}

String uuid = UUID.randomUUID().toString();
long startTime = System.currentTimeMillis();
LOG.info("start isAccessAllowed : " + startTime + " uuid: " + uuid);
boolean ret = false;
long startTime = System.currentTimeMillis();
final String uuid = UUID.randomUUID().toString();

try {
final String action = request.getAction() != null ? request.getAction().getType() : null;
final Set<String> entityTypes = request.getEntityTypeAndAllSuperTypes();
final String entityId = request.getEntityId();
final String classification = request.getClassification() != null ? request.getClassification().getTypeName() : null;
final RangerAccessRequestImpl rangerRequest = new RangerAccessRequestImpl();
final RangerAccessResourceImpl rangerResource = new RangerAccessResourceImpl();
final String ownerUser = request.getEntity() != null ? (String) request.getEntity().getAttribute(RESOURCE_ENTITY_OWNER) : null;

rangerResource.setValue(RESOURCE_ENTITY_TYPE, entityTypes);
rangerResource.setValue(RESOURCE_ENTITY_ID, entityId);
rangerResource.setOwnerUser(ownerUser);
rangerRequest.setAccessType(action);
rangerRequest.setAction(action);
rangerRequest.setUser(request.getUser());
rangerRequest.setUserGroups(request.getUserGroups());
rangerRequest.setClientIPAddress(request.getClientIPAddress());
rangerRequest.setAccessTime(request.getAccessTime());
rangerRequest.setResource(rangerResource);
rangerRequest.setForwardedAddresses(request.getForwardedAddresses());
rangerRequest.setRemoteIPAddress(request.getRemoteIPAddress());

if (AtlasPrivilege.ENTITY_ADD_LABEL.equals(request.getAction()) || AtlasPrivilege.ENTITY_REMOVE_LABEL.equals(request.getAction())) {
rangerResource.setValue(RESOURCE_ENTITY_LABEL, request.getLabel());
} else if (AtlasPrivilege.ENTITY_UPDATE_BUSINESS_METADATA.equals(request.getAction())) {
rangerResource.setValue(RESOURCE_ENTITY_BUSINESS_METADATA, request.getBusinessMetadata());
} else if (StringUtils.isNotEmpty(classification) && CLASSIFICATION_PRIVILEGES.contains(request.getAction())) {
rangerResource.setValue(RESOURCE_CLASSIFICATION, request.getClassificationTypeAndAllSuperTypes(classification));
}

if (CollectionUtils.isNotEmpty(request.getEntityClassifications())) {
Set<AtlasClassification> entityClassifications = request.getEntityClassifications();
Map<String, Object> contextOjb = rangerRequest.getContext();

Set<RangerTagForEval> rangerTagForEval = getRangerServiceTag(entityClassifications);

List<CompletableFuture<Boolean>> completableFutures = new ArrayList<>();
LOG.info("isAccessAllowed started : " + startTime);
if (contextOjb == null) {
Map<String, Object> contextOjb1 = new HashMap<String, Object>();
contextOjb1.put("CLASSIFICATIONS", rangerTagForEval);
rangerRequest.setContext(contextOjb1);
} else {
contextOjb.put("CLASSIFICATIONS", rangerTagForEval);
rangerRequest.setContext(contextOjb);
}

// check authorization for each classification
LOG.info("start check authorization for each classification: " + (System.currentTimeMillis() - startTime)+ " uuid: " + uuid);
LOG.info("classification level authorization started: " + (System.currentTimeMillis()-startTime) + "uuid: "+uuid);
for (AtlasClassification classificationToAuthorize : request.getEntityClassifications()) {
long rangerRequestCreationStartTime = System.currentTimeMillis();
RangerAccessRequestImpl rangerRequest = createRangerAccessRequest(request, classificationToAuthorize, rangerTagForEval);
LOG.info("Time taken to create a ranger request for uuid: "+uuid+ "is "+ (System.currentTimeMillis()-rangerRequestCreationStartTime));
long taskSubmissionTime = System.currentTimeMillis();
completableFutures.add(CompletableFuture.supplyAsync(()->checkAccess(rangerRequest, auditHandler, uuid, taskSubmissionTime), classificationAccessThreadpool));
}

// wait for all threads to complete their execution
CompletableFuture.allOf(completableFutures.toArray(new CompletableFuture[0])).join();
LOG.info("end check authorization for each classification: " + (System.currentTimeMillis() - startTime) + " uuid: " + uuid);

rangerResource.setValue(RESOURCE_ENTITY_CLASSIFICATION, request.getClassificationTypeAndAllSuperTypes(classificationToAuthorize.getTypeName()));

// if all checkAccess calls return true, then ret is true, else it is false
ret = completableFutures
.stream()
.map(CompletableFuture::join)
.allMatch(result -> result == true);
ret = checkAccess(rangerRequest, auditHandler, uuid);

if (!ret) {
break;
}
}
LOG.info("classification level authorization ended: " + (System.currentTimeMillis()-startTime) + "uuid: "+uuid);
} else {

RangerAccessRequestImpl rangerRequest = new RangerAccessRequestImpl();
RangerAccessResourceImpl rangerResource = new RangerAccessResourceImpl();

initRangerRequest(rangerRequest, request);
initRangerResource(rangerResource, request);

rangerRequest.setResource(rangerResource);

rangerResource.setValue(RESOURCE_ENTITY_CLASSIFICATION, ENTITY_NOT_CLASSIFIED );

ret = checkAccess(rangerRequest, auditHandler, uuid, System.currentTimeMillis());
ret = checkAccess(rangerRequest, auditHandler, uuid);
}

} finally {
Expand All @@ -728,81 +726,10 @@ private boolean isAccessAllowed(AtlasEntityAccessRequest request, RangerAtlasAud
if (LOG.isDebugEnabled()) {
LOG.debug("<== isAccessAllowed(" + request + "): " + ret);
}

LOG.info("isAccessAllowed ended: " + (System.currentTimeMillis()-startTime) + "uuid: "+uuid);
return ret;
}

private RangerAccessRequestImpl createRangerAccessRequest(AtlasEntityAccessRequest request,
AtlasClassification classificationToAuthorize,
Set<RangerTagForEval> rangerTagForEval) {

long startTime = System.currentTimeMillis();
LOG.info("createRangerAccessRequest start: " + startTime);

RangerAccessRequestImpl rangerRequest = new RangerAccessRequestImpl();
RangerAccessResourceImpl rangerResource = new RangerAccessResourceImpl();

initRangerRequest(rangerRequest, request);
initRangerResource(rangerResource, request);

rangerResource.setValue(RESOURCE_ENTITY_CLASSIFICATION, request.getClassificationTypeAndAllSuperTypes(classificationToAuthorize.getTypeName()));

rangerRequest.setResource(rangerResource);

setClassificationContextForRanger(rangerTagForEval, rangerRequest);

LOG.info("createRangerAccessRequest end: " + (System.currentTimeMillis() - startTime));

return rangerRequest;

}

private static void setClassificationContextForRanger(Set<RangerTagForEval> rangerTagForEval, RangerAccessRequestImpl rangerRequest) {
Map<String, Object> contextOjb = rangerRequest.getContext();

if (contextOjb == null) {
Map<String, Object> contextOjb1 = new HashMap<String, Object>();
contextOjb1.put("CLASSIFICATIONS", rangerTagForEval);
rangerRequest.setContext(contextOjb1);
} else {
contextOjb.put("CLASSIFICATIONS", rangerTagForEval);
rangerRequest.setContext(contextOjb);
}
}

private void initRangerRequest(RangerAccessRequestImpl rangerRequest, AtlasEntityAccessRequest request) {
final String action = request.getAction() != null ? request.getAction().getType() : null;

rangerRequest.setAccessType(action);
rangerRequest.setAction(action);
rangerRequest.setUser(request.getUser());
rangerRequest.setUserGroups(request.getUserGroups());
rangerRequest.setClientIPAddress(request.getClientIPAddress());
rangerRequest.setAccessTime(request.getAccessTime());
rangerRequest.setForwardedAddresses(request.getForwardedAddresses());
rangerRequest.setRemoteIPAddress(request.getRemoteIPAddress());
}

private void initRangerResource(RangerAccessResourceImpl rangerResource, AtlasEntityAccessRequest request) {
final Set<String> entityTypes = request.getEntityTypeAndAllSuperTypes();
final String entityId = request.getEntityId();
final String ownerUser = request.getEntity() != null ? (String) request.getEntity().getAttribute(RESOURCE_ENTITY_OWNER) : null;
final String classification = request.getClassification() != null ? request.getClassification().getTypeName() : null;

rangerResource.setValue(RESOURCE_ENTITY_TYPE, entityTypes);
rangerResource.setValue(RESOURCE_ENTITY_ID, entityId);
rangerResource.setOwnerUser(ownerUser);

if (AtlasPrivilege.ENTITY_ADD_LABEL.equals(request.getAction()) || AtlasPrivilege.ENTITY_REMOVE_LABEL.equals(request.getAction())) {
rangerResource.setValue(RESOURCE_ENTITY_LABEL, request.getLabel());
} else if (AtlasPrivilege.ENTITY_UPDATE_BUSINESS_METADATA.equals(request.getAction())) {
rangerResource.setValue(RESOURCE_ENTITY_BUSINESS_METADATA, request.getBusinessMetadata());
} else if (StringUtils.isNotEmpty(classification) && CLASSIFICATION_PRIVILEGES.contains(request.getAction())) {
rangerResource.setValue(RESOURCE_CLASSIFICATION, request.getClassificationTypeAndAllSuperTypes(classification));
}

}


private void setClassificationsToRequestContext(Set<AtlasClassification> entityClassifications, RangerAccessRequestImpl rangerRequest) {
Map<String, Object> contextOjb = rangerRequest.getContext();
Expand Down Expand Up @@ -866,8 +793,7 @@ private boolean checkAccess(RangerAccessRequestImpl request) {
return ret;
}

private boolean checkAccess(RangerAccessRequestImpl request, RangerAtlasAuditHandler auditHandler, String uuid, long taskSubmissionTime) {
LOG.info("wait time for checkAccess is "+ (System.currentTimeMillis() - taskSubmissionTime));
private boolean checkAccess(RangerAccessRequestImpl request, RangerAtlasAuditHandler auditHandler, String uuid) {
boolean ret = false;
long startTime = System.currentTimeMillis();
LOG.info("checkAccess started at: " + startTime + " uuid: " + uuid);
Expand Down Expand Up @@ -944,10 +870,7 @@ private void checkAccessAndScrub(AtlasEntityHeader entity, AtlasSearchResultScru

boolean isEntityAccessAllowed = isScrubAuditEnabled ? isAccessAllowed(entityAccessRequest) : isAccessAllowed(entityAccessRequest, null);
if (!isEntityAccessAllowed) {
long startTime = System.currentTimeMillis();
LOG.info("scrubEntityHeader started" + startTime);
scrubEntityHeader(entity, request.getTypeRegistry());
LOG.info("scrubEntityHeader ended" + (System.currentTimeMillis() - startTime));
}
}
}
Expand Down

0 comments on commit f56832f

Please sign in to comment.