Skip to content

Commit

Permalink
retrieve polocies in async
Browse files Browse the repository at this point in the history
  • Loading branch information
aarshi0301 committed Feb 1, 2024
1 parent 5c86264 commit 5a71e26
Showing 1 changed file with 114 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ public ServicePolicies getPolicies(String serviceName, String pluginId, Long las
AtlasEntityHeader abacService = getServiceEntity(abacServiceName);

if (abacService != null) {
allPolicies.addAll(getServicePolicies(abacService));
allPolicies.addAll(getServicePolicies(abacService,0));
ServicePolicies.AbacPolicies abacPolicies = new ServicePolicies.AbacPolicies();
abacPolicies.setServiceName(abacServiceName);
abacPolicies.setPolicyUpdateTime(new Date());
Expand Down Expand Up @@ -441,10 +441,80 @@ private List<RangerValiditySchedule> getPolicyValiditySchedule(AtlasEntityHeader
return ret;
}

private List<AtlasEntityHeader> getAtlasPolicies(String serviceName) throws AtlasBaseException {
AtlasPerfMetrics.MetricRecorder recorder = RequestContext.get().startMetricRecord("CachePolicyTransformerImpl."+serviceName+".getAtlasPolicies");

List<AtlasEntityHeader> ret = new ArrayList<>();
// private List<AtlasEntityHeader> getAtlasPolicies(String serviceName) throws AtlasBaseException {
// AtlasPerfMetrics.MetricRecorder recorder = RequestContext.get().startMetricRecord("CachePolicyTransformerImpl."+serviceName+".getAtlasPolicies");
//
// List<AtlasEntityHeader> ret = new ArrayList<>();
// try {
// IndexSearchParams indexSearchParams = new IndexSearchParams();
// Set<String> attributes = new HashSet<>();
// attributes.add(NAME);
// attributes.add(ATTR_POLICY_CATEGORY);
// attributes.add(ATTR_POLICY_FILTER_CRITERIA);
// attributes.add(ATTR_POLICY_SUB_CATEGORY);
// attributes.add(ATTR_POLICY_TYPE);
// attributes.add(ATTR_POLICY_SERVICE_NAME);
// attributes.add(ATTR_POLICY_USERS);
// attributes.add(ATTR_POLICY_GROUPS);
// attributes.add(ATTR_POLICY_ROLES);
// attributes.add(ATTR_POLICY_ACTIONS);
// attributes.add(ATTR_POLICY_RESOURCES);
// attributes.add(ATTR_POLICY_RESOURCES_CATEGORY);
// attributes.add(ATTR_POLICY_MASK_TYPE);
// attributes.add(ATTR_POLICY_PRIORITY);
// attributes.add(ATTR_POLICY_VALIDITY);
// attributes.add(ATTR_POLICY_CONDITIONS);
// attributes.add(ATTR_POLICY_IS_ENABLED);
// attributes.add(ATTR_POLICY_CONNECTION_QN);
//
// Map<String, Object> dsl = getMap("size", 0);
//
// List<Map<String, Object>> mustClauseList = new ArrayList<>();
// mustClauseList.add(getMap("term", getMap(ATTR_POLICY_SERVICE_NAME, serviceName)));
// mustClauseList.add(getMap("term", getMap(ATTR_POLICY_IS_ENABLED, true)));
// mustClauseList.add(getMap("match", getMap("__state", Id.EntityState.ACTIVE)));
//
// dsl.put("query", getMap("bool", getMap("must", mustClauseList)));
//
// List<Map> sortList = new ArrayList<>(0);
// sortList.add(getMap("__timestamp", getMap("order", "asc")));
// sortList.add(getMap("__guid", getMap("order", "asc")));
// dsl.put("sort", sortList);
//
// indexSearchParams.setDsl(dsl);
// indexSearchParams.setAttributes(attributes);
//
// int from = 0;
// int size = 250;
// boolean found = true;
//
// do {
// dsl.put("from", from);
// dsl.put("size", size);
// indexSearchParams.setDsl(dsl);
//
// List<AtlasEntityHeader> headers = discoveryService.directIndexSearch(indexSearchParams).getEntities();
// if (headers != null) {
// ret.addAll(headers);
// } else {
// found = false;
// }
//
// from += size;
//
// } while (found && ret.size() % size == 0);
//
// } finally {
// RequestContext.get().endMetricRecord(recorder);
// }
//
// return ret;
// }

private List<AtlasEntityHeader> getAtlasPolicies(String serviceName, int batchSize) throws AtlasBaseException {
AtlasPerfMetrics.MetricRecorder recorder = RequestContext.get().startMetricRecord("CachePolicyTransformerImpl." + serviceName + ".getAtlasPolicies");

AtomicReference<CompletableFuture<List<AtlasEntityHeader>>> futureResultRef = new AtomicReference<>(CompletableFuture.completedFuture(new ArrayList<>()));
try {
IndexSearchParams indexSearchParams = new IndexSearchParams();
Set<String> attributes = new HashSet<>();
Expand Down Expand Up @@ -484,33 +554,53 @@ private List<AtlasEntityHeader> getAtlasPolicies(String serviceName) throws Atla
indexSearchParams.setDsl(dsl);
indexSearchParams.setAttributes(attributes);

int from = 0;
int size = 250;
boolean found = true;

do {
dsl.put("from", from);
dsl.put("size", size);
indexSearchParams.setDsl(dsl);

List<AtlasEntityHeader> headers = discoveryService.directIndexSearch(indexSearchParams).getEntities();
if (headers != null) {
ret.addAll(headers);
} else {
found = false;
final int size = batchSize > 0 ? batchSize : 100;

CompletableFuture<Void> loopFuture = CompletableFuture.completedFuture(null);

loopFuture = loopFuture.thenCompose(v -> CompletableFuture.runAsync(() -> {
boolean found = true;
int currentFrom = 0;
while (found) {
final int finalFrom = currentFrom; // Capture the current 'from' value
Map<String, Object> currentDsl = new HashMap<>(dsl); // Clone the dsl for the current iteration
currentDsl.put("from", finalFrom);
currentDsl.put("size", size);
indexSearchParams.setDsl(currentDsl);

CompletableFuture<List<AtlasEntityHeader>> headersFuture = CompletableFuture.supplyAsync(() -> {
try {
return discoveryService.directIndexSearch(indexSearchParams).getEntities();
} catch (AtlasBaseException e) {
throw new RuntimeException(e);
}
});

futureResultRef.set(futureResultRef.get().thenCombine(headersFuture, (ret, headers) -> {
if (headers != null) {
ret.addAll(headers);
}
return ret;
}));

currentFrom += size;
found = !headersFuture.join().isEmpty() && futureResultRef.get().join().size() % size == 0;
}
}));

from += size;

} while (found && ret.size() % size == 0);

loopFuture.join(); // Wait for the loop to complete
} finally {
RequestContext.get().endMetricRecord(recorder);
}

return ret;
try {
return futureResultRef.get().get(); // Wait for the result to be ready and return it
} catch (InterruptedException | ExecutionException e) {
throw new AtlasBaseException(e);
}
}


private AtlasEntityHeader getServiceEntity(String serviceName) throws AtlasBaseException {
IndexSearchParams indexSearchParams = new IndexSearchParams();
Set<String> attributes = new HashSet<>();
Expand Down

0 comments on commit 5a71e26

Please sign in to comment.