Skip to content

Commit

Permalink
Merge pull request #3828 from atlanhq/ns/feat/delta-policy-master
Browse files Browse the repository at this point in the history
DG-1709 | Change policy refresh mechanism to only pull delta instead of all policies
  • Loading branch information
sumandas0 authored Dec 11, 2024
2 parents 60d95fe + 0577ea1 commit 89b9e66
Show file tree
Hide file tree
Showing 15 changed files with 365 additions and 193 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
public interface AtlasAuthAdminClient {
void init(RangerPluginConfig config);

ServicePolicies getServicePoliciesIfUpdated(long lastUpdatedTimeInMillis) throws Exception;
ServicePolicies getServicePoliciesIfUpdated(long lastUpdatedTimeInMillis, boolean usePolicyDelta) throws Exception;

RangerRoles getRolesIfUpdated(long lastUpdatedTimeInMillis) throws Exception;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ public class AtlasAuthRESTClient implements AtlasAuthAdminClient {

private static final String PARAM_LAST_UPDATED_TIME = "lastUpdatedTime";
private static final String PARAM_PLUGIN_ID = "pluginId";
private static final String PARAM_USE_POLICY_DELTA = "usePolicyDelta";

@Override
public void init(RangerPluginConfig config) {
Expand Down Expand Up @@ -66,20 +67,20 @@ public String getPluginId(String serviceName, String appId) {
}

@Override
public ServicePolicies getServicePoliciesIfUpdated(long lastUpdatedTimeInMillis) throws Exception {
URI uri = buildURI("/download/policies/" + serviceName, lastUpdatedTimeInMillis);
public ServicePolicies getServicePoliciesIfUpdated(long lastUpdatedTimeInMillis, boolean usePolicyDelta) throws Exception {
URI uri = buildURI("/download/policies/" + serviceName, lastUpdatedTimeInMillis, usePolicyDelta);
return sendRequestAndGetResponse(uri, ServicePolicies.class);
}

@Override
public RangerRoles getRolesIfUpdated(long lastUpdatedTimeInMillis) throws Exception {
URI uri = buildURI("/download/roles/" + serviceName, lastUpdatedTimeInMillis);
URI uri = buildURI("/download/roles/" + serviceName, lastUpdatedTimeInMillis, false);
return sendRequestAndGetResponse(uri, RangerRoles.class);
}

@Override
public RangerUserStore getUserStoreIfUpdated(long lastUpdatedTimeInMillis) throws Exception {
URI uri = buildURI("/download/users/" + serviceName, lastUpdatedTimeInMillis);
URI uri = buildURI("/download/users/" + serviceName, lastUpdatedTimeInMillis, false);
return sendRequestAndGetResponse(uri, RangerUserStore.class);
}

Expand Down Expand Up @@ -114,13 +115,14 @@ private <T> T sendRequestAndGetResponse(URI uri, Class<T> responseClass) throws
return null;
}

private URI buildURI(String path, long lastUpdatedTimeInMillis) throws URISyntaxException {
private URI buildURI(String path, long lastUpdatedTimeInMillis, boolean usePolicyDelta) throws URISyntaxException {
return new URIBuilder()
.setScheme(SCHEME)
.setHost(adminUrl)
.setPath(path)
.setParameter(PARAM_LAST_UPDATED_TIME, String.valueOf(lastUpdatedTimeInMillis))
.setParameter(PARAM_PLUGIN_ID, pluginId)
.setParameter(PARAM_USE_POLICY_DELTA, String.valueOf(usePolicyDelta))
.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,9 @@

package org.apache.atlas.plugin.model;

import com.fasterxml.jackson.annotation.JsonIgnore;
import org.apache.commons.collections.CollectionUtils;
import org.apache.htrace.shaded.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonInclude;

import javax.xml.bind.annotation.XmlAccessType;
import javax.xml.bind.annotation.XmlAccessorType;
Expand Down Expand Up @@ -529,6 +530,11 @@ public void setIsDenyAllElse(Boolean isDenyAllElse) {
this.isDenyAllElse = isDenyAllElse == null ? Boolean.FALSE : isDenyAllElse;
}

@JsonIgnore
public String getAtlasGuid() {
return getGuid().length() > 36 ? getGuid().substring(0, 36) : getGuid();
}

@Override
public String toString( ) {
StringBuilder sb = new StringBuilder();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@

package org.apache.atlas.plugin.model;

import org.apache.htrace.shaded.fasterxml.jackson.annotation.JsonIgnore;
import org.apache.htrace.shaded.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonInclude;

import javax.xml.bind.annotation.XmlAccessType;
import javax.xml.bind.annotation.XmlAccessorType;
Expand Down Expand Up @@ -73,6 +73,12 @@ public RangerPolicyDelta(final Long id, final Integer changeType, final Long pol
@JsonIgnore
public Long getPolicyId() { return policy != null ? policy.getId() : null; }

@JsonIgnore
public String getPolicyGuid() { return policy != null ? policy.getGuid() : null; }

@JsonIgnore
public String getPolicyAtlasGuid() { return policy != null ? policy.getAtlasGuid() : null; }

@JsonIgnore
public String getZoneName() { return policy != null ? policy.getZoneName() : null; }

Expand All @@ -94,6 +100,7 @@ public String toString() {
+ ", serviceType:" + getServiceType()
+ ", policyType:" + getPolicyType()
+ ", policyId:[" + getPolicyId() + "]"
+ ", policyGuid:[" + getPolicyGuid() + "]"
+ ", policy:[" + policy +"]";
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ enum AuditModeEnum {
private List<RangerPolicyEvaluator> dataMaskPolicyEvaluators;
private List<RangerPolicyEvaluator> rowFilterPolicyEvaluators;
private final List<RangerPolicyEvaluator> auditPolicyEvaluators;
private Map<Long, RangerPolicyEvaluator> policyEvaluatorsMap;
private Map<String, RangerPolicyEvaluator> policyEvaluatorsMap;
private boolean isContextEnrichersShared = false;
private boolean isPreCleaned = false;

Expand Down Expand Up @@ -654,9 +654,9 @@ public List<RangerPolicyEvaluator> getLikelyMatchPolicyEvaluators(RangerAccessRe
}


public Map<Long, RangerPolicyEvaluator> getPolicyEvaluatorsMap() { return policyEvaluatorsMap; }
public Map<String, RangerPolicyEvaluator> getPolicyEvaluatorsMap() { return policyEvaluatorsMap; }

RangerPolicyEvaluator getPolicyEvaluator(Long id) {
RangerPolicyEvaluator getPolicyEvaluator(String id) {
return policyEvaluatorsMap.get(id);
}

Expand Down Expand Up @@ -1252,17 +1252,17 @@ private void removeEvaluatorFromTrie(RangerPolicyEvaluator oldEvaluator, RangerR
}
}

private Map<Long, RangerPolicyEvaluator> createPolicyEvaluatorsMap() {
Map<Long, RangerPolicyEvaluator> tmpPolicyEvaluatorMap = new HashMap<>();
private Map<String, RangerPolicyEvaluator> createPolicyEvaluatorsMap() {
Map<String, RangerPolicyEvaluator> tmpPolicyEvaluatorMap = new HashMap<>();

for (RangerPolicyEvaluator evaluator : getPolicyEvaluators()) {
tmpPolicyEvaluatorMap.put(evaluator.getPolicy().getId(), evaluator);
tmpPolicyEvaluatorMap.put(evaluator.getPolicy().getGuid(), evaluator);
}
for (RangerPolicyEvaluator evaluator : getDataMaskPolicyEvaluators()) {
tmpPolicyEvaluatorMap.put(evaluator.getPolicy().getId(), evaluator);
tmpPolicyEvaluatorMap.put(evaluator.getPolicy().getGuid(), evaluator);
}
for (RangerPolicyEvaluator evaluator : getRowFilterPolicyEvaluators()) {
tmpPolicyEvaluatorMap.put(evaluator.getPolicy().getId(), evaluator);
tmpPolicyEvaluatorMap.put(evaluator.getPolicy().getGuid(), evaluator);
}

return tmpPolicyEvaluatorMap;
Expand Down Expand Up @@ -1294,7 +1294,7 @@ private RangerPolicyEvaluator addPolicy(RangerPolicy policy) {
}

if (!RangerPolicy.POLICY_TYPE_AUDIT.equals(policy.getPolicyType())) {
policyEvaluatorsMap.put(policy.getId(), ret);
policyEvaluatorsMap.put(policy.getGuid(), ret);
}
}
}
Expand All @@ -1306,22 +1306,22 @@ private RangerPolicyEvaluator addPolicy(RangerPolicy policy) {
return ret;
}

private void removePolicy(Long id) {
private void removePolicy(String guid) {
if (LOG.isDebugEnabled()) {
LOG.debug("==> RangerPolicyRepository.removePolicy(" + id +")");
LOG.debug("==> RangerPolicyRepository.removePolicy(" + guid +")");
}
Iterator<RangerPolicy> iterator = policies.iterator();
while (iterator.hasNext()) {
if (id.equals(iterator.next().getId())) {
if (guid.equals(iterator.next().getGuid())) {
iterator.remove();
//break;
}
}

policyEvaluatorsMap.remove(id);
policyEvaluatorsMap.remove(guid);

if (LOG.isDebugEnabled()) {
LOG.debug("<== RangerPolicyRepository.removePolicy(" + id +")");
LOG.debug("<== RangerPolicyRepository.removePolicy(" + guid +")");
}
}

Expand Down Expand Up @@ -1355,13 +1355,12 @@ private void deletePolicyEvaluator(RangerPolicyEvaluator evaluator) {
}

private RangerPolicyEvaluator update(final RangerPolicyDelta delta, final RangerPolicyEvaluator currentEvaluator) {

if (LOG.isDebugEnabled()) {
LOG.debug("==> RangerPolicyRepository.update(delta=" + delta + ", currentEvaluator=" + (currentEvaluator == null ? null : currentEvaluator.getPolicy()) + ")");
}
Integer changeType = delta.getChangeType();
String policyType = delta.getPolicyType();
Long policyId = delta.getPolicyId();
String policyId = delta.getPolicyGuid();

RangerPolicy policy = delta.getPolicy();

Expand Down Expand Up @@ -1472,7 +1471,7 @@ private void updateResourceTrie(List<RangerPolicyDelta> deltas) {
for (RangerPolicyDelta delta : deltas) {
final Integer changeType = delta.getChangeType();
final String serviceType = delta.getServiceType();
final Long policyId = delta.getPolicyId();
final String policyId = delta.getPolicyGuid();
final String policyType = delta.getPolicyType();

if (!serviceType.equals(this.serviceDef.getName())) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -314,7 +314,7 @@ public void setPolicies(ServicePolicies policies) {
Boolean hasPolicyDeltas = RangerPolicyDeltaUtil.hasPolicyDeltas(policies);

if (hasPolicyDeltas == null) {
LOG.info("Downloaded policies do not require policy change !! [" + policies + "]");
LOG.info("Downloaded policies do not require policy change !! [" + (policies.getPolicies() != null ? policies.getPolicies().size() : 0) + "]");

if (this.policyEngine == null) {

Expand Down Expand Up @@ -376,9 +376,8 @@ public void setPolicies(ServicePolicies policies) {
}

if (oldPolicyEngine != null) {
RangerPolicyEngineImpl oldPolicyEngineImpl = (RangerPolicyEngineImpl) oldPolicyEngine;

newPolicyEngine = RangerPolicyEngineImpl.getPolicyEngine(oldPolicyEngineImpl, policies);
// Create new evaluator for the updated policies
newPolicyEngine = new RangerPolicyEngineImpl(servicePolicies, pluginContext, roles);
}

if (newPolicyEngine != null) {
Expand Down Expand Up @@ -429,7 +428,8 @@ public void setPolicies(ServicePolicies policies) {
}

} catch (Exception e) {
LOG.error("setPolicies: policy engine initialization failed! Leaving current policy engine as-is. Exception : ", e);
LOG.error("setPolicies: Failed to set policies, didn't set policies", e);
throw e;
}
if (LOG.isDebugEnabled()) {
LOG.debug("<== setPolicies(" + policies + ")");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import org.apache.atlas.AtlasConfiguration;
import org.apache.atlas.authz.admin.client.AtlasAuthAdminClient;
import org.apache.atlas.policytransformer.CachePolicyTransformerImpl;
import org.apache.commons.lang.StringUtils;
Expand Down Expand Up @@ -60,10 +61,11 @@ public class PolicyRefresher extends Thread {
private final BlockingQueue<DownloadTrigger> policyDownloadQueue = new LinkedBlockingQueue<>();
private Timer policyDownloadTimer;
private long lastKnownVersion = -1L;
private long lastUpdatedTiemInMillis = -1L;
private long lastUpdatedTimeInMillis = -1L;
private long lastActivationTimeInMillis;
private boolean policiesSetInPlugin;
private boolean serviceDefSetInPlugin;
private boolean enableDeltaBasedRefresh;


public PolicyRefresher(RangerBasePlugin plugIn) {
Expand Down Expand Up @@ -104,6 +106,9 @@ public PolicyRefresher(RangerBasePlugin plugIn) {
this.userStoreProvider = new RangerUserStoreProvider(getServiceType(), appId, getServiceName(), atlasAuthAdminClient, cacheDir, pluginConfig);
this.pollingIntervalMs = pluginConfig.getLong(propertyPrefix + ".policy.pollIntervalMs", 30 * 1000);

this.enableDeltaBasedRefresh = AtlasConfiguration.DELTA_BASED_REFRESH_ENABLED.getBoolean();
LOG.info("PolicyRefresher(serviceName=" + serviceName + ") - delta based policy refresh enabled="+this.enableDeltaBasedRefresh);

setName("PolicyRefresher(serviceName=" + serviceName + ")-" + getId());

if(LOG.isDebugEnabled()) {
Expand Down Expand Up @@ -216,7 +221,7 @@ public void run() {
loadPolicy();
loadUserStore();
} catch(InterruptedException excp) {
LOG.info("PolicyRefresher(serviceName=" + serviceName + ").run(): interrupted! Exiting thread", excp);
LOG.info("PolicyRefreshxer(serviceName=" + serviceName + ").run(): interrupted! Exiting thread", excp);
break;
} finally {
if (trigger != null) {
Expand Down Expand Up @@ -273,7 +278,7 @@ private void loadPolicy() {
serviceDefSetInPlugin = false;
setLastActivationTimeInMillis(System.currentTimeMillis());
lastKnownVersion = svcPolicies.getPolicyVersion() != null ? svcPolicies.getPolicyVersion() : -1L;
lastUpdatedTiemInMillis = svcPolicies.getPolicyUpdateTime() != null ? svcPolicies.getPolicyUpdateTime().getTime() : -1L;
lastUpdatedTimeInMillis = svcPolicies.getPolicyUpdateTime() != null ? svcPolicies.getPolicyUpdateTime().getTime() : -1L;
} else {
if (!policiesSetInPlugin && !serviceDefSetInPlugin) {
plugIn.setPolicies(null);
Expand All @@ -287,10 +292,10 @@ private void loadPolicy() {
serviceDefSetInPlugin = true;
setLastActivationTimeInMillis(System.currentTimeMillis());
lastKnownVersion = -1;
lastUpdatedTiemInMillis = -1;
lastUpdatedTimeInMillis = -1;
}
} catch (Exception excp) {
LOG.error("Encountered unexpected exception, ignoring..", excp);
LOG.error("Encountered unexpected exception!!!!!!!!!!!", excp);
}

RangerPerfTracer.log(perf);
Expand All @@ -316,15 +321,18 @@ private ServicePolicies loadPolicyfromPolicyAdmin() throws RangerServiceNotFound

try {

if (serviceName.equals("atlas") && plugIn.getTypeRegistry() != null && lastUpdatedTiemInMillis == -1) {

if (serviceName.equals("atlas") && plugIn.getTypeRegistry() != null && lastUpdatedTimeInMillis == -1) {
LOG.info("PolicyRefresher(serviceName=" + serviceName + "): loading all policies for first time");
RangerRESTUtils restUtils = new RangerRESTUtils();
CachePolicyTransformerImpl transformer = new CachePolicyTransformerImpl(plugIn.getTypeRegistry());

svcPolicies = transformer.getPolicies(serviceName,
restUtils.getPluginId(serviceName, plugIn.getAppId()),
lastUpdatedTiemInMillis);
svcPolicies = transformer.getPoliciesAll(serviceName,
restUtils.getPluginId(serviceName, plugIn.getAppId()),
lastUpdatedTimeInMillis);
} else {
svcPolicies = atlasAuthAdminClient.getServicePoliciesIfUpdated(lastUpdatedTiemInMillis);
LOG.info("PolicyRefresher(serviceName=" + serviceName + "): loading delta policies from last known version=" + lastKnownVersion + ", lastUpdatedTime=" + lastUpdatedTimeInMillis);
svcPolicies = atlasAuthAdminClient.getServicePoliciesIfUpdated(lastUpdatedTimeInMillis, this.enableDeltaBasedRefresh);
}

boolean isUpdated = svcPolicies != null;
Expand Down Expand Up @@ -391,7 +399,7 @@ private ServicePolicies loadFromCache() {
}

lastKnownVersion = policies.getPolicyVersion() == null ? -1 : policies.getPolicyVersion().longValue();
lastUpdatedTiemInMillis = policies.getPolicyUpdateTime() == null ? -1 : policies.getPolicyUpdateTime().getTime();
lastUpdatedTimeInMillis = policies.getPolicyUpdateTime() == null ? -1 : policies.getPolicyUpdateTime().getTime();
}
} catch (Exception excp) {
LOG.error("failed to load policies from cache file " + cacheFile.getAbsolutePath(), excp);
Expand Down
Loading

0 comments on commit 89b9e66

Please sign in to comment.