Skip to content

Commit

Permalink
Merge branch 'master' into governance-preferences-attributes
Browse files Browse the repository at this point in the history
  • Loading branch information
nikhilbonte21 committed Oct 19, 2023
2 parents de9186a + 37a5a46 commit 3b98408
Show file tree
Hide file tree
Showing 10 changed files with 153 additions and 22 deletions.
5 changes: 3 additions & 2 deletions addons/elasticsearch/es-settings.json
Original file line number Diff line number Diff line change
Expand Up @@ -111,9 +111,10 @@
"replacement":" $0"
},
"truncate_filter": {
"pattern": "(^.{0,100000}).*$",
"pattern": "(.{0,100000}).*",
"type": "pattern_replace",
"replacement": "$1 "
"replacement": "$1",
"flags": "DOTALL"
}
}
}
Expand Down
10 changes: 10 additions & 0 deletions addons/models/0000-Area0/0010-base_model.json
Original file line number Diff line number Diff line change
Expand Up @@ -914,6 +914,16 @@
"skipScrubbing": true,
"includeInNotification": false
},
{
"name": "denyAssetFilters",
"typeName": "array<string>",
"cardinality": "SET",
"isIndexable": false,
"isOptional": true,
"isUnique": false,
"skipScrubbing": true,
"includeInNotification": false
},
{
"name": "channelLink",
"typeName": "string",
Expand Down
36 changes: 36 additions & 0 deletions addons/static/templates/connection_bootstrap_policies.json
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,42 @@

}
},
{
"typeName": "AuthPolicy",
"guid": -8,
"attributes": {
"qualifiedName": "{guid}/connection-link-assets-inverse",
"name": "{name}-connection-link-assets-inverse",
"policyCategory": "bootstrap",
"policySubCategory": "connection",
"policyType": "allow",
"policyServiceName": "atlas",
"policyRoles": [
"connection_admins_{guid}"
],
"policyActions": [
"add-relationship",
"remove-relationship"
],
"policyResourceCategory": "RELATIONSHIP",
"policyResources": [
"end-one-entity-classification:*",
"end-one-entity:*",
"end-one-entity-type:Catalog",
"end-one-entity-type:Connection",
"end-one-entity-type:Process",
"end-one-entity-type:ProcessExecution",
"end-one-entity-type:Namespace",

"end-two-entity-classification:*",
"end-two-entity:{entity}",
"end-two-entity:{entity}/*",
"end-two-entity-type:*",

"relationship-type:*"
]
}
},
{
"typeName": "AuthPolicy",
"guid": -2,
Expand Down
21 changes: 21 additions & 0 deletions addons/static/templates/policy_cache_transformer_persona.json
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,27 @@
"end-two-entity:*"
],
"actions": ["add-relationship", "remove-relationship"]
},
{
"policyType": "ACCESS",
"policyResourceCategory": "RELATIONSHIP",
"resources": [
"relationship-type:*",

"end-one-entity-type:Catalog",
"end-one-entity-type:Connection",
"end-one-entity-type:Process",
"end-one-entity-type:Namespace",
"end-one-entity-type:ProcessExecution",
"end-one-entity-classification:*",
"end-one-entity:*",

"end-two-entity-type:{entity-type}",
"end-two-entity-classification:*",
"end-two-entity:{entity}",
"end-two-entity:{entity}/*"
],
"actions": ["add-relationship", "remove-relationship"]
}
],
"persona-api-create": [
Expand Down
6 changes: 6 additions & 0 deletions common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,12 @@
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
<version>${redis.client.version}</version>
<exclusions>
<exclusion>
<groupId>io.netty</groupId>
<artifactId>*</artifactId>
</exclusion>
</exclusions>
</dependency>

</dependencies>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ public class AtlasHasLineageRequest implements Serializable {
public AtlasHasLineageRequest() {
}

private String assetGuid;
private String processGuid;
private String endGuid;
private String label;
Expand Down Expand Up @@ -68,9 +69,18 @@ public void setLabel(String label) {
this.label = label;
}

public String getAssetGuid() {
return assetGuid;
}

public void setAssetGuid(String assetGuid) {
this.assetGuid = assetGuid;
}

@Override
public String toString() {
return "AtlasHasLineageRequest{" +
"assetGuid='" + assetGuid + '\'' +
"processGuid='" + processGuid + '\'' +
", endGuid='" + endGuid + '\'' +
", label='" + label + '\'' +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -494,12 +494,13 @@ private void enqueueNeighbours(AtlasVertex currentVertex, boolean isDataset, Atl
else
neighbourVertex = currentEdge.getInVertex();

if (!lineageListContext.evaluateTraversalFilter(neighbourVertex))
String vertexGuid = getGuid(neighbourVertex);
if (StringUtils.isEmpty(vertexGuid) || !lineageListContext.evaluateTraversalFilter(neighbourVertex))
continue;

if (!skippedVertices.contains(getGuid(neighbourVertex)) && !visitedVertices.contains(getGuid(neighbourVertex))) {
visitedVertices.add(getGuid(neighbourVertex));
traversalQueue.add(getGuid(neighbourVertex));
if (!skippedVertices.contains(vertexGuid) && !visitedVertices.contains(vertexGuid)) {
visitedVertices.add(vertexGuid);
traversalQueue.add(vertexGuid);
addEntitiesToCache(neighbourVertex);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1527,6 +1527,8 @@ public void resetHasLineageOnInputOutputDelete(Collection<AtlasEdge> removedEdge
private void updateAssetHasLineageStatus(AtlasVertex assetVertex, AtlasEdge currentEdge, Collection<AtlasEdge> removedEdges) {
AtlasPerfMetrics.MetricRecorder metricRecorder = RequestContext.get().startMetricRecord("updateAssetHasLineageStatus");

removedEdges.forEach(edge -> RequestContext.get().addToDeletedEdgesIdsForResetHasLineage(edge.getIdForDisplay()));

Iterator<AtlasEdge> edgeIterator = assetVertex.query()
.direction(AtlasEdgeDirection.BOTH)
.label(PROCESS_EDGE_LABELS)
Expand All @@ -1537,7 +1539,7 @@ private void updateAssetHasLineageStatus(AtlasVertex assetVertex, AtlasEdge curr
int processHasLineageCount = 0;
while (edgeIterator.hasNext()) {
AtlasEdge edge = edgeIterator.next();
if (!removedEdges.contains(edge) && !currentEdge.equals(edge)) {
if (!RequestContext.get().getDeletedEdgesIdsForResetHasLineage().contains(edge.getIdForDisplay()) && !currentEdge.equals(edge)) {
AtlasVertex relatedProcessVertex = edge.getOutVertex();
boolean processHasLineage = getEntityHasLineage(relatedProcessVertex);
if (processHasLineage) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2443,28 +2443,61 @@ public void repairHasLineage(AtlasHasLineageRequests requests) throws AtlasBaseE
Set<AtlasEdge> inputOutputEdges = new HashSet<>();

for (AtlasHasLineageRequest request : requests.getRequest()) {
AtlasVertex processVertex = AtlasGraphUtilsV2.findByGuid(this.graph, request.getProcessGuid());
AtlasVertex assetVertex = AtlasGraphUtilsV2.findByGuid(this.graph, request.getEndGuid());
AtlasEdge edge = null;
try {
if (processVertex != null && assetVertex != null) {
edge = graphHelper.getEdge(processVertex, assetVertex, request.getLabel());
} else {
LOG.warn("Skipping since vertex is null for processGuid {} and asset Guid {}"
,request.getProcessGuid(),request.getEndGuid() );
if (StringUtils.isNotEmpty(request.getAssetGuid())) {
//only supports repairing scenario mentioned here - https://atlanhq.atlassian.net/browse/DG-128?focusedCommentId=20652
repairHasLineageForAsset(request);

} else {
AtlasVertex processVertex = AtlasGraphUtilsV2.findByGuid(this.graph, request.getProcessGuid());
AtlasVertex assetVertex = AtlasGraphUtilsV2.findByGuid(this.graph, request.getEndGuid());
AtlasEdge edge = null;
try {
if (processVertex != null && assetVertex != null) {
edge = graphHelper.getEdge(processVertex, assetVertex, request.getLabel());
} else {
LOG.warn("Skipping since vertex is null for processGuid {} and asset Guid {}"
,request.getProcessGuid(),request.getEndGuid() );
}
} catch (RepositoryException re) {
throw new AtlasBaseException(AtlasErrorCode.HAS_LINEAGE_GET_EDGE_FAILED, re);
}
} catch (RepositoryException re) {
throw new AtlasBaseException(AtlasErrorCode.HAS_LINEAGE_GET_EDGE_FAILED, re);
}

if (edge != null) {
inputOutputEdges.add(edge);
if (edge != null) {
inputOutputEdges.add(edge);
}
}
}
repairHasLineageWithAtlasEdges(inputOutputEdges);

if (CollectionUtils.isNotEmpty(inputOutputEdges)) {
repairHasLineageWithAtlasEdges(inputOutputEdges);
}

RequestContext.get().endMetricRecord(metricRecorder);
}

private void repairHasLineageForAsset(AtlasHasLineageRequest request) {
//only supports repairing scenario mentioned here - https://atlanhq.atlassian.net/browse/DG-128?focusedCommentId=20652

AtlasVertex assetVertex = AtlasGraphUtilsV2.findByGuid(this.graph, request.getAssetGuid());

if (getEntityHasLineage(assetVertex)) {
Iterator<AtlasEdge> lineageEdges = assetVertex.getEdges(AtlasEdgeDirection.BOTH, PROCESS_EDGE_LABELS).iterator();
boolean foundActiveRel = false;

while (lineageEdges.hasNext()) {
AtlasEdge edge = lineageEdges.next();
if (getStatus(edge) == ACTIVE) {
foundActiveRel = true;
break;
}
}

if (!foundActiveRel) {
AtlasGraphUtilsV2.setEncodedProperty(assetVertex, HAS_LINEAGE, false);
}
}
}

public void repairHasLineageWithAtlasEdges(Set<AtlasEdge> inputOutputEdges) {
AtlasPerfMetrics.MetricRecorder metricRecorder = RequestContext.get().startMetricRecord("repairHasLineageWithAtlasEdges");

Expand Down
11 changes: 11 additions & 0 deletions server-api/src/main/java/org/apache/atlas/RequestContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,8 @@ public class RequestContext {
private boolean allowDuplicateDisplayName;
private MetricsRegistry metricsRegistry;
private boolean skipAuthorizationCheck = false;
private Set<String> deletedEdgesIdsForResetHasLineage = new HashSet<>(0);


private RequestContext() {
}
Expand Down Expand Up @@ -147,6 +149,7 @@ public void clearCache() {
this.removedElementsMap.clear();
this.deletedEdgesIds.clear();
this.processGuidIds.clear();
this.deletedEdgesIdsForResetHasLineage.clear();
this.requestContextHeaders.clear();
this.relationshipEndToVertexIdMap.clear();
this.relationshipMutationMap.clear();
Expand Down Expand Up @@ -389,6 +392,14 @@ public Set<String> getDeletedEdgesIds() {
return deletedEdgesIds;
}

public void addToDeletedEdgesIdsForResetHasLineage(String edgeId) {
deletedEdgesIdsForResetHasLineage.add(edgeId);
}

public Set<String> getDeletedEdgesIdsForResetHasLineage() {
return deletedEdgesIdsForResetHasLineage;
}

public Set<String> getProcessGuidIds() {
return processGuidIds;
}
Expand Down

0 comments on commit 3b98408

Please sign in to comment.