diff --git a/repository/src/main/java/org/apache/atlas/repository/graph/GraphHelper.java b/repository/src/main/java/org/apache/atlas/repository/graph/GraphHelper.java index aca49380ef..79b7190b02 100755 --- a/repository/src/main/java/org/apache/atlas/repository/graph/GraphHelper.java +++ b/repository/src/main/java/org/apache/atlas/repository/graph/GraphHelper.java @@ -103,14 +103,14 @@ public final class GraphHelper { private int maxRetries = 3; private long retrySleepTimeMillis = 1000; - private boolean removePropagations = false; + private boolean removePropagations = true; public GraphHelper(AtlasGraph graph) { this.graph = graph; try { maxRetries = ApplicationProperties.get().getInt(RETRY_COUNT, 3); retrySleepTimeMillis = ApplicationProperties.get().getLong(RETRY_DELAY, 1000); - removePropagations = ApplicationProperties.get().getBoolean(DEFAULT_REMOVE_PROPAGATIONS_ON_ENTITY_DELETE, false); + removePropagations = ApplicationProperties.get().getBoolean(DEFAULT_REMOVE_PROPAGATIONS_ON_ENTITY_DELETE, true); } catch (AtlasException e) { LOG.error("Could not load configuration. Setting to default value for " + RETRY_COUNT, e); } diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/DeleteHandlerV1.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/DeleteHandlerV1.java index 60c9e25790..0a6cbccd2a 100644 --- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/DeleteHandlerV1.java +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/DeleteHandlerV1.java @@ -1292,17 +1292,10 @@ private void setBlockedClassificationIds(AtlasEdge edge, List classifica } } } - - - public void createAndQueueTask(String taskType, AtlasVertex entityVertex, String classificationVertexId, String relationshipGuid, Boolean currentRestrictPropagationThroughLineage) throws AtlasBaseException { - if (!CLASSIFICATION_PROPAGATION_DELETE.equals(taskType) && skipClassificationTaskCreation(classificationVertexId)) { - LOG.info("Task is already scheduled for classification id {}, no need to schedule task for vertex {}", classificationVertexId, entityVertex.getIdForDisplay()); - return; - } - + public void createAndQueueTaskWithoutCheck(String taskType, AtlasVertex entityVertex, String classificationVertexId, String relationshipGuid) throws AtlasBaseException { String currentUser = RequestContext.getCurrentUser(); String entityGuid = GraphHelper.getGuid(entityVertex); - Map taskParams = ClassificationTask.toParameters(entityGuid, classificationVertexId, relationshipGuid, currentRestrictPropagationThroughLineage); + Map taskParams = ClassificationTask.toParameters(entityGuid, classificationVertexId, relationshipGuid); AtlasTask task = taskManagement.createTask(taskType, currentUser, taskParams, classificationVertexId, entityGuid); AtlasGraphUtilsV2.addEncodedProperty(entityVertex, PENDING_TASKS_PROPERTY_KEY, task.getGuid()); @@ -1316,9 +1309,13 @@ public void createAndQueueTask(String taskType, AtlasVertex entityVertex, String return; } + createAndQueueTaskWithoutCheck(taskType, entityVertex, classificationVertexId, relationshipGuid); + } + + public void createAndQueueTaskWithoutCheck(String taskType, AtlasVertex entityVertex, String classificationVertexId, String relationshipGuid, Boolean currentRestrictPropagationThroughLineage) throws AtlasBaseException { String currentUser = RequestContext.getCurrentUser(); String entityGuid = GraphHelper.getGuid(entityVertex); - Map taskParams = ClassificationTask.toParameters(entityGuid, classificationVertexId, relationshipGuid); + Map taskParams = ClassificationTask.toParameters(entityGuid, classificationVertexId, relationshipGuid, currentRestrictPropagationThroughLineage); AtlasTask task = taskManagement.createTask(taskType, currentUser, taskParams, classificationVertexId, entityGuid); AtlasGraphUtilsV2.addEncodedProperty(entityVertex, PENDING_TASKS_PROPERTY_KEY, task.getGuid()); diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphMapper.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphMapper.java index 2674885d69..b446f4c299 100644 --- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphMapper.java +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphMapper.java @@ -137,8 +137,7 @@ import static org.apache.atlas.repository.graph.GraphHelper.getPropagatedEdges; import static org.apache.atlas.repository.graph.GraphHelper.getPropagatableClassifications; import static org.apache.atlas.repository.graph.GraphHelper.getClassificationEntityGuid; -import static org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2.getIdFromVertex; -import static org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2.isReference; +import static org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2.*; import static org.apache.atlas.repository.store.graph.v2.tasks.ClassificationPropagateTaskFactory.CLASSIFICATION_PROPAGATION_ADD; import static org.apache.atlas.repository.store.graph.v2.tasks.ClassificationPropagateTaskFactory.CLASSIFICATION_PROPAGATION_DELETE; import static org.apache.atlas.type.AtlasStructType.AtlasAttribute.AtlasRelationshipEdgeDirection.IN; @@ -3178,14 +3177,18 @@ public void updateClassifications(EntityMutationContext context, String guid, Li isClassificationUpdated = true; } + boolean removePropagation = false; // check for removePropagationsOnEntityDelete update Boolean currentRemovePropagations = currentClassification.getRemovePropagationsOnEntityDelete(); Boolean updatedRemovePropagations = classification.getRemovePropagationsOnEntityDelete(); - - if (updatedRemovePropagations != null && (updatedRemovePropagations != currentRemovePropagations)) { + if (updatedRemovePropagations != null && !updatedRemovePropagations.equals(currentRemovePropagations)) { AtlasGraphUtilsV2.setEncodedProperty(classificationVertex, CLASSIFICATION_VERTEX_REMOVE_PROPAGATIONS_KEY, updatedRemovePropagations); - isClassificationUpdated = true; + + boolean isEntityDeleted = DELETED.toString().equals(entityVertex.getProperty(STATE_PROPERTY_KEY, String.class)); + if (isEntityDeleted && updatedRemovePropagations) { + removePropagation = true; + } } if (isClassificationUpdated) { @@ -3201,10 +3204,6 @@ public void updateClassifications(EntityMutationContext context, String guid, Li mapClassification(EntityOperation.UPDATE, context, classification, entityType, entityVertex, classificationVertex); updateModificationMetadata(entityVertex); - // handle update of 'propagate' flag - Boolean currentTagPropagation = currentClassification.isPropagate(); - Boolean updatedTagPropagation = classification.isPropagate(); - /* ----------------------------- | Current Tag | Updated Tag | | Propagation | Propagation | @@ -3218,18 +3217,24 @@ public void updateClassifications(EntityMutationContext context, String guid, Li | true | false | => Remove Tag Propagation (send REMOVE classification notifications) |-------------|-------------| */ + Boolean currentTagPropagation = currentClassification.isPropagate(); + Boolean updatedTagPropagation = classification.isPropagate(); Boolean currentRestrictPropagationThroughLineage = currentClassification.getRestrictPropagationThroughLineage(); Boolean updatedRestrictPropagationThroughLineage = classification.getRestrictPropagationThroughLineage(); - if (taskManagement != null && DEFERRED_ACTION_ENABLED) { - String propagationType = updatedTagPropagation ? CLASSIFICATION_PROPAGATION_ADD : CLASSIFICATION_PROPAGATION_DELETE; + if ((!Objects.equals(updatedRemovePropagations, currentRemovePropagations) || + !Objects.equals(currentTagPropagation, updatedTagPropagation) || + !Objects.equals(currentRestrictPropagationThroughLineage, updatedRestrictPropagationThroughLineage)) && + taskManagement != null && DEFERRED_ACTION_ENABLED) { + String propagationType = CLASSIFICATION_PROPAGATION_ADD; + if (removePropagation || !updatedTagPropagation) + { + propagationType = CLASSIFICATION_PROPAGATION_DELETE; + } createAndQueueTask(propagationType, entityVertex, classificationVertex.getIdForDisplay(), currentRestrictPropagationThroughLineage); - - updatedTagPropagation = null; } - // compute propagatedEntityVertices once and use it for subsequent iterations and notifications if (updatedTagPropagation != null && (currentTagPropagation != updatedTagPropagation || currentRestrictPropagationThroughLineage != updatedRestrictPropagationThroughLineage)) { if (updatedTagPropagation) { @@ -3972,11 +3977,11 @@ private void addToUpdatedBusinessAttributes(Map> upd private void createAndQueueTask(String taskType, AtlasVertex entityVertex, String classificationVertexId, Boolean currentPropagateThroughLineage) throws AtlasBaseException{ - deleteDelegate.getHandler().createAndQueueTask(taskType, entityVertex, classificationVertexId, null, currentPropagateThroughLineage); + deleteDelegate.getHandler().createAndQueueTaskWithoutCheck(taskType, entityVertex, classificationVertexId, null, currentPropagateThroughLineage); } private void createAndQueueTask(String taskType, AtlasVertex entityVertex, String classificationVertexId) throws AtlasBaseException { - deleteDelegate.getHandler().createAndQueueTask(taskType, entityVertex, classificationVertexId, null); + deleteDelegate.getHandler().createAndQueueTaskWithoutCheck(taskType, entityVertex, classificationVertexId, null); } public void removePendingTaskFromEntity(String entityGuid, String taskGuid) throws EntityNotFoundException {