diff --git a/.github/workflows/maven.yml b/.github/workflows/maven.yml index f8a09b5589..562a6d9322 100644 --- a/.github/workflows/maven.yml +++ b/.github/workflows/maven.yml @@ -21,11 +21,10 @@ name: Java CI with Maven on: push: branches: - - alpha + - staging - beta - development - master - - lineageondemand jobs: build: diff --git a/addons/elasticsearch/es-settings.json b/addons/elasticsearch/es-settings.json index e67d2db8d9..40f560b817 100644 --- a/addons/elasticsearch/es-settings.json +++ b/addons/elasticsearch/es-settings.json @@ -63,6 +63,9 @@ "truncate_filter" ], "tokenizer": "standard" + }, + "atlan_hierarchy_analyzer": { + "tokenizer": "path_hierarchy" } }, "normalizer": { diff --git a/auth-agents-common/src/main/java/org/apache/atlas/plugin/util/PolicyRefresher.java b/auth-agents-common/src/main/java/org/apache/atlas/plugin/util/PolicyRefresher.java index aae09a7d26..457aa02f0e 100644 --- a/auth-agents-common/src/main/java/org/apache/atlas/plugin/util/PolicyRefresher.java +++ b/auth-agents-common/src/main/java/org/apache/atlas/plugin/util/PolicyRefresher.java @@ -322,7 +322,7 @@ private ServicePolicies loadPolicyfromPolicyAdmin() throws RangerServiceNotFound svcPolicies = transformer.getPolicies(serviceName, restUtils.getPluginId(serviceName, plugIn.getAppId()), - lastUpdatedTiemInMillis); + lastUpdatedTiemInMillis, null); } else { svcPolicies = atlasAuthAdminClient.getServicePoliciesIfUpdated(lastUpdatedTiemInMillis); } diff --git a/auth-agents-common/src/main/java/org/apache/atlas/policytransformer/CachePolicyTransformerImpl.java b/auth-agents-common/src/main/java/org/apache/atlas/policytransformer/CachePolicyTransformerImpl.java index 6f27c49983..f48047d132 100644 --- a/auth-agents-common/src/main/java/org/apache/atlas/policytransformer/CachePolicyTransformerImpl.java +++ b/auth-agents-common/src/main/java/org/apache/atlas/policytransformer/CachePolicyTransformerImpl.java @@ -18,6 +18,7 @@ package org.apache.atlas.policytransformer; +import org.apache.atlas.AtlasErrorCode; import org.apache.atlas.AtlasException; import org.apache.atlas.RequestContext; import org.apache.atlas.discovery.EntityDiscoveryService; @@ -52,15 +53,8 @@ import javax.inject.Inject; import java.io.IOException; import java.io.InputStream; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; -import java.util.Date; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; +import java.util.*; +import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import static org.apache.atlas.repository.Constants.NAME; @@ -136,7 +130,7 @@ public AtlasEntityHeader getService() { return service; } - public ServicePolicies getPolicies(String serviceName, String pluginId, Long lastUpdatedTime) { + public ServicePolicies getPolicies(String serviceName, String pluginId, Long lastUpdatedTime, Date latestEditTime) { //TODO: return only if updated AtlasPerfMetrics.MetricRecorder recorder = RequestContext.get().startMetricRecord("CachePolicyTransformerImpl.getPolicies." + serviceName); @@ -150,7 +144,7 @@ public ServicePolicies getPolicies(String serviceName, String pluginId, Long las servicePolicies.setPolicyUpdateTime(new Date()); if (service != null) { - List allPolicies = getServicePolicies(service, 250); + List allPolicies = getServicePolicies(service, 250, latestEditTime); servicePolicies.setServiceName(serviceName); servicePolicies.setServiceId(service.getGuid()); @@ -164,7 +158,7 @@ public ServicePolicies getPolicies(String serviceName, String pluginId, Long las AtlasEntityHeader tagService = getServiceEntity(tagServiceName); if (tagService != null) { - allPolicies.addAll(getServicePolicies(tagService, 0)); + allPolicies.addAll(getServicePolicies(tagService, 0, latestEditTime)); TagPolicies tagPolicies = new TagPolicies(); @@ -194,7 +188,7 @@ public ServicePolicies getPolicies(String serviceName, String pluginId, Long las } } catch (Exception e) { - LOG.error("ERROR in getPolicies {}: ", e); + LOG.error("ERROR in getPolicies: ", e); return null; } @@ -202,14 +196,30 @@ public ServicePolicies getPolicies(String serviceName, String pluginId, Long las return servicePolicies; } - private List getServicePolicies(AtlasEntityHeader service, int batchSize) throws AtlasBaseException, IOException { + private List getServicePolicies(AtlasEntityHeader service, int batchSize, Date latestEditTime) throws AtlasBaseException, IOException, InterruptedException { List servicePolicies = new ArrayList<>(); + List atlasPolicies = new ArrayList<>(); String serviceName = (String) service.getAttribute("name"); String serviceType = (String) service.getAttribute("authServiceType"); - List atlasPolicies = getAtlasPolicies(serviceName, batchSize); + int maxAttempts = 5; + int sleepFor = 500; + for (int attempt = 0; attempt <= maxAttempts; attempt++) { + try { + atlasPolicies = getAtlasPolicies(serviceName, batchSize, latestEditTime); + break; + } catch (AtlasBaseException e) { + LOG.error("ES_SYNC_FIX: {}: ERROR in getServicePolicies: {}", serviceName, e.getMessage()); + TimeUnit.MILLISECONDS.sleep(sleepFor); + if (attempt == maxAttempts) { + throw e; + } + sleepFor *= 2; + } + } + LOG.info("ES_SYNC_FIX: {}: Moving to transform policies, size: {}", serviceName, atlasPolicies.size()); if (CollectionUtils.isNotEmpty(atlasPolicies)) { //transform policies servicePolicies = transformAtlasPoliciesToRangerPolicies(atlasPolicies, serviceType, serviceName); @@ -451,7 +461,7 @@ private List getPolicyValiditySchedule(AtlasEntityHeader return ret; } - private List getAtlasPolicies(String serviceName, int batchSize) throws AtlasBaseException { + private List getAtlasPolicies(String serviceName, int batchSize, Date latestEditTime) throws AtlasBaseException { AtlasPerfMetrics.MetricRecorder recorder = RequestContext.get().startMetricRecord("CachePolicyTransformerImpl."+service+".getAtlasPolicies"); List ret = new ArrayList<>(); @@ -508,13 +518,33 @@ private List getAtlasPolicies(String serviceName, int batchSi List headers = discoveryService.directIndexSearch(indexSearchParams).getEntities(); if (headers != null) { ret.addAll(headers); + LOG.info("ES_SYNC_FIX: {}: ======= Found result with {} policies", serviceName, headers.size()); } else { found = false; + LOG.info("ES_SYNC_FIX: {}: ======= Found result with null policies", serviceName); } from += size; } while (found && ret.size() % size == 0); + if (Objects.equals(serviceName, "atlas")) { + boolean latestEditFound = false; + Date latestEditTimeAvailable = null; + for (AtlasEntityHeader entity : ret) { + // LOG.info("ES_SYNC_FIX: {}: Looping on returned policies: {}, size: {}", serviceName, entity.getDisplayText(), ret.size()); + if (latestEditTime == null || entity.getUpdateTime().compareTo(latestEditTime) >= 0) { + LOG.info("ES_SYNC_FIX: {}: Found latest policy: {}, latestEditTime: {}, found policy time: {}", serviceName, entity.getDisplayText(), latestEditTime, entity.getUpdateTime()); + latestEditFound = true; + break; + } + latestEditTimeAvailable = entity.getUpdateTime(); + // LOG.info("ES_SYNC_FIX: {}: Checked for latest edit, entity: {}, latestEditTimeAvailable: {}", serviceName, entity.getDisplayText(), latestEditTimeAvailable); + } + if (latestEditTime != null && !latestEditFound) { + LOG.info("ES_SYNC_FIX: {}: Latest edit not found yet, policies: {}, latestEditTime: {}, latestEditTimeAvailable: {}", serviceName, ret.size(), latestEditTime, latestEditTimeAvailable); + throw new AtlasBaseException("Latest edit not found yet"); + } + } } finally { RequestContext.get().endMetricRecord(recorder); diff --git a/intg/src/main/java/org/apache/atlas/model/audit/EntityAuditEventV2.java b/intg/src/main/java/org/apache/atlas/model/audit/EntityAuditEventV2.java index 9a4b03df73..50d33f9475 100644 --- a/intg/src/main/java/org/apache/atlas/model/audit/EntityAuditEventV2.java +++ b/intg/src/main/java/org/apache/atlas/model/audit/EntityAuditEventV2.java @@ -32,10 +32,7 @@ import javax.xml.bind.annotation.XmlAccessorType; import javax.xml.bind.annotation.XmlRootElement; import java.io.Serializable; -import java.util.Comparator; -import java.util.List; -import java.util.Map; -import java.util.Objects; +import java.util.*; import static com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility.NONE; import static com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility.PUBLIC_ONLY; @@ -119,6 +116,11 @@ public static EntityAuditActionV2 fromString(String strValue) { throw new IllegalArgumentException("No enum constant " + EntityAuditActionV2.class.getCanonicalName() + "." + strValue); } + + public static List getDeleteActions() { + return Arrays.asList(ENTITY_DELETE, ENTITY_PURGE, ENTITY_IMPORT_DELETE, CLASSIFICATION_DELETE, + PROPAGATED_CLASSIFICATION_DELETE, TERM_DELETE, LABEL_DELETE); + } } private String entityQualifiedName; diff --git a/webapp/src/main/java/org/apache/atlas/web/rest/AuthREST.java b/webapp/src/main/java/org/apache/atlas/web/rest/AuthREST.java index c5868cbfa3..69f1b29b98 100644 --- a/webapp/src/main/java/org/apache/atlas/web/rest/AuthREST.java +++ b/webapp/src/main/java/org/apache/atlas/web/rest/AuthREST.java @@ -21,6 +21,7 @@ import org.apache.atlas.annotation.Timed; import org.apache.atlas.exception.AtlasBaseException; import org.apache.atlas.model.audit.AuditSearchParams; +import org.apache.atlas.model.audit.EntityAuditEventV2; import org.apache.atlas.model.audit.EntityAuditSearchResult; import org.apache.atlas.model.instance.AtlasEntity; import org.apache.atlas.plugin.util.KeycloakUserStore; @@ -51,10 +52,7 @@ import javax.ws.rs.QueryParam; import javax.ws.rs.core.Context; import javax.ws.rs.core.MediaType; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; +import java.util.*; import static org.apache.atlas.policytransformer.CachePolicyTransformerImpl.ATTR_SERVICE_LAST_SYNC; import static org.apache.atlas.repository.Constants.PERSONA_ENTITY_TYPE; @@ -152,11 +150,12 @@ public ServicePolicies downloadPolicies(@PathParam("serviceName") final String s perf = AtlasPerfTracer.getPerfTracer(PERF_LOG, "AuthREST.downloadPolicies(serviceName="+serviceName+", pluginId="+pluginId+", lastUpdatedTime="+lastUpdatedTime+")"); } - if (!isPolicyUpdated(serviceName, lastUpdatedTime)) { + Long latestEditTime = getLastEditTime(serviceName, lastUpdatedTime); + if (latestEditTime == null) { return null; } - ServicePolicies ret = policyTransformer.getPolicies(serviceName, pluginId, lastUpdatedTime); + ServicePolicies ret = policyTransformer.getPolicies(serviceName, pluginId, lastUpdatedTime, new Date(latestEditTime)); updateLastSync(serviceName); @@ -184,7 +183,7 @@ private void updateLastSync(String serviceName) { } } - private boolean isPolicyUpdated(String serviceName, long lastUpdatedTime) { + private Long getLastEditTime(String serviceName, long lastUpdatedTime) { AtlasPerfMetrics.MetricRecorder recorder = RequestContext.get().startMetricRecord("AuthRest.isPolicyUpdated." + serviceName); List entityUpdateToWatch = new ArrayList<>(); @@ -199,26 +198,41 @@ private boolean isPolicyUpdated(String serviceName, long lastUpdatedTime) { mustClauseList.add(getMap("terms", getMap("typeName", entityUpdateToWatch))); lastUpdatedTime = lastUpdatedTime == -1 ? 0 : lastUpdatedTime; - mustClauseList.add(getMap("range", getMap("timestamp", getMap("gte", lastUpdatedTime)))); + mustClauseList.add(getMap("range", getMap("created", getMap("gte", lastUpdatedTime)))); dsl.put("query", getMap("bool", getMap("must", mustClauseList))); + List> sortList = new ArrayList<>(); + sortList.add(getMap("created", "desc")); + dsl.put("sort", sortList); + parameters.setDsl(dsl); + Long lastEditTime = 0L; // this timestamp is used to verify if the found policies are synced with any policy create or update op on cassandra try { EntityAuditSearchResult result = auditRepository.searchEvents(parameters.getQueryString()); - - if (result == null || CollectionUtils.isEmpty(result.getEntityAudits())) { - return false; + if (result != null) { + if (!CollectionUtils.isEmpty(result.getEntityAudits())) { + EntityAuditEventV2 lastAuditLog = result.getEntityAudits().get(0); + if (!EntityAuditEventV2.EntityAuditActionV2.getDeleteActions().contains(lastAuditLog.getAction()) && + lastAuditLog.getTypeName().equals(POLICY_ENTITY_TYPE) + ) { + lastEditTime = lastAuditLog.getTimestamp(); + } else { + LOG.info("ES_SYNC_FIX: {}: found delete action, so ignoring the last edit time: {}", serviceName, lastAuditLog.getTimestamp()); + } + } else { + lastEditTime = null; // no edits found + } } } catch (AtlasBaseException e) { LOG.error("ERROR in getPoliciesIfUpdated while fetching entity audits {}: ", e.getMessage()); - return true; } finally { RequestContext.get().endMetricRecord(recorder); + LOG.info("Last edit time for service {} is {}, dsl: {}", serviceName, lastEditTime, dsl); } - return true; + return lastEditTime; } private Map getMap(String key, Object value) {