From 3c5d0f430c04e5ee232dd1c58caf54995a87c09c Mon Sep 17 00:00:00 2001 From: Nikhil Soni Date: Wed, 7 Aug 2024 14:36:43 +0530 Subject: [PATCH 01/10] Check fetched policy time before updating cache --- .../atlas/plugin/util/PolicyRefresher.java | 2 +- .../CachePolicyTransformerImpl.java | 23 ++++++++++++++----- .../org/apache/atlas/web/rest/AuthREST.java | 9 ++++---- 3 files changed, 22 insertions(+), 12 deletions(-) diff --git a/auth-agents-common/src/main/java/org/apache/atlas/plugin/util/PolicyRefresher.java b/auth-agents-common/src/main/java/org/apache/atlas/plugin/util/PolicyRefresher.java index aae09a7d26..457aa02f0e 100644 --- a/auth-agents-common/src/main/java/org/apache/atlas/plugin/util/PolicyRefresher.java +++ b/auth-agents-common/src/main/java/org/apache/atlas/plugin/util/PolicyRefresher.java @@ -322,7 +322,7 @@ private ServicePolicies loadPolicyfromPolicyAdmin() throws RangerServiceNotFound svcPolicies = transformer.getPolicies(serviceName, restUtils.getPluginId(serviceName, plugIn.getAppId()), - lastUpdatedTiemInMillis); + lastUpdatedTiemInMillis, null); } else { svcPolicies = atlasAuthAdminClient.getServicePoliciesIfUpdated(lastUpdatedTiemInMillis); } diff --git a/auth-agents-common/src/main/java/org/apache/atlas/policytransformer/CachePolicyTransformerImpl.java b/auth-agents-common/src/main/java/org/apache/atlas/policytransformer/CachePolicyTransformerImpl.java index 6f27c49983..5aef7719f8 100644 --- a/auth-agents-common/src/main/java/org/apache/atlas/policytransformer/CachePolicyTransformerImpl.java +++ b/auth-agents-common/src/main/java/org/apache/atlas/policytransformer/CachePolicyTransformerImpl.java @@ -136,7 +136,7 @@ public AtlasEntityHeader getService() { return service; } - public ServicePolicies getPolicies(String serviceName, String pluginId, Long lastUpdatedTime) { + public ServicePolicies getPolicies(String serviceName, String pluginId, Long lastUpdatedTime, Date latestEditTime) { //TODO: return only if updated AtlasPerfMetrics.MetricRecorder recorder = RequestContext.get().startMetricRecord("CachePolicyTransformerImpl.getPolicies." + serviceName); @@ -150,7 +150,7 @@ public ServicePolicies getPolicies(String serviceName, String pluginId, Long las servicePolicies.setPolicyUpdateTime(new Date()); if (service != null) { - List allPolicies = getServicePolicies(service, 250); + List allPolicies = getServicePolicies(service, 250, latestEditTime); servicePolicies.setServiceName(serviceName); servicePolicies.setServiceId(service.getGuid()); @@ -164,7 +164,7 @@ public ServicePolicies getPolicies(String serviceName, String pluginId, Long las AtlasEntityHeader tagService = getServiceEntity(tagServiceName); if (tagService != null) { - allPolicies.addAll(getServicePolicies(tagService, 0)); + allPolicies.addAll(getServicePolicies(tagService, 0, latestEditTime)); TagPolicies tagPolicies = new TagPolicies(); @@ -202,13 +202,13 @@ public ServicePolicies getPolicies(String serviceName, String pluginId, Long las return servicePolicies; } - private List getServicePolicies(AtlasEntityHeader service, int batchSize) throws AtlasBaseException, IOException { + private List getServicePolicies(AtlasEntityHeader service, int batchSize, Date latestEditTime) throws AtlasBaseException, IOException { List servicePolicies = new ArrayList<>(); String serviceName = (String) service.getAttribute("name"); String serviceType = (String) service.getAttribute("authServiceType"); - List atlasPolicies = getAtlasPolicies(serviceName, batchSize); + List atlasPolicies = getAtlasPolicies(serviceName, batchSize, latestEditTime); if (CollectionUtils.isNotEmpty(atlasPolicies)) { //transform policies @@ -451,7 +451,7 @@ private List getPolicyValiditySchedule(AtlasEntityHeader return ret; } - private List getAtlasPolicies(String serviceName, int batchSize) throws AtlasBaseException { + private List getAtlasPolicies(String serviceName, int batchSize, Date latestEditTime) throws AtlasBaseException { AtlasPerfMetrics.MetricRecorder recorder = RequestContext.get().startMetricRecord("CachePolicyTransformerImpl."+service+".getAtlasPolicies"); List ret = new ArrayList<>(); @@ -516,6 +516,17 @@ private List getAtlasPolicies(String serviceName, int batchSi } while (found && ret.size() % size == 0); + boolean latestEditFound = false; + for (AtlasEntityHeader entity : ret) { + if (latestEditTime == null || entity.getUpdateTime().compareTo(latestEditTime) >= 0) { + latestEditFound = true; + break; + } + } + if (!latestEditFound) { + throw new AtlasBaseException("Latest edit not found yet"); + } + } finally { RequestContext.get().endMetricRecord(recorder); } diff --git a/webapp/src/main/java/org/apache/atlas/web/rest/AuthREST.java b/webapp/src/main/java/org/apache/atlas/web/rest/AuthREST.java index c5868cbfa3..8cb2f234d8 100644 --- a/webapp/src/main/java/org/apache/atlas/web/rest/AuthREST.java +++ b/webapp/src/main/java/org/apache/atlas/web/rest/AuthREST.java @@ -51,10 +51,7 @@ import javax.ws.rs.QueryParam; import javax.ws.rs.core.Context; import javax.ws.rs.core.MediaType; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; +import java.util.*; import static org.apache.atlas.policytransformer.CachePolicyTransformerImpl.ATTR_SERVICE_LAST_SYNC; import static org.apache.atlas.repository.Constants.PERSONA_ENTITY_TYPE; @@ -152,11 +149,13 @@ public ServicePolicies downloadPolicies(@PathParam("serviceName") final String s perf = AtlasPerfTracer.getPerfTracer(PERF_LOG, "AuthREST.downloadPolicies(serviceName="+serviceName+", pluginId="+pluginId+", lastUpdatedTime="+lastUpdatedTime+")"); } + Date latestEditTime = null; // TODO: get latest edit time from audit logs + if (!isPolicyUpdated(serviceName, lastUpdatedTime)) { return null; } - ServicePolicies ret = policyTransformer.getPolicies(serviceName, pluginId, lastUpdatedTime); + ServicePolicies ret = policyTransformer.getPolicies(serviceName, pluginId, lastUpdatedTime, latestEditTime); updateLastSync(serviceName); From 5e17f015246f5138a08016e442f6cfbb5dc67c32 Mon Sep 17 00:00:00 2001 From: Nikhil Soni Date: Wed, 7 Aug 2024 15:30:58 +0530 Subject: [PATCH 02/10] Calculate lastEditTime from audit logs --- .../atlas/model/audit/EntityAuditEventV2.java | 10 +++--- .../org/apache/atlas/web/rest/AuthREST.java | 32 +++++++++++++------ 2 files changed, 28 insertions(+), 14 deletions(-) diff --git a/intg/src/main/java/org/apache/atlas/model/audit/EntityAuditEventV2.java b/intg/src/main/java/org/apache/atlas/model/audit/EntityAuditEventV2.java index 9a4b03df73..50d33f9475 100644 --- a/intg/src/main/java/org/apache/atlas/model/audit/EntityAuditEventV2.java +++ b/intg/src/main/java/org/apache/atlas/model/audit/EntityAuditEventV2.java @@ -32,10 +32,7 @@ import javax.xml.bind.annotation.XmlAccessorType; import javax.xml.bind.annotation.XmlRootElement; import java.io.Serializable; -import java.util.Comparator; -import java.util.List; -import java.util.Map; -import java.util.Objects; +import java.util.*; import static com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility.NONE; import static com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility.PUBLIC_ONLY; @@ -119,6 +116,11 @@ public static EntityAuditActionV2 fromString(String strValue) { throw new IllegalArgumentException("No enum constant " + EntityAuditActionV2.class.getCanonicalName() + "." + strValue); } + + public static List getDeleteActions() { + return Arrays.asList(ENTITY_DELETE, ENTITY_PURGE, ENTITY_IMPORT_DELETE, CLASSIFICATION_DELETE, + PROPAGATED_CLASSIFICATION_DELETE, TERM_DELETE, LABEL_DELETE); + } } private String entityQualifiedName; diff --git a/webapp/src/main/java/org/apache/atlas/web/rest/AuthREST.java b/webapp/src/main/java/org/apache/atlas/web/rest/AuthREST.java index 8cb2f234d8..71888898a8 100644 --- a/webapp/src/main/java/org/apache/atlas/web/rest/AuthREST.java +++ b/webapp/src/main/java/org/apache/atlas/web/rest/AuthREST.java @@ -21,6 +21,7 @@ import org.apache.atlas.annotation.Timed; import org.apache.atlas.exception.AtlasBaseException; import org.apache.atlas.model.audit.AuditSearchParams; +import org.apache.atlas.model.audit.EntityAuditEventV2; import org.apache.atlas.model.audit.EntityAuditSearchResult; import org.apache.atlas.model.instance.AtlasEntity; import org.apache.atlas.plugin.util.KeycloakUserStore; @@ -149,13 +150,12 @@ public ServicePolicies downloadPolicies(@PathParam("serviceName") final String s perf = AtlasPerfTracer.getPerfTracer(PERF_LOG, "AuthREST.downloadPolicies(serviceName="+serviceName+", pluginId="+pluginId+", lastUpdatedTime="+lastUpdatedTime+")"); } - Date latestEditTime = null; // TODO: get latest edit time from audit logs - - if (!isPolicyUpdated(serviceName, lastUpdatedTime)) { + Long latestEditTime = getLastEditTime(serviceName, lastUpdatedTime); + if (latestEditTime == null) { return null; } - ServicePolicies ret = policyTransformer.getPolicies(serviceName, pluginId, lastUpdatedTime, latestEditTime); + ServicePolicies ret = policyTransformer.getPolicies(serviceName, pluginId, lastUpdatedTime, new Date(latestEditTime)); updateLastSync(serviceName); @@ -183,7 +183,7 @@ private void updateLastSync(String serviceName) { } } - private boolean isPolicyUpdated(String serviceName, long lastUpdatedTime) { + private Long getLastEditTime(String serviceName, long lastUpdatedTime) { AtlasPerfMetrics.MetricRecorder recorder = RequestContext.get().startMetricRecord("AuthRest.isPolicyUpdated." + serviceName); List entityUpdateToWatch = new ArrayList<>(); @@ -202,22 +202,34 @@ private boolean isPolicyUpdated(String serviceName, long lastUpdatedTime) { dsl.put("query", getMap("bool", getMap("must", mustClauseList))); + List> sortList = new ArrayList<>(); + sortList.add(getMap("timestamp", "desc")); + dsl.put("sort", sortList); + parameters.setDsl(dsl); + Long lastEditTime = 0L; try { EntityAuditSearchResult result = auditRepository.searchEvents(parameters.getQueryString()); - - if (result == null || CollectionUtils.isEmpty(result.getEntityAudits())) { - return false; + if (result != null) { + if (!CollectionUtils.isEmpty(result.getEntityAudits())) { + EntityAuditEventV2 lastAuditLog = result.getEntityAudits().get(0); + if (!EntityAuditEventV2.EntityAuditActionV2.getDeleteActions().contains(lastAuditLog.getAction())) { + lastEditTime = lastAuditLog.getTimestamp(); + } + } else { + lastEditTime = null; // no edits found + } } } catch (AtlasBaseException e) { LOG.error("ERROR in getPoliciesIfUpdated while fetching entity audits {}: ", e.getMessage()); - return true; + return lastEditTime; } finally { RequestContext.get().endMetricRecord(recorder); + LOG.info("Last edit time for service {} is {}, dsl: {}", serviceName, lastEditTime, dsl); } - return true; + return lastEditTime; } private Map getMap(String key, Object value) { From 19ad4957eae504b35dd92c4d3c326fa4ee6cfd60 Mon Sep 17 00:00:00 2001 From: Nikhil Soni Date: Wed, 7 Aug 2024 15:56:44 +0530 Subject: [PATCH 03/10] Try 3 times with a sleep if policies are missing --- .../CachePolicyTransformerImpl.java | 17 +++++++++++++++-- 1 file changed, 15 insertions(+), 2 deletions(-) diff --git a/auth-agents-common/src/main/java/org/apache/atlas/policytransformer/CachePolicyTransformerImpl.java b/auth-agents-common/src/main/java/org/apache/atlas/policytransformer/CachePolicyTransformerImpl.java index 5aef7719f8..8b45df5813 100644 --- a/auth-agents-common/src/main/java/org/apache/atlas/policytransformer/CachePolicyTransformerImpl.java +++ b/auth-agents-common/src/main/java/org/apache/atlas/policytransformer/CachePolicyTransformerImpl.java @@ -18,6 +18,7 @@ package org.apache.atlas.policytransformer; +import org.apache.atlas.AtlasErrorCode; import org.apache.atlas.AtlasException; import org.apache.atlas.RequestContext; import org.apache.atlas.discovery.EntityDiscoveryService; @@ -61,6 +62,7 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import static org.apache.atlas.repository.Constants.NAME; @@ -202,13 +204,24 @@ public ServicePolicies getPolicies(String serviceName, String pluginId, Long las return servicePolicies; } - private List getServicePolicies(AtlasEntityHeader service, int batchSize, Date latestEditTime) throws AtlasBaseException, IOException { + private List getServicePolicies(AtlasEntityHeader service, int batchSize, Date latestEditTime) throws AtlasBaseException, IOException, InterruptedException { List servicePolicies = new ArrayList<>(); + List atlasPolicies = new ArrayList<>(); String serviceName = (String) service.getAttribute("name"); String serviceType = (String) service.getAttribute("authServiceType"); - List atlasPolicies = getAtlasPolicies(serviceName, batchSize, latestEditTime); + + int maxAttempts = 3; + for (int attempt = 0; attempt < maxAttempts; attempt++) { + try { + atlasPolicies = getAtlasPolicies(serviceName, batchSize, latestEditTime); + break; + } catch (AtlasBaseException e) { + LOG.error("ERROR in getServicePolicies {}: ", e.getMessage(), e); + TimeUnit.SECONDS.sleep(2); + } + } if (CollectionUtils.isNotEmpty(atlasPolicies)) { //transform policies From 93b9baefa9f69149d98baba0bb5d517f2a4bb6d4 Mon Sep 17 00:00:00 2001 From: Nikhil Soni Date: Thu, 8 Aug 2024 15:02:30 +0530 Subject: [PATCH 04/10] Add more logs --- .../CachePolicyTransformerImpl.java | 22 ++++++++++++++----- .../org/apache/atlas/web/rest/AuthREST.java | 3 ++- 2 files changed, 18 insertions(+), 7 deletions(-) diff --git a/auth-agents-common/src/main/java/org/apache/atlas/policytransformer/CachePolicyTransformerImpl.java b/auth-agents-common/src/main/java/org/apache/atlas/policytransformer/CachePolicyTransformerImpl.java index 8b45df5813..c8dfc936a9 100644 --- a/auth-agents-common/src/main/java/org/apache/atlas/policytransformer/CachePolicyTransformerImpl.java +++ b/auth-agents-common/src/main/java/org/apache/atlas/policytransformer/CachePolicyTransformerImpl.java @@ -196,7 +196,7 @@ public ServicePolicies getPolicies(String serviceName, String pluginId, Long las } } catch (Exception e) { - LOG.error("ERROR in getPolicies {}: ", e); + LOG.error("ERROR in getPolicies: ", e); return null; } @@ -212,17 +212,19 @@ private List getServicePolicies(AtlasEntityHeader service, int bat String serviceName = (String) service.getAttribute("name"); String serviceType = (String) service.getAttribute("authServiceType"); - int maxAttempts = 3; + int maxAttempts = 5; + int sleepFor = 500; for (int attempt = 0; attempt < maxAttempts; attempt++) { try { atlasPolicies = getAtlasPolicies(serviceName, batchSize, latestEditTime); break; } catch (AtlasBaseException e) { - LOG.error("ERROR in getServicePolicies {}: ", e.getMessage(), e); - TimeUnit.SECONDS.sleep(2); + LOG.error("ERROR in getServicePolicies {}: ", e.getMessage()); + TimeUnit.MILLISECONDS.sleep(sleepFor); + sleepFor *= 2; } } - + LOG.info("Moving to transform policies, size: {}", atlasPolicies.size()); if (CollectionUtils.isNotEmpty(atlasPolicies)) { //transform policies servicePolicies = transformAtlasPoliciesToRangerPolicies(atlasPolicies, serviceType, serviceName); @@ -521,8 +523,10 @@ private List getAtlasPolicies(String serviceName, int batchSi List headers = discoveryService.directIndexSearch(indexSearchParams).getEntities(); if (headers != null) { ret.addAll(headers); + LOG.info("======= Found result with {} policies", headers.size()); } else { found = false; + LOG.info("======= Found result with null policies"); } from += size; @@ -530,13 +534,19 @@ private List getAtlasPolicies(String serviceName, int batchSi } while (found && ret.size() % size == 0); boolean latestEditFound = false; + Date latestEditTimeAvailable = null; for (AtlasEntityHeader entity : ret) { + LOG.info("Looping on returned policies: {}, size: {}", entity.getDisplayText(), ret.size()); if (latestEditTime == null || entity.getUpdateTime().compareTo(latestEditTime) >= 0) { + LOG.info("Found latest policy: {}, latestEditTime: {}, found policy time: {}", entity.getDisplayText(), latestEditTime, entity.getUpdateTime()); latestEditFound = true; break; } + latestEditTimeAvailable = entity.getUpdateTime(); + LOG.info("Checked for latest edit, entity: {}, latestEditTimeAvailable: {}", entity.getDisplayText(), latestEditTimeAvailable); } - if (!latestEditFound) { + if (latestEditTime != null && !latestEditFound) { + LOG.info("Latest edit not found yet, policies: {}, latestEditTime: {}, latestEditTimeAvailable: {}", ret.size(), latestEditTime, latestEditTimeAvailable); throw new AtlasBaseException("Latest edit not found yet"); } diff --git a/webapp/src/main/java/org/apache/atlas/web/rest/AuthREST.java b/webapp/src/main/java/org/apache/atlas/web/rest/AuthREST.java index 71888898a8..eeb8fd1add 100644 --- a/webapp/src/main/java/org/apache/atlas/web/rest/AuthREST.java +++ b/webapp/src/main/java/org/apache/atlas/web/rest/AuthREST.java @@ -216,6 +216,8 @@ private Long getLastEditTime(String serviceName, long lastUpdatedTime) { EntityAuditEventV2 lastAuditLog = result.getEntityAudits().get(0); if (!EntityAuditEventV2.EntityAuditActionV2.getDeleteActions().contains(lastAuditLog.getAction())) { lastEditTime = lastAuditLog.getTimestamp(); + } else { + LOG.info("found delete action, so ignoring the last edit time: {}", lastAuditLog.getTimestamp()); } } else { lastEditTime = null; // no edits found @@ -223,7 +225,6 @@ private Long getLastEditTime(String serviceName, long lastUpdatedTime) { } } catch (AtlasBaseException e) { LOG.error("ERROR in getPoliciesIfUpdated while fetching entity audits {}: ", e.getMessage()); - return lastEditTime; } finally { RequestContext.get().endMetricRecord(recorder); LOG.info("Last edit time for service {} is {}, dsl: {}", serviceName, lastEditTime, dsl); From f12370b7f217a67651866df0c0aac52eb93101b6 Mon Sep 17 00:00:00 2001 From: Nikhil Soni Date: Mon, 12 Aug 2024 18:42:35 +0530 Subject: [PATCH 05/10] Add check for atlas service on retry of policy refresh --- .../CachePolicyTransformerImpl.java | 49 ++++++++----------- 1 file changed, 21 insertions(+), 28 deletions(-) diff --git a/auth-agents-common/src/main/java/org/apache/atlas/policytransformer/CachePolicyTransformerImpl.java b/auth-agents-common/src/main/java/org/apache/atlas/policytransformer/CachePolicyTransformerImpl.java index c8dfc936a9..63e8806157 100644 --- a/auth-agents-common/src/main/java/org/apache/atlas/policytransformer/CachePolicyTransformerImpl.java +++ b/auth-agents-common/src/main/java/org/apache/atlas/policytransformer/CachePolicyTransformerImpl.java @@ -53,15 +53,7 @@ import javax.inject.Inject; import java.io.IOException; import java.io.InputStream; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; -import java.util.Date; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; +import java.util.*; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; @@ -219,12 +211,12 @@ private List getServicePolicies(AtlasEntityHeader service, int bat atlasPolicies = getAtlasPolicies(serviceName, batchSize, latestEditTime); break; } catch (AtlasBaseException e) { - LOG.error("ERROR in getServicePolicies {}: ", e.getMessage()); + LOG.error("ES_SYNC_FIX: {}: ERROR in getServicePolicies: {}", serviceName, e.getMessage()); TimeUnit.MILLISECONDS.sleep(sleepFor); sleepFor *= 2; } } - LOG.info("Moving to transform policies, size: {}", atlasPolicies.size()); + LOG.info("ES_SYNC_FIX: {}: Moving to transform policies, size: {}", serviceName, atlasPolicies.size()); if (CollectionUtils.isNotEmpty(atlasPolicies)) { //transform policies servicePolicies = transformAtlasPoliciesToRangerPolicies(atlasPolicies, serviceType, serviceName); @@ -523,31 +515,32 @@ private List getAtlasPolicies(String serviceName, int batchSi List headers = discoveryService.directIndexSearch(indexSearchParams).getEntities(); if (headers != null) { ret.addAll(headers); - LOG.info("======= Found result with {} policies", headers.size()); + LOG.info("ES_SYNC_FIX: {}: ======= Found result with {} policies", serviceName, headers.size()); } else { found = false; - LOG.info("======= Found result with null policies"); + LOG.info("ES_SYNC_FIX: {}: ======= Found result with null policies", serviceName); } from += size; } while (found && ret.size() % size == 0); - - boolean latestEditFound = false; - Date latestEditTimeAvailable = null; - for (AtlasEntityHeader entity : ret) { - LOG.info("Looping on returned policies: {}, size: {}", entity.getDisplayText(), ret.size()); - if (latestEditTime == null || entity.getUpdateTime().compareTo(latestEditTime) >= 0) { - LOG.info("Found latest policy: {}, latestEditTime: {}, found policy time: {}", entity.getDisplayText(), latestEditTime, entity.getUpdateTime()); - latestEditFound = true; - break; + if (Objects.equals(serviceName, "atlas")) { + boolean latestEditFound = false; + Date latestEditTimeAvailable = null; + for (AtlasEntityHeader entity : ret) { + // LOG.info("ES_SYNC_FIX: {}: Looping on returned policies: {}, size: {}", serviceName, entity.getDisplayText(), ret.size()); + if (latestEditTime == null || entity.getUpdateTime().compareTo(latestEditTime) >= 0) { + LOG.info("ES_SYNC_FIX: {}: Found latest policy: {}, latestEditTime: {}, found policy time: {}", serviceName, entity.getDisplayText(), latestEditTime, entity.getUpdateTime()); + latestEditFound = true; + break; + } + latestEditTimeAvailable = entity.getUpdateTime(); + // LOG.info("ES_SYNC_FIX: {}: Checked for latest edit, entity: {}, latestEditTimeAvailable: {}", serviceName, entity.getDisplayText(), latestEditTimeAvailable); + } + if (latestEditTime != null && !latestEditFound) { + LOG.info("ES_SYNC_FIX: {}: Latest edit not found yet, policies: {}, latestEditTime: {}, latestEditTimeAvailable: {}", serviceName, ret.size(), latestEditTime, latestEditTimeAvailable); + throw new AtlasBaseException("Latest edit not found yet"); } - latestEditTimeAvailable = entity.getUpdateTime(); - LOG.info("Checked for latest edit, entity: {}, latestEditTimeAvailable: {}", entity.getDisplayText(), latestEditTimeAvailable); - } - if (latestEditTime != null && !latestEditFound) { - LOG.info("Latest edit not found yet, policies: {}, latestEditTime: {}, latestEditTimeAvailable: {}", ret.size(), latestEditTime, latestEditTimeAvailable); - throw new AtlasBaseException("Latest edit not found yet"); } } finally { From e6c00582bb533ae86d4dcddad5c21dfca993a1e1 Mon Sep 17 00:00:00 2001 From: Nikhil Soni Date: Tue, 13 Aug 2024 11:55:37 +0530 Subject: [PATCH 06/10] Skip updating lastRefresh time on failure --- .../atlas/policytransformer/CachePolicyTransformerImpl.java | 2 +- webapp/src/main/java/org/apache/atlas/web/rest/AuthREST.java | 5 ++++- 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/auth-agents-common/src/main/java/org/apache/atlas/policytransformer/CachePolicyTransformerImpl.java b/auth-agents-common/src/main/java/org/apache/atlas/policytransformer/CachePolicyTransformerImpl.java index 63e8806157..63d4307236 100644 --- a/auth-agents-common/src/main/java/org/apache/atlas/policytransformer/CachePolicyTransformerImpl.java +++ b/auth-agents-common/src/main/java/org/apache/atlas/policytransformer/CachePolicyTransformerImpl.java @@ -206,7 +206,7 @@ private List getServicePolicies(AtlasEntityHeader service, int bat int maxAttempts = 5; int sleepFor = 500; - for (int attempt = 0; attempt < maxAttempts; attempt++) { + for (int attempt = 0; attempt <= maxAttempts; attempt++) { try { atlasPolicies = getAtlasPolicies(serviceName, batchSize, latestEditTime); break; diff --git a/webapp/src/main/java/org/apache/atlas/web/rest/AuthREST.java b/webapp/src/main/java/org/apache/atlas/web/rest/AuthREST.java index eeb8fd1add..892dd30a8e 100644 --- a/webapp/src/main/java/org/apache/atlas/web/rest/AuthREST.java +++ b/webapp/src/main/java/org/apache/atlas/web/rest/AuthREST.java @@ -157,7 +157,10 @@ public ServicePolicies downloadPolicies(@PathParam("serviceName") final String s ServicePolicies ret = policyTransformer.getPolicies(serviceName, pluginId, lastUpdatedTime, new Date(latestEditTime)); - updateLastSync(serviceName); + // check if ret contains any policies other than tagPolicies + if (ret != null && ret.getPolicies() != null && !ret.getPolicies().isEmpty()) { + updateLastSync(serviceName); + } return ret; } finally { From bb29a2214e90671c451d8bfebb3017c9920ef0cc Mon Sep 17 00:00:00 2001 From: Nikhil Soni Date: Tue, 13 Aug 2024 13:12:40 +0530 Subject: [PATCH 07/10] Break refresh if ES is not synced Instead of proceeding with refreshing the atlas_tag policies, this is will throw the exception so the last sync time is not updated and next run again sees the audit logs --- .../atlas/policytransformer/CachePolicyTransformerImpl.java | 3 +++ webapp/src/main/java/org/apache/atlas/web/rest/AuthREST.java | 5 +---- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/auth-agents-common/src/main/java/org/apache/atlas/policytransformer/CachePolicyTransformerImpl.java b/auth-agents-common/src/main/java/org/apache/atlas/policytransformer/CachePolicyTransformerImpl.java index 63d4307236..f48047d132 100644 --- a/auth-agents-common/src/main/java/org/apache/atlas/policytransformer/CachePolicyTransformerImpl.java +++ b/auth-agents-common/src/main/java/org/apache/atlas/policytransformer/CachePolicyTransformerImpl.java @@ -213,6 +213,9 @@ private List getServicePolicies(AtlasEntityHeader service, int bat } catch (AtlasBaseException e) { LOG.error("ES_SYNC_FIX: {}: ERROR in getServicePolicies: {}", serviceName, e.getMessage()); TimeUnit.MILLISECONDS.sleep(sleepFor); + if (attempt == maxAttempts) { + throw e; + } sleepFor *= 2; } } diff --git a/webapp/src/main/java/org/apache/atlas/web/rest/AuthREST.java b/webapp/src/main/java/org/apache/atlas/web/rest/AuthREST.java index 892dd30a8e..eeb8fd1add 100644 --- a/webapp/src/main/java/org/apache/atlas/web/rest/AuthREST.java +++ b/webapp/src/main/java/org/apache/atlas/web/rest/AuthREST.java @@ -157,10 +157,7 @@ public ServicePolicies downloadPolicies(@PathParam("serviceName") final String s ServicePolicies ret = policyTransformer.getPolicies(serviceName, pluginId, lastUpdatedTime, new Date(latestEditTime)); - // check if ret contains any policies other than tagPolicies - if (ret != null && ret.getPolicies() != null && !ret.getPolicies().isEmpty()) { - updateLastSync(serviceName); - } + updateLastSync(serviceName); return ret; } finally { From fba51ff4ae4dd330daf4d6cdd6f3a0a0291fb082 Mon Sep 17 00:00:00 2001 From: Nikhil Soni Date: Tue, 13 Aug 2024 15:00:29 +0530 Subject: [PATCH 08/10] Check audit logs based on event create time Previously used timestamp would never detect the delete event as the entity update time doesn't get updated on delete --- .../src/main/java/org/apache/atlas/web/rest/AuthREST.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/webapp/src/main/java/org/apache/atlas/web/rest/AuthREST.java b/webapp/src/main/java/org/apache/atlas/web/rest/AuthREST.java index eeb8fd1add..ec0e5a9a54 100644 --- a/webapp/src/main/java/org/apache/atlas/web/rest/AuthREST.java +++ b/webapp/src/main/java/org/apache/atlas/web/rest/AuthREST.java @@ -198,12 +198,12 @@ private Long getLastEditTime(String serviceName, long lastUpdatedTime) { mustClauseList.add(getMap("terms", getMap("typeName", entityUpdateToWatch))); lastUpdatedTime = lastUpdatedTime == -1 ? 0 : lastUpdatedTime; - mustClauseList.add(getMap("range", getMap("timestamp", getMap("gte", lastUpdatedTime)))); + mustClauseList.add(getMap("range", getMap("created", getMap("gte", lastUpdatedTime)))); dsl.put("query", getMap("bool", getMap("must", mustClauseList))); List> sortList = new ArrayList<>(); - sortList.add(getMap("timestamp", "desc")); + sortList.add(getMap("created", "desc")); dsl.put("sort", sortList); parameters.setDsl(dsl); @@ -217,7 +217,7 @@ private Long getLastEditTime(String serviceName, long lastUpdatedTime) { if (!EntityAuditEventV2.EntityAuditActionV2.getDeleteActions().contains(lastAuditLog.getAction())) { lastEditTime = lastAuditLog.getTimestamp(); } else { - LOG.info("found delete action, so ignoring the last edit time: {}", lastAuditLog.getTimestamp()); + LOG.info("ES_SYNC_FIX: {}: found delete action, so ignoring the last edit time: {}", serviceName, lastAuditLog.getTimestamp()); } } else { lastEditTime = null; // no edits found From 5faba351c947b2f08d2e89d9521a2d81c23dc07d Mon Sep 17 00:00:00 2001 From: Nikhil Soni Date: Tue, 13 Aug 2024 17:38:43 +0530 Subject: [PATCH 09/10] Check last edit time only for authpoliy Without this check, policy download API check could take the Persona or Purpose update time to be last edit time for which it will not find a policy since no policy exist for this mistakenly considered timestamp. --- .../src/main/java/org/apache/atlas/web/rest/AuthREST.java | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/webapp/src/main/java/org/apache/atlas/web/rest/AuthREST.java b/webapp/src/main/java/org/apache/atlas/web/rest/AuthREST.java index ec0e5a9a54..69f1b29b98 100644 --- a/webapp/src/main/java/org/apache/atlas/web/rest/AuthREST.java +++ b/webapp/src/main/java/org/apache/atlas/web/rest/AuthREST.java @@ -207,14 +207,16 @@ private Long getLastEditTime(String serviceName, long lastUpdatedTime) { dsl.put("sort", sortList); parameters.setDsl(dsl); - Long lastEditTime = 0L; + Long lastEditTime = 0L; // this timestamp is used to verify if the found policies are synced with any policy create or update op on cassandra try { EntityAuditSearchResult result = auditRepository.searchEvents(parameters.getQueryString()); if (result != null) { if (!CollectionUtils.isEmpty(result.getEntityAudits())) { EntityAuditEventV2 lastAuditLog = result.getEntityAudits().get(0); - if (!EntityAuditEventV2.EntityAuditActionV2.getDeleteActions().contains(lastAuditLog.getAction())) { + if (!EntityAuditEventV2.EntityAuditActionV2.getDeleteActions().contains(lastAuditLog.getAction()) && + lastAuditLog.getTypeName().equals(POLICY_ENTITY_TYPE) + ) { lastEditTime = lastAuditLog.getTimestamp(); } else { LOG.info("ES_SYNC_FIX: {}: found delete action, so ignoring the last edit time: {}", serviceName, lastAuditLog.getTimestamp()); From 911dae0cc1b985779b0532b7ad554c375d5d3ecb Mon Sep 17 00:00:00 2001 From: Nikhil Soni Date: Thu, 5 Sep 2024 16:05:33 +0530 Subject: [PATCH 10/10] Remove unused comments --- .../atlas/policytransformer/CachePolicyTransformerImpl.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/auth-agents-common/src/main/java/org/apache/atlas/policytransformer/CachePolicyTransformerImpl.java b/auth-agents-common/src/main/java/org/apache/atlas/policytransformer/CachePolicyTransformerImpl.java index f48047d132..52dbe93332 100644 --- a/auth-agents-common/src/main/java/org/apache/atlas/policytransformer/CachePolicyTransformerImpl.java +++ b/auth-agents-common/src/main/java/org/apache/atlas/policytransformer/CachePolicyTransformerImpl.java @@ -531,14 +531,12 @@ private List getAtlasPolicies(String serviceName, int batchSi boolean latestEditFound = false; Date latestEditTimeAvailable = null; for (AtlasEntityHeader entity : ret) { - // LOG.info("ES_SYNC_FIX: {}: Looping on returned policies: {}, size: {}", serviceName, entity.getDisplayText(), ret.size()); if (latestEditTime == null || entity.getUpdateTime().compareTo(latestEditTime) >= 0) { LOG.info("ES_SYNC_FIX: {}: Found latest policy: {}, latestEditTime: {}, found policy time: {}", serviceName, entity.getDisplayText(), latestEditTime, entity.getUpdateTime()); latestEditFound = true; break; } latestEditTimeAvailable = entity.getUpdateTime(); - // LOG.info("ES_SYNC_FIX: {}: Checked for latest edit, entity: {}, latestEditTimeAvailable: {}", serviceName, entity.getDisplayText(), latestEditTimeAvailable); } if (latestEditTime != null && !latestEditFound) { LOG.info("ES_SYNC_FIX: {}: Latest edit not found yet, policies: {}, latestEditTime: {}, latestEditTimeAvailable: {}", serviceName, ret.size(), latestEditTime, latestEditTimeAvailable);