Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DG-1735 | Add retry while fetching auth policies #3394

Closed
wants to merge 10 commits into from
Original file line number Diff line number Diff line change
Expand Up @@ -322,7 +322,7 @@ private ServicePolicies loadPolicyfromPolicyAdmin() throws RangerServiceNotFound

svcPolicies = transformer.getPolicies(serviceName,
restUtils.getPluginId(serviceName, plugIn.getAppId()),
lastUpdatedTiemInMillis);
lastUpdatedTiemInMillis, null);
} else {
svcPolicies = atlasAuthAdminClient.getServicePoliciesIfUpdated(lastUpdatedTiemInMillis);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.atlas.policytransformer;

import org.apache.atlas.AtlasErrorCode;
import org.apache.atlas.AtlasException;
import org.apache.atlas.RequestContext;
import org.apache.atlas.discovery.EntityDiscoveryService;
Expand Down Expand Up @@ -52,15 +53,8 @@
import javax.inject.Inject;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.*;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

import static org.apache.atlas.repository.Constants.NAME;
Expand Down Expand Up @@ -136,7 +130,7 @@ public AtlasEntityHeader getService() {
return service;
}

public ServicePolicies getPolicies(String serviceName, String pluginId, Long lastUpdatedTime) {
public ServicePolicies getPolicies(String serviceName, String pluginId, Long lastUpdatedTime, Date latestEditTime) {
//TODO: return only if updated
AtlasPerfMetrics.MetricRecorder recorder = RequestContext.get().startMetricRecord("CachePolicyTransformerImpl.getPolicies." + serviceName);

Expand All @@ -150,7 +144,7 @@ public ServicePolicies getPolicies(String serviceName, String pluginId, Long las
servicePolicies.setPolicyUpdateTime(new Date());

if (service != null) {
List<RangerPolicy> allPolicies = getServicePolicies(service, 250);
List<RangerPolicy> allPolicies = getServicePolicies(service, 250, latestEditTime);
servicePolicies.setServiceName(serviceName);
servicePolicies.setServiceId(service.getGuid());

Expand All @@ -164,7 +158,7 @@ public ServicePolicies getPolicies(String serviceName, String pluginId, Long las
AtlasEntityHeader tagService = getServiceEntity(tagServiceName);

if (tagService != null) {
allPolicies.addAll(getServicePolicies(tagService, 0));
allPolicies.addAll(getServicePolicies(tagService, 0, latestEditTime));

TagPolicies tagPolicies = new TagPolicies();

Expand Down Expand Up @@ -194,22 +188,38 @@ public ServicePolicies getPolicies(String serviceName, String pluginId, Long las
}

} catch (Exception e) {
LOG.error("ERROR in getPolicies {}: ", e);
LOG.error("ERROR in getPolicies: ", e);
return null;
}

RequestContext.get().endMetricRecord(recorder);
return servicePolicies;
}

private List<RangerPolicy> getServicePolicies(AtlasEntityHeader service, int batchSize) throws AtlasBaseException, IOException {
private List<RangerPolicy> getServicePolicies(AtlasEntityHeader service, int batchSize, Date latestEditTime) throws AtlasBaseException, IOException, InterruptedException {

List<RangerPolicy> servicePolicies = new ArrayList<>();
List<AtlasEntityHeader> atlasPolicies = new ArrayList<>();

String serviceName = (String) service.getAttribute("name");
String serviceType = (String) service.getAttribute("authServiceType");
List<AtlasEntityHeader> atlasPolicies = getAtlasPolicies(serviceName, batchSize);

int maxAttempts = 5;
int sleepFor = 500;
for (int attempt = 0; attempt <= maxAttempts; attempt++) {
try {
atlasPolicies = getAtlasPolicies(serviceName, batchSize, latestEditTime);
break;
} catch (AtlasBaseException e) {
LOG.error("ES_SYNC_FIX: {}: ERROR in getServicePolicies: {}", serviceName, e.getMessage());
TimeUnit.MILLISECONDS.sleep(sleepFor);
if (attempt == maxAttempts) {
throw e;
}
sleepFor *= 2;
}
}
LOG.info("ES_SYNC_FIX: {}: Moving to transform policies, size: {}", serviceName, atlasPolicies.size());
if (CollectionUtils.isNotEmpty(atlasPolicies)) {
//transform policies
servicePolicies = transformAtlasPoliciesToRangerPolicies(atlasPolicies, serviceType, serviceName);
Expand Down Expand Up @@ -451,7 +461,7 @@ private List<RangerValiditySchedule> getPolicyValiditySchedule(AtlasEntityHeader
return ret;
}

private List<AtlasEntityHeader> getAtlasPolicies(String serviceName, int batchSize) throws AtlasBaseException {
private List<AtlasEntityHeader> getAtlasPolicies(String serviceName, int batchSize, Date latestEditTime) throws AtlasBaseException {
AtlasPerfMetrics.MetricRecorder recorder = RequestContext.get().startMetricRecord("CachePolicyTransformerImpl."+service+".getAtlasPolicies");

List<AtlasEntityHeader> ret = new ArrayList<>();
Expand Down Expand Up @@ -508,13 +518,31 @@ private List<AtlasEntityHeader> getAtlasPolicies(String serviceName, int batchSi
List<AtlasEntityHeader> headers = discoveryService.directIndexSearch(indexSearchParams).getEntities();
if (headers != null) {
ret.addAll(headers);
LOG.info("ES_SYNC_FIX: {}: ======= Found result with {} policies", serviceName, headers.size());
} else {
found = false;
LOG.info("ES_SYNC_FIX: {}: ======= Found result with null policies", serviceName);
}

from += size;

} while (found && ret.size() % size == 0);
if (Objects.equals(serviceName, "atlas")) {
boolean latestEditFound = false;
Date latestEditTimeAvailable = null;
for (AtlasEntityHeader entity : ret) {
if (latestEditTime == null || entity.getUpdateTime().compareTo(latestEditTime) >= 0) {
LOG.info("ES_SYNC_FIX: {}: Found latest policy: {}, latestEditTime: {}, found policy time: {}", serviceName, entity.getDisplayText(), latestEditTime, entity.getUpdateTime());
latestEditFound = true;
break;
}
latestEditTimeAvailable = entity.getUpdateTime();
}
if (latestEditTime != null && !latestEditFound) {
LOG.info("ES_SYNC_FIX: {}: Latest edit not found yet, policies: {}, latestEditTime: {}, latestEditTimeAvailable: {}", serviceName, ret.size(), latestEditTime, latestEditTimeAvailable);
throw new AtlasBaseException("Latest edit not found yet");
}
}

} finally {
RequestContext.get().endMetricRecord(recorder);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,7 @@
import javax.xml.bind.annotation.XmlAccessorType;
import javax.xml.bind.annotation.XmlRootElement;
import java.io.Serializable;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.*;

import static com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility.NONE;
import static com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility.PUBLIC_ONLY;
Expand Down Expand Up @@ -119,6 +116,11 @@ public static EntityAuditActionV2 fromString(String strValue) {

throw new IllegalArgumentException("No enum constant " + EntityAuditActionV2.class.getCanonicalName() + "." + strValue);
}

public static List<EntityAuditActionV2> getDeleteActions() {
return Arrays.asList(ENTITY_DELETE, ENTITY_PURGE, ENTITY_IMPORT_DELETE, CLASSIFICATION_DELETE,
PROPAGATED_CLASSIFICATION_DELETE, TERM_DELETE, LABEL_DELETE);
}
}

private String entityQualifiedName;
Expand Down
40 changes: 27 additions & 13 deletions webapp/src/main/java/org/apache/atlas/web/rest/AuthREST.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.atlas.annotation.Timed;
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.model.audit.AuditSearchParams;
import org.apache.atlas.model.audit.EntityAuditEventV2;
import org.apache.atlas.model.audit.EntityAuditSearchResult;
import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.atlas.plugin.util.KeycloakUserStore;
Expand Down Expand Up @@ -51,10 +52,7 @@
import javax.ws.rs.QueryParam;
import javax.ws.rs.core.Context;
import javax.ws.rs.core.MediaType;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.*;

import static org.apache.atlas.policytransformer.CachePolicyTransformerImpl.ATTR_SERVICE_LAST_SYNC;
import static org.apache.atlas.repository.Constants.PERSONA_ENTITY_TYPE;
Expand Down Expand Up @@ -152,11 +150,12 @@ public ServicePolicies downloadPolicies(@PathParam("serviceName") final String s
perf = AtlasPerfTracer.getPerfTracer(PERF_LOG, "AuthREST.downloadPolicies(serviceName="+serviceName+", pluginId="+pluginId+", lastUpdatedTime="+lastUpdatedTime+")");
}

if (!isPolicyUpdated(serviceName, lastUpdatedTime)) {
Long latestEditTime = getLastEditTime(serviceName, lastUpdatedTime);
if (latestEditTime == null) {
return null;
}

ServicePolicies ret = policyTransformer.getPolicies(serviceName, pluginId, lastUpdatedTime);
ServicePolicies ret = policyTransformer.getPolicies(serviceName, pluginId, lastUpdatedTime, new Date(latestEditTime));

updateLastSync(serviceName);

Expand Down Expand Up @@ -184,7 +183,7 @@ private void updateLastSync(String serviceName) {
}
}

private boolean isPolicyUpdated(String serviceName, long lastUpdatedTime) {
private Long getLastEditTime(String serviceName, long lastUpdatedTime) {
AtlasPerfMetrics.MetricRecorder recorder = RequestContext.get().startMetricRecord("AuthRest.isPolicyUpdated." + serviceName);

List<String> entityUpdateToWatch = new ArrayList<>();
Expand All @@ -199,26 +198,41 @@ private boolean isPolicyUpdated(String serviceName, long lastUpdatedTime) {
mustClauseList.add(getMap("terms", getMap("typeName", entityUpdateToWatch)));

lastUpdatedTime = lastUpdatedTime == -1 ? 0 : lastUpdatedTime;
mustClauseList.add(getMap("range", getMap("timestamp", getMap("gte", lastUpdatedTime))));
mustClauseList.add(getMap("range", getMap("created", getMap("gte", lastUpdatedTime))));

dsl.put("query", getMap("bool", getMap("must", mustClauseList)));

List<Map<String, Object>> sortList = new ArrayList<>();
sortList.add(getMap("created", "desc"));
dsl.put("sort", sortList);

parameters.setDsl(dsl);
Long lastEditTime = 0L; // this timestamp is used to verify if the found policies are synced with any policy create or update op on cassandra

try {
EntityAuditSearchResult result = auditRepository.searchEvents(parameters.getQueryString());

if (result == null || CollectionUtils.isEmpty(result.getEntityAudits())) {
return false;
if (result != null) {
if (!CollectionUtils.isEmpty(result.getEntityAudits())) {
EntityAuditEventV2 lastAuditLog = result.getEntityAudits().get(0);
if (!EntityAuditEventV2.EntityAuditActionV2.getDeleteActions().contains(lastAuditLog.getAction()) &&
hr2904 marked this conversation as resolved.
Show resolved Hide resolved
lastAuditLog.getTypeName().equals(POLICY_ENTITY_TYPE)
) {
lastEditTime = lastAuditLog.getTimestamp();
} else {
LOG.info("ES_SYNC_FIX: {}: found delete action, so ignoring the last edit time: {}", serviceName, lastAuditLog.getTimestamp());
}
} else {
lastEditTime = null; // no edits found
}
}
} catch (AtlasBaseException e) {
LOG.error("ERROR in getPoliciesIfUpdated while fetching entity audits {}: ", e.getMessage());
return true;
} finally {
RequestContext.get().endMetricRecord(recorder);
LOG.info("Last edit time for service {} is {}, dsl: {}", serviceName, lastEditTime, dsl);
}

return true;
return lastEditTime;
}

private Map<String, Object> getMap(String key, Object value) {
Expand Down
Loading