Skip to content

Commit

Permalink
Merge pull request #3388 from atlanhq/DG-1746
Browse files Browse the repository at this point in the history
DG-1749 Custom relationshipDef support
  • Loading branch information
nikhilbonte21 authored Aug 11, 2024
2 parents cfd9514 + b4d291f commit 65ce5c8
Show file tree
Hide file tree
Showing 11 changed files with 161 additions and 9 deletions.
39 changes: 39 additions & 0 deletions addons/policies/bootstrap_relationship_policies.json
Original file line number Diff line number Diff line change
Expand Up @@ -837,6 +837,45 @@
"remove-relationship"
]
}
},

{
"typeName": "AuthPolicy",
"attributes":
{
"name": "LINK_ASSETS_CUSTOM_RELATIONSHIP",
"qualifiedName": "LINK_ASSETS_CUSTOM_RELATIONSHIP",
"policyCategory": "bootstrap",
"policySubCategory": "default",
"policyServiceName": "atlas",
"policyType": "allow",
"policyUsers":
[],
"policyGroups":
[],
"policyRoles":
[
"$admin",
"$api-token-default-access"
],
"policyResourceCategory": "RELATIONSHIP",
"policyResources":
[
"relationship-type:CustomRelationship",
"end-one-entity-type:Referenceable",
"end-two-entity-type:Referenceable",
"end-one-entity-classification:*",
"end-two-entity-classification:*",
"end-one-entity:*",
"end-two-entity:*"
],
"policyActions":
[
"add-relationship",
"update-relationship",
"remove-relationship"
]
}
}
]
}
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,10 @@ public final class Constants {
public static final String INPUT_PORT_PRODUCT_EDGE_LABEL = "__Asset.inputPortDataProducts";
public static final String OUTPUT_PORT_PRODUCT_EDGE_LABEL = "__Asset.outputPortDataProducts";

public static final String CUSTOM_RELATIONSHIP_EDGE_LABEL = "__Referenceable.customRelationshipTo";
public static final String CUSTOM_RELATIONSHIP_END_NAME_FROM = "customRelationshipFrom";
public static final String CUSTOM_RELATIONSHIP_END_NAME_TO = "customRelationshipTo";

/**
* SQL property keys.
*/
Expand Down
4 changes: 3 additions & 1 deletion intg/src/main/java/org/apache/atlas/AtlasConfiguration.java
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,9 @@ public enum AtlasConfiguration {

INDEXSEARCH_ASYNC_SEARCH_KEEP_ALIVE_TIME_IN_SECONDS("atlas.indexsearch.async.search.keep.alive.time.in.seconds", 300),

ATLAS_MAINTENANCE_MODE("atlas.maintenance.mode", false);
ATLAS_MAINTENANCE_MODE("atlas.maintenance.mode", false),

ATLAS_CUSTOM_RELATIONSHIPS_MAX_COUNT("atlas.custom.relationships.max.count", 100);


private static final Configuration APPLICATION_PROPERTIES;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ public class IndexSearchParams extends SearchParams {
* */
private boolean allowDeletedRelations;
private boolean accessControlExclusive;
private boolean requestRelationshipAttrsForSearch;

@Override
public String getQuery() {
Expand Down Expand Up @@ -84,6 +85,14 @@ public void setRelationAttributes(Set<String> relationAttributes) {
this.relationAttributes = relationAttributes;
}

public boolean isRequestRelationshipAttrsForSearch() {
return requestRelationshipAttrsForSearch;
}

public void setRequestRelationshipAttrsForSearch(boolean requestRelationshipAttrsForSearch) {
this.requestRelationshipAttrsForSearch = requestRelationshipAttrsForSearch;
}

@Override
public String toString() {
return "IndexSearchParams{" +
Expand All @@ -93,6 +102,7 @@ public String toString() {
", queryString='" + queryString + '\'' +
", allowDeletedRelations=" + allowDeletedRelations +
", accessControlExclusive=" + accessControlExclusive +
", requestRelationshipAttrsForSearch=" + requestRelationshipAttrsForSearch +
", utmTags="+ getUtmTags() +
", enableFullRestriction="+ enableFullRestriction +
'}';
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -984,6 +984,7 @@ public AtlasSearchResult directIndexSearch(SearchParams searchParams) throws Atl
IndexSearchParams params = (IndexSearchParams) searchParams;
RequestContext.get().setRelationAttrsForSearch(params.getRelationAttributes());
RequestContext.get().setAllowDeletedRelationsIndexsearch(params.isAllowDeletedRelations());
RequestContext.get().setRequestRelationshipAttrsForSearch(params.isRequestRelationshipAttrsForSearch());

AtlasSearchResult ret = new AtlasSearchResult();
AtlasIndexQuery indexQuery;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ public void deleteRelationships(Collection<AtlasEdge> edges, final boolean force
}
continue;
}
deleteEdge(edge, isInternal || forceDelete);
deleteEdge(edge, isInternal || forceDelete || isCustomRelationship(edge));
}
}

Expand Down Expand Up @@ -384,7 +384,7 @@ public boolean deleteEdgeReference(AtlasEdge edge, TypeCategory typeCategory, bo
// for relationship edges, inverse vertex's relationship attribute doesn't need to be updated.
// only delete the reference relationship edge
if (GraphHelper.isRelationshipEdge(edge)) {
deleteEdge(edge, isInternalType);
deleteEdge(edge, isInternalType || isCustomRelationship(edge));
AtlasVertex referencedVertex = entityRetriever.getReferencedEntityVertex(edge, relationshipDirection, entityVertex);

if (referencedVertex != null) {
Expand All @@ -401,7 +401,7 @@ public boolean deleteEdgeReference(AtlasEdge edge, TypeCategory typeCategory, bo
//legacy case - not a relationship edge
//If deleting just the edge, reverse attribute should be updated for any references
//For example, for the department type system, if the person's manager edge is deleted, subordinates of manager should be updated
deleteEdge(edge, true, isInternalType);
deleteEdge(edge, true, isInternalType || isCustomRelationship(edge));
}
}

Expand Down Expand Up @@ -984,7 +984,8 @@ protected void deleteEdgeBetweenVertices(AtlasVertex outVertex, AtlasVertex inVe
}

if (edge != null) {
deleteEdge(edge, isInternalType(inVertex) && isInternalType(outVertex));
boolean isInternal = isInternalType(inVertex) && isInternalType(outVertex);
deleteEdge(edge, isInternal || isCustomRelationship(edge));

final RequestContext requestContext = RequestContext.get();
final String outId = GraphHelper.getGuid(outVertex);
Expand Down Expand Up @@ -1069,6 +1070,10 @@ private boolean isInternalType(final AtlasVertex instanceVertex) {
return Objects.nonNull(entityType) && entityType.isInternalType();
}

private boolean isCustomRelationship(final AtlasEdge edge) {
return edge.getLabel().equals(CUSTOM_RELATIONSHIP_EDGE_LABEL);
}

private void addToPropagatedClassificationNames(AtlasVertex entityVertex, String classificationName) {
if (LOG.isDebugEnabled()) {
LOG.debug("Adding property {} = \"{}\" to vertex {}", PROPAGATED_TRAIT_NAMES_PROPERTY_KEY, classificationName, string(entityVertex));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,8 @@ public class AtlasRelationshipStoreV2 implements AtlasRelationshipStore {
private static final String END_2_DOC_ID_KEY = "end2DocId";
private static final String ES_DOC_ID_MAP_KEY = "esDocIdMap";

private static final String CUSTOM_RELATIONSHIP_TYPE_NAME = "CustomRelationship";

private static Set<String> EXCLUDE_MUTATION_REL_TYPE_NAMES = new HashSet<String>() {{
add(REL_DOMAIN_TO_DOMAINS);
add(REL_DOMAIN_TO_PRODUCTS);
Expand Down Expand Up @@ -140,6 +142,10 @@ public AtlasRelationship create(AtlasRelationship relationship) throws AtlasBase
AtlasVertex end1Vertex = getVertexFromEndPoint(relationship.getEnd1());
AtlasVertex end2Vertex = getVertexFromEndPoint(relationship.getEnd2());

if (relationship.getTypeName().equals(CUSTOM_RELATIONSHIP_TYPE_NAME)) {
EntityGraphMapper.validateCustomRelationship(end1Vertex, end2Vertex);
}

AtlasEdge edge = createRelationship(end1Vertex, end2Vertex, relationship);

AtlasRelationship ret = edge != null ? entityRetriever.mapEdgeToAtlasRelationship(edge) : null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -192,8 +192,9 @@ public class EntityGraphMapper {
private boolean DEFERRED_ACTION_ENABLED = AtlasConfiguration.TASKS_USE_ENABLED.getBoolean();
private boolean DIFFERENTIAL_AUDITS = STORE_DIFFERENTIAL_AUDITS.getBoolean();

private static final int MAX_NUMBER_OF_RETRIES = AtlasConfiguration.MAX_NUMBER_OF_RETRIES.getInt();
private static final int CHUNK_SIZE = AtlasConfiguration.TASKS_GRAPH_COMMIT_CHUNK_SIZE.getInt();
private static final int MAX_NUMBER_OF_RETRIES = AtlasConfiguration.MAX_NUMBER_OF_RETRIES.getInt();
private static final int CHUNK_SIZE = AtlasConfiguration.TASKS_GRAPH_COMMIT_CHUNK_SIZE.getInt();
private static final int CUSTOM_REL_THRESHOLD = AtlasConfiguration.ATLAS_CUSTOM_RELATIONSHIPS_MAX_COUNT.getInt();

private final GraphHelper graphHelper;
private final AtlasGraph graph;
Expand Down Expand Up @@ -1769,8 +1770,11 @@ private AtlasEdge getEdgeUsingRelationship(AttributeMutationContext ctx, EntityM

AtlasEdge edge = null;

Map<String, Object> relationshipAttributes = getRelationshipAttributes(ctx.getValue());
AtlasRelationship relationship = new AtlasRelationship(relationshipName, relationshipAttributes);

if (createEdge) {
edge = relationshipStore.getOrCreate(fromVertex, toVertex, new AtlasRelationship(relationshipName));
edge = relationshipStore.getOrCreate(fromVertex, toVertex, relationship);
boolean isCreated = graphHelper.getCreatedTime(edge) == RequestContext.get().getRequestTime();

if (isCreated) {
Expand All @@ -1781,7 +1785,7 @@ private AtlasEdge getEdgeUsingRelationship(AttributeMutationContext ctx, EntityM
}

} else {
edge = relationshipStore.getRelationship(fromVertex, toVertex, new AtlasRelationship(relationshipName));
edge = relationshipStore.getRelationship(fromVertex, toVertex, relationship);
}
ret = edge;
}
Expand Down Expand Up @@ -2010,6 +2014,10 @@ public List mapArrayValue(AttributeMutationContext ctx, EntityMutationContext co
case OUTPUT_PORT_PRODUCT_EDGE_LABEL:
addInternalProductAttr(ctx, newElementsCreated, removedElements);
break;

case CUSTOM_RELATIONSHIP_EDGE_LABEL:
validateCustomRelationship(ctx, newElementsCreated, false);
break;
}

if (LOG.isDebugEnabled()) {
Expand Down Expand Up @@ -2100,6 +2108,10 @@ public List appendArrayValue(AttributeMutationContext ctx, EntityMutationContext
case OUTPUT_PORT_PRODUCT_EDGE_LABEL:
addInternalProductAttr(ctx, newElementsCreated, null);
break;

case CUSTOM_RELATIONSHIP_EDGE_LABEL:
validateCustomRelationship(ctx, newElementsCreated, true);
break;
}

if (LOG.isDebugEnabled()) {
Expand Down Expand Up @@ -2209,6 +2221,58 @@ private void addEdgesToContext(String guid, List<Object> newElementsCreated, Lis
}
}

public static void validateCustomRelationship(AttributeMutationContext ctx, List<Object> newElements, boolean isAppend) throws AtlasBaseException {
long currentSize;
boolean isEdgeDirectionIn = ctx.getAttribute().getRelationshipEdgeDirection() == AtlasRelationshipEdgeDirection.IN;

if (isAppend) {
currentSize = ctx.getReferringVertex().getEdgesCount(isEdgeDirectionIn ? AtlasEdgeDirection.IN : AtlasEdgeDirection.OUT,
CUSTOM_RELATIONSHIP_EDGE_LABEL);
} else {
currentSize = newElements.size();
}

validateCustomRelationshipCount(currentSize, ctx.getReferringVertex());

AtlasEdgeDirection direction;
if (isEdgeDirectionIn) {
direction = AtlasEdgeDirection.OUT;
} else {
direction = AtlasEdgeDirection.IN;
}

for (Object obj : newElements) {
AtlasEdge edge = (AtlasEdge) obj;

AtlasVertex targetVertex;
if (isEdgeDirectionIn) {
targetVertex = edge.getOutVertex();
LOG.info("{}: {}", direction, "outVertex");
} else {
targetVertex = edge.getInVertex();
LOG.info("{}: {}", direction, "inVertex");
}

currentSize = targetVertex.getEdgesCount(direction, CUSTOM_RELATIONSHIP_EDGE_LABEL);
validateCustomRelationshipCount(currentSize, targetVertex);
}
}

public static void validateCustomRelationship(AtlasVertex end1Vertex, AtlasVertex end2Vertex) throws AtlasBaseException {
long currentSize = end1Vertex.getEdgesCount(AtlasEdgeDirection.OUT, CUSTOM_RELATIONSHIP_EDGE_LABEL) + 1;
validateCustomRelationshipCount(currentSize, end1Vertex);

currentSize = end2Vertex.getEdgesCount(AtlasEdgeDirection.IN, CUSTOM_RELATIONSHIP_EDGE_LABEL) + 1;
validateCustomRelationshipCount(currentSize, end2Vertex);
}

private static void validateCustomRelationshipCount(long size, AtlasVertex vertex) throws AtlasBaseException {
if (CUSTOM_REL_THRESHOLD < size) {
throw new AtlasBaseException(AtlasErrorCode.OPERATION_NOT_SUPPORTED,
"Custom relationships size is more than " + CUSTOM_REL_THRESHOLD + ", current is " + size + " for " + vertex.getProperty(NAME, String.class));
}
}

private void addInternalProductAttr(AttributeMutationContext ctx, List<Object> createdElements, List<AtlasEdge> deletedElements) throws AtlasBaseException {
MetricRecorder metricRecorder = RequestContext.get().startMetricRecord("addInternalProductAttrForAppend");
AtlasVertex toVertex = ctx.getReferringVertex();
Expand Down Expand Up @@ -2676,6 +2740,8 @@ private static Map<String, Object> getRelationshipAttributes(Object val) throws
if (relationshipStruct instanceof Map) {
return AtlasTypeUtil.toStructAttributes(((Map) relationshipStruct));
}
} else if (val instanceof AtlasObjectId) {
return ((AtlasObjectId) val).getAttributes();
}

return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1658,6 +1658,14 @@ private AtlasObjectId mapVertexToObjectId(AtlasVertex entityVertex, String edgeL
ret = toAtlasObjectId(referenceVertex);
}
}

if (RequestContext.get().isRequestRelationshipAttrsForSearch()) {
boolean isRelationshipAttribute = typeRegistry.getRelationshipDefByName(GraphHelper.getTypeName(edge)) != null;
if (isRelationshipAttribute) {
AtlasRelationship relationship = mapEdgeToAtlasRelationship(edge);
ret.getAttributes().put("relationshipAttributes", relationship.getAttributes());
}
}
}

return ret;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ public void run() {
TASK_LOG.info("Task not scheduled as it was not found");
return;
}
RequestContext.get().setTraceId(task.getGuid());

TASK_LOG.info("Task guid = "+task.getGuid());
taskVertex = registry.getVertex(task.getGuid());
Expand Down
10 changes: 10 additions & 0 deletions server-api/src/main/java/org/apache/atlas/RequestContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ public class RequestContext {
private boolean allowDeletedRelationsIndexsearch = false;
private boolean includeMeanings = true;
private boolean includeClassifications = true;
private boolean requestRelationshipAttrsForSearch;

private boolean includeClassificationNames = false;

Expand Down Expand Up @@ -160,6 +161,7 @@ public void clearCache() {
this.onlyCAUpdateEntities.clear();
this.onlyBAUpdateEntities.clear();
this.relationAttrsForSearch.clear();
this.requestRelationshipAttrsForSearch = false;
this.queuedTasks.clear();
this.newElementsCreatedMap.clear();
this.removedElementsMap.clear();
Expand Down Expand Up @@ -214,6 +216,14 @@ public void setRelationAttrsForSearch(Set<String> relationAttrsForSearch) {
}
}

public boolean isRequestRelationshipAttrsForSearch() {
return requestRelationshipAttrsForSearch;
}

public void setRequestRelationshipAttrsForSearch(boolean requestRelationshipAttrsForSearch) {
this.requestRelationshipAttrsForSearch = requestRelationshipAttrsForSearch;
}

public Map<String, List<Object>> getRemovedElementsMap() {
return removedElementsMap;
}
Expand Down

0 comments on commit 65ce5c8

Please sign in to comment.