Skip to content

Commit

Permalink
Merge branch 'staging' into LIN-1079-stg
Browse files Browse the repository at this point in the history
  • Loading branch information
akshaysw committed Aug 22, 2024
2 parents 6cd442b + e3f5a0a commit f03328f
Show file tree
Hide file tree
Showing 6 changed files with 84 additions and 36 deletions.
3 changes: 1 addition & 2 deletions .github/workflows/maven.yml
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,10 @@ name: Java CI with Maven
on:
push:
branches:
- alpha
- staging
- beta
- development
- master
- lineageondemand

jobs:
build:
Expand Down
3 changes: 3 additions & 0 deletions addons/elasticsearch/es-settings.json
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,9 @@
"truncate_filter"
],
"tokenizer": "standard"
},
"atlan_hierarchy_analyzer": {
"tokenizer": "path_hierarchy"
}
},
"normalizer": {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -322,7 +322,7 @@ private ServicePolicies loadPolicyfromPolicyAdmin() throws RangerServiceNotFound

svcPolicies = transformer.getPolicies(serviceName,
restUtils.getPluginId(serviceName, plugIn.getAppId()),
lastUpdatedTiemInMillis);
lastUpdatedTiemInMillis, null);
} else {
svcPolicies = atlasAuthAdminClient.getServicePoliciesIfUpdated(lastUpdatedTiemInMillis);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.atlas.policytransformer;

import org.apache.atlas.AtlasErrorCode;
import org.apache.atlas.AtlasException;
import org.apache.atlas.RequestContext;
import org.apache.atlas.discovery.EntityDiscoveryService;
Expand Down Expand Up @@ -52,15 +53,8 @@
import javax.inject.Inject;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.*;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

import static org.apache.atlas.repository.Constants.NAME;
Expand Down Expand Up @@ -136,7 +130,7 @@ public AtlasEntityHeader getService() {
return service;
}

public ServicePolicies getPolicies(String serviceName, String pluginId, Long lastUpdatedTime) {
public ServicePolicies getPolicies(String serviceName, String pluginId, Long lastUpdatedTime, Date latestEditTime) {
//TODO: return only if updated
AtlasPerfMetrics.MetricRecorder recorder = RequestContext.get().startMetricRecord("CachePolicyTransformerImpl.getPolicies." + serviceName);

Expand All @@ -150,7 +144,7 @@ public ServicePolicies getPolicies(String serviceName, String pluginId, Long las
servicePolicies.setPolicyUpdateTime(new Date());

if (service != null) {
List<RangerPolicy> allPolicies = getServicePolicies(service, 250);
List<RangerPolicy> allPolicies = getServicePolicies(service, 250, latestEditTime);
servicePolicies.setServiceName(serviceName);
servicePolicies.setServiceId(service.getGuid());

Expand All @@ -164,7 +158,7 @@ public ServicePolicies getPolicies(String serviceName, String pluginId, Long las
AtlasEntityHeader tagService = getServiceEntity(tagServiceName);

if (tagService != null) {
allPolicies.addAll(getServicePolicies(tagService, 0));
allPolicies.addAll(getServicePolicies(tagService, 0, latestEditTime));

TagPolicies tagPolicies = new TagPolicies();

Expand Down Expand Up @@ -194,22 +188,38 @@ public ServicePolicies getPolicies(String serviceName, String pluginId, Long las
}

} catch (Exception e) {
LOG.error("ERROR in getPolicies {}: ", e);
LOG.error("ERROR in getPolicies: ", e);
return null;
}

RequestContext.get().endMetricRecord(recorder);
return servicePolicies;
}

private List<RangerPolicy> getServicePolicies(AtlasEntityHeader service, int batchSize) throws AtlasBaseException, IOException {
private List<RangerPolicy> getServicePolicies(AtlasEntityHeader service, int batchSize, Date latestEditTime) throws AtlasBaseException, IOException, InterruptedException {

List<RangerPolicy> servicePolicies = new ArrayList<>();
List<AtlasEntityHeader> atlasPolicies = new ArrayList<>();

String serviceName = (String) service.getAttribute("name");
String serviceType = (String) service.getAttribute("authServiceType");
List<AtlasEntityHeader> atlasPolicies = getAtlasPolicies(serviceName, batchSize);

int maxAttempts = 5;
int sleepFor = 500;
for (int attempt = 0; attempt <= maxAttempts; attempt++) {
try {
atlasPolicies = getAtlasPolicies(serviceName, batchSize, latestEditTime);
break;
} catch (AtlasBaseException e) {
LOG.error("ES_SYNC_FIX: {}: ERROR in getServicePolicies: {}", serviceName, e.getMessage());
TimeUnit.MILLISECONDS.sleep(sleepFor);
if (attempt == maxAttempts) {
throw e;
}
sleepFor *= 2;
}
}
LOG.info("ES_SYNC_FIX: {}: Moving to transform policies, size: {}", serviceName, atlasPolicies.size());
if (CollectionUtils.isNotEmpty(atlasPolicies)) {
//transform policies
servicePolicies = transformAtlasPoliciesToRangerPolicies(atlasPolicies, serviceType, serviceName);
Expand Down Expand Up @@ -451,7 +461,7 @@ private List<RangerValiditySchedule> getPolicyValiditySchedule(AtlasEntityHeader
return ret;
}

private List<AtlasEntityHeader> getAtlasPolicies(String serviceName, int batchSize) throws AtlasBaseException {
private List<AtlasEntityHeader> getAtlasPolicies(String serviceName, int batchSize, Date latestEditTime) throws AtlasBaseException {
AtlasPerfMetrics.MetricRecorder recorder = RequestContext.get().startMetricRecord("CachePolicyTransformerImpl."+service+".getAtlasPolicies");

List<AtlasEntityHeader> ret = new ArrayList<>();
Expand Down Expand Up @@ -508,13 +518,33 @@ private List<AtlasEntityHeader> getAtlasPolicies(String serviceName, int batchSi
List<AtlasEntityHeader> headers = discoveryService.directIndexSearch(indexSearchParams).getEntities();
if (headers != null) {
ret.addAll(headers);
LOG.info("ES_SYNC_FIX: {}: ======= Found result with {} policies", serviceName, headers.size());
} else {
found = false;
LOG.info("ES_SYNC_FIX: {}: ======= Found result with null policies", serviceName);
}

from += size;

} while (found && ret.size() % size == 0);
if (Objects.equals(serviceName, "atlas")) {
boolean latestEditFound = false;
Date latestEditTimeAvailable = null;
for (AtlasEntityHeader entity : ret) {
// LOG.info("ES_SYNC_FIX: {}: Looping on returned policies: {}, size: {}", serviceName, entity.getDisplayText(), ret.size());
if (latestEditTime == null || entity.getUpdateTime().compareTo(latestEditTime) >= 0) {
LOG.info("ES_SYNC_FIX: {}: Found latest policy: {}, latestEditTime: {}, found policy time: {}", serviceName, entity.getDisplayText(), latestEditTime, entity.getUpdateTime());
latestEditFound = true;
break;
}
latestEditTimeAvailable = entity.getUpdateTime();
// LOG.info("ES_SYNC_FIX: {}: Checked for latest edit, entity: {}, latestEditTimeAvailable: {}", serviceName, entity.getDisplayText(), latestEditTimeAvailable);
}
if (latestEditTime != null && !latestEditFound) {
LOG.info("ES_SYNC_FIX: {}: Latest edit not found yet, policies: {}, latestEditTime: {}, latestEditTimeAvailable: {}", serviceName, ret.size(), latestEditTime, latestEditTimeAvailable);
throw new AtlasBaseException("Latest edit not found yet");
}
}

} finally {
RequestContext.get().endMetricRecord(recorder);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,7 @@
import javax.xml.bind.annotation.XmlAccessorType;
import javax.xml.bind.annotation.XmlRootElement;
import java.io.Serializable;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.*;

import static com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility.NONE;
import static com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility.PUBLIC_ONLY;
Expand Down Expand Up @@ -119,6 +116,11 @@ public static EntityAuditActionV2 fromString(String strValue) {

throw new IllegalArgumentException("No enum constant " + EntityAuditActionV2.class.getCanonicalName() + "." + strValue);
}

public static List<EntityAuditActionV2> getDeleteActions() {
return Arrays.asList(ENTITY_DELETE, ENTITY_PURGE, ENTITY_IMPORT_DELETE, CLASSIFICATION_DELETE,
PROPAGATED_CLASSIFICATION_DELETE, TERM_DELETE, LABEL_DELETE);
}
}

private String entityQualifiedName;
Expand Down
40 changes: 27 additions & 13 deletions webapp/src/main/java/org/apache/atlas/web/rest/AuthREST.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.atlas.annotation.Timed;
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.model.audit.AuditSearchParams;
import org.apache.atlas.model.audit.EntityAuditEventV2;
import org.apache.atlas.model.audit.EntityAuditSearchResult;
import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.atlas.plugin.util.KeycloakUserStore;
Expand Down Expand Up @@ -51,10 +52,7 @@
import javax.ws.rs.QueryParam;
import javax.ws.rs.core.Context;
import javax.ws.rs.core.MediaType;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.*;

import static org.apache.atlas.policytransformer.CachePolicyTransformerImpl.ATTR_SERVICE_LAST_SYNC;
import static org.apache.atlas.repository.Constants.PERSONA_ENTITY_TYPE;
Expand Down Expand Up @@ -152,11 +150,12 @@ public ServicePolicies downloadPolicies(@PathParam("serviceName") final String s
perf = AtlasPerfTracer.getPerfTracer(PERF_LOG, "AuthREST.downloadPolicies(serviceName="+serviceName+", pluginId="+pluginId+", lastUpdatedTime="+lastUpdatedTime+")");
}

if (!isPolicyUpdated(serviceName, lastUpdatedTime)) {
Long latestEditTime = getLastEditTime(serviceName, lastUpdatedTime);
if (latestEditTime == null) {
return null;
}

ServicePolicies ret = policyTransformer.getPolicies(serviceName, pluginId, lastUpdatedTime);
ServicePolicies ret = policyTransformer.getPolicies(serviceName, pluginId, lastUpdatedTime, new Date(latestEditTime));

updateLastSync(serviceName);

Expand Down Expand Up @@ -184,7 +183,7 @@ private void updateLastSync(String serviceName) {
}
}

private boolean isPolicyUpdated(String serviceName, long lastUpdatedTime) {
private Long getLastEditTime(String serviceName, long lastUpdatedTime) {
AtlasPerfMetrics.MetricRecorder recorder = RequestContext.get().startMetricRecord("AuthRest.isPolicyUpdated." + serviceName);

List<String> entityUpdateToWatch = new ArrayList<>();
Expand All @@ -199,26 +198,41 @@ private boolean isPolicyUpdated(String serviceName, long lastUpdatedTime) {
mustClauseList.add(getMap("terms", getMap("typeName", entityUpdateToWatch)));

lastUpdatedTime = lastUpdatedTime == -1 ? 0 : lastUpdatedTime;
mustClauseList.add(getMap("range", getMap("timestamp", getMap("gte", lastUpdatedTime))));
mustClauseList.add(getMap("range", getMap("created", getMap("gte", lastUpdatedTime))));

dsl.put("query", getMap("bool", getMap("must", mustClauseList)));

List<Map<String, Object>> sortList = new ArrayList<>();
sortList.add(getMap("created", "desc"));
dsl.put("sort", sortList);

parameters.setDsl(dsl);
Long lastEditTime = 0L; // this timestamp is used to verify if the found policies are synced with any policy create or update op on cassandra

try {
EntityAuditSearchResult result = auditRepository.searchEvents(parameters.getQueryString());

if (result == null || CollectionUtils.isEmpty(result.getEntityAudits())) {
return false;
if (result != null) {
if (!CollectionUtils.isEmpty(result.getEntityAudits())) {
EntityAuditEventV2 lastAuditLog = result.getEntityAudits().get(0);
if (!EntityAuditEventV2.EntityAuditActionV2.getDeleteActions().contains(lastAuditLog.getAction()) &&
lastAuditLog.getTypeName().equals(POLICY_ENTITY_TYPE)
) {
lastEditTime = lastAuditLog.getTimestamp();
} else {
LOG.info("ES_SYNC_FIX: {}: found delete action, so ignoring the last edit time: {}", serviceName, lastAuditLog.getTimestamp());
}
} else {
lastEditTime = null; // no edits found
}
}
} catch (AtlasBaseException e) {
LOG.error("ERROR in getPoliciesIfUpdated while fetching entity audits {}: ", e.getMessage());
return true;
} finally {
RequestContext.get().endMetricRecord(recorder);
LOG.info("Last edit time for service {} is {}, dsl: {}", serviceName, lastEditTime, dsl);
}

return true;
return lastEditTime;
}

private Map<String, Object> getMap(String key, Object value) {
Expand Down

0 comments on commit f03328f

Please sign in to comment.