Skip to content

Commit

Permalink
Merge branch 'staging' into LIN-1388
Browse files Browse the repository at this point in the history
  • Loading branch information
suraj5077 authored Dec 4, 2024
2 parents 8cac9fd + 0b22bd3 commit 26e2980
Show file tree
Hide file tree
Showing 19 changed files with 486 additions and 214 deletions.
3 changes: 1 addition & 2 deletions .github/workflows/maven.yml
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,10 @@ name: Java CI with Maven
on:
push:
branches:
- alpha
- staging
- beta
- development
- master
- lineageondemand

jobs:
build:
Expand Down
3 changes: 3 additions & 0 deletions addons/elasticsearch/es-settings.json
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,9 @@
"truncate_filter"
],
"tokenizer": "standard"
},
"atlan_hierarchy_analyzer": {
"tokenizer": "path_hierarchy"
}
},
"normalizer": {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -322,7 +322,7 @@ private ServicePolicies loadPolicyfromPolicyAdmin() throws RangerServiceNotFound

svcPolicies = transformer.getPolicies(serviceName,
restUtils.getPluginId(serviceName, plugIn.getAppId()),
lastUpdatedTiemInMillis);
lastUpdatedTiemInMillis, null);
} else {
svcPolicies = atlasAuthAdminClient.getServicePoliciesIfUpdated(lastUpdatedTiemInMillis);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.atlas.policytransformer;

import org.apache.atlas.AtlasErrorCode;
import org.apache.atlas.AtlasException;
import org.apache.atlas.RequestContext;
import org.apache.atlas.discovery.EntityDiscoveryService;
Expand Down Expand Up @@ -53,15 +54,8 @@
import javax.inject.Inject;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.*;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

import static org.apache.atlas.repository.Constants.NAME;
Expand Down Expand Up @@ -137,7 +131,7 @@ public AtlasEntityHeader getService() {
return service;
}

public ServicePolicies getPolicies(String serviceName, String pluginId, Long lastUpdatedTime) {
public ServicePolicies getPolicies(String serviceName, String pluginId, Long lastUpdatedTime, Date latestEditTime) {
//TODO: return only if updated
AtlasPerfMetrics.MetricRecorder recorder = RequestContext.get().startMetricRecord("CachePolicyTransformerImpl.getPolicies." + serviceName);

Expand All @@ -151,7 +145,7 @@ public ServicePolicies getPolicies(String serviceName, String pluginId, Long las
servicePolicies.setPolicyUpdateTime(new Date());

if (service != null) {
List<RangerPolicy> allPolicies = getServicePolicies(service, 250);
List<RangerPolicy> allPolicies = getServicePolicies(service, 250, latestEditTime);
servicePolicies.setServiceName(serviceName);
servicePolicies.setServiceId(service.getGuid());

Expand All @@ -165,7 +159,7 @@ public ServicePolicies getPolicies(String serviceName, String pluginId, Long las
AtlasEntityHeader tagService = getServiceEntity(tagServiceName);

if (tagService != null) {
allPolicies.addAll(getServicePolicies(tagService, 0));
allPolicies.addAll(getServicePolicies(tagService, 0, latestEditTime));

TagPolicies tagPolicies = new TagPolicies();

Expand Down Expand Up @@ -195,22 +189,38 @@ public ServicePolicies getPolicies(String serviceName, String pluginId, Long las
}

} catch (Exception e) {
LOG.error("ERROR in getPolicies {}: ", e);
LOG.error("ERROR in getPolicies: ", e);
return null;
}

RequestContext.get().endMetricRecord(recorder);
return servicePolicies;
}

private List<RangerPolicy> getServicePolicies(AtlasEntityHeader service, int batchSize) throws AtlasBaseException, IOException {
private List<RangerPolicy> getServicePolicies(AtlasEntityHeader service, int batchSize, Date latestEditTime) throws AtlasBaseException, IOException, InterruptedException {

List<RangerPolicy> servicePolicies = new ArrayList<>();
List<AtlasEntityHeader> atlasPolicies = new ArrayList<>();

String serviceName = (String) service.getAttribute("name");
String serviceType = (String) service.getAttribute("authServiceType");
List<AtlasEntityHeader> atlasPolicies = getAtlasPolicies(serviceName, batchSize);

int maxAttempts = 5;
int sleepFor = 500;
for (int attempt = 0; attempt <= maxAttempts; attempt++) {
try {
atlasPolicies = getAtlasPolicies(serviceName, batchSize, latestEditTime);
break;
} catch (AtlasBaseException e) {
LOG.error("ES_SYNC_FIX: {}: ERROR in getServicePolicies: {}", serviceName, e.getMessage());
TimeUnit.MILLISECONDS.sleep(sleepFor);
if (attempt == maxAttempts) {
throw e;
}
sleepFor *= 2;
}
}
LOG.info("ES_SYNC_FIX: {}: Moving to transform policies, size: {}", serviceName, atlasPolicies.size());
if (CollectionUtils.isNotEmpty(atlasPolicies)) {
//transform policies
servicePolicies = transformAtlasPoliciesToRangerPolicies(atlasPolicies, serviceType, serviceName);
Expand Down Expand Up @@ -452,7 +462,7 @@ private List<RangerValiditySchedule> getPolicyValiditySchedule(AtlasEntityHeader
return ret;
}

private List<AtlasEntityHeader> getAtlasPolicies(String serviceName, int batchSize) throws AtlasBaseException {
private List<AtlasEntityHeader> getAtlasPolicies(String serviceName, int batchSize, Date latestEditTime) throws AtlasBaseException {
AtlasPerfMetrics.MetricRecorder recorder = RequestContext.get().startMetricRecord("CachePolicyTransformerImpl."+service+".getAtlasPolicies");

List<AtlasEntityHeader> ret = new ArrayList<>();
Expand Down Expand Up @@ -509,13 +519,33 @@ private List<AtlasEntityHeader> getAtlasPolicies(String serviceName, int batchSi
List<AtlasEntityHeader> headers = discoveryService.directIndexSearch(indexSearchParams).getEntities();
if (headers != null) {
ret.addAll(headers);
LOG.info("ES_SYNC_FIX: {}: ======= Found result with {} policies", serviceName, headers.size());
} else {
found = false;
LOG.info("ES_SYNC_FIX: {}: ======= Found result with null policies", serviceName);
}

from += size;

} while (found && ret.size() % size == 0);
if (Objects.equals(serviceName, "atlas")) {
boolean latestEditFound = false;
Date latestEditTimeAvailable = null;
for (AtlasEntityHeader entity : ret) {
// LOG.info("ES_SYNC_FIX: {}: Looping on returned policies: {}, size: {}", serviceName, entity.getDisplayText(), ret.size());
if (latestEditTime == null || entity.getUpdateTime().compareTo(latestEditTime) >= 0) {
LOG.info("ES_SYNC_FIX: {}: Found latest policy: {}, latestEditTime: {}, found policy time: {}", serviceName, entity.getDisplayText(), latestEditTime, entity.getUpdateTime());
latestEditFound = true;
break;
}
latestEditTimeAvailable = entity.getUpdateTime();
// LOG.info("ES_SYNC_FIX: {}: Checked for latest edit, entity: {}, latestEditTimeAvailable: {}", serviceName, entity.getDisplayText(), latestEditTimeAvailable);
}
if (latestEditTime != null && !latestEditFound) {
LOG.info("ES_SYNC_FIX: {}: Latest edit not found yet, policies: {}, latestEditTime: {}, latestEditTimeAvailable: {}", serviceName, ret.size(), latestEditTime, latestEditTimeAvailable);
throw new AtlasBaseException("Latest edit not found yet");
}
}

} finally {
RequestContext.get().endMetricRecord(recorder);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,7 @@ public final class Constants {
public static final String PROCESS_INPUTS = "__Process.inputs";

public static String[] PROCESS_EDGE_LABELS = {PROCESS_OUTPUTS, PROCESS_INPUTS};
public static final String CONNECTION_PROCESS_ENTITY_TYPE = "ConnectionProcess";

/**
* The homeId field is used when saving into Atlas a copy of an object that is being imported from another
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,7 @@
import javax.xml.bind.annotation.XmlAccessorType;
import javax.xml.bind.annotation.XmlRootElement;
import java.io.Serializable;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.*;

import static com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility.NONE;
import static com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility.PUBLIC_ONLY;
Expand Down Expand Up @@ -119,6 +116,11 @@ public static EntityAuditActionV2 fromString(String strValue) {

throw new IllegalArgumentException("No enum constant " + EntityAuditActionV2.class.getCanonicalName() + "." + strValue);
}

public static List<EntityAuditActionV2> getDeleteActions() {
return Arrays.asList(ENTITY_DELETE, ENTITY_PURGE, ENTITY_IMPORT_DELETE, CLASSIFICATION_DELETE,
PROPAGATED_CLASSIFICATION_DELETE, TERM_DELETE, LABEL_DELETE);
}
}

private String entityQualifiedName;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
@JsonAutoDetect(getterVisibility = PUBLIC_ONLY, setterVisibility = PUBLIC_ONLY, fieldVisibility = NONE)
@JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL)
public class LineageListRequest {
public static final String LINEAGE_TYPE_DATASET_PROCESS_LINEAGE = "DatasetProcessLineage";
public static final String LINEAGE_TYPE_PRODUCT_ASSET_LINEAGE = "ProductAssetLineage";
private String guid;
private Integer size;
private Integer from;
Expand All @@ -35,6 +37,9 @@ public void setImmediateNeighbours(Boolean immediateNeighbours) {
this.immediateNeighbours = immediateNeighbours;
}


private String lineageType = LINEAGE_TYPE_DATASET_PROCESS_LINEAGE;

public enum LineageDirection {INPUT, OUTPUT}

public LineageListRequest() {
Expand Down Expand Up @@ -90,6 +95,13 @@ public Integer getDepth() {
public void setDepth(Integer depth) {
this.depth = depth;
}
public String getLineageType() {
return lineageType;
}

public void setLineageType(String lineageType) {
this.lineageType = lineageType;
}

public LineageDirection getDirection() {
return direction;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

import static com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility.NONE;
import static com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility.PUBLIC_ONLY;
import static org.apache.atlas.model.lineage.LineageListRequest.LINEAGE_TYPE_DATASET_PROCESS_LINEAGE;

@JsonAutoDetect(getterVisibility = PUBLIC_ONLY, setterVisibility = PUBLIC_ONLY, fieldVisibility = NONE)
@JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL)
Expand All @@ -22,6 +23,7 @@ public class LineageOnDemandRequest {
private Set<String> attributes;
private Set<String> relationAttributes;
private LineageOnDemandBaseParams defaultParams;
private String lineageType = LINEAGE_TYPE_DATASET_PROCESS_LINEAGE;

public LineageOnDemandRequest() {
this.attributes = new HashSet<>();
Expand Down Expand Up @@ -64,6 +66,13 @@ public void setRelationshipTraversalFilters(SearchParameters.FilterCriteria rela
this.relationshipTraversalFilters = relationshipTraversalFilters;
}

public String getLineageType() {
return lineageType;
}

public void setLineageType(String lineageType) {
this.lineageType = lineageType;
}
public Set<String> getAttributes() {
return attributes;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
import org.apache.commons.collections.Predicate;
import java.util.Set;

import static org.apache.atlas.model.lineage.LineageListRequest.LINEAGE_TYPE_DATASET_PROCESS_LINEAGE;

public final class AtlasLineageListContext {
private String guid;
private int size;
Expand All @@ -23,6 +25,7 @@ public final class AtlasLineageListContext {
private int currentEntityCounter;
private boolean depthLimitReached;
private boolean hasMoreUpdated;
private String lineageType = LINEAGE_TYPE_DATASET_PROCESS_LINEAGE;
private Boolean immediateNeighbours;

public AtlasLineageListContext(LineageListRequest lineageListRequest, AtlasTypeRegistry typeRegistry) {
Expand All @@ -35,6 +38,7 @@ public AtlasLineageListContext(LineageListRequest lineageListRequest, AtlasTypeR
this.vertexTraversalPredicate = constructInMemoryPredicate(typeRegistry, lineageListRequest.getEntityTraversalFilters());
this.edgeTraversalPredicate = constructInMemoryPredicate(typeRegistry, lineageListRequest.getRelationshipTraversalFilters());
this.attributes = lineageListRequest.getAttributes();
this.lineageType = lineageListRequest.getLineageType();
this.relationAttributes = lineageListRequest.getRelationAttributes();
this.immediateNeighbours = lineageListRequest.getImmediateNeighbours();
}
Expand Down Expand Up @@ -131,6 +135,14 @@ public void setCurrentFromCounter(int currentFromCounter) {
this.currentFromCounter = currentFromCounter;
}

public String getLineageType() {
return lineageType;
}

public void setLineageType(String lineageType) {
this.lineageType = lineageType;
}

public int getCurrentEntityCounter() {
return currentEntityCounter;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@
import java.util.Map;
import java.util.Set;

import static org.apache.atlas.model.lineage.LineageListRequest.LINEAGE_TYPE_DATASET_PROCESS_LINEAGE;


public class AtlasLineageOnDemandContext {
private static final Logger LOG = LoggerFactory.getLogger(AtlasLineageContext.class);

Expand All @@ -24,13 +27,17 @@ public class AtlasLineageOnDemandContext {
private Set<String> relationAttributes;
private LineageOnDemandBaseParams defaultParams;


private String lineageType = LINEAGE_TYPE_DATASET_PROCESS_LINEAGE;

public AtlasLineageOnDemandContext(LineageOnDemandRequest lineageOnDemandRequest, AtlasTypeRegistry typeRegistry) {
this.constraints = lineageOnDemandRequest.getConstraints();
this.attributes = lineageOnDemandRequest.getAttributes();
this.relationAttributes = lineageOnDemandRequest.getRelationAttributes();
this.defaultParams = lineageOnDemandRequest.getDefaultParams();
this.vertexPredicate = constructInMemoryPredicate(typeRegistry, lineageOnDemandRequest.getEntityTraversalFilters());
this.edgePredicate = constructInMemoryPredicate(typeRegistry, lineageOnDemandRequest.getRelationshipTraversalFilters());
this.lineageType = lineageOnDemandRequest.getLineageType();
}

public Map<String, LineageOnDemandConstraints> getConstraints() {
Expand All @@ -56,6 +63,13 @@ public Predicate getEdgePredicate() {
public void setEdgePredicate(Predicate edgePredicate) {
this.edgePredicate = edgePredicate;
}
public String getLineageType() {
return lineageType;
}

public void setLineageType(String lineageType) {
this.lineageType = lineageType;
}

public Set<String> getAttributes() {
return attributes;
Expand Down
Loading

0 comments on commit 26e2980

Please sign in to comment.