Skip to content

Commit

Permalink
Merge branch 'master' into atlassearch
Browse files Browse the repository at this point in the history
  • Loading branch information
n5nk committed Nov 23, 2023
2 parents aa86e1e + 76effda commit 7952594
Show file tree
Hide file tree
Showing 14 changed files with 128 additions and 41 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,6 @@ public RangerRoles loadRolesIfUpdated(long lastUpdatedTime) throws AtlasBaseExce

boolean isKeycloakUpdated = isKeycloakSubjectsStoreUpdated(lastUpdatedTime);
if (!isKeycloakUpdated) {
LOG.info("loadRolesIfUpdated: Skipping as no update found");
return null;
}

Expand Down Expand Up @@ -272,7 +271,6 @@ public RangerUserStore loadUserStoreIfUpdated(long lastUpdatedTime) throws Atlas

boolean isKeycloakUpdated = isKeycloakSubjectsStoreUpdated(lastUpdatedTime);
if (!isKeycloakUpdated) {
LOG.info("loadUserStoreIfUpdated: Skipping as no update found");
return null;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -188,9 +188,7 @@ public ServicePolicies getPolicies(String serviceName, String pluginId, Long las

RequestContext.get().endMetricRecord(recorderFilterPolicies);

if (LOG.isDebugEnabled()) {
LOG.debug("Found {} policies", servicePolicies.getPolicies().size());
}
LOG.info("Found {} policies", servicePolicies.getPolicies().size());
}

} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import org.codehaus.jettison.json.JSONException;
import org.codehaus.jettison.json.JSONObject;
import org.keycloak.representations.idm.*;
import org.keycloak.representations.oidc.TokenMetadataRepresentation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -174,6 +175,10 @@ public List<EventRepresentation> getEvents(List<String> type, String client, Str
return KEYCLOAK.getEvents(type, client, user, dateFrom, dateTo, ipAddress, first, max).body();
}

public TokenMetadataRepresentation introspectToken(String token) throws AtlasBaseException {
return KEYCLOAK.introspectToken(token).body();
}

public static AtlasKeycloakClient getKeycloakClient() throws AtlasBaseException {
if (Objects.isNull(KEYCLOAK_CLIENT)) {
LOG.info("Initializing Keycloak client..");
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
package org.apache.atlas.keycloak.client;

import okhttp3.FormBody;
import okhttp3.RequestBody;
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.keycloak.client.config.KeycloakConfig;
import org.keycloak.representations.idm.*;
import org.keycloak.representations.oidc.TokenMetadataRepresentation;
import retrofit2.Response;

import java.util.List;
Expand All @@ -13,6 +16,9 @@
*/
public final class KeycloakRestClient extends AbstractKeycloakClient {

private static final String TOKEN = "token";
private static final String CLIENT_ID = "client_id";
private static final String CLIENT_SECRET = "client_secret";
public KeycloakRestClient(final KeycloakConfig keycloakConfig) {
super(keycloakConfig);
}
Expand Down Expand Up @@ -121,4 +127,16 @@ public Response<List<EventRepresentation>> getEvents(List<String> type, String c
String dateTo, String ipAddress, Integer first, Integer max) throws AtlasBaseException {
return processResponse(this.retrofit.getEvents(this.keycloakConfig.getRealmId(), type, client, user, dateFrom, dateTo, ipAddress, first, max));
}

public Response<TokenMetadataRepresentation> introspectToken(String token) throws AtlasBaseException {
return processResponse(this.retrofit.introspectToken(this.keycloakConfig.getRealmId(), getIntrospectTokenRequest(token)));
}

private RequestBody getIntrospectTokenRequest(String token) {
return new FormBody.Builder()
.addEncoded(TOKEN, token)
.addEncoded(CLIENT_ID, this.keycloakConfig.getClientId())
.addEncoded(CLIENT_SECRET, this.keycloakConfig.getClientSecret())
.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import okhttp3.RequestBody;
import org.keycloak.representations.AccessTokenResponse;
import org.keycloak.representations.idm.*;
import org.keycloak.representations.oidc.TokenMetadataRepresentation;
import retrofit2.Call;
import retrofit2.http.*;

Expand Down Expand Up @@ -142,4 +143,8 @@ Call<List<EventRepresentation>> getEvents(@Path("realmId") String realmId, @Quer
@POST("realms/{realmId}/protocol/openid-connect/token")
Call<AccessTokenResponse> grantToken(@Path("realmId") String realmId, @Body RequestBody request);

@Headers({"Accept: application/json", "Content-Type: application/x-www-form-urlencoded", "Cache-Control: no-store", "Cache-Control: no-cache"})
@POST("realms/{realmId}/protocol/openid-connect/token/introspect")
Call<TokenMetadataRepresentation> introspectToken(@Path("realmId") String realmId, @Body RequestBody request);

}
5 changes: 4 additions & 1 deletion intg/src/main/java/org/apache/atlas/AtlasConfiguration.java
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,10 @@ public enum AtlasConfiguration {
INDEX_CLIENT_CONNECTION_TIMEOUT("atlas.index.client.connection.timeout.ms", 900000),
INDEX_CLIENT_SOCKET_TIMEOUT("atlas.index.client.socket.timeout.ms", 900000),
ENABLE_SEARCH_LOGGER("atlas.enable.search.logger", true),
SEARCH_LOGGER_MAX_THREADS("atlas.enable.search.logger.max.threads", 20);
SEARCH_LOGGER_MAX_THREADS("atlas.enable.search.logger.max.threads", 20),

PERSONA_POLICY_ASSET_MAX_LIMIT("atlas.persona.policy.asset.maxlimit", 1000),
ENABLE_KEYCLOAK_TOKEN_INTROSPECTION("atlas.canary.keycloak.token-introspection", false);


private static final Configuration APPLICATION_PROPERTIES;
Expand Down
4 changes: 3 additions & 1 deletion intg/src/main/java/org/apache/atlas/AtlasErrorCode.java
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,9 @@ public enum AtlasErrorCode {
JSON_ERROR(400, "ATLAS-400-00-109", "Error occurred putting object into JSONObject: {0}"),
DISABLED_OPERATION(400, "ATLAS-400-00-110", "This operation is temporarily disabled as it is under maintenance."),
TASK_INVALID_PARAMETERS(400, "ATLAS-400-00-111", "Invalid parameters for task {0}"),
TASK_TYPE_NOT_SUPPORTED(400, "ATLAS-400-00-112", "Task type {0} is not supported");
TASK_TYPE_NOT_SUPPORTED(400, "ATLAS-400-00-112", "Task type {0} is not supported"),

PERSONA_POLICY_ASSETS_LIMIT_EXCEEDED(400, "ATLAS-400-00-113", "Exceeded limit of maximum allowed assets across policies for a Persona: Limit: {0}, assets: {1}");


private String errorCode;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,6 @@ public List<EntityAuditEventV2> listEventsV2(String entityId, EntityAuditEventV2

@Override
public EntityAuditSearchResult searchEvents(String queryString) throws AtlasBaseException {
LOG.info("Hitting ES query to fetch audits: {}", queryString);
try {
String response = performSearchOnIndex(queryString);
return getResultFromResponse(response);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
*/
package org.apache.atlas.repository.store.aliasstore;

import org.apache.atlas.AtlasConfiguration;
import org.apache.atlas.AtlasErrorCode;
import org.apache.atlas.ESAliasRequestBuilder;
import org.apache.atlas.ESAliasRequestBuilder.AliasAction;
import org.apache.atlas.exception.AtlasBaseException;
Expand All @@ -38,6 +40,7 @@
import javax.inject.Inject;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
Expand Down Expand Up @@ -68,6 +71,8 @@ public class ESAliasStore implements IndexAliasStore {
private final AtlasGraph graph;
private final EntityGraphRetriever entityRetriever;

private final int assetsMaxLimit = AtlasConfiguration.PERSONA_POLICY_ASSET_MAX_LIMIT.getInt();

@Inject
public ESAliasStore(AtlasGraph graph,
EntityGraphRetriever entityRetriever) {
Expand Down Expand Up @@ -165,32 +170,46 @@ private Map<String, Object> getFilterForPurpose(AtlasEntity purpose) throws Atla

private void personaPolicyToESDslClauses(List<AtlasEntity> policies,
List<Map<String, Object>> allowClauseList) throws AtlasBaseException {
List<String> terms = new ArrayList<>();

for (AtlasEntity policy: policies) {

if (policy.getStatus() == null || AtlasEntity.Status.ACTIVE.equals(policy.getStatus())) {
List<String> assets = getPolicyAssets(policy);

if (getIsAllowPolicy(policy)) {
if (getPolicyActions(policy).contains(ACCESS_READ_PERSONA_METADATA)) {
String connectionQName = getPolicyConnectionQN(policy);
if (StringUtils.isEmpty(connectionQName)) {
connectionQName = getConnectionQualifiedNameFromPolicyAssets(entityRetriever, assets);
}
if (!getIsAllowPolicy(policy)) {
continue;
}

if (getPolicyActions(policy).contains(ACCESS_READ_PERSONA_METADATA)) {

String connectionQName = getPolicyConnectionQN(policy);
if (StringUtils.isEmpty(connectionQName)) {
connectionQName = getConnectionQualifiedNameFromPolicyAssets(entityRetriever, assets);
}

for (String asset : assets) {
terms.add(asset);
allowClauseList.add(mapOf("wildcard", mapOf(QUALIFIED_NAME, asset + "/*")));
}

for (String asset : assets) {
addPersonaMetadataFilterClauses(asset, allowClauseList);
}
terms.add(connectionQName);

addPersonaMetadataFilterConnectionClause(connectionQName, allowClauseList);
} else if (getPolicyActions(policy).contains(ACCESS_READ_PERSONA_GLOSSARY)) {

} else if (getPolicyActions(policy).contains(ACCESS_READ_PERSONA_GLOSSARY)) {
for (String glossaryQName : assets) {
addPersonaGlossaryFilterClauses(glossaryQName, allowClauseList);
}
for (String glossaryQName : assets) {
terms.add(glossaryQName);
allowClauseList.add(mapOf("wildcard", mapOf(QUALIFIED_NAME, "*@" + glossaryQName)));
}
}
}

if (terms.size() > assetsMaxLimit) {
throw new AtlasBaseException(AtlasErrorCode.PERSONA_POLICY_ASSETS_LIMIT_EXCEEDED, String.valueOf(assetsMaxLimit), String.valueOf(terms.size()));
}
}

allowClauseList.add(mapOf("terms", mapOf(QUALIFIED_NAME, terms)));
}

private Map<String, Object> esClausesToFilter(List<Map<String, Object>> allowClauseList) {
Expand All @@ -208,22 +227,6 @@ private String getAliasName(AtlasEntity entity) {
return getESAliasName(entity);
}

private void addPersonaMetadataFilterClauses(String asset, List<Map<String, Object>> clauseList) {
clauseList.add(mapOf("term", mapOf(QUALIFIED_NAME, asset)));
clauseList.add(mapOf("wildcard", mapOf(QUALIFIED_NAME, asset + "/*")));
clauseList.add(mapOf("wildcard", mapOf(QUALIFIED_NAME, asset)));
}

private void addPersonaGlossaryFilterClauses(String asset, List<Map<String, Object>> clauseList) {
clauseList.add(mapOf("term", mapOf(QUALIFIED_NAME, asset)));
clauseList.add(mapOf("wildcard", mapOf(QUALIFIED_NAME, asset + "/*")));
clauseList.add(mapOf("wildcard", mapOf(QUALIFIED_NAME, "*@" + asset)));
}

private void addPersonaMetadataFilterConnectionClause(String connection, List<Map<String, Object>> clauseList) {
clauseList.add(mapOf("term", mapOf(QUALIFIED_NAME, connection)));
}

private void addPurposeMetadataFilterClauses(List<String> tags, List<Map<String, Object>> clauseList) {
clauseList.add(mapOf("terms", mapOf(TRAIT_NAMES_PROPERTY_KEY, tags)));
clauseList.add(mapOf("terms", mapOf(PROPAGATED_TRAIT_NAMES_PROPERTY_KEY, tags)));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,9 +98,9 @@ public void run() {
if (CollectionUtils.isNotEmpty(tasks)) {
final CountDownLatch latch = new CountDownLatch(tasks.size());
submitAll(tasks, latch);
LOG.info("Submitted {} tasks to the queue", tasks.size());
waitForTasksToComplete(latch);
} else {
LOG.info("No tasks to queue, sleeping for {} ms", pollInterval);
redisService.releaseDistributedLock(ATLAS_TASK_LOCK);
}
Thread.sleep(pollInterval);
Expand Down Expand Up @@ -153,7 +153,6 @@ public void run() {
if (LOG.isDebugEnabled()) {
LOG.debug("TasksFetcher: Fetching tasks for queuing");
}
LOG.info("TasksFetcher: Fetching tasks for queuing");

this.tasks = registry.getTasksForReQueue();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,6 @@ private boolean isPolicyUpdated(String serviceName, long lastUpdatedTime) {
EntityAuditSearchResult result = auditRepository.searchEvents(parameters.getQueryString());

if (result == null || CollectionUtils.isEmpty(result.getEntityAudits())) {
LOG.info("getPoliciesIfUpdated: Skipping as no update found");
return false;
}
} catch (AtlasBaseException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,8 @@ public Authentication authenticate(Authentication authentication)
} else if (keycloakAuthenticationEnabled) {
try {
authentication = atlasKeycloakAuthenticationProvider.authenticate(authentication);
} catch (KeycloakAuthenticationException ex) {
throw new AtlasAuthenticationException("Authentication failed.");
} catch (Exception ex) {
LOG.error("Error while Keycloak authentication", ex);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,17 @@
*/
package org.apache.atlas.web.security;

import io.micrometer.core.instrument.Counter;
import org.apache.atlas.AtlasConfiguration;
import org.apache.atlas.keycloak.client.AtlasKeycloakClient;
import org.apache.atlas.service.metrics.MetricUtils;
import org.apache.atlas.ApplicationProperties;
import org.apache.commons.configuration.Configuration;
import org.keycloak.adapters.springsecurity.authentication.KeycloakAuthenticationProvider;
import org.keycloak.adapters.springsecurity.token.KeycloakAuthenticationToken;
import org.keycloak.representations.oidc.TokenMetadataRepresentation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.security.core.Authentication;
import org.springframework.security.core.GrantedAuthority;
import org.springframework.security.core.authority.SimpleGrantedAuthority;
Expand All @@ -28,18 +35,25 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Objects;

@Component
public class AtlasKeycloakAuthenticationProvider extends AtlasAbstractAuthenticationProvider {
private final boolean groupsFromUGI;
private final String groupsClaim;
private final boolean isTokenIntrospectionEnabled;

private final KeycloakAuthenticationProvider keycloakAuthenticationProvider;
private final AtlasKeycloakClient atlasKeycloakClient;
private static final Logger LOG = LoggerFactory.getLogger(AtlasKeycloakAuthenticationProvider.class);

public AtlasKeycloakAuthenticationProvider() throws Exception {
this.keycloakAuthenticationProvider = new KeycloakAuthenticationProvider();
this.atlasKeycloakClient = AtlasKeycloakClient.getKeycloakClient();

Configuration configuration = ApplicationProperties.get();

this.isTokenIntrospectionEnabled = AtlasConfiguration.ENABLE_KEYCLOAK_TOKEN_INTROSPECTION.getBoolean();
this.groupsFromUGI = configuration.getBoolean("atlas.authentication.method.keycloak.ugi-groups", true);
this.groupsClaim = configuration.getString("atlas.authentication.method.keycloak.groups_claim");
}
Expand All @@ -66,11 +80,40 @@ public Authentication authenticate(Authentication authentication) {
}
}

if (authentication.getName().startsWith("service-account-apikey")) {
// Increment the counter when the authentication is for a service account.
Counter.builder("service_account_apikey_request_counter").register(MetricUtils.getMeterRegistry()).increment();

// Validate the token online with keycloak server if token introspection is enabled
LOG.info("isTokenIntrospectionEnabled: {}", isTokenIntrospectionEnabled);
if (isTokenIntrospectionEnabled) {
LOG.info("Validating request for clientId: {}", authentication.getName().substring("service-account-".length()));
try {
KeycloakAuthenticationToken keycloakToken = (KeycloakAuthenticationToken) authentication;
String bearerToken = keycloakToken.getAccount().getKeycloakSecurityContext().getTokenString();
TokenMetadataRepresentation introspectToken = atlasKeycloakClient.introspectToken(bearerToken);
if (Objects.nonNull(introspectToken) && introspectToken.isActive()) {
authentication.setAuthenticated(true);
} else {
handleInvalidApiKey(authentication);
}
} catch (Exception e) {
throw new KeycloakAuthenticationException("Keycloak Authentication failed", e.getCause());
}
}
}

return authentication;
}

@Override
public boolean supports(Class<?> aClass) {
return keycloakAuthenticationProvider.supports(aClass);
}

private void handleInvalidApiKey(Authentication authentication) {
authentication.setAuthenticated(false);
LOG.error("Invalid clientId: {}", authentication.getName().substring("service-account-".length()));
throw new KeycloakAuthenticationException("Invalid ClientId");
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package org.apache.atlas.web.security;

import org.springframework.security.core.AuthenticationException;

public class KeycloakAuthenticationException extends AuthenticationException {
public KeycloakAuthenticationException(String msg, Throwable cause) {
super(msg, cause);
}

public KeycloakAuthenticationException(String msg) {
super(msg);
}
}

0 comments on commit 7952594

Please sign in to comment.