Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DG-1709 | Delta based policies refresh #3395

Merged
merged 11 commits into from
Aug 21, 2024

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,9 @@ public RangerPolicyDelta(final Long id, final Integer changeType, final Long pol
@JsonIgnore
public Long getPolicyId() { return policy != null ? policy.getId() : null; }

@JsonIgnore
public String getPolicyGuid() { return policy != null ? policy.getGuid() : null; }

@JsonIgnore
public String getZoneName() { return policy != null ? policy.getZoneName() : null; }

Expand All @@ -94,6 +97,7 @@ public String toString() {
+ ", serviceType:" + getServiceType()
+ ", policyType:" + getPolicyType()
+ ", policyId:[" + getPolicyId() + "]"
+ ", policyGuid:[" + getPolicyGuid() + "]"
+ ", policy:[" + policy +"]";
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ enum AuditModeEnum {
private List<RangerPolicyEvaluator> dataMaskPolicyEvaluators;
private List<RangerPolicyEvaluator> rowFilterPolicyEvaluators;
private final List<RangerPolicyEvaluator> auditPolicyEvaluators;
private Map<Long, RangerPolicyEvaluator> policyEvaluatorsMap;
private Map<String, RangerPolicyEvaluator> policyEvaluatorsMap;
private boolean isContextEnrichersShared = false;
private boolean isPreCleaned = false;

Expand Down Expand Up @@ -654,9 +654,9 @@ public List<RangerPolicyEvaluator> getLikelyMatchPolicyEvaluators(RangerAccessRe
}


public Map<Long, RangerPolicyEvaluator> getPolicyEvaluatorsMap() { return policyEvaluatorsMap; }
public Map<String, RangerPolicyEvaluator> getPolicyEvaluatorsMap() { return policyEvaluatorsMap; }

RangerPolicyEvaluator getPolicyEvaluator(Long id) {
RangerPolicyEvaluator getPolicyEvaluator(String id) {
return policyEvaluatorsMap.get(id);
}

Expand Down Expand Up @@ -1252,17 +1252,17 @@ private void removeEvaluatorFromTrie(RangerPolicyEvaluator oldEvaluator, RangerR
}
}

private Map<Long, RangerPolicyEvaluator> createPolicyEvaluatorsMap() {
Map<Long, RangerPolicyEvaluator> tmpPolicyEvaluatorMap = new HashMap<>();
private Map<String, RangerPolicyEvaluator> createPolicyEvaluatorsMap() {
Map<String, RangerPolicyEvaluator> tmpPolicyEvaluatorMap = new HashMap<>();

for (RangerPolicyEvaluator evaluator : getPolicyEvaluators()) {
tmpPolicyEvaluatorMap.put(evaluator.getPolicy().getId(), evaluator);
tmpPolicyEvaluatorMap.put(evaluator.getPolicy().getGuid(), evaluator);
}
for (RangerPolicyEvaluator evaluator : getDataMaskPolicyEvaluators()) {
tmpPolicyEvaluatorMap.put(evaluator.getPolicy().getId(), evaluator);
tmpPolicyEvaluatorMap.put(evaluator.getPolicy().getGuid(), evaluator);
}
for (RangerPolicyEvaluator evaluator : getRowFilterPolicyEvaluators()) {
tmpPolicyEvaluatorMap.put(evaluator.getPolicy().getId(), evaluator);
tmpPolicyEvaluatorMap.put(evaluator.getPolicy().getGuid(), evaluator);
}

return tmpPolicyEvaluatorMap;
Expand Down Expand Up @@ -1294,7 +1294,7 @@ private RangerPolicyEvaluator addPolicy(RangerPolicy policy) {
}

if (!RangerPolicy.POLICY_TYPE_AUDIT.equals(policy.getPolicyType())) {
policyEvaluatorsMap.put(policy.getId(), ret);
policyEvaluatorsMap.put(policy.getGuid(), ret);
}
}
}
Expand All @@ -1306,22 +1306,22 @@ private RangerPolicyEvaluator addPolicy(RangerPolicy policy) {
return ret;
}

private void removePolicy(Long id) {
private void removePolicy(String guid) {
if (LOG.isDebugEnabled()) {
LOG.debug("==> RangerPolicyRepository.removePolicy(" + id +")");
LOG.debug("==> RangerPolicyRepository.removePolicy(" + guid +")");
}
Iterator<RangerPolicy> iterator = policies.iterator();
while (iterator.hasNext()) {
if (id.equals(iterator.next().getId())) {
if (guid.equals(iterator.next().getGuid())) {
iterator.remove();
//break;
}
}

policyEvaluatorsMap.remove(id);
policyEvaluatorsMap.remove(guid);

if (LOG.isDebugEnabled()) {
LOG.debug("<== RangerPolicyRepository.removePolicy(" + id +")");
LOG.debug("<== RangerPolicyRepository.removePolicy(" + guid +")");
}
}

Expand Down Expand Up @@ -1355,13 +1355,13 @@ private void deletePolicyEvaluator(RangerPolicyEvaluator evaluator) {
}

private RangerPolicyEvaluator update(final RangerPolicyDelta delta, final RangerPolicyEvaluator currentEvaluator) {

LOG.info("PolicyDelta: RangerPolicyRepository.update is called, policyGuid: "+delta.getPolicyGuid());
if (LOG.isDebugEnabled()) {
LOG.debug("==> RangerPolicyRepository.update(delta=" + delta + ", currentEvaluator=" + (currentEvaluator == null ? null : currentEvaluator.getPolicy()) + ")");
}
Integer changeType = delta.getChangeType();
String policyType = delta.getPolicyType();
Long policyId = delta.getPolicyId();
String policyId = delta.getPolicyGuid();

RangerPolicy policy = delta.getPolicy();

Expand Down Expand Up @@ -1472,7 +1472,7 @@ private void updateResourceTrie(List<RangerPolicyDelta> deltas) {
for (RangerPolicyDelta delta : deltas) {
final Integer changeType = delta.getChangeType();
final String serviceType = delta.getServiceType();
final Long policyId = delta.getPolicyId();
final String policyId = delta.getPolicyGuid();
final String policyType = delta.getPolicyType();

if (!serviceType.equals(this.serviceDef.getName())) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -391,6 +391,7 @@ public void setPolicies(ServicePolicies policies) {
RangerPolicyEngineImpl oldPolicyEngineImpl = (RangerPolicyEngineImpl) oldPolicyEngine;

newPolicyEngine = RangerPolicyEngineImpl.getPolicyEngine(oldPolicyEngineImpl, policies);
//TODO: this looks like a mistake, second arg should be servicePolicies which has the applied delta
}

if (newPolicyEngine != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,12 @@

import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import org.apache.atlas.ApplicationProperties;
import org.apache.atlas.AtlasException;
import org.apache.atlas.authz.admin.client.AtlasAuthAdminClient;
import org.apache.atlas.policytransformer.CachePolicyTransformerImpl;
import org.apache.atlas.repository.audit.ESBasedAuditRepository;
import org.apache.commons.configuration.Configuration;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
Expand All @@ -40,6 +44,8 @@
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;

import static org.apache.atlas.ApplicationProperties.DELTA_BASED_REFRESH;


public class PolicyRefresher extends Thread {
private static final Log LOG = LogFactory.getLog(PolicyRefresher.class);
Expand All @@ -64,6 +70,9 @@ public class PolicyRefresher extends Thread {
private long lastActivationTimeInMillis;
private boolean policiesSetInPlugin;
private boolean serviceDefSetInPlugin;
private Configuration atlasConfig;
private boolean enableDeltaBasedRefresh;
private ESBasedAuditRepository auditRepository;


public PolicyRefresher(RangerBasePlugin plugIn) {
Expand Down Expand Up @@ -104,6 +113,16 @@ public PolicyRefresher(RangerBasePlugin plugIn) {
this.userStoreProvider = new RangerUserStoreProvider(getServiceType(), appId, getServiceName(), atlasAuthAdminClient, cacheDir, pluginConfig);
this.pollingIntervalMs = pluginConfig.getLong(propertyPrefix + ".policy.pollIntervalMs", 30 * 1000);

try {
this.atlasConfig = ApplicationProperties.get();
this.auditRepository = new ESBasedAuditRepository(atlasConfig);
this.auditRepository.start();
this.enableDeltaBasedRefresh = this.atlasConfig.getBoolean(DELTA_BASED_REFRESH, false);
} catch (AtlasException e) {
LOG.error("PolicyDelta: Error while reading atlas configuration", e);
this.enableDeltaBasedRefresh = false;
}

setName("PolicyRefresher(serviceName=" + serviceName + ")-" + getId());

if(LOG.isDebugEnabled()) {
Expand Down Expand Up @@ -316,13 +335,20 @@ private ServicePolicies loadPolicyfromPolicyAdmin() throws RangerServiceNotFound

try {

if (serviceName.equals("atlas") && plugIn.getTypeRegistry() != null && lastUpdatedTiemInMillis == -1) {
if (serviceName.equals("atlas") && plugIn.getTypeRegistry() != null) {
RangerRESTUtils restUtils = new RangerRESTUtils();
CachePolicyTransformerImpl transformer = new CachePolicyTransformerImpl(plugIn.getTypeRegistry());

svcPolicies = transformer.getPolicies(serviceName,
restUtils.getPluginId(serviceName, plugIn.getAppId()),
lastUpdatedTiemInMillis, null);
CachePolicyTransformerImpl transformer = new CachePolicyTransformerImpl(plugIn.getTypeRegistry(), auditRepository);
if (lastUpdatedTiemInMillis == -1) {
svcPolicies = transformer.getPoliciesAll(serviceName,
restUtils.getPluginId(serviceName, plugIn.getAppId()),
lastUpdatedTiemInMillis, null);
} else if (this.enableDeltaBasedRefresh) {
svcPolicies = transformer.getPoliciesDelta(serviceName,
restUtils.getPluginId(serviceName, plugIn.getAppId()),
lastUpdatedTiemInMillis);
} else {
svcPolicies = atlasAuthAdminClient.getServicePoliciesIfUpdated(lastUpdatedTiemInMillis);
}
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For Atlas, always use internal function if feature is enabled.

} else {
svcPolicies = atlasAuthAdminClient.getServicePoliciesIfUpdated(lastUpdatedTiemInMillis);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ public static List<RangerPolicy> applyDeltas(List<RangerPolicy> policies, List<R
int changeType = delta.getChangeType();

if (changeType == RangerPolicyDelta.CHANGE_TYPE_POLICY_CREATE || changeType == RangerPolicyDelta.CHANGE_TYPE_POLICY_UPDATE || changeType == RangerPolicyDelta.CHANGE_TYPE_POLICY_DELETE) {
Long policyId = delta.getPolicyId();
String policyId = delta.getPolicyGuid(); // change to getGuid() as id is not set in policy

if (policyId == null) {
continue;
Expand All @@ -91,7 +91,7 @@ public static List<RangerPolicy> applyDeltas(List<RangerPolicy> policies, List<R

while (iter.hasNext()) {
RangerPolicy policy = iter.next();
if (policyId.equals(policy.getId()) && (changeType == RangerPolicyDelta.CHANGE_TYPE_POLICY_DELETE || changeType == RangerPolicyDelta.CHANGE_TYPE_POLICY_UPDATE)) {
if (policyId.equals(policy.getGuid()) && (changeType == RangerPolicyDelta.CHANGE_TYPE_POLICY_DELETE || changeType == RangerPolicyDelta.CHANGE_TYPE_POLICY_UPDATE)) {
deletedPolicies.add(policy);
iter.remove();
}
Expand Down Expand Up @@ -140,10 +140,6 @@ public static List<RangerPolicy> applyDeltas(List<RangerPolicy> policies, List<R
ret = policies;
}

if (CollectionUtils.isNotEmpty(deltas) && hasExpectedServiceType && CollectionUtils.isNotEmpty(ret)) {
ret.sort(RangerPolicy.POLICY_ID_COMPARATOR);
}

RangerPerfTracer.log(perf);

if (LOG.isDebugEnabled()) {
Expand All @@ -160,7 +156,7 @@ public static boolean isValidDeltas(List<RangerPolicyDelta> deltas, String compo

for (RangerPolicyDelta delta : deltas) {
final Integer changeType = delta.getChangeType();
final Long policyId = delta.getPolicyId();
final String policyGuid = delta.getPolicyGuid();

if (changeType == null) {
isValid = false;
Expand All @@ -171,7 +167,7 @@ public static boolean isValidDeltas(List<RangerPolicyDelta> deltas, String compo
&& changeType != RangerPolicyDelta.CHANGE_TYPE_POLICY_UPDATE
&& changeType != RangerPolicyDelta.CHANGE_TYPE_POLICY_DELETE) {
isValid = false;
} else if (policyId == null) {
} else if (policyGuid == null) {
isValid = false;
} else {
final String serviceType = delta.getServiceType();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -392,7 +392,7 @@ static public TagPolicies copyHeader(TagPolicies source, String componentService
return ret;
}

public static ServicePolicies applyDelta(final ServicePolicies servicePolicies, RangerPolicyEngineImpl policyEngine) {
public static ServicePolicies applyDelta(final ServicePolicies servicePolicies, RangerPolicyEngineImpl policyEngine) {
ServicePolicies ret = copyHeader(servicePolicies);

List<RangerPolicy> oldResourcePolicies = policyEngine.getResourcePolicies();
Expand Down
Loading
Loading