Skip to content

Commit

Permalink
Merge pull request #3395 from atlanhq/ns/DG-1709-refresh-delta-policies
Browse files Browse the repository at this point in the history
DG-1709 |  Delta based policies refresh
  • Loading branch information
krsoninikhil authored Aug 21, 2024
2 parents de2d9c9 + 05f0b8f commit e448510
Show file tree
Hide file tree
Showing 10 changed files with 1,929 additions and 40 deletions.

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,9 @@ public RangerPolicyDelta(final Long id, final Integer changeType, final Long pol
@JsonIgnore
public Long getPolicyId() { return policy != null ? policy.getId() : null; }

@JsonIgnore
public String getPolicyGuid() { return policy != null ? policy.getGuid() : null; }

@JsonIgnore
public String getZoneName() { return policy != null ? policy.getZoneName() : null; }

Expand All @@ -94,6 +97,7 @@ public String toString() {
+ ", serviceType:" + getServiceType()
+ ", policyType:" + getPolicyType()
+ ", policyId:[" + getPolicyId() + "]"
+ ", policyGuid:[" + getPolicyGuid() + "]"
+ ", policy:[" + policy +"]";
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ enum AuditModeEnum {
private List<RangerPolicyEvaluator> dataMaskPolicyEvaluators;
private List<RangerPolicyEvaluator> rowFilterPolicyEvaluators;
private final List<RangerPolicyEvaluator> auditPolicyEvaluators;
private Map<Long, RangerPolicyEvaluator> policyEvaluatorsMap;
private Map<String, RangerPolicyEvaluator> policyEvaluatorsMap;
private boolean isContextEnrichersShared = false;
private boolean isPreCleaned = false;

Expand Down Expand Up @@ -654,9 +654,9 @@ public List<RangerPolicyEvaluator> getLikelyMatchPolicyEvaluators(RangerAccessRe
}


public Map<Long, RangerPolicyEvaluator> getPolicyEvaluatorsMap() { return policyEvaluatorsMap; }
public Map<String, RangerPolicyEvaluator> getPolicyEvaluatorsMap() { return policyEvaluatorsMap; }

RangerPolicyEvaluator getPolicyEvaluator(Long id) {
RangerPolicyEvaluator getPolicyEvaluator(String id) {
return policyEvaluatorsMap.get(id);
}

Expand Down Expand Up @@ -1252,17 +1252,17 @@ private void removeEvaluatorFromTrie(RangerPolicyEvaluator oldEvaluator, RangerR
}
}

private Map<Long, RangerPolicyEvaluator> createPolicyEvaluatorsMap() {
Map<Long, RangerPolicyEvaluator> tmpPolicyEvaluatorMap = new HashMap<>();
private Map<String, RangerPolicyEvaluator> createPolicyEvaluatorsMap() {
Map<String, RangerPolicyEvaluator> tmpPolicyEvaluatorMap = new HashMap<>();

for (RangerPolicyEvaluator evaluator : getPolicyEvaluators()) {
tmpPolicyEvaluatorMap.put(evaluator.getPolicy().getId(), evaluator);
tmpPolicyEvaluatorMap.put(evaluator.getPolicy().getGuid(), evaluator);
}
for (RangerPolicyEvaluator evaluator : getDataMaskPolicyEvaluators()) {
tmpPolicyEvaluatorMap.put(evaluator.getPolicy().getId(), evaluator);
tmpPolicyEvaluatorMap.put(evaluator.getPolicy().getGuid(), evaluator);
}
for (RangerPolicyEvaluator evaluator : getRowFilterPolicyEvaluators()) {
tmpPolicyEvaluatorMap.put(evaluator.getPolicy().getId(), evaluator);
tmpPolicyEvaluatorMap.put(evaluator.getPolicy().getGuid(), evaluator);
}

return tmpPolicyEvaluatorMap;
Expand Down Expand Up @@ -1294,7 +1294,7 @@ private RangerPolicyEvaluator addPolicy(RangerPolicy policy) {
}

if (!RangerPolicy.POLICY_TYPE_AUDIT.equals(policy.getPolicyType())) {
policyEvaluatorsMap.put(policy.getId(), ret);
policyEvaluatorsMap.put(policy.getGuid(), ret);
}
}
}
Expand All @@ -1306,22 +1306,22 @@ private RangerPolicyEvaluator addPolicy(RangerPolicy policy) {
return ret;
}

private void removePolicy(Long id) {
private void removePolicy(String guid) {
if (LOG.isDebugEnabled()) {
LOG.debug("==> RangerPolicyRepository.removePolicy(" + id +")");
LOG.debug("==> RangerPolicyRepository.removePolicy(" + guid +")");
}
Iterator<RangerPolicy> iterator = policies.iterator();
while (iterator.hasNext()) {
if (id.equals(iterator.next().getId())) {
if (guid.equals(iterator.next().getGuid())) {
iterator.remove();
//break;
}
}

policyEvaluatorsMap.remove(id);
policyEvaluatorsMap.remove(guid);

if (LOG.isDebugEnabled()) {
LOG.debug("<== RangerPolicyRepository.removePolicy(" + id +")");
LOG.debug("<== RangerPolicyRepository.removePolicy(" + guid +")");
}
}

Expand Down Expand Up @@ -1355,13 +1355,13 @@ private void deletePolicyEvaluator(RangerPolicyEvaluator evaluator) {
}

private RangerPolicyEvaluator update(final RangerPolicyDelta delta, final RangerPolicyEvaluator currentEvaluator) {

LOG.info("PolicyDelta: RangerPolicyRepository.update is called, policyGuid: "+delta.getPolicyGuid());
if (LOG.isDebugEnabled()) {
LOG.debug("==> RangerPolicyRepository.update(delta=" + delta + ", currentEvaluator=" + (currentEvaluator == null ? null : currentEvaluator.getPolicy()) + ")");
}
Integer changeType = delta.getChangeType();
String policyType = delta.getPolicyType();
Long policyId = delta.getPolicyId();
String policyId = delta.getPolicyGuid();

RangerPolicy policy = delta.getPolicy();

Expand Down Expand Up @@ -1472,7 +1472,7 @@ private void updateResourceTrie(List<RangerPolicyDelta> deltas) {
for (RangerPolicyDelta delta : deltas) {
final Integer changeType = delta.getChangeType();
final String serviceType = delta.getServiceType();
final Long policyId = delta.getPolicyId();
final String policyId = delta.getPolicyGuid();
final String policyType = delta.getPolicyType();

if (!serviceType.equals(this.serviceDef.getName())) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -391,6 +391,7 @@ public void setPolicies(ServicePolicies policies) {
RangerPolicyEngineImpl oldPolicyEngineImpl = (RangerPolicyEngineImpl) oldPolicyEngine;

newPolicyEngine = RangerPolicyEngineImpl.getPolicyEngine(oldPolicyEngineImpl, policies);
//TODO: this looks like a mistake, second arg should be servicePolicies which has the applied delta
}

if (newPolicyEngine != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,12 @@

import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import org.apache.atlas.ApplicationProperties;
import org.apache.atlas.AtlasException;
import org.apache.atlas.authz.admin.client.AtlasAuthAdminClient;
import org.apache.atlas.policytransformer.CachePolicyTransformerImpl;
import org.apache.atlas.repository.audit.ESBasedAuditRepository;
import org.apache.commons.configuration.Configuration;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
Expand All @@ -40,6 +44,8 @@
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;

import static org.apache.atlas.ApplicationProperties.DELTA_BASED_REFRESH;


public class PolicyRefresher extends Thread {
private static final Log LOG = LogFactory.getLog(PolicyRefresher.class);
Expand All @@ -64,6 +70,9 @@ public class PolicyRefresher extends Thread {
private long lastActivationTimeInMillis;
private boolean policiesSetInPlugin;
private boolean serviceDefSetInPlugin;
private Configuration atlasConfig;
private boolean enableDeltaBasedRefresh;
private ESBasedAuditRepository auditRepository;


public PolicyRefresher(RangerBasePlugin plugIn) {
Expand Down Expand Up @@ -104,6 +113,16 @@ public PolicyRefresher(RangerBasePlugin plugIn) {
this.userStoreProvider = new RangerUserStoreProvider(getServiceType(), appId, getServiceName(), atlasAuthAdminClient, cacheDir, pluginConfig);
this.pollingIntervalMs = pluginConfig.getLong(propertyPrefix + ".policy.pollIntervalMs", 30 * 1000);

try {
this.atlasConfig = ApplicationProperties.get();
this.auditRepository = new ESBasedAuditRepository(atlasConfig);
this.auditRepository.start();
this.enableDeltaBasedRefresh = this.atlasConfig.getBoolean(DELTA_BASED_REFRESH, false);
} catch (AtlasException e) {
LOG.error("PolicyDelta: Error while reading atlas configuration", e);
this.enableDeltaBasedRefresh = false;
}

setName("PolicyRefresher(serviceName=" + serviceName + ")-" + getId());

if(LOG.isDebugEnabled()) {
Expand Down Expand Up @@ -316,13 +335,20 @@ private ServicePolicies loadPolicyfromPolicyAdmin() throws RangerServiceNotFound

try {

if (serviceName.equals("atlas") && plugIn.getTypeRegistry() != null && lastUpdatedTiemInMillis == -1) {
if (serviceName.equals("atlas") && plugIn.getTypeRegistry() != null) {
RangerRESTUtils restUtils = new RangerRESTUtils();
CachePolicyTransformerImpl transformer = new CachePolicyTransformerImpl(plugIn.getTypeRegistry());

svcPolicies = transformer.getPolicies(serviceName,
restUtils.getPluginId(serviceName, plugIn.getAppId()),
lastUpdatedTiemInMillis, null);
CachePolicyTransformerImpl transformer = new CachePolicyTransformerImpl(plugIn.getTypeRegistry(), auditRepository);
if (lastUpdatedTiemInMillis == -1) {
svcPolicies = transformer.getPoliciesAll(serviceName,
restUtils.getPluginId(serviceName, plugIn.getAppId()),
lastUpdatedTiemInMillis, null);
} else if (this.enableDeltaBasedRefresh) {
svcPolicies = transformer.getPoliciesDelta(serviceName,
restUtils.getPluginId(serviceName, plugIn.getAppId()),
lastUpdatedTiemInMillis);
} else {
svcPolicies = atlasAuthAdminClient.getServicePoliciesIfUpdated(lastUpdatedTiemInMillis);
}
} else {
svcPolicies = atlasAuthAdminClient.getServicePoliciesIfUpdated(lastUpdatedTiemInMillis);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ public static List<RangerPolicy> applyDeltas(List<RangerPolicy> policies, List<R
int changeType = delta.getChangeType();

if (changeType == RangerPolicyDelta.CHANGE_TYPE_POLICY_CREATE || changeType == RangerPolicyDelta.CHANGE_TYPE_POLICY_UPDATE || changeType == RangerPolicyDelta.CHANGE_TYPE_POLICY_DELETE) {
Long policyId = delta.getPolicyId();
String policyId = delta.getPolicyGuid(); // change to getGuid() as id is not set in policy

if (policyId == null) {
continue;
Expand All @@ -91,7 +91,7 @@ public static List<RangerPolicy> applyDeltas(List<RangerPolicy> policies, List<R

while (iter.hasNext()) {
RangerPolicy policy = iter.next();
if (policyId.equals(policy.getId()) && (changeType == RangerPolicyDelta.CHANGE_TYPE_POLICY_DELETE || changeType == RangerPolicyDelta.CHANGE_TYPE_POLICY_UPDATE)) {
if (policyId.equals(policy.getGuid()) && (changeType == RangerPolicyDelta.CHANGE_TYPE_POLICY_DELETE || changeType == RangerPolicyDelta.CHANGE_TYPE_POLICY_UPDATE)) {
deletedPolicies.add(policy);
iter.remove();
}
Expand Down Expand Up @@ -140,10 +140,6 @@ public static List<RangerPolicy> applyDeltas(List<RangerPolicy> policies, List<R
ret = policies;
}

if (CollectionUtils.isNotEmpty(deltas) && hasExpectedServiceType && CollectionUtils.isNotEmpty(ret)) {
ret.sort(RangerPolicy.POLICY_ID_COMPARATOR);
}

RangerPerfTracer.log(perf);

if (LOG.isDebugEnabled()) {
Expand All @@ -160,7 +156,7 @@ public static boolean isValidDeltas(List<RangerPolicyDelta> deltas, String compo

for (RangerPolicyDelta delta : deltas) {
final Integer changeType = delta.getChangeType();
final Long policyId = delta.getPolicyId();
final String policyGuid = delta.getPolicyGuid();

if (changeType == null) {
isValid = false;
Expand All @@ -171,7 +167,7 @@ public static boolean isValidDeltas(List<RangerPolicyDelta> deltas, String compo
&& changeType != RangerPolicyDelta.CHANGE_TYPE_POLICY_UPDATE
&& changeType != RangerPolicyDelta.CHANGE_TYPE_POLICY_DELETE) {
isValid = false;
} else if (policyId == null) {
} else if (policyGuid == null) {
isValid = false;
} else {
final String serviceType = delta.getServiceType();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -392,7 +392,7 @@ static public TagPolicies copyHeader(TagPolicies source, String componentService
return ret;
}

public static ServicePolicies applyDelta(final ServicePolicies servicePolicies, RangerPolicyEngineImpl policyEngine) {
public static ServicePolicies applyDelta(final ServicePolicies servicePolicies, RangerPolicyEngineImpl policyEngine) {
ServicePolicies ret = copyHeader(servicePolicies);

List<RangerPolicy> oldResourcePolicies = policyEngine.getResourcePolicies();
Expand Down
Loading

0 comments on commit e448510

Please sign in to comment.