Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add optimisation for classification access. #2564

Merged
merged 1 commit into from
Dec 4, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,22 @@
import org.apache.atlas.plugin.service.RangerBasePlugin;
import org.apache.atlas.plugin.util.RangerPerfTracer;

import java.util.*;

import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.ListIterator;
import java.util.ArrayList;
import java.util.Set;
import java.util.Map;
import java.util.HashMap;
import java.util.UUID;
import java.util.Collection;


import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import static org.apache.atlas.authorization.atlas.authorizer.RangerAtlasAuthorizerUtil.*;
import static org.apache.atlas.authorize.AtlasAuthorizationUtils.getCurrentUserGroups;
Expand All @@ -74,6 +89,8 @@ public class RangerAtlasAuthorizer implements AtlasAuthorizer {
add(AtlasPrivilege.ENTITY_UPDATE_CLASSIFICATION);
}};

private static final ExecutorService classificationAccessThreadpool = Executors.newFixedThreadPool(20);

@Override
public void init() {
if (LOG.isDebugEnabled()) {
Expand Down Expand Up @@ -240,7 +257,7 @@ public boolean isAccessAllowed(AtlasTypeAccessRequest request) throws AtlasAutho
boolean isAuditDisabled = ACCESS_TYPE_TYPE_READ.equalsIgnoreCase(action);

if (isAuditDisabled) {
ret = checkAccess(rangerRequest, null, "");
ret = checkAccess(rangerRequest, null, "", System.currentTimeMillis());
} else {
ret = checkAccess(rangerRequest);
}
Expand Down Expand Up @@ -650,71 +667,56 @@ private boolean isAccessAllowed(AtlasEntityAccessRequest request, RangerAtlasAud
if (LOG.isDebugEnabled()) {
LOG.debug("==> isAccessAllowed(" + request + ")");
}
boolean ret = false;

String uuid = UUID.randomUUID().toString();
long startTime = System.currentTimeMillis();
final String uuid = UUID.randomUUID().toString();
LOG.info("start isAccessAllowed : " + startTime + " uuid: " + uuid);
boolean ret = false;

try {
final String action = request.getAction() != null ? request.getAction().getType() : null;
final Set<String> entityTypes = request.getEntityTypeAndAllSuperTypes();
final String entityId = request.getEntityId();
final String classification = request.getClassification() != null ? request.getClassification().getTypeName() : null;
final RangerAccessRequestImpl rangerRequest = new RangerAccessRequestImpl();
final RangerAccessResourceImpl rangerResource = new RangerAccessResourceImpl();
final String ownerUser = request.getEntity() != null ? (String) request.getEntity().getAttribute(RESOURCE_ENTITY_OWNER) : null;

rangerResource.setValue(RESOURCE_ENTITY_TYPE, entityTypes);
rangerResource.setValue(RESOURCE_ENTITY_ID, entityId);
rangerResource.setOwnerUser(ownerUser);
rangerRequest.setAccessType(action);
rangerRequest.setAction(action);
rangerRequest.setUser(request.getUser());
rangerRequest.setUserGroups(request.getUserGroups());
rangerRequest.setClientIPAddress(request.getClientIPAddress());
rangerRequest.setAccessTime(request.getAccessTime());
rangerRequest.setResource(rangerResource);
rangerRequest.setForwardedAddresses(request.getForwardedAddresses());
rangerRequest.setRemoteIPAddress(request.getRemoteIPAddress());

if (AtlasPrivilege.ENTITY_ADD_LABEL.equals(request.getAction()) || AtlasPrivilege.ENTITY_REMOVE_LABEL.equals(request.getAction())) {
rangerResource.setValue(RESOURCE_ENTITY_LABEL, request.getLabel());
} else if (AtlasPrivilege.ENTITY_UPDATE_BUSINESS_METADATA.equals(request.getAction())) {
rangerResource.setValue(RESOURCE_ENTITY_BUSINESS_METADATA, request.getBusinessMetadata());
} else if (StringUtils.isNotEmpty(classification) && CLASSIFICATION_PRIVILEGES.contains(request.getAction())) {
rangerResource.setValue(RESOURCE_CLASSIFICATION, request.getClassificationTypeAndAllSuperTypes(classification));
}

if (CollectionUtils.isNotEmpty(request.getEntityClassifications())) {
Set<AtlasClassification> entityClassifications = request.getEntityClassifications();
Map<String, Object> contextOjb = rangerRequest.getContext();

Set<RangerTagForEval> rangerTagForEval = getRangerServiceTag(entityClassifications);

if (contextOjb == null) {
Map<String, Object> contextOjb1 = new HashMap<String, Object>();
contextOjb1.put("CLASSIFICATIONS", rangerTagForEval);
rangerRequest.setContext(contextOjb1);
} else {
contextOjb.put("CLASSIFICATIONS", rangerTagForEval);
rangerRequest.setContext(contextOjb);
}
List<CompletableFuture<Boolean>> completableFutures = new ArrayList<>();
LOG.info("isAccessAllowed started : " + startTime);

// check authorization for each classification
LOG.info("classification level authorization started: " + (System.currentTimeMillis()-startTime) + "uuid: "+uuid);
LOG.info("start check authorization for each classification: " + (System.currentTimeMillis() - startTime)+ " uuid: " + uuid);
for (AtlasClassification classificationToAuthorize : request.getEntityClassifications()) {
rangerResource.setValue(RESOURCE_ENTITY_CLASSIFICATION, request.getClassificationTypeAndAllSuperTypes(classificationToAuthorize.getTypeName()));
long rangerRequestCreationStartTime = System.currentTimeMillis();
RangerAccessRequestImpl rangerRequest = createRangerAccessRequest(request, classificationToAuthorize, rangerTagForEval);
LOG.info("Time taken to create a ranger request for uuid: "+uuid+ "is "+ (System.currentTimeMillis()-rangerRequestCreationStartTime));
long taskSubmissionTime = System.currentTimeMillis();
completableFutures.add(CompletableFuture.supplyAsync(()->checkAccess(rangerRequest, auditHandler, uuid, taskSubmissionTime), classificationAccessThreadpool));
}

ret = checkAccess(rangerRequest, auditHandler, uuid);
// wait for all threads to complete their execution
CompletableFuture.allOf(completableFutures.toArray(new CompletableFuture[0])).join();
LOG.info("end check authorization for each classification: " + (System.currentTimeMillis() - startTime) + " uuid: " + uuid);


// if all checkAccess calls return true, then ret is true, else it is false
ret = completableFutures
.stream()
.map(CompletableFuture::join)
.allMatch(result -> result == true);

if (!ret) {
break;
}
}
LOG.info("classification level authorization ended: " + (System.currentTimeMillis()-startTime) + "uuid: "+uuid);
} else {

RangerAccessRequestImpl rangerRequest = new RangerAccessRequestImpl();
RangerAccessResourceImpl rangerResource = new RangerAccessResourceImpl();

initRangerRequest(rangerRequest, request);
initRangerResource(rangerResource, request);

rangerRequest.setResource(rangerResource);

rangerResource.setValue(RESOURCE_ENTITY_CLASSIFICATION, ENTITY_NOT_CLASSIFIED );

ret = checkAccess(rangerRequest, auditHandler, uuid);
ret = checkAccess(rangerRequest, auditHandler, uuid, System.currentTimeMillis());
}

} finally {
Expand All @@ -726,10 +728,81 @@ private boolean isAccessAllowed(AtlasEntityAccessRequest request, RangerAtlasAud
if (LOG.isDebugEnabled()) {
LOG.debug("<== isAccessAllowed(" + request + "): " + ret);
}
LOG.info("isAccessAllowed ended: " + (System.currentTimeMillis()-startTime) + "uuid: "+uuid);

return ret;
}

private RangerAccessRequestImpl createRangerAccessRequest(AtlasEntityAccessRequest request,
AtlasClassification classificationToAuthorize,
Set<RangerTagForEval> rangerTagForEval) {

long startTime = System.currentTimeMillis();
LOG.info("createRangerAccessRequest start: " + startTime);

RangerAccessRequestImpl rangerRequest = new RangerAccessRequestImpl();
RangerAccessResourceImpl rangerResource = new RangerAccessResourceImpl();

initRangerRequest(rangerRequest, request);
initRangerResource(rangerResource, request);

rangerResource.setValue(RESOURCE_ENTITY_CLASSIFICATION, request.getClassificationTypeAndAllSuperTypes(classificationToAuthorize.getTypeName()));

rangerRequest.setResource(rangerResource);

setClassificationContextForRanger(rangerTagForEval, rangerRequest);

LOG.info("createRangerAccessRequest end: " + (System.currentTimeMillis() - startTime));

return rangerRequest;

}

private static void setClassificationContextForRanger(Set<RangerTagForEval> rangerTagForEval, RangerAccessRequestImpl rangerRequest) {
Map<String, Object> contextOjb = rangerRequest.getContext();

if (contextOjb == null) {
Map<String, Object> contextOjb1 = new HashMap<String, Object>();
contextOjb1.put("CLASSIFICATIONS", rangerTagForEval);
rangerRequest.setContext(contextOjb1);
} else {
contextOjb.put("CLASSIFICATIONS", rangerTagForEval);
rangerRequest.setContext(contextOjb);
}
}

private void initRangerRequest(RangerAccessRequestImpl rangerRequest, AtlasEntityAccessRequest request) {
final String action = request.getAction() != null ? request.getAction().getType() : null;

rangerRequest.setAccessType(action);
rangerRequest.setAction(action);
rangerRequest.setUser(request.getUser());
rangerRequest.setUserGroups(request.getUserGroups());
rangerRequest.setClientIPAddress(request.getClientIPAddress());
rangerRequest.setAccessTime(request.getAccessTime());
rangerRequest.setForwardedAddresses(request.getForwardedAddresses());
rangerRequest.setRemoteIPAddress(request.getRemoteIPAddress());
}

private void initRangerResource(RangerAccessResourceImpl rangerResource, AtlasEntityAccessRequest request) {
final Set<String> entityTypes = request.getEntityTypeAndAllSuperTypes();
final String entityId = request.getEntityId();
final String ownerUser = request.getEntity() != null ? (String) request.getEntity().getAttribute(RESOURCE_ENTITY_OWNER) : null;
final String classification = request.getClassification() != null ? request.getClassification().getTypeName() : null;

rangerResource.setValue(RESOURCE_ENTITY_TYPE, entityTypes);
rangerResource.setValue(RESOURCE_ENTITY_ID, entityId);
rangerResource.setOwnerUser(ownerUser);

if (AtlasPrivilege.ENTITY_ADD_LABEL.equals(request.getAction()) || AtlasPrivilege.ENTITY_REMOVE_LABEL.equals(request.getAction())) {
rangerResource.setValue(RESOURCE_ENTITY_LABEL, request.getLabel());
} else if (AtlasPrivilege.ENTITY_UPDATE_BUSINESS_METADATA.equals(request.getAction())) {
rangerResource.setValue(RESOURCE_ENTITY_BUSINESS_METADATA, request.getBusinessMetadata());
} else if (StringUtils.isNotEmpty(classification) && CLASSIFICATION_PRIVILEGES.contains(request.getAction())) {
rangerResource.setValue(RESOURCE_CLASSIFICATION, request.getClassificationTypeAndAllSuperTypes(classification));
}

}


private void setClassificationsToRequestContext(Set<AtlasClassification> entityClassifications, RangerAccessRequestImpl rangerRequest) {
Map<String, Object> contextOjb = rangerRequest.getContext();
Expand Down Expand Up @@ -793,7 +866,8 @@ private boolean checkAccess(RangerAccessRequestImpl request) {
return ret;
}

private boolean checkAccess(RangerAccessRequestImpl request, RangerAtlasAuditHandler auditHandler, String uuid) {
private boolean checkAccess(RangerAccessRequestImpl request, RangerAtlasAuditHandler auditHandler, String uuid, long taskSubmissionTime) {
LOG.info("wait time for checkAccess is "+ (System.currentTimeMillis() - taskSubmissionTime));
boolean ret = false;
long startTime = System.currentTimeMillis();
LOG.info("checkAccess started at: " + startTime + " uuid: " + uuid);
Expand Down Expand Up @@ -870,7 +944,10 @@ private void checkAccessAndScrub(AtlasEntityHeader entity, AtlasSearchResultScru

boolean isEntityAccessAllowed = isScrubAuditEnabled ? isAccessAllowed(entityAccessRequest) : isAccessAllowed(entityAccessRequest, null);
if (!isEntityAccessAllowed) {
long startTime = System.currentTimeMillis();
LOG.info("scrubEntityHeader started" + startTime);
scrubEntityHeader(entity, request.getTypeRegistry());
LOG.info("scrubEntityHeader ended" + (System.currentTimeMillis() - startTime));
}
}
}
Expand Down
Loading