Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DG-1735 | Fix for policy updates getting missed by PolicyRefresher #3386

Merged
merged 5 commits into from
Aug 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -322,7 +322,7 @@ private ServicePolicies loadPolicyfromPolicyAdmin() throws RangerServiceNotFound

svcPolicies = transformer.getPolicies(serviceName,
restUtils.getPluginId(serviceName, plugIn.getAppId()),
lastUpdatedTiemInMillis);
lastUpdatedTiemInMillis, null);
} else {
svcPolicies = atlasAuthAdminClient.getServicePoliciesIfUpdated(lastUpdatedTiemInMillis);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.atlas.policytransformer;

import org.apache.atlas.AtlasErrorCode;
import org.apache.atlas.AtlasException;
import org.apache.atlas.RequestContext;
import org.apache.atlas.discovery.EntityDiscoveryService;
Expand Down Expand Up @@ -61,6 +62,7 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

import static org.apache.atlas.repository.Constants.NAME;
Expand Down Expand Up @@ -136,7 +138,7 @@ public AtlasEntityHeader getService() {
return service;
}

public ServicePolicies getPolicies(String serviceName, String pluginId, Long lastUpdatedTime) {
public ServicePolicies getPolicies(String serviceName, String pluginId, Long lastUpdatedTime, Date latestEditTime) {
//TODO: return only if updated
AtlasPerfMetrics.MetricRecorder recorder = RequestContext.get().startMetricRecord("CachePolicyTransformerImpl.getPolicies." + serviceName);

Expand All @@ -150,7 +152,7 @@ public ServicePolicies getPolicies(String serviceName, String pluginId, Long las
servicePolicies.setPolicyUpdateTime(new Date());

if (service != null) {
List<RangerPolicy> allPolicies = getServicePolicies(service, 250);
List<RangerPolicy> allPolicies = getServicePolicies(service, 250, latestEditTime);
servicePolicies.setServiceName(serviceName);
servicePolicies.setServiceId(service.getGuid());

Expand All @@ -164,7 +166,7 @@ public ServicePolicies getPolicies(String serviceName, String pluginId, Long las
AtlasEntityHeader tagService = getServiceEntity(tagServiceName);

if (tagService != null) {
allPolicies.addAll(getServicePolicies(tagService, 0));
allPolicies.addAll(getServicePolicies(tagService, 0, latestEditTime));

TagPolicies tagPolicies = new TagPolicies();

Expand Down Expand Up @@ -194,22 +196,35 @@ public ServicePolicies getPolicies(String serviceName, String pluginId, Long las
}

} catch (Exception e) {
LOG.error("ERROR in getPolicies {}: ", e);
LOG.error("ERROR in getPolicies: ", e);
return null;
}

RequestContext.get().endMetricRecord(recorder);
return servicePolicies;
}

private List<RangerPolicy> getServicePolicies(AtlasEntityHeader service, int batchSize) throws AtlasBaseException, IOException {
private List<RangerPolicy> getServicePolicies(AtlasEntityHeader service, int batchSize, Date latestEditTime) throws AtlasBaseException, IOException, InterruptedException {

List<RangerPolicy> servicePolicies = new ArrayList<>();
List<AtlasEntityHeader> atlasPolicies = new ArrayList<>();

String serviceName = (String) service.getAttribute("name");
String serviceType = (String) service.getAttribute("authServiceType");
List<AtlasEntityHeader> atlasPolicies = getAtlasPolicies(serviceName, batchSize);

int maxAttempts = 5;
int sleepFor = 500;
for (int attempt = 0; attempt < maxAttempts; attempt++) {
try {
atlasPolicies = getAtlasPolicies(serviceName, batchSize, latestEditTime);
break;
} catch (AtlasBaseException e) {
LOG.error("ERROR in getServicePolicies {}: ", e.getMessage());
TimeUnit.MILLISECONDS.sleep(sleepFor);
sleepFor *= 2;
}
}
LOG.info("Moving to transform policies, size: {}", atlasPolicies.size());
if (CollectionUtils.isNotEmpty(atlasPolicies)) {
//transform policies
servicePolicies = transformAtlasPoliciesToRangerPolicies(atlasPolicies, serviceType, serviceName);
Expand Down Expand Up @@ -451,7 +466,7 @@ private List<RangerValiditySchedule> getPolicyValiditySchedule(AtlasEntityHeader
return ret;
}

private List<AtlasEntityHeader> getAtlasPolicies(String serviceName, int batchSize) throws AtlasBaseException {
private List<AtlasEntityHeader> getAtlasPolicies(String serviceName, int batchSize, Date latestEditTime) throws AtlasBaseException {
AtlasPerfMetrics.MetricRecorder recorder = RequestContext.get().startMetricRecord("CachePolicyTransformerImpl."+service+".getAtlasPolicies");

List<AtlasEntityHeader> ret = new ArrayList<>();
Expand Down Expand Up @@ -508,14 +523,33 @@ private List<AtlasEntityHeader> getAtlasPolicies(String serviceName, int batchSi
List<AtlasEntityHeader> headers = discoveryService.directIndexSearch(indexSearchParams).getEntities();
if (headers != null) {
ret.addAll(headers);
LOG.info("======= Found result with {} policies", headers.size());
} else {
found = false;
LOG.info("======= Found result with null policies");
}

from += size;

} while (found && ret.size() % size == 0);

boolean latestEditFound = false;
Date latestEditTimeAvailable = null;
for (AtlasEntityHeader entity : ret) {
LOG.info("Looping on returned policies: {}, size: {}", entity.getDisplayText(), ret.size());
if (latestEditTime == null || entity.getUpdateTime().compareTo(latestEditTime) >= 0) {
LOG.info("Found latest policy: {}, latestEditTime: {}, found policy time: {}", entity.getDisplayText(), latestEditTime, entity.getUpdateTime());
latestEditFound = true;
break;
}
latestEditTimeAvailable = entity.getUpdateTime();
LOG.info("Checked for latest edit, entity: {}, latestEditTimeAvailable: {}", entity.getDisplayText(), latestEditTimeAvailable);
}
if (latestEditTime != null && !latestEditFound) {
LOG.info("Latest edit not found yet, policies: {}, latestEditTime: {}, latestEditTimeAvailable: {}", ret.size(), latestEditTime, latestEditTimeAvailable);
throw new AtlasBaseException("Latest edit not found yet");
}

} finally {
RequestContext.get().endMetricRecord(recorder);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,7 @@
import javax.xml.bind.annotation.XmlAccessorType;
import javax.xml.bind.annotation.XmlRootElement;
import java.io.Serializable;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.*;

import static com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility.NONE;
import static com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility.PUBLIC_ONLY;
Expand Down Expand Up @@ -119,6 +116,11 @@ public static EntityAuditActionV2 fromString(String strValue) {

throw new IllegalArgumentException("No enum constant " + EntityAuditActionV2.class.getCanonicalName() + "." + strValue);
}

public static List<EntityAuditActionV2> getDeleteActions() {
return Arrays.asList(ENTITY_DELETE, ENTITY_PURGE, ENTITY_IMPORT_DELETE, CLASSIFICATION_DELETE,
PROPAGATED_CLASSIFICATION_DELETE, TERM_DELETE, LABEL_DELETE);
}
}

private String entityQualifiedName;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,12 +45,12 @@ public class DataContract {
public String dataset;
public DatasetType type;
public String description;
public List<String> owners;
public Object owners;
public List<BusinessTag> tags;
public String certificate;
@Valid
public List<Column> columns;
private final Map<String, Object> unknownFields = new HashMap<>();
private final Map<String, Object> unknownFields = new HashMap<>();

public enum Status {
@JsonProperty("DRAFT") DRAFT,
Expand Down Expand Up @@ -146,7 +146,7 @@ public void setType(String type) throws AtlasBaseException {
}
}

public void setOwners(List<String> owners) {
public void setOwners(Object owners) {
this.owners = owners;
}

Expand Down
36 changes: 24 additions & 12 deletions webapp/src/main/java/org/apache/atlas/web/rest/AuthREST.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.atlas.annotation.Timed;
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.model.audit.AuditSearchParams;
import org.apache.atlas.model.audit.EntityAuditEventV2;
import org.apache.atlas.model.audit.EntityAuditSearchResult;
import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.atlas.plugin.util.KeycloakUserStore;
Expand Down Expand Up @@ -51,10 +52,7 @@
import javax.ws.rs.QueryParam;
import javax.ws.rs.core.Context;
import javax.ws.rs.core.MediaType;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.*;

import static org.apache.atlas.policytransformer.CachePolicyTransformerImpl.ATTR_SERVICE_LAST_SYNC;
import static org.apache.atlas.repository.Constants.PERSONA_ENTITY_TYPE;
Expand Down Expand Up @@ -152,11 +150,12 @@ public ServicePolicies downloadPolicies(@PathParam("serviceName") final String s
perf = AtlasPerfTracer.getPerfTracer(PERF_LOG, "AuthREST.downloadPolicies(serviceName="+serviceName+", pluginId="+pluginId+", lastUpdatedTime="+lastUpdatedTime+")");
}

if (!isPolicyUpdated(serviceName, lastUpdatedTime)) {
Long latestEditTime = getLastEditTime(serviceName, lastUpdatedTime);
if (latestEditTime == null) {
return null;
}

ServicePolicies ret = policyTransformer.getPolicies(serviceName, pluginId, lastUpdatedTime);
ServicePolicies ret = policyTransformer.getPolicies(serviceName, pluginId, lastUpdatedTime, new Date(latestEditTime));

updateLastSync(serviceName);

Expand Down Expand Up @@ -184,7 +183,7 @@ private void updateLastSync(String serviceName) {
}
}

private boolean isPolicyUpdated(String serviceName, long lastUpdatedTime) {
private Long getLastEditTime(String serviceName, long lastUpdatedTime) {
AtlasPerfMetrics.MetricRecorder recorder = RequestContext.get().startMetricRecord("AuthRest.isPolicyUpdated." + serviceName);

List<String> entityUpdateToWatch = new ArrayList<>();
Expand All @@ -203,22 +202,35 @@ private boolean isPolicyUpdated(String serviceName, long lastUpdatedTime) {

dsl.put("query", getMap("bool", getMap("must", mustClauseList)));

List<Map<String, Object>> sortList = new ArrayList<>();
sortList.add(getMap("timestamp", "desc"));
dsl.put("sort", sortList);

parameters.setDsl(dsl);
Long lastEditTime = 0L;

try {
EntityAuditSearchResult result = auditRepository.searchEvents(parameters.getQueryString());

if (result == null || CollectionUtils.isEmpty(result.getEntityAudits())) {
return false;
if (result != null) {
if (!CollectionUtils.isEmpty(result.getEntityAudits())) {
EntityAuditEventV2 lastAuditLog = result.getEntityAudits().get(0);
if (!EntityAuditEventV2.EntityAuditActionV2.getDeleteActions().contains(lastAuditLog.getAction())) {
lastEditTime = lastAuditLog.getTimestamp();
} else {
LOG.info("found delete action, so ignoring the last edit time: {}", lastAuditLog.getTimestamp());
}
} else {
lastEditTime = null; // no edits found
}
}
} catch (AtlasBaseException e) {
LOG.error("ERROR in getPoliciesIfUpdated while fetching entity audits {}: ", e.getMessage());
return true;
} finally {
RequestContext.get().endMetricRecord(recorder);
LOG.info("Last edit time for service {} is {}, dsl: {}", serviceName, lastEditTime, dsl);
}

return true;
return lastEditTime;
}

private Map<String, Object> getMap(String key, Object value) {
Expand Down
Loading