Skip to content

Commit

Permalink
Merge pull request #2972 from atlanhq/PLT-1092-Re
Browse files Browse the repository at this point in the history
Fixed the columnProcess Lineage and a bug
  • Loading branch information
hr2904 authored May 3, 2024
2 parents 5213318 + 160e7bd commit 6f22505
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -378,7 +378,6 @@ public enum SupportedFileExtensions { XLSX, XLS, CSV }

public static final String CATALOG_PROCESS_INPUT_RELATIONSHIP_LABEL = "__Process.inputs";
public static final String CATALOG_PROCESS_OUTPUT_RELATIONSHIP_LABEL = "__Process.outputs";
public static final String COLUMN_LINEAGE_RELATIONSHIP_LABEL = "__Process.columnProcesses";
public static final String CLASSIFICATION_PROPAGATION_MODE_DEFAULT ="DEFAULT";
public static final String CLASSIFICATION_PROPAGATION_MODE_RESTRICT_LINEAGE ="RESTRICT_LINEAGE";

Expand All @@ -388,14 +387,12 @@ public enum SupportedFileExtensions { XLSX, XLS, CSV }
public static final HashMap<String, ArrayList<String>> CLASSIFICATION_PROPAGATION_MODE_LABELS_MAP = new HashMap<String, ArrayList<String>>(){{
put(CLASSIFICATION_PROPAGATION_MODE_RESTRICT_LINEAGE, new ArrayList<>(
Arrays.asList(CATALOG_PROCESS_INPUT_RELATIONSHIP_LABEL,
CATALOG_PROCESS_OUTPUT_RELATIONSHIP_LABEL,
COLUMN_LINEAGE_RELATIONSHIP_LABEL
CATALOG_PROCESS_OUTPUT_RELATIONSHIP_LABEL
)));
put(CLASSIFICATION_PROPAGATION_MODE_DEFAULT, null);
put(CLASSIFICATION_PROPAGATION_MODE_RESTRICT_HIERARCHY, new ArrayList<>(
Arrays.asList(CATALOG_PROCESS_INPUT_RELATIONSHIP_LABEL,
CATALOG_PROCESS_OUTPUT_RELATIONSHIP_LABEL,
COLUMN_LINEAGE_RELATIONSHIP_LABEL
CATALOG_PROCESS_OUTPUT_RELATIONSHIP_LABEL
)));
}};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,8 +137,7 @@
import static org.apache.atlas.repository.graph.GraphHelper.getPropagatableClassifications;
import static org.apache.atlas.repository.graph.GraphHelper.getClassificationEntityGuid;
import static org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2.*;
import static org.apache.atlas.repository.store.graph.v2.tasks.ClassificationPropagateTaskFactory.CLASSIFICATION_PROPAGATION_ADD;
import static org.apache.atlas.repository.store.graph.v2.tasks.ClassificationPropagateTaskFactory.CLASSIFICATION_PROPAGATION_DELETE;
import static org.apache.atlas.repository.store.graph.v2.tasks.ClassificationPropagateTaskFactory.*;
import static org.apache.atlas.type.AtlasStructType.AtlasAttribute.AtlasRelationshipEdgeDirection.IN;
import static org.apache.atlas.type.AtlasStructType.AtlasAttribute.AtlasRelationshipEdgeDirection.OUT;
import static org.apache.atlas.type.Constants.PENDING_TASKS_PROPERTY_KEY;
Expand Down Expand Up @@ -3580,18 +3579,21 @@ public void updateClassifications(EntityMutationContext context, String guid, Li
Boolean updatedRestrictPropagationThroughLineage = classification.getRestrictPropagationThroughLineage();
Boolean currentRestrictPropagationThroughHierarchy = currentClassification.getRestrictPropagationThroughHierarchy();
Boolean updatedRestrictPropagationThroughHierarchy = classification.getRestrictPropagationThroughHierarchy();

String propagationMode = entityRetriever.determinePropagationMode(updatedRestrictPropagationThroughLineage, updatedRestrictPropagationThroughHierarchy);
if ((!Objects.equals(updatedRemovePropagations, currentRemovePropagations) ||
!Objects.equals(currentTagPropagation, updatedTagPropagation) ||
!Objects.equals(currentRestrictPropagationThroughLineage, updatedRestrictPropagationThroughLineage)) &&
taskManagement != null && DEFERRED_ACTION_ENABLED) {

String propagationType = CLASSIFICATION_PROPAGATION_ADD;
if (removePropagation || !updatedTagPropagation)
{
if(currentRestrictPropagationThroughLineage != updatedRestrictPropagationThroughLineage || currentRestrictPropagationThroughHierarchy != updatedRestrictPropagationThroughHierarchy){
propagationType = CLASSIFICATION_REFRESH_PROPAGATION;
}
if (removePropagation || !updatedTagPropagation) {
propagationType = CLASSIFICATION_PROPAGATION_DELETE;
}
createAndQueueTask(propagationType, entityVertex, classificationVertex.getIdForDisplay(), currentRestrictPropagationThroughLineage,currentRestrictPropagationThroughHierarchy);
updatedTagPropagation = null;
}

// compute propagatedEntityVertices once and use it for subsequent iterations and notifications
Expand All @@ -3605,11 +3607,7 @@ public void updateClassifications(EntityMutationContext context, String guid, Li
deleteDelegate.getHandler().removeTagPropagation(classificationVertex);
}
if (CollectionUtils.isEmpty(entitiesToPropagateTo)) {
String propagationMode;
if (updatedRemovePropagations !=null) {
propagationMode = entityRetriever.determinePropagationMode(updatedRestrictPropagationThroughLineage, updatedRestrictPropagationThroughHierarchy);
}
else{
if (updatedRemovePropagations ==null) {
propagationMode = CLASSIFICATION_PROPAGATION_MODE_DEFAULT;
}
Boolean toExclude = propagationMode == CLASSIFICATION_VERTEX_RESTRICT_PROPAGATE_THROUGH_LINEAGE ? true : false;
Expand Down

0 comments on commit 6f22505

Please sign in to comment.