Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[stag] PLT-302 Optimize Persona ES alias #2451

Merged
merged 10 commits into from
Nov 10, 2023
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,6 @@ public RangerRoles loadRolesIfUpdated(long lastUpdatedTime) throws AtlasBaseExce

boolean isKeycloakUpdated = isKeycloakSubjectsStoreUpdated(lastUpdatedTime);
if (!isKeycloakUpdated) {
LOG.info("loadRolesIfUpdated: Skipping as no update found");
return null;
}

Expand Down Expand Up @@ -272,7 +271,6 @@ public RangerUserStore loadUserStoreIfUpdated(long lastUpdatedTime) throws Atlas

boolean isKeycloakUpdated = isKeycloakSubjectsStoreUpdated(lastUpdatedTime);
if (!isKeycloakUpdated) {
LOG.info("loadUserStoreIfUpdated: Skipping as no update found");
return null;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -188,9 +188,7 @@ public ServicePolicies getPolicies(String serviceName, String pluginId, Long las

RequestContext.get().endMetricRecord(recorderFilterPolicies);

if (LOG.isDebugEnabled()) {
LOG.debug("Found {} policies", servicePolicies.getPolicies().size());
}
LOG.info("Found {} policies", servicePolicies.getPolicies().size());
}

} catch (Exception e) {
Expand Down
4 changes: 3 additions & 1 deletion intg/src/main/java/org/apache/atlas/AtlasConfiguration.java
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,9 @@ public enum AtlasConfiguration {
INDEX_CLIENT_CONNECTION_TIMEOUT("atlas.index.client.connection.timeout.ms", 900000),
INDEX_CLIENT_SOCKET_TIMEOUT("atlas.index.client.socket.timeout.ms", 900000),
ENABLE_SEARCH_LOGGER("atlas.enable.search.logger", true),
SEARCH_LOGGER_MAX_THREADS("atlas.enable.search.logger.max.threads", 20);
SEARCH_LOGGER_MAX_THREADS("atlas.enable.search.logger.max.threads", 20),

PERSONA_POLICY_ASSET_MAX_LIMIT("atlas.persona.policy.asset.maxlimit", 1000);


private static final Configuration APPLICATION_PROPERTIES;
Expand Down
4 changes: 3 additions & 1 deletion intg/src/main/java/org/apache/atlas/AtlasErrorCode.java
Original file line number Diff line number Diff line change
Expand Up @@ -284,7 +284,9 @@ public enum AtlasErrorCode {
JSON_ERROR(400, "ATLAS-400-00-109", "Error occurred putting object into JSONObject: {0}"),
DISABLED_OPERATION(400, "ATLAS-400-00-110", "This operation is temporarily disabled as it is under maintenance."),
TASK_INVALID_PARAMETERS(400, "ATLAS-400-00-111", "Invalid parameters for task {0}"),
TASK_TYPE_NOT_SUPPORTED(400, "ATLAS-400-00-112", "Task type {0} is not supported");
TASK_TYPE_NOT_SUPPORTED(400, "ATLAS-400-00-112", "Task type {0} is not supported"),

PERSONA_POLICY_ASSETS_LIMIT_EXCEEDED(400, "ATLAS-400-00-113", "Exceeded limit of maximum allowed assets across policies for a Persona: Limit: {0}, assets: {1}");


private String errorCode;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,6 @@ public List<EntityAuditEventV2> listEventsV2(String entityId, EntityAuditEventV2

@Override
public EntityAuditSearchResult searchEvents(String queryString) throws AtlasBaseException {
LOG.info("Hitting ES query to fetch audits: {}, {}", System.currentTimeMillis(), queryString);
try {
String response = performSearchOnIndex(queryString);
return getResultFromResponse(response);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
*/
package org.apache.atlas.repository.store.aliasstore;

import org.apache.atlas.AtlasConfiguration;
import org.apache.atlas.AtlasErrorCode;
import org.apache.atlas.ESAliasRequestBuilder;
import org.apache.atlas.ESAliasRequestBuilder.AliasAction;
import org.apache.atlas.exception.AtlasBaseException;
Expand All @@ -38,6 +40,7 @@
import javax.inject.Inject;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
Expand Down Expand Up @@ -68,6 +71,8 @@ public class ESAliasStore implements IndexAliasStore {
private final AtlasGraph graph;
private final EntityGraphRetriever entityRetriever;

private final int assetsMaxLimit = AtlasConfiguration.PERSONA_POLICY_ASSET_MAX_LIMIT.getInt();

@Inject
public ESAliasStore(AtlasGraph graph,
EntityGraphRetriever entityRetriever) {
Expand Down Expand Up @@ -165,32 +170,47 @@ private Map<String, Object> getFilterForPurpose(AtlasEntity purpose) throws Atla

private void personaPolicyToESDslClauses(List<AtlasEntity> policies,
List<Map<String, Object>> allowClauseList) throws AtlasBaseException {
List<String> terms = new ArrayList<>();
for (AtlasEntity policy: policies) {

if (policy.getStatus() == null || AtlasEntity.Status.ACTIVE.equals(policy.getStatus())) {
List<String> assets = getPolicyAssets(policy);

if (getIsAllowPolicy(policy)) {
if (getPolicyActions(policy).contains(ACCESS_READ_PERSONA_METADATA)) {

if (terms.size() + assets.size() + 1 > assetsMaxLimit) {
nikhilbonte21 marked this conversation as resolved.
Show resolved Hide resolved
// For Metadata policies, along with assets we add 1 more clause for connection qualifiedName hence comparing with "assets.size() + 1"
throw new AtlasBaseException(AtlasErrorCode.PERSONA_POLICY_ASSETS_LIMIT_EXCEEDED, String.valueOf(assetsMaxLimit), String.valueOf(terms.size()));
}

String connectionQName = getPolicyConnectionQN(policy);
if (StringUtils.isEmpty(connectionQName)) {
connectionQName = getConnectionQualifiedNameFromPolicyAssets(entityRetriever, assets);
}

for (String asset : assets) {
addPersonaMetadataFilterClauses(asset, allowClauseList);
terms.add(asset);
allowClauseList.add(mapOf("wildcard", mapOf(QUALIFIED_NAME, asset + "/*")));
}

addPersonaMetadataFilterConnectionClause(connectionQName, allowClauseList);
terms.add(connectionQName);

} else if (getPolicyActions(policy).contains(ACCESS_READ_PERSONA_GLOSSARY)) {
if (terms.size() + assets.size() > assetsMaxLimit) {
throw new AtlasBaseException(AtlasErrorCode.PERSONA_POLICY_ASSETS_LIMIT_EXCEEDED, String.valueOf(assetsMaxLimit), String.valueOf(terms.size()));
}

for (String glossaryQName : assets) {
addPersonaGlossaryFilterClauses(glossaryQName, allowClauseList);
terms.add(glossaryQName);
allowClauseList.add(mapOf("wildcard", mapOf(QUALIFIED_NAME, "*@" + glossaryQName)));
}
}
}
}
}

allowClauseList.add(mapOf("terms", mapOf(QUALIFIED_NAME, terms)));
}

private Map<String, Object> esClausesToFilter(List<Map<String, Object>> allowClauseList) {
Expand All @@ -208,22 +228,6 @@ private String getAliasName(AtlasEntity entity) {
return getESAliasName(entity);
}

private void addPersonaMetadataFilterClauses(String asset, List<Map<String, Object>> clauseList) {
clauseList.add(mapOf("term", mapOf(QUALIFIED_NAME, asset)));
clauseList.add(mapOf("wildcard", mapOf(QUALIFIED_NAME, asset + "/*")));
clauseList.add(mapOf("wildcard", mapOf(QUALIFIED_NAME, asset)));
}

private void addPersonaGlossaryFilterClauses(String asset, List<Map<String, Object>> clauseList) {
clauseList.add(mapOf("term", mapOf(QUALIFIED_NAME, asset)));
clauseList.add(mapOf("wildcard", mapOf(QUALIFIED_NAME, asset + "/*")));
clauseList.add(mapOf("wildcard", mapOf(QUALIFIED_NAME, "*@" + asset)));
}

private void addPersonaMetadataFilterConnectionClause(String connection, List<Map<String, Object>> clauseList) {
clauseList.add(mapOf("term", mapOf(QUALIFIED_NAME, connection)));
}

private void addPurposeMetadataFilterClauses(List<String> tags, List<Map<String, Object>> clauseList) {
clauseList.add(mapOf("terms", mapOf(TRAIT_NAMES_PROPERTY_KEY, tags)));
clauseList.add(mapOf("terms", mapOf(PROPAGATED_TRAIT_NAMES_PROPERTY_KEY, tags)));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,9 +98,9 @@ public void run() {
if (CollectionUtils.isNotEmpty(tasks)) {
final CountDownLatch latch = new CountDownLatch(tasks.size());
submitAll(tasks, latch);
LOG.info("Submitted {} tasks to the queue", tasks.size());
waitForTasksToComplete(latch);
} else {
LOG.info("No tasks to queue, sleeping for {} ms", pollInterval);
redisService.releaseDistributedLock(ATLAS_TASK_LOCK);
}
Thread.sleep(pollInterval);
Expand Down Expand Up @@ -153,7 +153,6 @@ public void run() {
if (LOG.isDebugEnabled()) {
LOG.debug("TasksFetcher: Fetching tasks for queuing");
}
LOG.info("TasksFetcher: Fetching tasks for queuing");

this.tasks = registry.getTasksForReQueue();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,6 @@ private boolean isPolicyUpdated(String serviceName, long lastUpdatedTime) {
EntityAuditSearchResult result = auditRepository.searchEvents(parameters.getQueryString());

if (result == null || CollectionUtils.isEmpty(result.getEntityAudits())) {
LOG.info("getPoliciesIfUpdated: Skipping as no update found");
return false;
}
} catch (AtlasBaseException e) {
Expand Down