Skip to content

Commit

Permalink
Merge pull request #2734 from atlanhq/plt-524-auth-api
Browse files Browse the repository at this point in the history
feat: refactor few functions and remove some unused func, deps
  • Loading branch information
sumandas0 authored Jan 8, 2024
2 parents 6459f25 + e838fcf commit 83ce0e2
Show file tree
Hide file tree
Showing 3 changed files with 39 additions and 259 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

package org.apache.atlas.plugin.util;

import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.atlas.AtlasConfiguration;
import org.apache.atlas.RequestContext;
import org.apache.atlas.auth.client.heracles.models.HeraclesRoleViewRepresentation;
import org.apache.atlas.exception.AtlasBaseException;
Expand All @@ -33,10 +33,6 @@
import org.slf4j.LoggerFactory;

import java.util.*;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

import static org.apache.atlas.auth.client.keycloak.AtlasKeycloakClient.getKeycloakClient;
Expand Down Expand Up @@ -73,13 +69,6 @@ public KeycloakUserStore(String serviceName) {
}
}

public static ExecutorService getExecutorService(String namePattern) {
ExecutorService service = Executors.newFixedThreadPool(NUM_THREADS,
new ThreadFactoryBuilder().setNameFormat(namePattern + Thread.currentThread().getName())
.build());
return service;
}

public boolean isKeycloakSubjectsStoreUpdated(long cacheLastUpdatedTime) throws AtlasBaseException {
AtlasPerfMetrics.MetricRecorder metricRecorder = RequestContext.get().startMetricRecord("getKeycloakSubjectsStoreUpdatedTime");
cacheLastUpdatedTime = -1;
Expand Down Expand Up @@ -162,59 +151,55 @@ public RangerRoles loadRolesIfUpdated(long lastUpdatedTime) throws AtlasBaseExce
Map<String, List<RangerRole.RoleMember>> roleUserMapping = new HashMap<>();
Set<RangerRole> roleSet = new HashSet<>();

int userSize = 100;
int userSize = AtlasConfiguration.HERACLES_CLIENT_PAGINATION_SIZE.getInt();
int userFrom = 0;
boolean userFound = true;
List<UserRepresentation> userRetrievalResult;

List<UserRepresentation> ret = new ArrayList<>();
do {
List<UserRepresentation> users = getHeraclesClient().getUsersMappings(userFrom, userSize);
userRetrievalResult = getHeraclesClient().getUsersMappings(userFrom, userSize);

if (CollectionUtils.isEmpty(users)) {
userFound = false;
} else {
ret.addAll(users);
userFrom += userSize;
}
if (!CollectionUtils.isEmpty(userRetrievalResult)) {
userRetrievalResult.forEach(user -> {
Set<String> userRoles = new HashSet<>(user.getRealmRoles());

for (UserRepresentation user : users) {
Set<String> userRoles = new HashSet<>(user.getRealmRoles());
userRoles.forEach(role -> roleUserMapping
.computeIfAbsent(role, k -> new ArrayList<>())
.add(new RangerRole.RoleMember(user.getUsername(), false))
);
});

for (String role : userRoles) {
roleUserMapping.computeIfAbsent(role, k -> new ArrayList<>())
.add(new RangerRole.RoleMember(user.getUsername(), false));
}
userFrom += userSize;
}

} while (userFound && ret.size() % userSize == 0);
} while (!CollectionUtils.isEmpty(userRetrievalResult) && userRetrievalResult.size() % userSize == 0);

int roleSize = 100;
int roleSize = AtlasConfiguration.HERACLES_CLIENT_PAGINATION_SIZE.getInt();
int roleFrom = 0;
boolean roleFound = true;
List<HeraclesRoleViewRepresentation> roleRetrievalResult;

List<HeraclesRoleViewRepresentation> retRole = new ArrayList<>();
do {
roleRetrievalResult = getHeraclesClient().getRolesMappings(roleFrom, roleSize);

if (!CollectionUtils.isEmpty(roleRetrievalResult)) {
roleRetrievalResult.forEach(role -> {
RangerRole rangerRole = new RangerRole();
rangerRole.setName(role.getName());
rangerRole.setGroups(role.getGroups().stream()
.map(x -> new RangerRole.RoleMember(x, false))
.collect(Collectors.toList()));
rangerRole.setUsers(roleUserMapping.get(role.getName()));
rangerRole.setRoles(role.getRoles().stream()
.map(x -> new RangerRole.RoleMember(x, false))
.collect(Collectors.toList()));

roleSet.add(rangerRole);
});

List<HeraclesRoleViewRepresentation> roles = getHeraclesClient().getRolesMappings(roleFrom, roleSize);

if (CollectionUtils.isEmpty(roles)) {
roleFound = false;
} else {
retRole.addAll(roles);
roleFrom += roleSize;
}

for (HeraclesRoleViewRepresentation role : roles) {
RangerRole rangerRole = new RangerRole();
rangerRole.setName(role.getName());
rangerRole.setGroups(role.getGroups().stream().map(x -> new RangerRole.RoleMember(x, true)).collect(Collectors.toList()));
rangerRole.setUsers(roleUserMapping.get(role.getName()));
rangerRole.setRoles(role.getRoles().stream().map(x -> new RangerRole.RoleMember(x, false)).collect(Collectors.toList()));

roleSet.add(rangerRole);
}
} while (!CollectionUtils.isEmpty(roleRetrievalResult) && roleRetrievalResult.size() % roleSize == 0);

} while (roleFound && retRole.size() % roleSize == 0);

processDefaultRole(roleSet);
LOG.info("Inverting roles");
Expand Down Expand Up @@ -321,6 +306,11 @@ private void processDefaultRole(Set<RangerRole> roleSet) {

public RangerUserStore loadUserStoreIfUpdated(long lastUpdatedTime) throws AtlasBaseException {
AtlasPerfMetrics.MetricRecorder recorder = RequestContext.get().startMetricRecord("loadUserStoreIfUpdated");
boolean isKeycloakUpdated = isKeycloakSubjectsStoreUpdated(lastUpdatedTime);
if (!isKeycloakUpdated) {
return null;
}

int userSize = 100;
int userFrom = 0;
boolean userFound = true;
Expand Down Expand Up @@ -350,212 +340,4 @@ public RangerUserStore loadUserStoreIfUpdated(long lastUpdatedTime) throws Atlas

return userStore;
}

private static RangerRole keycloakRoleToRangerRole(RoleRepresentation kRole) {
AtlasPerfMetrics.MetricRecorder recorder = RequestContext.get().startMetricRecord("keycloakRolesToRangerRoles");

RangerRole rangerRole = new RangerRole();
rangerRole.setName(kRole.getName());
rangerRole.setDescription(kRole.getDescription() + " " + kRole.getId());

RequestContext.get().endMetricRecord(recorder);
return rangerRole;
}

private static List<RangerRole.RoleMember> keycloakGroupsToRangerRoleMember(Set<GroupRepresentation> kGroups) {
AtlasPerfMetrics.MetricRecorder recorder = RequestContext.get().startMetricRecord("keycloakGroupsToRangerRoleMember");
List<RangerRole.RoleMember> rangerGroups = new ArrayList<>();

for (GroupRepresentation kGroup : kGroups) {
//TODO: Revisit isAdmin flag
rangerGroups.add(new RangerRole.RoleMember(kGroup.getName(), false));
}

RequestContext.get().endMetricRecord(recorder);
return rangerGroups;
}

private static List<RangerRole.RoleMember> keycloakUsersToRangerRoleMember(Set<UserRepresentation> kUsers) {
AtlasPerfMetrics.MetricRecorder recorder = RequestContext.get().startMetricRecord("keycloakUsersToRangerRoleMember");
List<RangerRole.RoleMember> rangerUsers = new ArrayList<>();

for (UserRepresentation kUser : kUsers) {
//TODO: Revisit isAdmin flag
rangerUsers.add(new RangerRole.RoleMember(kUser.getUsername(), false));
}

RequestContext.get().endMetricRecord(recorder);
return rangerUsers;
}

private static List<RangerRole.RoleMember> keycloakRolesToRangerRoleMember(Set<RoleRepresentation> kRoles) {
AtlasPerfMetrics.MetricRecorder recorder = RequestContext.get().startMetricRecord("keycloakRolesToRangerRoleMember");
List<RangerRole.RoleMember> rangerRoles = new ArrayList<>();

for (RoleRepresentation kRole : kRoles) {
//TODO: Revisit isAdmin flag
rangerRoles.add(new RangerRole.RoleMember(kRole.getName(), false));
}

RequestContext.get().endMetricRecord(recorder);
return rangerRoles;
}

protected static <T> void submitCallablesAndWaitToFinish(String threadName, List<Callable<T>> callables) throws AtlasBaseException {
ExecutorService service = getExecutorService(threadName + "-%d-");
try {

LOG.info("Submitting callables: {}", threadName);
callables.forEach(service::submit);

service.shutdown();

boolean terminated = service.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);
LOG.info("awaitTermination done: {}", threadName);

if (!terminated) {
LOG.warn("Time out occurred while waiting to complete {}", threadName);
}
} catch (InterruptedException e) {
throw new AtlasBaseException();
}
}

static class RoleSubjectsFetcher implements Callable<RangerRole> {
private Set<RangerRole> roleSet;
private RoleRepresentation kRole;
List<UserRepresentation> userNamesList;

public RoleSubjectsFetcher(RoleRepresentation kRole,
Set<RangerRole> roleSet,
List<UserRepresentation> userNamesList) {
this.kRole = kRole;
this.roleSet = roleSet;
this.userNamesList = userNamesList;
}

@Override
public RangerRole call() throws Exception {
AtlasPerfMetrics.MetricRecorder recorder = RequestContext.get().startMetricRecord("roleSubjectsFetcher");
final RangerRole rangerRole = keycloakRoleToRangerRole(kRole);

try {
//get all groups for Roles
Thread groupsFetcher = new Thread(() -> {
int start = 0;
int size = 500;
boolean found = true;
Set<GroupRepresentation> ret = new HashSet<>();

do {
try {
Set<GroupRepresentation> kGroups = getKeycloakClient().getRoleGroupMembers(kRole.getName(), start, size);
if (CollectionUtils.isNotEmpty(kGroups)) {
ret.addAll(kGroups);
start += size;
} else {
found = false;
}
} catch (Exception e) {
LOG.error("Failed to get group members with role", e);
throw new RuntimeException(e);
}

} while (found && ret.size() % size == 0);

rangerRole.setGroups(keycloakGroupsToRangerRoleMember(ret));
});
groupsFetcher.start();

//get all users for Roles
Thread usersFetcher = new Thread(() -> {
int start = 0;
int size = 100;
boolean found = true;
Set<UserRepresentation> ret = new HashSet<>();

do {
try {
Set<UserRepresentation> userRepresentations = getKeycloakClient().getRoleUserMembers(kRole.getName(), start, size);
if (CollectionUtils.isNotEmpty(userRepresentations)) {
ret.addAll(userRepresentations);
start += size;
} else {
found = false;
}
} catch (Exception e) {
LOG.error("Failed to get users for role {}", kRole.getName(), e);
throw new RuntimeException(e);
}

} while (found && ret.size() % size == 0);

rangerRole.setUsers(keycloakUsersToRangerRoleMember(ret));
userNamesList.addAll(ret);
});
usersFetcher.start();

//get all roles for Roles
Thread subRolesFetcher = new Thread(() -> {
Set<RoleRepresentation> kSubRoles = null;
try {
kSubRoles = getKeycloakClient().getRoleComposites(kRole.getName());
rangerRole.setRoles(keycloakRolesToRangerRoleMember(kSubRoles));
} catch (AtlasBaseException e) {
LOG.error("Failed to get composite for role {}", kRole.getName(), e);
throw new RuntimeException(e);
}
});
subRolesFetcher.start();

try {
groupsFetcher.join();
usersFetcher.join();
subRolesFetcher.join();
} catch (InterruptedException e) {
LOG.error("Failed to wait for threads to complete: {}", kRole.getName());
e.printStackTrace();
}

RequestContext.get().endMetricRecord(recorder);
roleSet.add(rangerRole);
} catch (Exception e) {
LOG.error("RoleSubjectsFetcher: Failed to process role {}: {}", kRole.getName(), e.getMessage());
} finally {
RequestContext.get().endMetricRecord(recorder);
}

return rangerRole;
}
}

static class UserGroupsFetcher implements Callable {
private Map<String, Set<String>> userGroupMapping;
private UserRepresentation kUser;

public UserGroupsFetcher(UserRepresentation kUser, Map<String, Set<String>> userGroupMapping) {
this.kUser = kUser;
this.userGroupMapping = userGroupMapping;
}

@Override
public Object call() throws Exception {
AtlasPerfMetrics.MetricRecorder recorder = RequestContext.get().startMetricRecord("userGroupsFetcher");

try {
List<GroupRepresentation> kGroups = getKeycloakClient().getGroupsForUserById(kUser.getId());
userGroupMapping.put(kUser.getUsername(),
kGroups.stream()
.map(GroupRepresentation::getName)
.collect(Collectors.toSet()));

} catch (Exception e) {
LOG.error("UserGroupsFetcher: Failed to process user {}: {}", kUser.getUsername(), e.getMessage());
} finally {
RequestContext.get().endMetricRecord(recorder);
}

return null;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,13 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.atlas.admin.client.RangerAdminClient;
import org.apache.atlas.plugin.service.RangerBasePlugin;

import java.io.File;
import java.io.FileReader;
import java.io.FileWriter;
import java.io.Reader;
import java.io.Writer;
import java.util.Date;
import java.util.HashMap;
import java.util.Set;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ public enum AtlasConfiguration {

PERSONA_POLICY_ASSET_MAX_LIMIT("atlas.persona.policy.asset.maxlimit", 1000),
ENABLE_KEYCLOAK_TOKEN_INTROSPECTION("atlas.canary.keycloak.token-introspection", false),
KEYCLOAK_ADMIN_CLIENT_PAGINATION_SIZE("atlas.keycloak.admin.resource-pagination-size", 1500),
HERACLES_CLIENT_PAGINATION_SIZE("atlas.heracles.admin.resource-pagination-size", 100),
HERACLES_API_SERVER_URL("atlas.heracles.api.server.url", "http://heracles-service.heracles.svc.cluster.local");


Expand Down

0 comments on commit 83ce0e2

Please sign in to comment.