Skip to content

Commit

Permalink
Merge pull request #2778 from atlanhq/feat/PLT-626
Browse files Browse the repository at this point in the history
Optimise append operation and enhance error messages
  • Loading branch information
aarshi0301 authored Jan 23, 2024
2 parents bfcad2a + 8e82360 commit 8fc5dfe
Show file tree
Hide file tree
Showing 5 changed files with 77 additions and 53 deletions.
3 changes: 2 additions & 1 deletion intg/src/main/java/org/apache/atlas/AtlasErrorCode.java
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,7 @@ public enum AtlasErrorCode {
PAGINATION_CAN_ONLY_BE_USED_WITH_DEPTH_ONE(400, "ATLAS-400-00-103", "Pagination can be used only when depth is 1"),
CANT_CALCULATE_VERTEX_COUNTS_WITHOUT_PAGINATION(400, "ATLAS-400-00-104", "Vertex counts can't be calculated without pagination"),
FORBIDDEN_TYPENAME(400,"ATLAS-400-00-107", "Forbidden type: Can not pass builtin type {0}"),
ATTRIBUTE_ALREADY_EXISTS_IN_RELATIONSHIP_ATTRIBUTE(400, "ATLAS-400-00-108", "{0}: attribute already exists in relationshipAttributes"),

// All Not found enums go here
TYPE_NAME_NOT_FOUND(404, "ATLAS-404-00-001", "Given typename {0} was invalid"),
Expand Down Expand Up @@ -213,6 +214,7 @@ public enum AtlasErrorCode {
TASK_NOT_FOUND(404, "ATLAS-404-00-018", "Given task guid {0} is invalid/not found"),
RESOURCE_NOT_FOUND(404, "ATLAS-404-00-019", "{0} not found"),
INDEX_NOT_FOUND(404, "ATLAS-404-00-020", "ES index {0} not found"),
RELATIONSHIP_DOES_NOT_EXIST(404, "ATLAS-409-00-0021", "relationship {0} does not exist between entities {1} and {2}"),

METHOD_NOT_ALLOWED(405, "ATLAS-405-00-001", "Error 405 - The request method {0} is inappropriate for the URL: {1}"),
DELETE_TAG_PROPAGATION_NOT_ALLOWED(406, "ATLAS-406-00-001", "Classification delete is not allowed; Add/Update classification propagation is in queue for classification: {0} and entity: {1}. Please try again"),
Expand All @@ -237,7 +239,6 @@ public enum AtlasErrorCode {

CATEGORY_PARENT_FROM_OTHER_GLOSSARY(409, "ATLAS-400-00-0015", "Parent category from another Anchor(glossary) not supported"),
CLASSIFICATION_TYPE_HAS_REFERENCES(409, "ATLAS-400-00-0016", "Given classification {0} [{1}] has references"),

// All internal errors go here
INTERNAL_ERROR(500, "ATLAS-500-00-001", "Internal server error {0}"),
INDEX_CREATION_FAILED(500, "ATLAS-500-00-002", "Index creation failed for {0}"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -357,6 +357,12 @@ void visitEntity(AtlasEntityType entityType, AtlasEntity entity) throws AtlasBas
private void visitRelationships(AtlasEntityType entityType, AtlasEntity entity, List<String> visitedAttributes) throws AtlasBaseException {
for (String attrName : entityType.getRelationshipAttributes().keySet()) {

if (entity.hasRelationshipAttribute(attrName)){
if (entity.hasAppendRelationshipAttribute(attrName) || entity.hasRemoveRelationshipAttribute(attrName)){
throw new AtlasBaseException(AtlasErrorCode.ATTRIBUTE_ALREADY_EXISTS_IN_RELATIONSHIP_ATTRIBUTE, attrName);
}
}

// if attribute is not in 'relationshipAttributes', try 'attributes'
// appendRelationshipAttribute will be ignored if same attribute is present
// in relationshipAttribute
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,11 @@ public AttributeMutationContext(EntityOperation op, AtlasVertex referringVertex,
this.existingEdge = currentEdge;
}

public AttributeMutationContext(EntityOperation op, AtlasVertex referringVertex, AtlasAttribute attribute, Object value,
String vertexProperty, AtlasType currentElementType) {
this(op, referringVertex, attribute, value, vertexProperty, currentElementType, null);
}

@Override
public int hashCode() {
return Objects.hash(op, referringVertex, attribute, value, vertexProperty, currentElementType, existingEdge);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1071,7 +1071,6 @@ private void mapRelationshipAttributes(AtlasEntity entity, AtlasEntityType entit
}
}
}

updateModificationMetadata(vertex);

RequestContext.get().endMetricRecord(metric);
Expand Down Expand Up @@ -1118,8 +1117,6 @@ private void mapAppendRemoveRelationshipAttributes(AtlasEntity entity, AtlasEnti
}
}

updateModificationMetadata(vertex);

RequestContext.get().endMetricRecord(metric);

if (LOG.isDebugEnabled()) {
Expand Down Expand Up @@ -1710,7 +1707,7 @@ private AtlasEdge mapObjectIdValueUsingRelationship(AttributeMutationContext ctx
return ret;
}

private AtlasEdge getEdgeUsingRelationship(AttributeMutationContext ctx, EntityMutationContext context) throws AtlasBaseException {
private AtlasEdge getEdgeUsingRelationship(AttributeMutationContext ctx, EntityMutationContext context, boolean createEdge) throws AtlasBaseException {
if (LOG.isDebugEnabled()) {
LOG.debug("==> getEdgeUsingRelationship({})", ctx);
}
Expand Down Expand Up @@ -1767,14 +1764,23 @@ private AtlasEdge getEdgeUsingRelationship(AttributeMutationContext ctx, EntityM
toVertex = attributeVertex;
}

AtlasEdge edge = relationshipStore.getRelationship(fromVertex, toVertex, new AtlasRelationship(relationshipName));
AtlasEdge edge = null;

if (edge != null && getStatus(edge) != DELETED) {
ret = edge;
}
if (createEdge) {
edge = relationshipStore.getOrCreate(fromVertex, toVertex, new AtlasRelationship(relationshipName));
boolean isCreated = graphHelper.getCreatedTime(edge) == RequestContext.get().getRequestTime();

RequestContext requestContext = RequestContext.get();
requestContext.recordEntityUpdate(entityRetriever.toAtlasEntityHeader(toVertex));
if (isCreated) {
// if relationship did not exist before and new relationship was created
// record entity update on both relationship vertices
recordEntityUpdateForNonRelationsipAttribute(fromVertex);
recordEntityUpdateForNonRelationsipAttribute(toVertex);
}

} else {
edge = relationshipStore.getRelationship(fromVertex, toVertex, new AtlasRelationship(relationshipName));
}
ret = edge;
}
}

Expand Down Expand Up @@ -2022,27 +2028,6 @@ public List appendArrayValue(AttributeMutationContext ctx, EntityMutationContext
AtlasAttribute inverseRefAttribute = attribute.getInverseRefAttribute();
Cardinality cardinality = attribute.getAttributeDef().getCardinality();
List<Object> newElementsCreated = new ArrayList<>();
List<Object> currentElements;


if (isReference && !isSoftReference) {
// returns already attached assets
currentElements = (List) getCollectionElementsUsingRelationship(ctx.getReferringVertex(), attribute, isStructType);
} else {
currentElements = (List) getArrayElementsProperty(elementType, isSoftReference, ctx.getReferringVertex(), ctx.getVertexProperty());
}

if (PARTIAL_UPDATE.equals(ctx.getOp()) && attribute.getAttributeDef().isAppendOnPartialUpdate() && CollectionUtils.isNotEmpty(currentElements)) {
if (CollectionUtils.isEmpty(newElements)) {
newElements = new ArrayList<>(currentElements);
} else {
List<Object> mergedVal = new ArrayList<>(currentElements);

mergedVal.addAll(newElements);

newElements = mergedVal;
}
}

boolean isNewElementsNull = newElements == null;

Expand All @@ -2054,13 +2039,13 @@ public List appendArrayValue(AttributeMutationContext ctx, EntityMutationContext
newElements = (List) newElements.stream().distinct().collect(Collectors.toList());
}


for (int index = 0; index < newElements.size(); index++) {
AtlasEdge existingEdge = (isSoftReference) ? null : getEdgeAt(currentElements, index, elementType);
AttributeMutationContext arrCtx = new AttributeMutationContext(ctx.getOp(), ctx.getReferringVertex(), ctx.getAttribute(), newElements.get(index),
ctx.getVertexProperty(), elementType, existingEdge);
ctx.getVertexProperty(), elementType);


Object newEntry = mapCollectionElementsToVertex(arrCtx, context);
Object newEntry = getEdgeUsingRelationship(arrCtx, context, true);

if (isReference && newEntry != null && newEntry instanceof AtlasEdge && inverseRefAttribute != null) {
// Update the inverse reference value.
Expand All @@ -2084,23 +2069,23 @@ public List appendArrayValue(AttributeMutationContext ctx, EntityMutationContext
}

if (isNewElementsNull) {
setArrayElementsProperty(elementType, isSoftReference, ctx.getReferringVertex(), ctx.getVertexProperty(), null, null, cardinality);
setArrayElementsProperty(elementType, isSoftReference, ctx.getReferringVertex(), ctx.getVertexProperty(), new ArrayList<>(0), new ArrayList<>(0), cardinality);
} else {
setArrayElementsProperty(elementType, isSoftReference, ctx.getReferringVertex(), ctx.getVertexProperty(), newElements, currentElements, cardinality);
setArrayElementsProperty(elementType, isSoftReference, ctx.getReferringVertex(), ctx.getVertexProperty(), newElements, new ArrayList<>(0), cardinality);
}

switch (ctx.getAttribute().getRelationshipEdgeLabel()) {
case TERM_ASSIGNMENT_LABEL: addMeaningsToEntity(ctx, newElementsCreated, new ArrayList<>());
case TERM_ASSIGNMENT_LABEL: addMeaningsToEntity(ctx, newElementsCreated, new ArrayList<>(0));
break;

case CATEGORY_TERMS_EDGE_LABEL: addCategoriesToTermEntity(ctx, newElementsCreated, new ArrayList<>());
case CATEGORY_TERMS_EDGE_LABEL: addCategoriesToTermEntity(ctx, newElementsCreated, new ArrayList<>(0));
break;

case CATEGORY_PARENT_EDGE_LABEL: addCatParentAttr(ctx, newElementsCreated, new ArrayList<>());
case CATEGORY_PARENT_EDGE_LABEL: addCatParentAttr(ctx, newElementsCreated, new ArrayList<>(0));
break;

case PROCESS_INPUTS:
case PROCESS_OUTPUTS: addEdgesToContext(GraphHelper.getGuid(ctx.referringVertex), newElementsCreated, new ArrayList<>());
case PROCESS_OUTPUTS: addEdgesToContext(GraphHelper.getGuid(ctx.referringVertex), newElementsCreated, new ArrayList<>(0));
break;
}

Expand Down Expand Up @@ -2139,29 +2124,35 @@ public List removeArrayValue(AttributeMutationContext ctx, EntityMutationContext

for (int index = 0; index < elementsDeleted.size(); index++) {
AttributeMutationContext arrCtx = new AttributeMutationContext(ctx.getOp(), ctx.getReferringVertex(), ctx.getAttribute(), elementsDeleted.get(index),
ctx.getVertexProperty(), elementType, null);
ctx.getVertexProperty(), elementType);

Object deleteEntry = getEdgeUsingRelationship(arrCtx, context, false);

Object deleteEntry = getEdgeUsingRelationship(arrCtx, context);
if(deleteEntry != null) {
entityRelationsDeleted.add(deleteEntry);
// throw error if relation does not exist but requested to remove
if (deleteEntry == null) {
AtlasVertex attributeVertex = context.getDiscoveryContext().getResolvedEntityVertex(getGuid(arrCtx.getValue()));
throw new AtlasBaseException(AtlasErrorCode.RELATIONSHIP_DOES_NOT_EXIST, attribute.getRelationshipName(),
AtlasGraphUtilsV2.getIdFromVertex(attributeVertex), AtlasGraphUtilsV2.getIdFromVertex(ctx.getReferringVertex()));
}

entityRelationsDeleted.add(deleteEntry);
}

removedElements = removeArrayEntries(attribute, (List)entityRelationsDeleted, ctx);


switch (ctx.getAttribute().getRelationshipEdgeLabel()) {
case TERM_ASSIGNMENT_LABEL: addMeaningsToEntity(ctx, new ArrayList<>() , removedElements);
case TERM_ASSIGNMENT_LABEL: addMeaningsToEntity(ctx, new ArrayList<>(0) , removedElements);
break;

case CATEGORY_TERMS_EDGE_LABEL: addCategoriesToTermEntity(ctx, new ArrayList<>(), removedElements);
case CATEGORY_TERMS_EDGE_LABEL: addCategoriesToTermEntity(ctx, new ArrayList<>(0), removedElements);
break;

case CATEGORY_PARENT_EDGE_LABEL: addCatParentAttr(ctx, new ArrayList<>(), removedElements);
case CATEGORY_PARENT_EDGE_LABEL: addCatParentAttr(ctx, new ArrayList<>(0), removedElements);
break;

case PROCESS_INPUTS:
case PROCESS_OUTPUTS: addEdgesToContext(GraphHelper.getGuid(ctx.referringVertex), new ArrayList<>(), removedElements);
case PROCESS_OUTPUTS: addEdgesToContext(GraphHelper.getGuid(ctx.referringVertex), new ArrayList<>(0), removedElements);
break;
}

Expand Down Expand Up @@ -2894,12 +2885,15 @@ private List<AtlasEdge> removeArrayEntries(AtlasAttribute attribute, List<AtlasE
continue;
}

boolean deleted = deleteDelegate.getHandler().deleteEdgeReference(edge, entryType.getTypeCategory(), attribute.isOwnedRef(),
// update both sides of relationship wen edge is deleted
recordEntityUpdateForNonRelationsipAttribute(edge.getInVertex());
recordEntityUpdateForNonRelationsipAttribute(edge.getOutVertex());

deleteDelegate.getHandler().deleteEdgeReference(edge, entryType.getTypeCategory(), attribute.isOwnedRef(),
true, attribute.getRelationshipEdgeDirection(), entityVertex);

if (!deleted) {
additionalElements.add(edge);
}
additionalElements.add(edge);

}

return additionalElements;
Expand Down Expand Up @@ -4070,6 +4064,19 @@ private void recordEntityUpdate(AtlasVertex vertex) throws AtlasBaseException {
}
}

private void recordEntityUpdateForNonRelationsipAttribute(AtlasVertex vertex) throws AtlasBaseException {
if (vertex != null) {
RequestContext req = RequestContext.get();

if (!req.isUpdatedEntity(graphHelper.getGuid(vertex))) {
updateModificationMetadata(vertex);

req.recordEntityUpdateForNonRelationshipAttributes(entityRetriever.toAtlasEntityHeader(vertex));
}
}
}


private String getIdFromInVertex(AtlasEdge edge) {
return getIdFromVertex(edge.getInVertex());
}
Expand Down
5 changes: 5 additions & 0 deletions server-api/src/main/java/org/apache/atlas/RequestContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -344,6 +344,11 @@ public void recordEntityUpdate(AtlasEntityHeader entity) {
}
}

public void recordEntityUpdateForNonRelationshipAttributes(AtlasEntityHeader entity) {
if (entity != null && entity.getGuid() != null) {
updatedEntities.put(entity.getGuid(), entity);
}
}
public void recordEntityToSkip(String guid) {
if(! StringUtils.isEmpty(guid)) {
entitiesToSkipUpdate.add(guid);
Expand Down

0 comments on commit 8fc5dfe

Please sign in to comment.