From e838fcf9fe3883be2934c1db72322594e3898016 Mon Sep 17 00:00:00 2001 From: Suman Das <59254445+sumandas0@users.noreply.github.com> Date: Mon, 8 Jan 2024 20:13:27 +0530 Subject: [PATCH] feat: refactor few functions and remove some unused func. deps --- .../atlas/plugin/util/KeycloakUserStore.java | 294 +++--------------- .../plugin/util/RangerUserStoreProvider.java | 2 - .../org/apache/atlas/AtlasConfiguration.java | 2 +- 3 files changed, 39 insertions(+), 259 deletions(-) diff --git a/auth-agents-common/src/main/java/org/apache/atlas/plugin/util/KeycloakUserStore.java b/auth-agents-common/src/main/java/org/apache/atlas/plugin/util/KeycloakUserStore.java index 51f1ddf849..2f9470c3fc 100644 --- a/auth-agents-common/src/main/java/org/apache/atlas/plugin/util/KeycloakUserStore.java +++ b/auth-agents-common/src/main/java/org/apache/atlas/plugin/util/KeycloakUserStore.java @@ -19,7 +19,7 @@ package org.apache.atlas.plugin.util; -import com.google.common.util.concurrent.ThreadFactoryBuilder; +import org.apache.atlas.AtlasConfiguration; import org.apache.atlas.RequestContext; import org.apache.atlas.auth.client.heracles.models.HeraclesRoleViewRepresentation; import org.apache.atlas.exception.AtlasBaseException; @@ -33,10 +33,6 @@ import org.slf4j.LoggerFactory; import java.util.*; -import java.util.concurrent.Callable; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import static org.apache.atlas.auth.client.keycloak.AtlasKeycloakClient.getKeycloakClient; @@ -73,13 +69,6 @@ public KeycloakUserStore(String serviceName) { } } - public static ExecutorService getExecutorService(String namePattern) { - ExecutorService service = Executors.newFixedThreadPool(NUM_THREADS, - new ThreadFactoryBuilder().setNameFormat(namePattern + Thread.currentThread().getName()) - .build()); - return service; - } - public boolean isKeycloakSubjectsStoreUpdated(long cacheLastUpdatedTime) throws AtlasBaseException { AtlasPerfMetrics.MetricRecorder metricRecorder = RequestContext.get().startMetricRecord("getKeycloakSubjectsStoreUpdatedTime"); cacheLastUpdatedTime = -1; @@ -162,59 +151,55 @@ public RangerRoles loadRolesIfUpdated(long lastUpdatedTime) throws AtlasBaseExce Map> roleUserMapping = new HashMap<>(); Set roleSet = new HashSet<>(); - int userSize = 100; + int userSize = AtlasConfiguration.HERACLES_CLIENT_PAGINATION_SIZE.getInt(); int userFrom = 0; - boolean userFound = true; + List userRetrievalResult; - List ret = new ArrayList<>(); do { - List users = getHeraclesClient().getUsersMappings(userFrom, userSize); + userRetrievalResult = getHeraclesClient().getUsersMappings(userFrom, userSize); - if (CollectionUtils.isEmpty(users)) { - userFound = false; - } else { - ret.addAll(users); - userFrom += userSize; - } + if (!CollectionUtils.isEmpty(userRetrievalResult)) { + userRetrievalResult.forEach(user -> { + Set userRoles = new HashSet<>(user.getRealmRoles()); - for (UserRepresentation user : users) { - Set userRoles = new HashSet<>(user.getRealmRoles()); + userRoles.forEach(role -> roleUserMapping + .computeIfAbsent(role, k -> new ArrayList<>()) + .add(new RangerRole.RoleMember(user.getUsername(), false)) + ); + }); - for (String role : userRoles) { - roleUserMapping.computeIfAbsent(role, k -> new ArrayList<>()) - .add(new RangerRole.RoleMember(user.getUsername(), false)); - } + userFrom += userSize; } - } while (userFound && ret.size() % userSize == 0); + } while (!CollectionUtils.isEmpty(userRetrievalResult) && userRetrievalResult.size() % userSize == 0); - int roleSize = 100; + int roleSize = AtlasConfiguration.HERACLES_CLIENT_PAGINATION_SIZE.getInt(); int roleFrom = 0; - boolean roleFound = true; + List roleRetrievalResult; - List retRole = new ArrayList<>(); do { + roleRetrievalResult = getHeraclesClient().getRolesMappings(roleFrom, roleSize); + + if (!CollectionUtils.isEmpty(roleRetrievalResult)) { + roleRetrievalResult.forEach(role -> { + RangerRole rangerRole = new RangerRole(); + rangerRole.setName(role.getName()); + rangerRole.setGroups(role.getGroups().stream() + .map(x -> new RangerRole.RoleMember(x, false)) + .collect(Collectors.toList())); + rangerRole.setUsers(roleUserMapping.get(role.getName())); + rangerRole.setRoles(role.getRoles().stream() + .map(x -> new RangerRole.RoleMember(x, false)) + .collect(Collectors.toList())); + + roleSet.add(rangerRole); + }); - List roles = getHeraclesClient().getRolesMappings(roleFrom, roleSize); - - if (CollectionUtils.isEmpty(roles)) { - roleFound = false; - } else { - retRole.addAll(roles); roleFrom += roleSize; } - for (HeraclesRoleViewRepresentation role : roles) { - RangerRole rangerRole = new RangerRole(); - rangerRole.setName(role.getName()); - rangerRole.setGroups(role.getGroups().stream().map(x -> new RangerRole.RoleMember(x, true)).collect(Collectors.toList())); - rangerRole.setUsers(roleUserMapping.get(role.getName())); - rangerRole.setRoles(role.getRoles().stream().map(x -> new RangerRole.RoleMember(x, false)).collect(Collectors.toList())); - - roleSet.add(rangerRole); - } + } while (!CollectionUtils.isEmpty(roleRetrievalResult) && roleRetrievalResult.size() % roleSize == 0); - } while (roleFound && retRole.size() % roleSize == 0); processDefaultRole(roleSet); LOG.info("Inverting roles"); @@ -321,6 +306,11 @@ private void processDefaultRole(Set roleSet) { public RangerUserStore loadUserStoreIfUpdated(long lastUpdatedTime) throws AtlasBaseException { AtlasPerfMetrics.MetricRecorder recorder = RequestContext.get().startMetricRecord("loadUserStoreIfUpdated"); + boolean isKeycloakUpdated = isKeycloakSubjectsStoreUpdated(lastUpdatedTime); + if (!isKeycloakUpdated) { + return null; + } + int userSize = 100; int userFrom = 0; boolean userFound = true; @@ -350,212 +340,4 @@ public RangerUserStore loadUserStoreIfUpdated(long lastUpdatedTime) throws Atlas return userStore; } - - private static RangerRole keycloakRoleToRangerRole(RoleRepresentation kRole) { - AtlasPerfMetrics.MetricRecorder recorder = RequestContext.get().startMetricRecord("keycloakRolesToRangerRoles"); - - RangerRole rangerRole = new RangerRole(); - rangerRole.setName(kRole.getName()); - rangerRole.setDescription(kRole.getDescription() + " " + kRole.getId()); - - RequestContext.get().endMetricRecord(recorder); - return rangerRole; - } - - private static List keycloakGroupsToRangerRoleMember(Set kGroups) { - AtlasPerfMetrics.MetricRecorder recorder = RequestContext.get().startMetricRecord("keycloakGroupsToRangerRoleMember"); - List rangerGroups = new ArrayList<>(); - - for (GroupRepresentation kGroup : kGroups) { - //TODO: Revisit isAdmin flag - rangerGroups.add(new RangerRole.RoleMember(kGroup.getName(), false)); - } - - RequestContext.get().endMetricRecord(recorder); - return rangerGroups; - } - - private static List keycloakUsersToRangerRoleMember(Set kUsers) { - AtlasPerfMetrics.MetricRecorder recorder = RequestContext.get().startMetricRecord("keycloakUsersToRangerRoleMember"); - List rangerUsers = new ArrayList<>(); - - for (UserRepresentation kUser : kUsers) { - //TODO: Revisit isAdmin flag - rangerUsers.add(new RangerRole.RoleMember(kUser.getUsername(), false)); - } - - RequestContext.get().endMetricRecord(recorder); - return rangerUsers; - } - - private static List keycloakRolesToRangerRoleMember(Set kRoles) { - AtlasPerfMetrics.MetricRecorder recorder = RequestContext.get().startMetricRecord("keycloakRolesToRangerRoleMember"); - List rangerRoles = new ArrayList<>(); - - for (RoleRepresentation kRole : kRoles) { - //TODO: Revisit isAdmin flag - rangerRoles.add(new RangerRole.RoleMember(kRole.getName(), false)); - } - - RequestContext.get().endMetricRecord(recorder); - return rangerRoles; - } - - protected static void submitCallablesAndWaitToFinish(String threadName, List> callables) throws AtlasBaseException { - ExecutorService service = getExecutorService(threadName + "-%d-"); - try { - - LOG.info("Submitting callables: {}", threadName); - callables.forEach(service::submit); - - service.shutdown(); - - boolean terminated = service.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS); - LOG.info("awaitTermination done: {}", threadName); - - if (!terminated) { - LOG.warn("Time out occurred while waiting to complete {}", threadName); - } - } catch (InterruptedException e) { - throw new AtlasBaseException(); - } - } - - static class RoleSubjectsFetcher implements Callable { - private Set roleSet; - private RoleRepresentation kRole; - List userNamesList; - - public RoleSubjectsFetcher(RoleRepresentation kRole, - Set roleSet, - List userNamesList) { - this.kRole = kRole; - this.roleSet = roleSet; - this.userNamesList = userNamesList; - } - - @Override - public RangerRole call() throws Exception { - AtlasPerfMetrics.MetricRecorder recorder = RequestContext.get().startMetricRecord("roleSubjectsFetcher"); - final RangerRole rangerRole = keycloakRoleToRangerRole(kRole); - - try { - //get all groups for Roles - Thread groupsFetcher = new Thread(() -> { - int start = 0; - int size = 500; - boolean found = true; - Set ret = new HashSet<>(); - - do { - try { - Set kGroups = getKeycloakClient().getRoleGroupMembers(kRole.getName(), start, size); - if (CollectionUtils.isNotEmpty(kGroups)) { - ret.addAll(kGroups); - start += size; - } else { - found = false; - } - } catch (Exception e) { - LOG.error("Failed to get group members with role", e); - throw new RuntimeException(e); - } - - } while (found && ret.size() % size == 0); - - rangerRole.setGroups(keycloakGroupsToRangerRoleMember(ret)); - }); - groupsFetcher.start(); - - //get all users for Roles - Thread usersFetcher = new Thread(() -> { - int start = 0; - int size = 100; - boolean found = true; - Set ret = new HashSet<>(); - - do { - try { - Set userRepresentations = getKeycloakClient().getRoleUserMembers(kRole.getName(), start, size); - if (CollectionUtils.isNotEmpty(userRepresentations)) { - ret.addAll(userRepresentations); - start += size; - } else { - found = false; - } - } catch (Exception e) { - LOG.error("Failed to get users for role {}", kRole.getName(), e); - throw new RuntimeException(e); - } - - } while (found && ret.size() % size == 0); - - rangerRole.setUsers(keycloakUsersToRangerRoleMember(ret)); - userNamesList.addAll(ret); - }); - usersFetcher.start(); - - //get all roles for Roles - Thread subRolesFetcher = new Thread(() -> { - Set kSubRoles = null; - try { - kSubRoles = getKeycloakClient().getRoleComposites(kRole.getName()); - rangerRole.setRoles(keycloakRolesToRangerRoleMember(kSubRoles)); - } catch (AtlasBaseException e) { - LOG.error("Failed to get composite for role {}", kRole.getName(), e); - throw new RuntimeException(e); - } - }); - subRolesFetcher.start(); - - try { - groupsFetcher.join(); - usersFetcher.join(); - subRolesFetcher.join(); - } catch (InterruptedException e) { - LOG.error("Failed to wait for threads to complete: {}", kRole.getName()); - e.printStackTrace(); - } - - RequestContext.get().endMetricRecord(recorder); - roleSet.add(rangerRole); - } catch (Exception e) { - LOG.error("RoleSubjectsFetcher: Failed to process role {}: {}", kRole.getName(), e.getMessage()); - } finally { - RequestContext.get().endMetricRecord(recorder); - } - - return rangerRole; - } - } - - static class UserGroupsFetcher implements Callable { - private Map> userGroupMapping; - private UserRepresentation kUser; - - public UserGroupsFetcher(UserRepresentation kUser, Map> userGroupMapping) { - this.kUser = kUser; - this.userGroupMapping = userGroupMapping; - } - - @Override - public Object call() throws Exception { - AtlasPerfMetrics.MetricRecorder recorder = RequestContext.get().startMetricRecord("userGroupsFetcher"); - - try { - List kGroups = getKeycloakClient().getGroupsForUserById(kUser.getId()); - userGroupMapping.put(kUser.getUsername(), - kGroups.stream() - .map(GroupRepresentation::getName) - .collect(Collectors.toSet())); - - } catch (Exception e) { - LOG.error("UserGroupsFetcher: Failed to process user {}: {}", kUser.getUsername(), e.getMessage()); - } finally { - RequestContext.get().endMetricRecord(recorder); - } - - return null; - } - } } diff --git a/auth-agents-common/src/main/java/org/apache/atlas/plugin/util/RangerUserStoreProvider.java b/auth-agents-common/src/main/java/org/apache/atlas/plugin/util/RangerUserStoreProvider.java index 89980504b0..3dd97bcee8 100644 --- a/auth-agents-common/src/main/java/org/apache/atlas/plugin/util/RangerUserStoreProvider.java +++ b/auth-agents-common/src/main/java/org/apache/atlas/plugin/util/RangerUserStoreProvider.java @@ -26,7 +26,6 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; -import org.apache.atlas.admin.client.RangerAdminClient; import org.apache.atlas.plugin.service.RangerBasePlugin; import java.io.File; @@ -34,7 +33,6 @@ import java.io.FileWriter; import java.io.Reader; import java.io.Writer; -import java.util.Date; import java.util.HashMap; import java.util.Set; diff --git a/intg/src/main/java/org/apache/atlas/AtlasConfiguration.java b/intg/src/main/java/org/apache/atlas/AtlasConfiguration.java index d10e4b13fc..7ae882b245 100644 --- a/intg/src/main/java/org/apache/atlas/AtlasConfiguration.java +++ b/intg/src/main/java/org/apache/atlas/AtlasConfiguration.java @@ -108,7 +108,7 @@ public enum AtlasConfiguration { PERSONA_POLICY_ASSET_MAX_LIMIT("atlas.persona.policy.asset.maxlimit", 1000), ENABLE_KEYCLOAK_TOKEN_INTROSPECTION("atlas.canary.keycloak.token-introspection", false), - KEYCLOAK_ADMIN_CLIENT_PAGINATION_SIZE("atlas.keycloak.admin.resource-pagination-size", 1500), + HERACLES_CLIENT_PAGINATION_SIZE("atlas.heracles.admin.resource-pagination-size", 100), HERACLES_API_SERVER_URL("atlas.heracles.api.server.url", "http://heracles-service.heracles.svc.cluster.local");