From 3c5d0f430c04e5ee232dd1c58caf54995a87c09c Mon Sep 17 00:00:00 2001 From: Nikhil Soni Date: Wed, 7 Aug 2024 14:36:43 +0530 Subject: [PATCH 1/4] Check fetched policy time before updating cache --- .../atlas/plugin/util/PolicyRefresher.java | 2 +- .../CachePolicyTransformerImpl.java | 23 ++++++++++++++----- .../org/apache/atlas/web/rest/AuthREST.java | 9 ++++---- 3 files changed, 22 insertions(+), 12 deletions(-) diff --git a/auth-agents-common/src/main/java/org/apache/atlas/plugin/util/PolicyRefresher.java b/auth-agents-common/src/main/java/org/apache/atlas/plugin/util/PolicyRefresher.java index aae09a7d26..457aa02f0e 100644 --- a/auth-agents-common/src/main/java/org/apache/atlas/plugin/util/PolicyRefresher.java +++ b/auth-agents-common/src/main/java/org/apache/atlas/plugin/util/PolicyRefresher.java @@ -322,7 +322,7 @@ private ServicePolicies loadPolicyfromPolicyAdmin() throws RangerServiceNotFound svcPolicies = transformer.getPolicies(serviceName, restUtils.getPluginId(serviceName, plugIn.getAppId()), - lastUpdatedTiemInMillis); + lastUpdatedTiemInMillis, null); } else { svcPolicies = atlasAuthAdminClient.getServicePoliciesIfUpdated(lastUpdatedTiemInMillis); } diff --git a/auth-agents-common/src/main/java/org/apache/atlas/policytransformer/CachePolicyTransformerImpl.java b/auth-agents-common/src/main/java/org/apache/atlas/policytransformer/CachePolicyTransformerImpl.java index 6f27c49983..5aef7719f8 100644 --- a/auth-agents-common/src/main/java/org/apache/atlas/policytransformer/CachePolicyTransformerImpl.java +++ b/auth-agents-common/src/main/java/org/apache/atlas/policytransformer/CachePolicyTransformerImpl.java @@ -136,7 +136,7 @@ public AtlasEntityHeader getService() { return service; } - public ServicePolicies getPolicies(String serviceName, String pluginId, Long lastUpdatedTime) { + public ServicePolicies getPolicies(String serviceName, String pluginId, Long lastUpdatedTime, Date latestEditTime) { //TODO: return only if updated AtlasPerfMetrics.MetricRecorder recorder = RequestContext.get().startMetricRecord("CachePolicyTransformerImpl.getPolicies." + serviceName); @@ -150,7 +150,7 @@ public ServicePolicies getPolicies(String serviceName, String pluginId, Long las servicePolicies.setPolicyUpdateTime(new Date()); if (service != null) { - List allPolicies = getServicePolicies(service, 250); + List allPolicies = getServicePolicies(service, 250, latestEditTime); servicePolicies.setServiceName(serviceName); servicePolicies.setServiceId(service.getGuid()); @@ -164,7 +164,7 @@ public ServicePolicies getPolicies(String serviceName, String pluginId, Long las AtlasEntityHeader tagService = getServiceEntity(tagServiceName); if (tagService != null) { - allPolicies.addAll(getServicePolicies(tagService, 0)); + allPolicies.addAll(getServicePolicies(tagService, 0, latestEditTime)); TagPolicies tagPolicies = new TagPolicies(); @@ -202,13 +202,13 @@ public ServicePolicies getPolicies(String serviceName, String pluginId, Long las return servicePolicies; } - private List getServicePolicies(AtlasEntityHeader service, int batchSize) throws AtlasBaseException, IOException { + private List getServicePolicies(AtlasEntityHeader service, int batchSize, Date latestEditTime) throws AtlasBaseException, IOException { List servicePolicies = new ArrayList<>(); String serviceName = (String) service.getAttribute("name"); String serviceType = (String) service.getAttribute("authServiceType"); - List atlasPolicies = getAtlasPolicies(serviceName, batchSize); + List atlasPolicies = getAtlasPolicies(serviceName, batchSize, latestEditTime); if (CollectionUtils.isNotEmpty(atlasPolicies)) { //transform policies @@ -451,7 +451,7 @@ private List getPolicyValiditySchedule(AtlasEntityHeader return ret; } - private List getAtlasPolicies(String serviceName, int batchSize) throws AtlasBaseException { + private List getAtlasPolicies(String serviceName, int batchSize, Date latestEditTime) throws AtlasBaseException { AtlasPerfMetrics.MetricRecorder recorder = RequestContext.get().startMetricRecord("CachePolicyTransformerImpl."+service+".getAtlasPolicies"); List ret = new ArrayList<>(); @@ -516,6 +516,17 @@ private List getAtlasPolicies(String serviceName, int batchSi } while (found && ret.size() % size == 0); + boolean latestEditFound = false; + for (AtlasEntityHeader entity : ret) { + if (latestEditTime == null || entity.getUpdateTime().compareTo(latestEditTime) >= 0) { + latestEditFound = true; + break; + } + } + if (!latestEditFound) { + throw new AtlasBaseException("Latest edit not found yet"); + } + } finally { RequestContext.get().endMetricRecord(recorder); } diff --git a/webapp/src/main/java/org/apache/atlas/web/rest/AuthREST.java b/webapp/src/main/java/org/apache/atlas/web/rest/AuthREST.java index c5868cbfa3..8cb2f234d8 100644 --- a/webapp/src/main/java/org/apache/atlas/web/rest/AuthREST.java +++ b/webapp/src/main/java/org/apache/atlas/web/rest/AuthREST.java @@ -51,10 +51,7 @@ import javax.ws.rs.QueryParam; import javax.ws.rs.core.Context; import javax.ws.rs.core.MediaType; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; +import java.util.*; import static org.apache.atlas.policytransformer.CachePolicyTransformerImpl.ATTR_SERVICE_LAST_SYNC; import static org.apache.atlas.repository.Constants.PERSONA_ENTITY_TYPE; @@ -152,11 +149,13 @@ public ServicePolicies downloadPolicies(@PathParam("serviceName") final String s perf = AtlasPerfTracer.getPerfTracer(PERF_LOG, "AuthREST.downloadPolicies(serviceName="+serviceName+", pluginId="+pluginId+", lastUpdatedTime="+lastUpdatedTime+")"); } + Date latestEditTime = null; // TODO: get latest edit time from audit logs + if (!isPolicyUpdated(serviceName, lastUpdatedTime)) { return null; } - ServicePolicies ret = policyTransformer.getPolicies(serviceName, pluginId, lastUpdatedTime); + ServicePolicies ret = policyTransformer.getPolicies(serviceName, pluginId, lastUpdatedTime, latestEditTime); updateLastSync(serviceName); From 5e17f015246f5138a08016e442f6cfbb5dc67c32 Mon Sep 17 00:00:00 2001 From: Nikhil Soni Date: Wed, 7 Aug 2024 15:30:58 +0530 Subject: [PATCH 2/4] Calculate lastEditTime from audit logs --- .../atlas/model/audit/EntityAuditEventV2.java | 10 +++--- .../org/apache/atlas/web/rest/AuthREST.java | 32 +++++++++++++------ 2 files changed, 28 insertions(+), 14 deletions(-) diff --git a/intg/src/main/java/org/apache/atlas/model/audit/EntityAuditEventV2.java b/intg/src/main/java/org/apache/atlas/model/audit/EntityAuditEventV2.java index 9a4b03df73..50d33f9475 100644 --- a/intg/src/main/java/org/apache/atlas/model/audit/EntityAuditEventV2.java +++ b/intg/src/main/java/org/apache/atlas/model/audit/EntityAuditEventV2.java @@ -32,10 +32,7 @@ import javax.xml.bind.annotation.XmlAccessorType; import javax.xml.bind.annotation.XmlRootElement; import java.io.Serializable; -import java.util.Comparator; -import java.util.List; -import java.util.Map; -import java.util.Objects; +import java.util.*; import static com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility.NONE; import static com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility.PUBLIC_ONLY; @@ -119,6 +116,11 @@ public static EntityAuditActionV2 fromString(String strValue) { throw new IllegalArgumentException("No enum constant " + EntityAuditActionV2.class.getCanonicalName() + "." + strValue); } + + public static List getDeleteActions() { + return Arrays.asList(ENTITY_DELETE, ENTITY_PURGE, ENTITY_IMPORT_DELETE, CLASSIFICATION_DELETE, + PROPAGATED_CLASSIFICATION_DELETE, TERM_DELETE, LABEL_DELETE); + } } private String entityQualifiedName; diff --git a/webapp/src/main/java/org/apache/atlas/web/rest/AuthREST.java b/webapp/src/main/java/org/apache/atlas/web/rest/AuthREST.java index 8cb2f234d8..71888898a8 100644 --- a/webapp/src/main/java/org/apache/atlas/web/rest/AuthREST.java +++ b/webapp/src/main/java/org/apache/atlas/web/rest/AuthREST.java @@ -21,6 +21,7 @@ import org.apache.atlas.annotation.Timed; import org.apache.atlas.exception.AtlasBaseException; import org.apache.atlas.model.audit.AuditSearchParams; +import org.apache.atlas.model.audit.EntityAuditEventV2; import org.apache.atlas.model.audit.EntityAuditSearchResult; import org.apache.atlas.model.instance.AtlasEntity; import org.apache.atlas.plugin.util.KeycloakUserStore; @@ -149,13 +150,12 @@ public ServicePolicies downloadPolicies(@PathParam("serviceName") final String s perf = AtlasPerfTracer.getPerfTracer(PERF_LOG, "AuthREST.downloadPolicies(serviceName="+serviceName+", pluginId="+pluginId+", lastUpdatedTime="+lastUpdatedTime+")"); } - Date latestEditTime = null; // TODO: get latest edit time from audit logs - - if (!isPolicyUpdated(serviceName, lastUpdatedTime)) { + Long latestEditTime = getLastEditTime(serviceName, lastUpdatedTime); + if (latestEditTime == null) { return null; } - ServicePolicies ret = policyTransformer.getPolicies(serviceName, pluginId, lastUpdatedTime, latestEditTime); + ServicePolicies ret = policyTransformer.getPolicies(serviceName, pluginId, lastUpdatedTime, new Date(latestEditTime)); updateLastSync(serviceName); @@ -183,7 +183,7 @@ private void updateLastSync(String serviceName) { } } - private boolean isPolicyUpdated(String serviceName, long lastUpdatedTime) { + private Long getLastEditTime(String serviceName, long lastUpdatedTime) { AtlasPerfMetrics.MetricRecorder recorder = RequestContext.get().startMetricRecord("AuthRest.isPolicyUpdated." + serviceName); List entityUpdateToWatch = new ArrayList<>(); @@ -202,22 +202,34 @@ private boolean isPolicyUpdated(String serviceName, long lastUpdatedTime) { dsl.put("query", getMap("bool", getMap("must", mustClauseList))); + List> sortList = new ArrayList<>(); + sortList.add(getMap("timestamp", "desc")); + dsl.put("sort", sortList); + parameters.setDsl(dsl); + Long lastEditTime = 0L; try { EntityAuditSearchResult result = auditRepository.searchEvents(parameters.getQueryString()); - - if (result == null || CollectionUtils.isEmpty(result.getEntityAudits())) { - return false; + if (result != null) { + if (!CollectionUtils.isEmpty(result.getEntityAudits())) { + EntityAuditEventV2 lastAuditLog = result.getEntityAudits().get(0); + if (!EntityAuditEventV2.EntityAuditActionV2.getDeleteActions().contains(lastAuditLog.getAction())) { + lastEditTime = lastAuditLog.getTimestamp(); + } + } else { + lastEditTime = null; // no edits found + } } } catch (AtlasBaseException e) { LOG.error("ERROR in getPoliciesIfUpdated while fetching entity audits {}: ", e.getMessage()); - return true; + return lastEditTime; } finally { RequestContext.get().endMetricRecord(recorder); + LOG.info("Last edit time for service {} is {}, dsl: {}", serviceName, lastEditTime, dsl); } - return true; + return lastEditTime; } private Map getMap(String key, Object value) { From 19ad4957eae504b35dd92c4d3c326fa4ee6cfd60 Mon Sep 17 00:00:00 2001 From: Nikhil Soni Date: Wed, 7 Aug 2024 15:56:44 +0530 Subject: [PATCH 3/4] Try 3 times with a sleep if policies are missing --- .../CachePolicyTransformerImpl.java | 17 +++++++++++++++-- 1 file changed, 15 insertions(+), 2 deletions(-) diff --git a/auth-agents-common/src/main/java/org/apache/atlas/policytransformer/CachePolicyTransformerImpl.java b/auth-agents-common/src/main/java/org/apache/atlas/policytransformer/CachePolicyTransformerImpl.java index 5aef7719f8..8b45df5813 100644 --- a/auth-agents-common/src/main/java/org/apache/atlas/policytransformer/CachePolicyTransformerImpl.java +++ b/auth-agents-common/src/main/java/org/apache/atlas/policytransformer/CachePolicyTransformerImpl.java @@ -18,6 +18,7 @@ package org.apache.atlas.policytransformer; +import org.apache.atlas.AtlasErrorCode; import org.apache.atlas.AtlasException; import org.apache.atlas.RequestContext; import org.apache.atlas.discovery.EntityDiscoveryService; @@ -61,6 +62,7 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import static org.apache.atlas.repository.Constants.NAME; @@ -202,13 +204,24 @@ public ServicePolicies getPolicies(String serviceName, String pluginId, Long las return servicePolicies; } - private List getServicePolicies(AtlasEntityHeader service, int batchSize, Date latestEditTime) throws AtlasBaseException, IOException { + private List getServicePolicies(AtlasEntityHeader service, int batchSize, Date latestEditTime) throws AtlasBaseException, IOException, InterruptedException { List servicePolicies = new ArrayList<>(); + List atlasPolicies = new ArrayList<>(); String serviceName = (String) service.getAttribute("name"); String serviceType = (String) service.getAttribute("authServiceType"); - List atlasPolicies = getAtlasPolicies(serviceName, batchSize, latestEditTime); + + int maxAttempts = 3; + for (int attempt = 0; attempt < maxAttempts; attempt++) { + try { + atlasPolicies = getAtlasPolicies(serviceName, batchSize, latestEditTime); + break; + } catch (AtlasBaseException e) { + LOG.error("ERROR in getServicePolicies {}: ", e.getMessage(), e); + TimeUnit.SECONDS.sleep(2); + } + } if (CollectionUtils.isNotEmpty(atlasPolicies)) { //transform policies From 93b9baefa9f69149d98baba0bb5d517f2a4bb6d4 Mon Sep 17 00:00:00 2001 From: Nikhil Soni Date: Thu, 8 Aug 2024 15:02:30 +0530 Subject: [PATCH 4/4] Add more logs --- .../CachePolicyTransformerImpl.java | 22 ++++++++++++++----- .../org/apache/atlas/web/rest/AuthREST.java | 3 ++- 2 files changed, 18 insertions(+), 7 deletions(-) diff --git a/auth-agents-common/src/main/java/org/apache/atlas/policytransformer/CachePolicyTransformerImpl.java b/auth-agents-common/src/main/java/org/apache/atlas/policytransformer/CachePolicyTransformerImpl.java index 8b45df5813..c8dfc936a9 100644 --- a/auth-agents-common/src/main/java/org/apache/atlas/policytransformer/CachePolicyTransformerImpl.java +++ b/auth-agents-common/src/main/java/org/apache/atlas/policytransformer/CachePolicyTransformerImpl.java @@ -196,7 +196,7 @@ public ServicePolicies getPolicies(String serviceName, String pluginId, Long las } } catch (Exception e) { - LOG.error("ERROR in getPolicies {}: ", e); + LOG.error("ERROR in getPolicies: ", e); return null; } @@ -212,17 +212,19 @@ private List getServicePolicies(AtlasEntityHeader service, int bat String serviceName = (String) service.getAttribute("name"); String serviceType = (String) service.getAttribute("authServiceType"); - int maxAttempts = 3; + int maxAttempts = 5; + int sleepFor = 500; for (int attempt = 0; attempt < maxAttempts; attempt++) { try { atlasPolicies = getAtlasPolicies(serviceName, batchSize, latestEditTime); break; } catch (AtlasBaseException e) { - LOG.error("ERROR in getServicePolicies {}: ", e.getMessage(), e); - TimeUnit.SECONDS.sleep(2); + LOG.error("ERROR in getServicePolicies {}: ", e.getMessage()); + TimeUnit.MILLISECONDS.sleep(sleepFor); + sleepFor *= 2; } } - + LOG.info("Moving to transform policies, size: {}", atlasPolicies.size()); if (CollectionUtils.isNotEmpty(atlasPolicies)) { //transform policies servicePolicies = transformAtlasPoliciesToRangerPolicies(atlasPolicies, serviceType, serviceName); @@ -521,8 +523,10 @@ private List getAtlasPolicies(String serviceName, int batchSi List headers = discoveryService.directIndexSearch(indexSearchParams).getEntities(); if (headers != null) { ret.addAll(headers); + LOG.info("======= Found result with {} policies", headers.size()); } else { found = false; + LOG.info("======= Found result with null policies"); } from += size; @@ -530,13 +534,19 @@ private List getAtlasPolicies(String serviceName, int batchSi } while (found && ret.size() % size == 0); boolean latestEditFound = false; + Date latestEditTimeAvailable = null; for (AtlasEntityHeader entity : ret) { + LOG.info("Looping on returned policies: {}, size: {}", entity.getDisplayText(), ret.size()); if (latestEditTime == null || entity.getUpdateTime().compareTo(latestEditTime) >= 0) { + LOG.info("Found latest policy: {}, latestEditTime: {}, found policy time: {}", entity.getDisplayText(), latestEditTime, entity.getUpdateTime()); latestEditFound = true; break; } + latestEditTimeAvailable = entity.getUpdateTime(); + LOG.info("Checked for latest edit, entity: {}, latestEditTimeAvailable: {}", entity.getDisplayText(), latestEditTimeAvailable); } - if (!latestEditFound) { + if (latestEditTime != null && !latestEditFound) { + LOG.info("Latest edit not found yet, policies: {}, latestEditTime: {}, latestEditTimeAvailable: {}", ret.size(), latestEditTime, latestEditTimeAvailable); throw new AtlasBaseException("Latest edit not found yet"); } diff --git a/webapp/src/main/java/org/apache/atlas/web/rest/AuthREST.java b/webapp/src/main/java/org/apache/atlas/web/rest/AuthREST.java index 71888898a8..eeb8fd1add 100644 --- a/webapp/src/main/java/org/apache/atlas/web/rest/AuthREST.java +++ b/webapp/src/main/java/org/apache/atlas/web/rest/AuthREST.java @@ -216,6 +216,8 @@ private Long getLastEditTime(String serviceName, long lastUpdatedTime) { EntityAuditEventV2 lastAuditLog = result.getEntityAudits().get(0); if (!EntityAuditEventV2.EntityAuditActionV2.getDeleteActions().contains(lastAuditLog.getAction())) { lastEditTime = lastAuditLog.getTimestamp(); + } else { + LOG.info("found delete action, so ignoring the last edit time: {}", lastAuditLog.getTimestamp()); } } else { lastEditTime = null; // no edits found @@ -223,7 +225,6 @@ private Long getLastEditTime(String serviceName, long lastUpdatedTime) { } } catch (AtlasBaseException e) { LOG.error("ERROR in getPoliciesIfUpdated while fetching entity audits {}: ", e.getMessage()); - return lastEditTime; } finally { RequestContext.get().endMetricRecord(recorder); LOG.info("Last edit time for service {} is {}, dsl: {}", serviceName, lastEditTime, dsl);