Skip to content

Commit

Permalink
Merge pull request #2451 from atlanhq/PLT-302-staging
Browse files Browse the repository at this point in the history
[stag] PLT-302 Optimize Persona ES alias
  • Loading branch information
nikhilbonte21 authored Nov 10, 2023
2 parents 0aa152a + a22e731 commit e97e0a5
Show file tree
Hide file tree
Showing 8 changed files with 31 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,6 @@ public RangerRoles loadRolesIfUpdated(long lastUpdatedTime) throws AtlasBaseExce

boolean isKeycloakUpdated = isKeycloakSubjectsStoreUpdated(lastUpdatedTime);
if (!isKeycloakUpdated) {
LOG.info("loadRolesIfUpdated: Skipping as no update found");
return null;
}

Expand Down Expand Up @@ -272,7 +271,6 @@ public RangerUserStore loadUserStoreIfUpdated(long lastUpdatedTime) throws Atlas

boolean isKeycloakUpdated = isKeycloakSubjectsStoreUpdated(lastUpdatedTime);
if (!isKeycloakUpdated) {
LOG.info("loadUserStoreIfUpdated: Skipping as no update found");
return null;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -188,9 +188,7 @@ public ServicePolicies getPolicies(String serviceName, String pluginId, Long las

RequestContext.get().endMetricRecord(recorderFilterPolicies);

if (LOG.isDebugEnabled()) {
LOG.debug("Found {} policies", servicePolicies.getPolicies().size());
}
LOG.info("Found {} policies", servicePolicies.getPolicies().size());
}

} catch (Exception e) {
Expand Down
4 changes: 3 additions & 1 deletion intg/src/main/java/org/apache/atlas/AtlasConfiguration.java
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,9 @@ public enum AtlasConfiguration {
INDEX_CLIENT_CONNECTION_TIMEOUT("atlas.index.client.connection.timeout.ms", 900000),
INDEX_CLIENT_SOCKET_TIMEOUT("atlas.index.client.socket.timeout.ms", 900000),
ENABLE_SEARCH_LOGGER("atlas.enable.search.logger", true),
SEARCH_LOGGER_MAX_THREADS("atlas.enable.search.logger.max.threads", 20);
SEARCH_LOGGER_MAX_THREADS("atlas.enable.search.logger.max.threads", 20),

PERSONA_POLICY_ASSET_MAX_LIMIT("atlas.persona.policy.asset.maxlimit", 1000);


private static final Configuration APPLICATION_PROPERTIES;
Expand Down
4 changes: 3 additions & 1 deletion intg/src/main/java/org/apache/atlas/AtlasErrorCode.java
Original file line number Diff line number Diff line change
Expand Up @@ -284,7 +284,9 @@ public enum AtlasErrorCode {
JSON_ERROR(400, "ATLAS-400-00-109", "Error occurred putting object into JSONObject: {0}"),
DISABLED_OPERATION(400, "ATLAS-400-00-110", "This operation is temporarily disabled as it is under maintenance."),
TASK_INVALID_PARAMETERS(400, "ATLAS-400-00-111", "Invalid parameters for task {0}"),
TASK_TYPE_NOT_SUPPORTED(400, "ATLAS-400-00-112", "Task type {0} is not supported");
TASK_TYPE_NOT_SUPPORTED(400, "ATLAS-400-00-112", "Task type {0} is not supported"),

PERSONA_POLICY_ASSETS_LIMIT_EXCEEDED(400, "ATLAS-400-00-113", "Exceeded limit of maximum allowed assets across policies for a Persona: Limit: {0}, assets: {1}");


private String errorCode;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,6 @@ public List<EntityAuditEventV2> listEventsV2(String entityId, EntityAuditEventV2

@Override
public EntityAuditSearchResult searchEvents(String queryString) throws AtlasBaseException {
LOG.info("Hitting ES query to fetch audits: {}, {}", System.currentTimeMillis(), queryString);
try {
String response = performSearchOnIndex(queryString);
return getResultFromResponse(response);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
*/
package org.apache.atlas.repository.store.aliasstore;

import org.apache.atlas.AtlasConfiguration;
import org.apache.atlas.AtlasErrorCode;
import org.apache.atlas.ESAliasRequestBuilder;
import org.apache.atlas.ESAliasRequestBuilder.AliasAction;
import org.apache.atlas.exception.AtlasBaseException;
Expand All @@ -38,6 +40,7 @@
import javax.inject.Inject;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
Expand Down Expand Up @@ -68,6 +71,8 @@ public class ESAliasStore implements IndexAliasStore {
private final AtlasGraph graph;
private final EntityGraphRetriever entityRetriever;

private final int assetsMaxLimit = AtlasConfiguration.PERSONA_POLICY_ASSET_MAX_LIMIT.getInt();

@Inject
public ESAliasStore(AtlasGraph graph,
EntityGraphRetriever entityRetriever) {
Expand Down Expand Up @@ -165,32 +170,47 @@ private Map<String, Object> getFilterForPurpose(AtlasEntity purpose) throws Atla

private void personaPolicyToESDslClauses(List<AtlasEntity> policies,
List<Map<String, Object>> allowClauseList) throws AtlasBaseException {
List<String> terms = new ArrayList<>();
for (AtlasEntity policy: policies) {

if (policy.getStatus() == null || AtlasEntity.Status.ACTIVE.equals(policy.getStatus())) {
List<String> assets = getPolicyAssets(policy);

if (getIsAllowPolicy(policy)) {
if (getPolicyActions(policy).contains(ACCESS_READ_PERSONA_METADATA)) {

if (terms.size() + assets.size() + 1 > assetsMaxLimit) {
// For Metadata policies, along with assets we add 1 more clause for connection qualifiedName hence comparing with "assets.size() + 1"
throw new AtlasBaseException(AtlasErrorCode.PERSONA_POLICY_ASSETS_LIMIT_EXCEEDED, String.valueOf(assetsMaxLimit), String.valueOf(terms.size()));
}

String connectionQName = getPolicyConnectionQN(policy);
if (StringUtils.isEmpty(connectionQName)) {
connectionQName = getConnectionQualifiedNameFromPolicyAssets(entityRetriever, assets);
}

for (String asset : assets) {
addPersonaMetadataFilterClauses(asset, allowClauseList);
terms.add(asset);
allowClauseList.add(mapOf("wildcard", mapOf(QUALIFIED_NAME, asset + "/*")));
}

addPersonaMetadataFilterConnectionClause(connectionQName, allowClauseList);
terms.add(connectionQName);

} else if (getPolicyActions(policy).contains(ACCESS_READ_PERSONA_GLOSSARY)) {
if (terms.size() + assets.size() > assetsMaxLimit) {
throw new AtlasBaseException(AtlasErrorCode.PERSONA_POLICY_ASSETS_LIMIT_EXCEEDED, String.valueOf(assetsMaxLimit), String.valueOf(terms.size()));
}

for (String glossaryQName : assets) {
addPersonaGlossaryFilterClauses(glossaryQName, allowClauseList);
terms.add(glossaryQName);
allowClauseList.add(mapOf("wildcard", mapOf(QUALIFIED_NAME, "*@" + glossaryQName)));
}
}
}
}
}

allowClauseList.add(mapOf("terms", mapOf(QUALIFIED_NAME, terms)));
}

private Map<String, Object> esClausesToFilter(List<Map<String, Object>> allowClauseList) {
Expand All @@ -208,22 +228,6 @@ private String getAliasName(AtlasEntity entity) {
return getESAliasName(entity);
}

private void addPersonaMetadataFilterClauses(String asset, List<Map<String, Object>> clauseList) {
clauseList.add(mapOf("term", mapOf(QUALIFIED_NAME, asset)));
clauseList.add(mapOf("wildcard", mapOf(QUALIFIED_NAME, asset + "/*")));
clauseList.add(mapOf("wildcard", mapOf(QUALIFIED_NAME, asset)));
}

private void addPersonaGlossaryFilterClauses(String asset, List<Map<String, Object>> clauseList) {
clauseList.add(mapOf("term", mapOf(QUALIFIED_NAME, asset)));
clauseList.add(mapOf("wildcard", mapOf(QUALIFIED_NAME, asset + "/*")));
clauseList.add(mapOf("wildcard", mapOf(QUALIFIED_NAME, "*@" + asset)));
}

private void addPersonaMetadataFilterConnectionClause(String connection, List<Map<String, Object>> clauseList) {
clauseList.add(mapOf("term", mapOf(QUALIFIED_NAME, connection)));
}

private void addPurposeMetadataFilterClauses(List<String> tags, List<Map<String, Object>> clauseList) {
clauseList.add(mapOf("terms", mapOf(TRAIT_NAMES_PROPERTY_KEY, tags)));
clauseList.add(mapOf("terms", mapOf(PROPAGATED_TRAIT_NAMES_PROPERTY_KEY, tags)));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,9 +98,9 @@ public void run() {
if (CollectionUtils.isNotEmpty(tasks)) {
final CountDownLatch latch = new CountDownLatch(tasks.size());
submitAll(tasks, latch);
LOG.info("Submitted {} tasks to the queue", tasks.size());
waitForTasksToComplete(latch);
} else {
LOG.info("No tasks to queue, sleeping for {} ms", pollInterval);
redisService.releaseDistributedLock(ATLAS_TASK_LOCK);
}
Thread.sleep(pollInterval);
Expand Down Expand Up @@ -153,7 +153,6 @@ public void run() {
if (LOG.isDebugEnabled()) {
LOG.debug("TasksFetcher: Fetching tasks for queuing");
}
LOG.info("TasksFetcher: Fetching tasks for queuing");

this.tasks = registry.getTasksForReQueue();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,6 @@ private boolean isPolicyUpdated(String serviceName, long lastUpdatedTime) {
EntityAuditSearchResult result = auditRepository.searchEvents(parameters.getQueryString());

if (result == null || CollectionUtils.isEmpty(result.getEntityAudits())) {
LOG.info("getPoliciesIfUpdated: Skipping as no update found");
return false;
}
} catch (AtlasBaseException e) {
Expand Down

0 comments on commit e97e0a5

Please sign in to comment.