Skip to content

Commit

Permalink
Merge pull request #3386 from atlanhq/ns/DG-1735-es-sync-delay
Browse files Browse the repository at this point in the history
DG-1735 | Fix for policy updates getting missed by PolicyRefresher
  • Loading branch information
krsoninikhil authored Aug 8, 2024
2 parents 75be7f9 + 93b9bae commit cfd9514
Show file tree
Hide file tree
Showing 4 changed files with 72 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -322,7 +322,7 @@ private ServicePolicies loadPolicyfromPolicyAdmin() throws RangerServiceNotFound

svcPolicies = transformer.getPolicies(serviceName,
restUtils.getPluginId(serviceName, plugIn.getAppId()),
lastUpdatedTiemInMillis);
lastUpdatedTiemInMillis, null);
} else {
svcPolicies = atlasAuthAdminClient.getServicePoliciesIfUpdated(lastUpdatedTiemInMillis);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.atlas.policytransformer;

import org.apache.atlas.AtlasErrorCode;
import org.apache.atlas.AtlasException;
import org.apache.atlas.RequestContext;
import org.apache.atlas.discovery.EntityDiscoveryService;
Expand Down Expand Up @@ -61,6 +62,7 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

import static org.apache.atlas.repository.Constants.NAME;
Expand Down Expand Up @@ -136,7 +138,7 @@ public AtlasEntityHeader getService() {
return service;
}

public ServicePolicies getPolicies(String serviceName, String pluginId, Long lastUpdatedTime) {
public ServicePolicies getPolicies(String serviceName, String pluginId, Long lastUpdatedTime, Date latestEditTime) {
//TODO: return only if updated
AtlasPerfMetrics.MetricRecorder recorder = RequestContext.get().startMetricRecord("CachePolicyTransformerImpl.getPolicies." + serviceName);

Expand All @@ -150,7 +152,7 @@ public ServicePolicies getPolicies(String serviceName, String pluginId, Long las
servicePolicies.setPolicyUpdateTime(new Date());

if (service != null) {
List<RangerPolicy> allPolicies = getServicePolicies(service, 250);
List<RangerPolicy> allPolicies = getServicePolicies(service, 250, latestEditTime);
servicePolicies.setServiceName(serviceName);
servicePolicies.setServiceId(service.getGuid());

Expand All @@ -164,7 +166,7 @@ public ServicePolicies getPolicies(String serviceName, String pluginId, Long las
AtlasEntityHeader tagService = getServiceEntity(tagServiceName);

if (tagService != null) {
allPolicies.addAll(getServicePolicies(tagService, 0));
allPolicies.addAll(getServicePolicies(tagService, 0, latestEditTime));

TagPolicies tagPolicies = new TagPolicies();

Expand Down Expand Up @@ -194,22 +196,35 @@ public ServicePolicies getPolicies(String serviceName, String pluginId, Long las
}

} catch (Exception e) {
LOG.error("ERROR in getPolicies {}: ", e);
LOG.error("ERROR in getPolicies: ", e);
return null;
}

RequestContext.get().endMetricRecord(recorder);
return servicePolicies;
}

private List<RangerPolicy> getServicePolicies(AtlasEntityHeader service, int batchSize) throws AtlasBaseException, IOException {
private List<RangerPolicy> getServicePolicies(AtlasEntityHeader service, int batchSize, Date latestEditTime) throws AtlasBaseException, IOException, InterruptedException {

List<RangerPolicy> servicePolicies = new ArrayList<>();
List<AtlasEntityHeader> atlasPolicies = new ArrayList<>();

String serviceName = (String) service.getAttribute("name");
String serviceType = (String) service.getAttribute("authServiceType");
List<AtlasEntityHeader> atlasPolicies = getAtlasPolicies(serviceName, batchSize);

int maxAttempts = 5;
int sleepFor = 500;
for (int attempt = 0; attempt < maxAttempts; attempt++) {
try {
atlasPolicies = getAtlasPolicies(serviceName, batchSize, latestEditTime);
break;
} catch (AtlasBaseException e) {
LOG.error("ERROR in getServicePolicies {}: ", e.getMessage());
TimeUnit.MILLISECONDS.sleep(sleepFor);
sleepFor *= 2;
}
}
LOG.info("Moving to transform policies, size: {}", atlasPolicies.size());
if (CollectionUtils.isNotEmpty(atlasPolicies)) {
//transform policies
servicePolicies = transformAtlasPoliciesToRangerPolicies(atlasPolicies, serviceType, serviceName);
Expand Down Expand Up @@ -451,7 +466,7 @@ private List<RangerValiditySchedule> getPolicyValiditySchedule(AtlasEntityHeader
return ret;
}

private List<AtlasEntityHeader> getAtlasPolicies(String serviceName, int batchSize) throws AtlasBaseException {
private List<AtlasEntityHeader> getAtlasPolicies(String serviceName, int batchSize, Date latestEditTime) throws AtlasBaseException {
AtlasPerfMetrics.MetricRecorder recorder = RequestContext.get().startMetricRecord("CachePolicyTransformerImpl."+service+".getAtlasPolicies");

List<AtlasEntityHeader> ret = new ArrayList<>();
Expand Down Expand Up @@ -509,14 +524,33 @@ private List<AtlasEntityHeader> getAtlasPolicies(String serviceName, int batchSi
List<AtlasEntityHeader> headers = discoveryService.directIndexSearch(indexSearchParams).getEntities();
if (headers != null) {
ret.addAll(headers);
LOG.info("======= Found result with {} policies", headers.size());
} else {
found = false;
LOG.info("======= Found result with null policies");
}

from += size;

} while (found && ret.size() % size == 0);

boolean latestEditFound = false;
Date latestEditTimeAvailable = null;
for (AtlasEntityHeader entity : ret) {
LOG.info("Looping on returned policies: {}, size: {}", entity.getDisplayText(), ret.size());
if (latestEditTime == null || entity.getUpdateTime().compareTo(latestEditTime) >= 0) {
LOG.info("Found latest policy: {}, latestEditTime: {}, found policy time: {}", entity.getDisplayText(), latestEditTime, entity.getUpdateTime());
latestEditFound = true;
break;
}
latestEditTimeAvailable = entity.getUpdateTime();
LOG.info("Checked for latest edit, entity: {}, latestEditTimeAvailable: {}", entity.getDisplayText(), latestEditTimeAvailable);
}
if (latestEditTime != null && !latestEditFound) {
LOG.info("Latest edit not found yet, policies: {}, latestEditTime: {}, latestEditTimeAvailable: {}", ret.size(), latestEditTime, latestEditTimeAvailable);
throw new AtlasBaseException("Latest edit not found yet");
}

} finally {
RequestContext.get().endMetricRecord(recorder);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,7 @@
import javax.xml.bind.annotation.XmlAccessorType;
import javax.xml.bind.annotation.XmlRootElement;
import java.io.Serializable;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.*;

import static com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility.NONE;
import static com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility.PUBLIC_ONLY;
Expand Down Expand Up @@ -119,6 +116,11 @@ public static EntityAuditActionV2 fromString(String strValue) {

throw new IllegalArgumentException("No enum constant " + EntityAuditActionV2.class.getCanonicalName() + "." + strValue);
}

public static List<EntityAuditActionV2> getDeleteActions() {
return Arrays.asList(ENTITY_DELETE, ENTITY_PURGE, ENTITY_IMPORT_DELETE, CLASSIFICATION_DELETE,
PROPAGATED_CLASSIFICATION_DELETE, TERM_DELETE, LABEL_DELETE);
}
}

private String entityQualifiedName;
Expand Down
36 changes: 24 additions & 12 deletions webapp/src/main/java/org/apache/atlas/web/rest/AuthREST.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.atlas.annotation.Timed;
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.model.audit.AuditSearchParams;
import org.apache.atlas.model.audit.EntityAuditEventV2;
import org.apache.atlas.model.audit.EntityAuditSearchResult;
import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.atlas.plugin.util.KeycloakUserStore;
Expand Down Expand Up @@ -51,10 +52,7 @@
import javax.ws.rs.QueryParam;
import javax.ws.rs.core.Context;
import javax.ws.rs.core.MediaType;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.*;

import static org.apache.atlas.policytransformer.CachePolicyTransformerImpl.ATTR_SERVICE_LAST_SYNC;
import static org.apache.atlas.repository.Constants.PERSONA_ENTITY_TYPE;
Expand Down Expand Up @@ -152,11 +150,12 @@ public ServicePolicies downloadPolicies(@PathParam("serviceName") final String s
perf = AtlasPerfTracer.getPerfTracer(PERF_LOG, "AuthREST.downloadPolicies(serviceName="+serviceName+", pluginId="+pluginId+", lastUpdatedTime="+lastUpdatedTime+")");
}

if (!isPolicyUpdated(serviceName, lastUpdatedTime)) {
Long latestEditTime = getLastEditTime(serviceName, lastUpdatedTime);
if (latestEditTime == null) {
return null;
}

ServicePolicies ret = policyTransformer.getPolicies(serviceName, pluginId, lastUpdatedTime);
ServicePolicies ret = policyTransformer.getPolicies(serviceName, pluginId, lastUpdatedTime, new Date(latestEditTime));

updateLastSync(serviceName);

Expand Down Expand Up @@ -184,7 +183,7 @@ private void updateLastSync(String serviceName) {
}
}

private boolean isPolicyUpdated(String serviceName, long lastUpdatedTime) {
private Long getLastEditTime(String serviceName, long lastUpdatedTime) {
AtlasPerfMetrics.MetricRecorder recorder = RequestContext.get().startMetricRecord("AuthRest.isPolicyUpdated." + serviceName);

List<String> entityUpdateToWatch = new ArrayList<>();
Expand All @@ -203,22 +202,35 @@ private boolean isPolicyUpdated(String serviceName, long lastUpdatedTime) {

dsl.put("query", getMap("bool", getMap("must", mustClauseList)));

List<Map<String, Object>> sortList = new ArrayList<>();
sortList.add(getMap("timestamp", "desc"));
dsl.put("sort", sortList);

parameters.setDsl(dsl);
Long lastEditTime = 0L;

try {
EntityAuditSearchResult result = auditRepository.searchEvents(parameters.getQueryString());

if (result == null || CollectionUtils.isEmpty(result.getEntityAudits())) {
return false;
if (result != null) {
if (!CollectionUtils.isEmpty(result.getEntityAudits())) {
EntityAuditEventV2 lastAuditLog = result.getEntityAudits().get(0);
if (!EntityAuditEventV2.EntityAuditActionV2.getDeleteActions().contains(lastAuditLog.getAction())) {
lastEditTime = lastAuditLog.getTimestamp();
} else {
LOG.info("found delete action, so ignoring the last edit time: {}", lastAuditLog.getTimestamp());
}
} else {
lastEditTime = null; // no edits found
}
}
} catch (AtlasBaseException e) {
LOG.error("ERROR in getPoliciesIfUpdated while fetching entity audits {}: ", e.getMessage());
return true;
} finally {
RequestContext.get().endMetricRecord(recorder);
LOG.info("Last edit time for service {} is {}, dsl: {}", serviceName, lastEditTime, dsl);
}

return true;
return lastEditTime;
}

private Map<String, Object> getMap(String key, Object value) {
Expand Down

0 comments on commit cfd9514

Please sign in to comment.