Skip to content

Commit

Permalink
Merge pull request #2523 from atlanhq/scrub_optimisation
Browse files Browse the repository at this point in the history
Add parallelisation for classification level method to optimise scrubSearch
  • Loading branch information
ektavarma10 authored Nov 28, 2023
2 parents 729410a + 5956234 commit a8145ae
Show file tree
Hide file tree
Showing 2 changed files with 96 additions and 44 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,10 @@
import java.util.ListIterator;
import java.util.Map;
import java.util.Set;
import java.util.ArrayList;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import static org.apache.atlas.authorization.atlas.authorizer.RangerAtlasAuthorizerUtil.*;
import static org.apache.atlas.authorize.AtlasAuthorizationUtils.getCurrentUserGroups;
Expand All @@ -81,6 +85,8 @@ public class RangerAtlasAuthorizer implements AtlasAuthorizer {
add(AtlasPrivilege.ENTITY_UPDATE_CLASSIFICATION);
}};

private static final ExecutorService classificationAccessThreadpool = Executors.newFixedThreadPool(NUM_THREADS);

@Override
public void init() {
if (LOG.isDebugEnabled()) {
Expand Down Expand Up @@ -657,61 +663,43 @@ private boolean isAccessAllowed(AtlasEntityAccessRequest request, RangerAtlasAud
boolean ret = false;

try {
final String action = request.getAction() != null ? request.getAction().getType() : null;
final Set<String> entityTypes = request.getEntityTypeAndAllSuperTypes();
final String entityId = request.getEntityId();
final String classification = request.getClassification() != null ? request.getClassification().getTypeName() : null;
final RangerAccessRequestImpl rangerRequest = new RangerAccessRequestImpl();
final RangerAccessResourceImpl rangerResource = new RangerAccessResourceImpl();
final String ownerUser = request.getEntity() != null ? (String) request.getEntity().getAttribute(RESOURCE_ENTITY_OWNER) : null;

rangerResource.setValue(RESOURCE_ENTITY_TYPE, entityTypes);
rangerResource.setValue(RESOURCE_ENTITY_ID, entityId);
rangerResource.setOwnerUser(ownerUser);
rangerRequest.setAccessType(action);
rangerRequest.setAction(action);
rangerRequest.setUser(request.getUser());
rangerRequest.setUserGroups(request.getUserGroups());
rangerRequest.setClientIPAddress(request.getClientIPAddress());
rangerRequest.setAccessTime(request.getAccessTime());
rangerRequest.setResource(rangerResource);
rangerRequest.setForwardedAddresses(request.getForwardedAddresses());
rangerRequest.setRemoteIPAddress(request.getRemoteIPAddress());

if (AtlasPrivilege.ENTITY_ADD_LABEL.equals(request.getAction()) || AtlasPrivilege.ENTITY_REMOVE_LABEL.equals(request.getAction())) {
rangerResource.setValue(RESOURCE_ENTITY_LABEL, request.getLabel());
} else if (AtlasPrivilege.ENTITY_UPDATE_BUSINESS_METADATA.equals(request.getAction())) {
rangerResource.setValue(RESOURCE_ENTITY_BUSINESS_METADATA, request.getBusinessMetadata());
} else if (StringUtils.isNotEmpty(classification) && CLASSIFICATION_PRIVILEGES.contains(request.getAction())) {
rangerResource.setValue(RESOURCE_CLASSIFICATION, request.getClassificationTypeAndAllSuperTypes(classification));
}

if (CollectionUtils.isNotEmpty(request.getEntityClassifications())) {
Set<AtlasClassification> entityClassifications = request.getEntityClassifications();
Map<String, Object> contextOjb = rangerRequest.getContext();

Set<RangerTagForEval> rangerTagForEval = getRangerServiceTag(entityClassifications);

if (contextOjb == null) {
Map<String, Object> contextOjb1 = new HashMap<String, Object>();
contextOjb1.put("CLASSIFICATIONS", rangerTagForEval);
rangerRequest.setContext(contextOjb1);
} else {
contextOjb.put("CLASSIFICATIONS", rangerTagForEval);
rangerRequest.setContext(contextOjb);
}
List<CompletableFuture<Boolean>> completableFutures = new ArrayList<>();

// check authorization for each classification
for (AtlasClassification classificationToAuthorize : request.getEntityClassifications()) {
rangerResource.setValue(RESOURCE_ENTITY_CLASSIFICATION, request.getClassificationTypeAndAllSuperTypes(classificationToAuthorize.getTypeName()));

ret = checkAccess(rangerRequest, auditHandler);
RangerAccessResourceImpl rangerResource = createRangerResourceRequest(request);
RangerAccessRequestImpl rangerRequest = createRangerAccessRequest(request, rangerResource);

if (!ret) {
break;
}
rangerResource.setValue(RESOURCE_ENTITY_CLASSIFICATION,
request.getClassificationTypeAndAllSuperTypes(classificationToAuthorize.getTypeName()));
setClassificationContextForRanger(rangerTagForEval, rangerRequest);

completableFutures.add(CompletableFuture.supplyAsync(
() -> checkAccess(rangerRequest, auditHandler), classificationAccessThreadpool));
}

// wait for all threads to complete their execution
CompletableFuture.allOf(completableFutures.toArray(new CompletableFuture[0])).join();

// if all checkAccess calls return true, then ret is true, else it is false
ret = completableFutures
.stream()
.map(CompletableFuture::join)
.allMatch(result -> result == true);

} else {

RangerAccessResourceImpl rangerResource = createRangerResourceRequest(request);

RangerAccessRequestImpl rangerRequest = createRangerAccessRequest(request, rangerResource);

rangerResource.setValue(RESOURCE_ENTITY_CLASSIFICATION, ENTITY_NOT_CLASSIFIED );

ret = checkAccess(rangerRequest, auditHandler);
Expand All @@ -730,6 +718,66 @@ private boolean isAccessAllowed(AtlasEntityAccessRequest request, RangerAtlasAud
return ret;
}

private RangerAccessResourceImpl createRangerResourceRequest(AtlasEntityAccessRequest request) {
RangerAccessResourceImpl rangerResource = new RangerAccessResourceImpl();

final Set<String> entityTypes = request.getEntityTypeAndAllSuperTypes();
final String entityId = request.getEntityId();
final String ownerUser = request.getEntity() != null ?
(String) request.getEntity().getAttribute(RESOURCE_ENTITY_OWNER) : null;
final String classification = request.getClassification() != null ?
request.getClassification().getTypeName() : null;

rangerResource.setValue(RESOURCE_ENTITY_TYPE, entityTypes);
rangerResource.setValue(RESOURCE_ENTITY_ID, entityId);
rangerResource.setOwnerUser(ownerUser);

if (AtlasPrivilege.ENTITY_ADD_LABEL.equals(request.getAction()) ||
AtlasPrivilege.ENTITY_REMOVE_LABEL.equals(request.getAction())) {
rangerResource.setValue(RESOURCE_ENTITY_LABEL, request.getLabel());
} else if (AtlasPrivilege.ENTITY_UPDATE_BUSINESS_METADATA.equals(request.getAction())) {
rangerResource.setValue(RESOURCE_ENTITY_BUSINESS_METADATA, request.getBusinessMetadata());
} else if (StringUtils.isNotEmpty(classification) && CLASSIFICATION_PRIVILEGES.contains(request.getAction())) {
rangerResource.setValue(RESOURCE_CLASSIFICATION,
request.getClassificationTypeAndAllSuperTypes(classification));
}

return rangerResource;
}

private RangerAccessRequestImpl createRangerAccessRequest(AtlasEntityAccessRequest request,
RangerAccessResourceImpl rangerResource) {

RangerAccessRequestImpl rangerRequest = new RangerAccessRequestImpl();

final String action = request.getAction() != null ? request.getAction().getType() : null;

rangerRequest.setAccessType(action);
rangerRequest.setAction(action);
rangerRequest.setUser(request.getUser());
rangerRequest.setUserGroups(request.getUserGroups());
rangerRequest.setClientIPAddress(request.getClientIPAddress());
rangerRequest.setAccessTime(request.getAccessTime());
rangerRequest.setForwardedAddresses(request.getForwardedAddresses());
rangerRequest.setRemoteIPAddress(request.getRemoteIPAddress());
rangerRequest.setResource(rangerResource);

return rangerRequest;

}

private static void setClassificationContextForRanger(Set<RangerTagForEval> rangerTagForEval, RangerAccessRequestImpl rangerRequest) {
Map<String, Object> contextOjb = rangerRequest.getContext();

if (contextOjb == null) {
Map<String, Object> contextOjb1 = new HashMap<String, Object>();
contextOjb1.put("CLASSIFICATIONS", rangerTagForEval);
rangerRequest.setContext(contextOjb1);
} else {
contextOjb.put("CLASSIFICATIONS", rangerTagForEval);
rangerRequest.setContext(contextOjb);
}
}

private void setClassificationsToRequestContext(Set<AtlasClassification> entityClassifications, RangerAccessRequestImpl rangerRequest) {
Map<String, Object> contextOjb = rangerRequest.getContext();
Expand Down Expand Up @@ -903,10 +951,12 @@ public RangerAtlasAuditHandler(AtlasEntityAccessRequest request, RangerServiceDe
rangerResource.setValue(RESOURCE_ENTITY_CLASSIFICATION, strClassifications);
rangerResource.setValue(RESOURCE_ENTITY_ID, request.getEntityId());

if (AtlasPrivilege.ENTITY_ADD_LABEL.equals(request.getAction()) || AtlasPrivilege.ENTITY_REMOVE_LABEL.equals(request.getAction())) {
if (AtlasPrivilege.ENTITY_ADD_LABEL.equals(request.getAction()) ||
AtlasPrivilege.ENTITY_REMOVE_LABEL.equals(request.getAction())) {
rangerResource.setValue(RESOURCE_ENTITY_LABEL, "label=" + request.getLabel());
} else if (AtlasPrivilege.ENTITY_UPDATE_BUSINESS_METADATA.equals(request.getAction())) {
rangerResource.setValue(RESOURCE_ENTITY_BUSINESS_METADATA, "business-metadata=" + request.getBusinessMetadata());
rangerResource.setValue(RESOURCE_ENTITY_BUSINESS_METADATA,
"business-metadata=" + request.getBusinessMetadata());
}

auditEvents = new HashMap<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@

public class RangerAtlasAuthorizerUtil {

public static Integer NUM_THREADS = 5;

static void toRangerRequest(AtlasEntityAccessRequest request, RangerAccessRequestImpl rangerRequest, RangerAccessResourceImpl rangerResource){

final String action = request.getAction() != null ? request.getAction().getType() : null;
Expand Down

0 comments on commit a8145ae

Please sign in to comment.