Skip to content

Commit

Permalink
Merge pull request #3395 from atlanhq/ns/DG-1709-refresh-delta-policies
Browse files Browse the repository at this point in the history
DG-1709 |  Delta based policies refresh
  • Loading branch information
krsoninikhil committed Dec 4, 2024
1 parent 500e6c0 commit eca1121
Show file tree
Hide file tree
Showing 9 changed files with 231 additions and 46 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,9 @@ public RangerPolicyDelta(final Long id, final Integer changeType, final Long pol
@JsonIgnore
public Long getPolicyId() { return policy != null ? policy.getId() : null; }

@JsonIgnore
public String getPolicyGuid() { return policy != null ? policy.getGuid() : null; }

@JsonIgnore
public String getZoneName() { return policy != null ? policy.getZoneName() : null; }

Expand All @@ -94,6 +97,7 @@ public String toString() {
+ ", serviceType:" + getServiceType()
+ ", policyType:" + getPolicyType()
+ ", policyId:[" + getPolicyId() + "]"
+ ", policyGuid:[" + getPolicyGuid() + "]"
+ ", policy:[" + policy +"]";
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ enum AuditModeEnum {
private List<RangerPolicyEvaluator> dataMaskPolicyEvaluators;
private List<RangerPolicyEvaluator> rowFilterPolicyEvaluators;
private final List<RangerPolicyEvaluator> auditPolicyEvaluators;
private Map<Long, RangerPolicyEvaluator> policyEvaluatorsMap;
private Map<String, RangerPolicyEvaluator> policyEvaluatorsMap;
private boolean isContextEnrichersShared = false;
private boolean isPreCleaned = false;

Expand Down Expand Up @@ -654,9 +654,9 @@ public List<RangerPolicyEvaluator> getLikelyMatchPolicyEvaluators(RangerAccessRe
}


public Map<Long, RangerPolicyEvaluator> getPolicyEvaluatorsMap() { return policyEvaluatorsMap; }
public Map<String, RangerPolicyEvaluator> getPolicyEvaluatorsMap() { return policyEvaluatorsMap; }

RangerPolicyEvaluator getPolicyEvaluator(Long id) {
RangerPolicyEvaluator getPolicyEvaluator(String id) {
return policyEvaluatorsMap.get(id);
}

Expand Down Expand Up @@ -1252,17 +1252,17 @@ private void removeEvaluatorFromTrie(RangerPolicyEvaluator oldEvaluator, RangerR
}
}

private Map<Long, RangerPolicyEvaluator> createPolicyEvaluatorsMap() {
Map<Long, RangerPolicyEvaluator> tmpPolicyEvaluatorMap = new HashMap<>();
private Map<String, RangerPolicyEvaluator> createPolicyEvaluatorsMap() {
Map<String, RangerPolicyEvaluator> tmpPolicyEvaluatorMap = new HashMap<>();

for (RangerPolicyEvaluator evaluator : getPolicyEvaluators()) {
tmpPolicyEvaluatorMap.put(evaluator.getPolicy().getId(), evaluator);
tmpPolicyEvaluatorMap.put(evaluator.getPolicy().getGuid(), evaluator);
}
for (RangerPolicyEvaluator evaluator : getDataMaskPolicyEvaluators()) {
tmpPolicyEvaluatorMap.put(evaluator.getPolicy().getId(), evaluator);
tmpPolicyEvaluatorMap.put(evaluator.getPolicy().getGuid(), evaluator);
}
for (RangerPolicyEvaluator evaluator : getRowFilterPolicyEvaluators()) {
tmpPolicyEvaluatorMap.put(evaluator.getPolicy().getId(), evaluator);
tmpPolicyEvaluatorMap.put(evaluator.getPolicy().getGuid(), evaluator);
}

return tmpPolicyEvaluatorMap;
Expand Down Expand Up @@ -1294,7 +1294,7 @@ private RangerPolicyEvaluator addPolicy(RangerPolicy policy) {
}

if (!RangerPolicy.POLICY_TYPE_AUDIT.equals(policy.getPolicyType())) {
policyEvaluatorsMap.put(policy.getId(), ret);
policyEvaluatorsMap.put(policy.getGuid(), ret);
}
}
}
Expand All @@ -1306,22 +1306,22 @@ private RangerPolicyEvaluator addPolicy(RangerPolicy policy) {
return ret;
}

private void removePolicy(Long id) {
private void removePolicy(String guid) {
if (LOG.isDebugEnabled()) {
LOG.debug("==> RangerPolicyRepository.removePolicy(" + id +")");
LOG.debug("==> RangerPolicyRepository.removePolicy(" + guid +")");
}
Iterator<RangerPolicy> iterator = policies.iterator();
while (iterator.hasNext()) {
if (id.equals(iterator.next().getId())) {
if (guid.equals(iterator.next().getGuid())) {
iterator.remove();
//break;
}
}

policyEvaluatorsMap.remove(id);
policyEvaluatorsMap.remove(guid);

if (LOG.isDebugEnabled()) {
LOG.debug("<== RangerPolicyRepository.removePolicy(" + id +")");
LOG.debug("<== RangerPolicyRepository.removePolicy(" + guid +")");
}
}

Expand Down Expand Up @@ -1355,13 +1355,13 @@ private void deletePolicyEvaluator(RangerPolicyEvaluator evaluator) {
}

private RangerPolicyEvaluator update(final RangerPolicyDelta delta, final RangerPolicyEvaluator currentEvaluator) {

LOG.info("PolicyDelta: RangerPolicyRepository.update is called, policyGuid: "+delta.getPolicyGuid());
if (LOG.isDebugEnabled()) {
LOG.debug("==> RangerPolicyRepository.update(delta=" + delta + ", currentEvaluator=" + (currentEvaluator == null ? null : currentEvaluator.getPolicy()) + ")");
}
Integer changeType = delta.getChangeType();
String policyType = delta.getPolicyType();
Long policyId = delta.getPolicyId();
String policyId = delta.getPolicyGuid();

RangerPolicy policy = delta.getPolicy();

Expand Down Expand Up @@ -1472,7 +1472,7 @@ private void updateResourceTrie(List<RangerPolicyDelta> deltas) {
for (RangerPolicyDelta delta : deltas) {
final Integer changeType = delta.getChangeType();
final String serviceType = delta.getServiceType();
final Long policyId = delta.getPolicyId();
final String policyId = delta.getPolicyGuid();
final String policyType = delta.getPolicyType();

if (!serviceType.equals(this.serviceDef.getName())) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -379,6 +379,7 @@ public void setPolicies(ServicePolicies policies) {
RangerPolicyEngineImpl oldPolicyEngineImpl = (RangerPolicyEngineImpl) oldPolicyEngine;

newPolicyEngine = RangerPolicyEngineImpl.getPolicyEngine(oldPolicyEngineImpl, policies);
//TODO: this looks like a mistake, second arg should be servicePolicies which has the applied delta
}

if (newPolicyEngine != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,12 @@

import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import org.apache.atlas.ApplicationProperties;
import org.apache.atlas.AtlasException;
import org.apache.atlas.authz.admin.client.AtlasAuthAdminClient;
import org.apache.atlas.policytransformer.CachePolicyTransformerImpl;
import org.apache.atlas.repository.audit.ESBasedAuditRepository;
import org.apache.commons.configuration.Configuration;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
Expand All @@ -40,6 +44,8 @@
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;

import static org.apache.atlas.ApplicationProperties.DELTA_BASED_REFRESH;


public class PolicyRefresher extends Thread {
private static final Log LOG = LogFactory.getLog(PolicyRefresher.class);
Expand All @@ -64,6 +70,9 @@ public class PolicyRefresher extends Thread {
private long lastActivationTimeInMillis;
private boolean policiesSetInPlugin;
private boolean serviceDefSetInPlugin;
private Configuration atlasConfig;
private boolean enableDeltaBasedRefresh;
private ESBasedAuditRepository auditRepository;


public PolicyRefresher(RangerBasePlugin plugIn) {
Expand Down Expand Up @@ -104,6 +113,16 @@ public PolicyRefresher(RangerBasePlugin plugIn) {
this.userStoreProvider = new RangerUserStoreProvider(getServiceType(), appId, getServiceName(), atlasAuthAdminClient, cacheDir, pluginConfig);
this.pollingIntervalMs = pluginConfig.getLong(propertyPrefix + ".policy.pollIntervalMs", 30 * 1000);

try {
this.atlasConfig = ApplicationProperties.get();
this.auditRepository = new ESBasedAuditRepository(atlasConfig);
this.auditRepository.start();
this.enableDeltaBasedRefresh = this.atlasConfig.getBoolean(DELTA_BASED_REFRESH, false);
} catch (AtlasException e) {
LOG.error("PolicyDelta: Error while reading atlas configuration", e);
this.enableDeltaBasedRefresh = false;
}

setName("PolicyRefresher(serviceName=" + serviceName + ")-" + getId());

if(LOG.isDebugEnabled()) {
Expand Down Expand Up @@ -316,13 +335,21 @@ private ServicePolicies loadPolicyfromPolicyAdmin() throws RangerServiceNotFound

try {

if (serviceName.equals("atlas") && plugIn.getTypeRegistry() != null && lastUpdatedTiemInMillis == -1) {
if (serviceName.equals("atlas") && plugIn.getTypeRegistry() != null) {
RangerRESTUtils restUtils = new RangerRESTUtils();
CachePolicyTransformerImpl transformer = new CachePolicyTransformerImpl(plugIn.getTypeRegistry());

svcPolicies = transformer.getPolicies(serviceName,
restUtils.getPluginId(serviceName, plugIn.getAppId()),
lastUpdatedTiemInMillis, null);

CachePolicyTransformerImpl transformer = new CachePolicyTransformerImpl(plugIn.getTypeRegistry(), auditRepository);
if (lastUpdatedTiemInMillis == -1) {
svcPolicies = transformer.getPoliciesAll(serviceName,
restUtils.getPluginId(serviceName, plugIn.getAppId()),
lastUpdatedTiemInMillis, null);
} else if (this.enableDeltaBasedRefresh) {
svcPolicies = transformer.getPoliciesDelta(serviceName,
restUtils.getPluginId(serviceName, plugIn.getAppId()),
lastUpdatedTiemInMillis);
} else {
svcPolicies = atlasAuthAdminClient.getServicePoliciesIfUpdated(lastUpdatedTiemInMillis);
}
} else {
svcPolicies = atlasAuthAdminClient.getServicePoliciesIfUpdated(lastUpdatedTiemInMillis);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ public static List<RangerPolicy> applyDeltas(List<RangerPolicy> policies, List<R
int changeType = delta.getChangeType();

if (changeType == RangerPolicyDelta.CHANGE_TYPE_POLICY_CREATE || changeType == RangerPolicyDelta.CHANGE_TYPE_POLICY_UPDATE || changeType == RangerPolicyDelta.CHANGE_TYPE_POLICY_DELETE) {
Long policyId = delta.getPolicyId();
String policyId = delta.getPolicyGuid(); // change to getGuid() as id is not set in policy

if (policyId == null) {
continue;
Expand All @@ -91,7 +91,7 @@ public static List<RangerPolicy> applyDeltas(List<RangerPolicy> policies, List<R

while (iter.hasNext()) {
RangerPolicy policy = iter.next();
if (policyId.equals(policy.getId()) && (changeType == RangerPolicyDelta.CHANGE_TYPE_POLICY_DELETE || changeType == RangerPolicyDelta.CHANGE_TYPE_POLICY_UPDATE)) {
if (policyId.equals(policy.getGuid()) && (changeType == RangerPolicyDelta.CHANGE_TYPE_POLICY_DELETE || changeType == RangerPolicyDelta.CHANGE_TYPE_POLICY_UPDATE)) {
deletedPolicies.add(policy);
iter.remove();
}
Expand Down Expand Up @@ -140,10 +140,6 @@ public static List<RangerPolicy> applyDeltas(List<RangerPolicy> policies, List<R
ret = policies;
}

if (CollectionUtils.isNotEmpty(deltas) && hasExpectedServiceType && CollectionUtils.isNotEmpty(ret)) {
ret.sort(RangerPolicy.POLICY_ID_COMPARATOR);
}

RangerPerfTracer.log(perf);

if (LOG.isDebugEnabled()) {
Expand All @@ -160,7 +156,7 @@ public static boolean isValidDeltas(List<RangerPolicyDelta> deltas, String compo

for (RangerPolicyDelta delta : deltas) {
final Integer changeType = delta.getChangeType();
final Long policyId = delta.getPolicyId();
final String policyGuid = delta.getPolicyGuid();

if (changeType == null) {
isValid = false;
Expand All @@ -171,7 +167,7 @@ public static boolean isValidDeltas(List<RangerPolicyDelta> deltas, String compo
&& changeType != RangerPolicyDelta.CHANGE_TYPE_POLICY_UPDATE
&& changeType != RangerPolicyDelta.CHANGE_TYPE_POLICY_DELETE) {
isValid = false;
} else if (policyId == null) {
} else if (policyGuid == null) {
isValid = false;
} else {
final String serviceType = delta.getServiceType();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -390,7 +390,7 @@ static public TagPolicies copyHeader(TagPolicies source, String componentService
return ret;
}

public static ServicePolicies applyDelta(final ServicePolicies servicePolicies, RangerPolicyEngineImpl policyEngine) {
public static ServicePolicies applyDelta(final ServicePolicies servicePolicies, RangerPolicyEngineImpl policyEngine) {
ServicePolicies ret = copyHeader(servicePolicies);

List<RangerPolicy> oldResourcePolicies = policyEngine.getResourcePolicies();
Expand Down
Loading

0 comments on commit eca1121

Please sign in to comment.