Skip to content

Commit

Permalink
PR Review Changes Done
Browse files Browse the repository at this point in the history
  • Loading branch information
hr2904 committed Mar 28, 2024
1 parent 35b057f commit 209d05e
Show file tree
Hide file tree
Showing 8 changed files with 18 additions and 54 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.File;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
Expand Down Expand Up @@ -379,7 +378,7 @@ public enum SupportedFileExtensions { XLSX, XLS, CSV }
public static final String CLASSIFICATION_PROPAGATION_MODE_RESTRICT_HIERARCHY ="RESTRICT_HIERARCHY";


public static final HashMap<String, ArrayList<String>> CLASSIFICATION_PROPAGATION_EXCLUSION_MAP = new HashMap<String, ArrayList<String>>(){{
public static final HashMap<String, ArrayList<String>> CLASSIFICATION_PROPAGATION_MODE_LABELS_MAP = new HashMap<String, ArrayList<String>>(){{
put(CLASSIFICATION_PROPAGATION_MODE_RESTRICT_LINEAGE, new ArrayList<>(
Arrays.asList(CATALOG_PROCESS_INPUT_RELATIONSHIP_LABEL,
CATALOG_PROCESS_OUTPUT_RELATIONSHIP_LABEL,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -341,28 +341,21 @@ public static boolean getRemovePropagations(AtlasVertex classificationVertex) {
return ret;
}

public static boolean getRestrictPropagationThroughLineage(AtlasVertex classificationVertex) {
boolean ret = false;

if (classificationVertex != null) {
Boolean restrictPropagationThroughLineage = AtlasGraphUtilsV2.getEncodedProperty(classificationVertex, CLASSIFICATION_VERTEX_RESTRICT_PROPAGATE_THROUGH_LINEAGE, Boolean.class);

ret = (restrictPropagationThroughLineage == null) ? false : restrictPropagationThroughLineage;
public static boolean getRestrictPropagation(AtlasVertex classificationVertex, String propertyName) {
if (classificationVertex == null) {
return false;
}
Boolean restrictPropagation = AtlasGraphUtilsV2.getEncodedProperty(classificationVertex, propertyName, Boolean.class);

return ret;
return restrictPropagation != null && restrictPropagation;
}

public static boolean getRestrictPropagationThroughHierarchy(AtlasVertex classificationVertex) {
boolean ret = false;

if (classificationVertex != null) {
Boolean restrictPropagationThroughHierarchy = AtlasGraphUtilsV2.getEncodedProperty(classificationVertex, CLASSIFICATION_VERTEX_RESTRICT_PROPAGATE_THROUGH_HIERARCHY, Boolean.class);

ret = (restrictPropagationThroughHierarchy == null) ? false : restrictPropagationThroughHierarchy;
}
public static boolean getRestrictPropagationThroughLineage(AtlasVertex classificationVertex) {
return getRestrictPropagation(classificationVertex,CLASSIFICATION_VERTEX_RESTRICT_PROPAGATE_THROUGH_LINEAGE);
}

return ret;
public static boolean getRestrictPropagationThroughHierarchy(AtlasVertex classificationVertex) {
return getRestrictPropagation(classificationVertex,CLASSIFICATION_VERTEX_RESTRICT_PROPAGATE_THROUGH_HIERARCHY);
}

public static AtlasVertex getClassificationVertex(AtlasVertex entityVertex, String classificationName) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1312,21 +1312,6 @@ public void createAndQueueTask(String taskType, AtlasVertex entityVertex, String
createAndQueueTaskWithoutCheck(taskType, entityVertex, classificationVertexId, relationshipGuid);
}

public void createAndQueueTask(String taskType, AtlasVertex entityVertex, String classificationVertexId, String relationshipGuid, Boolean currentRestrictPropagationThroughLineage, Boolean currentRestrictPropagationThroughHierarchy) throws AtlasBaseException {
if (!CLASSIFICATION_PROPAGATION_DELETE.equals(taskType) && skipClassificationTaskCreation(classificationVertexId)) {
LOG.info("Task is already scheduled for classification id {}, no need to schedule task for vertex {}", classificationVertexId, entityVertex.getIdForDisplay());
return;
}

String currentUser = RequestContext.getCurrentUser();
String entityGuid = GraphHelper.getGuid(entityVertex);
Map<String, Object> taskParams = ClassificationTask.toParameters(entityGuid, classificationVertexId, relationshipGuid, currentRestrictPropagationThroughLineage, currentRestrictPropagationThroughHierarchy);
AtlasTask task = taskManagement.createTask(taskType, currentUser, taskParams, classificationVertexId, entityGuid);

AtlasGraphUtilsV2.addEncodedProperty(entityVertex, PENDING_TASKS_PROPERTY_KEY, task.getGuid());

RequestContext.get().queueTask(task);
}
public void createAndQueueTaskWithoutCheck(String taskType, AtlasVertex entityVertex, String classificationVertexId, String relationshipGuid, Boolean currentRestrictPropagationThroughLineage,Boolean currentRestrictPropogationThroughHierarchy) throws AtlasBaseException {
String currentUser = RequestContext.getCurrentUser();
String entityGuid = GraphHelper.getGuid(entityVertex);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,12 +60,6 @@ public AtlasEntityDiffResult getDiffResult(AtlasEntity updatedEntity, AtlasVerte
return getDiffResult(updatedEntity, null, storedVertex, findOnlyFirstDiff);
}

private void verifyClassificationsPropagationMode(List<AtlasClassification> incomingClassifications) throws AtlasBaseException {
for(AtlasClassification incomingClassification : incomingClassifications){
entityRetriever.determinePropagationMode(incomingClassification.getRestrictPropagationThroughLineage(),incomingClassification.getRestrictPropagationThroughHierachy());
}
}

private AtlasEntityDiffResult getDiffResult(AtlasEntity updatedEntity, AtlasEntity storedEntity, AtlasVertex storedVertex, boolean findOnlyFirstDiff) throws AtlasBaseException {
AtlasEntity diffEntity = new AtlasEntity(updatedEntity.getTypeName());
AtlasEntityType entityType = typeRegistry.getEntityTypeByName(updatedEntity.getTypeName());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1570,9 +1570,6 @@ private EntityMutationContext preCreateOrUpdate(EntityStream entityStream, Entit
String guid = element.getKey();
AtlasEntity entity = entityStream.getByGuid(guid);

if(Objects.nonNull(entity) && Objects.nonNull(entity.getClassifications())) {
entityRetriever.verifyClassificationsPropagationMode(entity.getClassifications());
}
if (entity != null) { // entity would be null if guid is not in the stream but referenced by an entity in the stream
AtlasEntityType entityType = typeRegistry.getEntityTypeByName(entity.getTypeName());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -257,8 +257,6 @@ private Map<String, List<AtlasClassification>> computeChanges(AtlasEntityHeader
List<AtlasClassification> incomingClassifications = listOps.filter(incomingEntityHeader.getGuid(), incomingEntityHeader.getClassifications());
List<AtlasClassification> entityClassifications = listOps.filter(entityToBeUpdated.getGuid(), entityToBeUpdated.getClassifications());

entityRetriever.verifyClassificationsPropagationMode(incomingClassifications);

if (CollectionUtils.isEmpty(incomingClassifications) && CollectionUtils.isEmpty(entityClassifications)) {
return null;
}
Expand Down Expand Up @@ -286,7 +284,6 @@ private void addClassifications(String entityGuid, String typeName, List<AtlasCl
if (CollectionUtils.isEmpty(list)) {
return;
}

String classificationNames = getClassificationNames(list);
try {
entitiesStore.addClassifications(entityGuid, list);
Expand All @@ -300,7 +297,6 @@ private void updateClassifications(String entityGuid, String typeName, List<Atla
if (CollectionUtils.isEmpty(list)) {
return;
}

String classificationNames = getClassificationNames(list);
try {
entitiesStore.updateClassifications(entityGuid, list);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2990,7 +2990,7 @@ public void addClassifications(final EntityMutationContext context, String guid,
List<AtlasVertex> entitiesToPropagateTo = null;
Map<AtlasClassification, HashSet<AtlasVertex>> addedClassifications = new HashMap<>();
List<AtlasClassification> addClassifications = new ArrayList<>(classifications.size());

entityRetriever.verifyClassificationsPropagationMode(classifications);
for (AtlasClassification c : classifications) {
AtlasClassification classification = new AtlasClassification(c);
String classificationName = classification.getTypeName();
Expand Down Expand Up @@ -3077,7 +3077,7 @@ public void addClassifications(final EntityMutationContext context, String guid,
String propagationMode;
propagationMode = entityRetriever.determinePropagationMode(classification.getRestrictPropagationThroughLineage(),classification.getRestrictPropagationThroughHierachy());
Boolean toExclude = propagationMode == CLASSIFICATION_PROPAGATION_MODE_RESTRICT_LINEAGE ? true : false;
entitiesToPropagateTo = entityRetriever.getImpactedVerticesV2(entityVertex, CLASSIFICATION_PROPAGATION_EXCLUSION_MAP.get(propagationMode),toExclude);
entitiesToPropagateTo = entityRetriever.getImpactedVerticesV2(entityVertex, CLASSIFICATION_PROPAGATION_MODE_LABELS_MAP.get(propagationMode),toExclude);
}

if (CollectionUtils.isNotEmpty(entitiesToPropagateTo)) {
Expand Down Expand Up @@ -3170,7 +3170,7 @@ public List<String> propagateClassification(String entityGuid, String classifica

String propagationMode = entityRetriever.determinePropagationMode(currentRestrictPropagationThroughLineage, currentRestrictPropagationThroughHierarchy);

List<String> edgeLabelsToCheck = CLASSIFICATION_PROPAGATION_EXCLUSION_MAP.get(propagationMode);
List<String> edgeLabelsToCheck = CLASSIFICATION_PROPAGATION_MODE_LABELS_MAP.get(propagationMode);
Boolean toExclude = propagationMode == CLASSIFICATION_PROPAGATION_MODE_RESTRICT_LINEAGE ? true:false;
List<AtlasVertex> impactedVertices = entityRetriever.getIncludedImpactedVerticesV2(entityVertex, relationshipGuid, classificationVertexId, edgeLabelsToCheck,toExclude);

Expand Down Expand Up @@ -3613,7 +3613,7 @@ public void updateClassifications(EntityMutationContext context, String guid, Li
propagationMode = CLASSIFICATION_PROPAGATION_MODE_DEFAULT;
}
Boolean toExclude = propagationMode == CLASSIFICATION_VERTEX_RESTRICT_PROPAGATE_THROUGH_LINEAGE ? true : false;
entitiesToPropagateTo = entityRetriever.getImpactedVerticesV2(entityVertex, null, classificationVertex.getIdForDisplay(), CLASSIFICATION_PROPAGATION_EXCLUSION_MAP.get(propagationMode),toExclude);
entitiesToPropagateTo = entityRetriever.getImpactedVerticesV2(entityVertex, null, classificationVertex.getIdForDisplay(), CLASSIFICATION_PROPAGATION_MODE_LABELS_MAP.get(propagationMode),toExclude);
}

if (CollectionUtils.isNotEmpty(entitiesToPropagateTo)) {
Expand Down Expand Up @@ -3906,7 +3906,7 @@ public void classificationRefreshPropagation(String classificationId) throws Atl

List<String> verticesIdsToAddClassification = new ArrayList<>();
List<String> propagatedVerticesIdWithoutEdge = entityRetriever.getImpactedVerticesIdsClassificationAttached(sourceEntityVertex , classificationId,
CLASSIFICATION_PROPAGATION_EXCLUSION_MAP.get(propagationMode),toExclude, verticesIdsToAddClassification);
CLASSIFICATION_PROPAGATION_MODE_LABELS_MAP.get(propagationMode),toExclude, verticesIdsToAddClassification);

LOG.info("To add classification with id {} to {} vertices", classificationId, verticesIdsToAddClassification.size());

Expand Down Expand Up @@ -3954,7 +3954,7 @@ private void processClassificationDeleteOnlyPropagation(AtlasVertex currentClass
LOG.info("Traversed {} vertices including edge with relationship GUID {} for classification vertex {}", propagatedVerticesIds.size(), relationshipGuid, classificationId);

List<String> propagatedVerticesIdWithoutEdge = entityRetriever.getImpactedVerticesIds(sourceEntityVertex, relationshipGuid , classificationId,
CLASSIFICATION_PROPAGATION_EXCLUSION_MAP.get(propagationMode),toExclude);
CLASSIFICATION_PROPAGATION_MODE_LABELS_MAP.get(propagationMode),toExclude);

LOG.info("Traversed {} vertices except edge with relationship GUID {} for classification vertex {}", propagatedVerticesIdWithoutEdge.size(), relationshipGuid, classificationId);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -535,7 +535,7 @@ public Map<AtlasVertex, List<AtlasVertex>> getClassificationPropagatedEntitiesMa
propagationMode = determinePropagationMode(restrictPropagationThroughLineage,restrictPropagationThroughHierarchy);
Boolean toExclude = propagationMode == CLASSIFICATION_PROPAGATION_MODE_RESTRICT_LINEAGE ? true : false;
List<AtlasVertex> entitiesPropagatingTo = getImpactedVerticesV2(sourceEntityVertex, relationshipGuidToExclude,
classificationId, CLASSIFICATION_PROPAGATION_EXCLUSION_MAP.get(propagationMode),toExclude);
classificationId, CLASSIFICATION_PROPAGATION_MODE_LABELS_MAP.get(propagationMode),toExclude);

LOG.info("Traversed {} vertices for Classification vertex id {} excluding RelationShip GUID {}", entitiesPropagatingTo.size(), classificationId, relationshipGuidToExclude);

Expand Down

0 comments on commit 209d05e

Please sign in to comment.