Skip to content

Commit

Permalink
Merge pull request #3324 from atlanhq/dg-1476
Browse files Browse the repository at this point in the history
DG-1476 Handle QN check and Archive Stakeholders in domain delete flow
  • Loading branch information
PRATHAM2002-DS authored Jul 10, 2024
2 parents 2b798d5 + 835ecf0 commit dd50da2
Show file tree
Hide file tree
Showing 4 changed files with 81 additions and 43 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1586,21 +1586,23 @@ private EntityMutationResponse createOrUpdate(EntityStream entityStream, boolean
// Check if authorized to update entities
if (!reqContext.isImportInProgress()) {
for (AtlasEntity entity : context.getUpdatedEntities()) {
AtlasEntityHeader entityHeaderWithClassifications = entityRetriever.toAtlasEntityHeaderWithClassifications(entity.getGuid());
AtlasEntityHeader entityHeader = new AtlasEntityHeader(entity);
if(!PreProcessor.skipUpdateAuthCheckTypes.contains(entity.getTypeName())){
AtlasEntityHeader entityHeaderWithClassifications = entityRetriever.toAtlasEntityHeaderWithClassifications(entity.getGuid());
AtlasEntityHeader entityHeader = new AtlasEntityHeader(entity);

if(CollectionUtils.isNotEmpty(entityHeaderWithClassifications.getClassifications())) {
entityHeader.setClassifications(entityHeaderWithClassifications.getClassifications());
}
if(CollectionUtils.isNotEmpty(entityHeaderWithClassifications.getClassifications())) {
entityHeader.setClassifications(entityHeaderWithClassifications.getClassifications());
}

AtlasEntity diffEntity = reqContext.getDifferentialEntity(entity.getGuid());
boolean skipAuthBaseConditions = diffEntity != null && MapUtils.isEmpty(diffEntity.getCustomAttributes()) && MapUtils.isEmpty(diffEntity.getBusinessAttributes()) && CollectionUtils.isEmpty(diffEntity.getClassifications()) && CollectionUtils.isEmpty(diffEntity.getLabels());
boolean skipAuthMeaningsUpdate = diffEntity != null && MapUtils.isNotEmpty(diffEntity.getRelationshipAttributes()) && diffEntity.getRelationshipAttributes().containsKey("meanings") && diffEntity.getRelationshipAttributes().size() == 1 && MapUtils.isEmpty(diffEntity.getAttributes());
boolean skipAuthStarredDetailsUpdate = diffEntity != null && MapUtils.isEmpty(diffEntity.getRelationshipAttributes()) && MapUtils.isNotEmpty(diffEntity.getAttributes()) && diffEntity.getAttributes().size() == 3 && diffEntity.getAttributes().containsKey(ATTR_STARRED_BY) && diffEntity.getAttributes().containsKey(ATTR_STARRED_COUNT) && diffEntity.getAttributes().containsKey(ATTR_STARRED_DETAILS_LIST);
if (skipAuthBaseConditions && (skipAuthMeaningsUpdate || skipAuthStarredDetailsUpdate)) {
//do nothing, only diff is relationshipAttributes.meanings or starred, allow update
} else {
AtlasAuthorizationUtils.verifyUpdateEntityAccess(typeRegistry, entityHeader,"update entity: type=" + entity.getTypeName());
AtlasEntity diffEntity = reqContext.getDifferentialEntity(entity.getGuid());
boolean skipAuthBaseConditions = diffEntity != null && MapUtils.isEmpty(diffEntity.getCustomAttributes()) && MapUtils.isEmpty(diffEntity.getBusinessAttributes()) && CollectionUtils.isEmpty(diffEntity.getClassifications()) && CollectionUtils.isEmpty(diffEntity.getLabels());
boolean skipAuthMeaningsUpdate = diffEntity != null && MapUtils.isNotEmpty(diffEntity.getRelationshipAttributes()) && diffEntity.getRelationshipAttributes().containsKey("meanings") && diffEntity.getRelationshipAttributes().size() == 1 && MapUtils.isEmpty(diffEntity.getAttributes());
boolean skipAuthStarredDetailsUpdate = diffEntity != null && MapUtils.isEmpty(diffEntity.getRelationshipAttributes()) && MapUtils.isNotEmpty(diffEntity.getAttributes()) && diffEntity.getAttributes().size() == 3 && diffEntity.getAttributes().containsKey(ATTR_STARRED_BY) && diffEntity.getAttributes().containsKey(ATTR_STARRED_COUNT) && diffEntity.getAttributes().containsKey(ATTR_STARRED_DETAILS_LIST);
if (skipAuthBaseConditions && (skipAuthMeaningsUpdate || skipAuthStarredDetailsUpdate)) {
//do nothing, only diff is relationshipAttributes.meanings or starred, allow update
} else {
AtlasAuthorizationUtils.verifyUpdateEntityAccess(typeRegistry, entityHeader,"update entity: type=" + entity.getTypeName());
}
}
}
}
Expand Down Expand Up @@ -1900,7 +1902,7 @@ public PreProcessor getPreProcessor(String typeName) {
break;

case DATA_DOMAIN_ENTITY_TYPE:
preProcessor = new DataDomainPreProcessor(typeRegistry, entityRetriever, graph);
preProcessor = new DataDomainPreProcessor(typeRegistry, entityRetriever, graph, this);
break;

case DATA_PRODUCT_ENTITY_TYPE:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,11 @@ public interface PreProcessor {
add(DATA_PRODUCT_ENTITY_TYPE);
}};

Set<String> skipUpdateAuthCheckTypes = new HashSet<String>() {{
add(DATA_DOMAIN_ENTITY_TYPE);
add(DATA_PRODUCT_ENTITY_TYPE);
}};

void processAttributes(AtlasStruct entity, EntityMutationContext context, EntityMutations.EntityOperation operation) throws AtlasBaseException;

default void processDelete(AtlasVertex vertex) throws AtlasBaseException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,23 +18,21 @@
package org.apache.atlas.repository.store.graph.v2.preprocessor.datamesh;


import com.sun.org.apache.bcel.internal.generic.NEW;
import org.apache.atlas.AtlasErrorCode;
import org.apache.atlas.RequestContext;
import org.apache.atlas.authorize.AtlasAuthorizationUtils;
import org.apache.atlas.authorize.AtlasEntityAccessRequest;
import org.apache.atlas.authorize.AtlasPrivilege;
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.atlas.model.instance.AtlasEntityHeader;
import org.apache.atlas.model.instance.AtlasObjectId;
import org.apache.atlas.model.instance.AtlasRelatedObjectId;
import org.apache.atlas.model.instance.AtlasStruct;
import org.apache.atlas.model.instance.EntityMutations;
import org.apache.atlas.model.instance.*;
import org.apache.atlas.repository.graph.GraphHelper;
import org.apache.atlas.repository.graphdb.AtlasEdge;
import org.apache.atlas.repository.graphdb.AtlasEdgeDirection;
import org.apache.atlas.repository.graphdb.AtlasGraph;
import org.apache.atlas.repository.graphdb.AtlasVertex;
import org.apache.atlas.repository.store.graph.AtlasEntityStore;
import org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2;
import org.apache.atlas.repository.store.graph.v2.EntityGraphRetriever;
import org.apache.atlas.repository.store.graph.v2.EntityMutationContext;
import org.apache.atlas.type.AtlasTypeRegistry;
Expand All @@ -49,7 +47,7 @@
import static org.apache.atlas.repository.Constants.*;
import static org.apache.atlas.repository.graph.GraphHelper.*;
import static org.apache.atlas.repository.store.graph.v2.preprocessor.PreProcessorUtils.*;
import static org.apache.atlas.repository.store.graph.v2.preprocessor.datamesh.StakeholderTitlePreProcessor.ATTR_DOMAIN_QUALIFIED_NAMES;
import static org.apache.atlas.repository.store.graph.v2.preprocessor.datamesh.StakeholderTitlePreProcessor.*;
import static org.apache.atlas.repository.util.AtlasEntityUtils.mapOf;

public class DataDomainPreProcessor extends AbstractDomainPreProcessor {
Expand All @@ -59,13 +57,15 @@ public class DataDomainPreProcessor extends AbstractDomainPreProcessor {
private Map<String, String> updatedPolicyResources;
private EntityGraphRetriever retrieverNoRelation = null;
private Map<String, String> updatedDomainQualifiedNames;
private AtlasEntityStore entityStore;

public DataDomainPreProcessor(AtlasTypeRegistry typeRegistry, EntityGraphRetriever entityRetriever,
AtlasGraph graph) {
AtlasGraph graph, AtlasEntityStore entityStore) {
super(typeRegistry, entityRetriever, graph);
this.updatedPolicyResources = new HashMap<>();
this.retrieverNoRelation = new EntityGraphRetriever(graph, typeRegistry, true);
this.updatedDomainQualifiedNames = new HashMap<>();
this.entityStore = entityStore;
}

@Override
Expand Down Expand Up @@ -142,6 +142,10 @@ private void processUpdateDomain(AtlasEntity entity, AtlasVertex vertex) throws
validateStakeholderRelationship(entity);

String vertexQnName = vertex.getProperty(QUALIFIED_NAME, String.class);
entity.setAttribute(QUALIFIED_NAME, vertexQnName);
// Check if authorized to update entities
AtlasAuthorizationUtils.verifyUpdateEntityAccess(typeRegistry, new AtlasEntityHeader(entity),"update entity: type=" + entity.getTypeName());


AtlasEntity storedDomain = entityRetriever.toAtlasEntity(vertex);
AtlasRelatedObjectId currentParentDomainObjectId = (AtlasRelatedObjectId) storedDomain.getRelationshipAttribute(PARENT_DOMAIN_REL_TYPE);
Expand Down Expand Up @@ -188,9 +192,8 @@ private void processUpdateDomain(AtlasEntity entity, AtlasVertex vertex) throws
if (!domainCurrentName.equals(domainNewName)) {
domainExists(domainNewName, currentParentDomainQualifiedName, storedDomain.getGuid());
}
entity.setAttribute(QUALIFIED_NAME, vertexQnName);
}

}
RequestContext.get().endMetricRecord(metricRecorder);
}

Expand Down Expand Up @@ -404,46 +407,72 @@ private void validateStakeholderRelationship(AtlasEntity entity) throws AtlasBas
}
}

public boolean verifyStakeholderTitleExists(String domainQualifiedName) throws AtlasBaseException {

List<Map<String, Object>> mustClauseList = new ArrayList();
mustClauseList.add(mapOf("term", mapOf("__typeName.keyword", STAKEHOLDER_TITLE_ENTITY_TYPE)));
public List<AtlasEntityHeader> getStakeholderTitle(String domainQualifiedName) throws AtlasBaseException {
List<Map<String, Object>> mustClauseList = new ArrayList<>();
mustClauseList.add(mapOf("term", mapOf("__state", "ACTIVE")));
mustClauseList.add(mapOf("term", mapOf(ATTR_DOMAIN_QUALIFIED_NAMES, domainQualifiedName)));
mustClauseList.add(mapOf("term", mapOf("__typeName.keyword", STAKEHOLDER_TITLE_ENTITY_TYPE)));

List<String> termsList = Arrays.asList(
NEW_STAR,
STAR,
domainQualifiedName
);

Map<String, Object> bool = mapOf("must", mustClauseList);
Map<String, Object> dsl = mapOf("query", mapOf("bool", bool));
Map<String, Object> termsMap = mapOf(ATTR_DOMAIN_QUALIFIED_NAMES, termsList);
Map<String, Object> termsFilter = mapOf("terms", termsMap);
Map<String, Object> filterBool = mapOf("filter", termsFilter);
Map<String, Object> nestedBool = mapOf("bool", filterBool);

List<AtlasEntityHeader> assets = indexSearchPaginated(dsl, null, super.discovery);
mustClauseList.add(mapOf("bool", nestedBool));

if (CollectionUtils.isNotEmpty(assets)) {
return true;
}
Map<String, Object> topBool = mapOf("must", mustClauseList);
Map<String, Object> topFilter = mapOf("bool", topBool);
Map<String, Object> query = mapOf("filter", topFilter);
Map<String, Object> dsl = mapOf("query", mapOf("bool", query));

return false;
List<AtlasEntityHeader> assets = indexSearchPaginated(dsl, null, super.discovery);

return assets;
}


@Override
public void processDelete(AtlasVertex vertex) throws AtlasBaseException {
AtlasPerfMetrics.MetricRecorder metricRecorder = RequestContext.get().startMetricRecord("processProductDelete");
AtlasPerfMetrics.MetricRecorder metricRecorder = RequestContext.get().startMetricRecord("processDomainDelete");

try{
List<String> stakeHolderGuids = new ArrayList<>();

// active childrens exists?
Iterator<AtlasVertex> childrens = getActiveChildrenVertices(vertex,
DOMAIN_PARENT_EDGE_LABEL, DATA_PRODUCT_EDGE_LABEL);
if (childrens.hasNext()){
throw new AtlasBaseException("Domain cannot be archived because some subdomains or products are active in this domain");
throw new AtlasBaseException(AtlasErrorCode.OPERATION_NOT_SUPPORTED, "Domain cannot be archived because some subdomains or products are active in this domain");
}

// active stakeholder exists?
childrens = getActiveChildrenVertices(vertex, STAKEHOLDER_EDGE_LABEL);
if (childrens.hasNext()){
throw new AtlasBaseException("Domain cannot be archived because some stakeholders are active in this domain");
while (childrens.hasNext()){
AtlasVertex child = childrens.next();
AtlasObjectId childId = entityRetriever.toAtlasObjectId(child);
stakeHolderGuids.add(childId.getGuid());
}

if (CollectionUtils.isNotEmpty(stakeHolderGuids)) {
entityStore.deleteByIds(stakeHolderGuids);
}

// active stakeholder titles exists?
if(verifyStakeholderTitleExists(vertex.getProperty(QUALIFIED_NAME, String.class))){
throw new AtlasBaseException("Domain cannot be archived because some stakeholdersTitles are active in this domain");
List<AtlasEntityHeader> stakeholderTitles = getStakeholderTitle(vertex.getProperty(QUALIFIED_NAME, String.class));
if (CollectionUtils.isNotEmpty(stakeholderTitles)) {
for (AtlasEntityHeader stakeholderTitle : stakeholderTitles) {
AtlasVertex stakeholderTitleVertex = entityRetriever.getEntityVertex(stakeholderTitle.getGuid());
AtlasGraphUtilsV2.removeItemFromListPropertyValue(stakeholderTitleVertex, ATTR_DOMAIN_QUALIFIED_NAMES, vertex.getProperty(QUALIFIED_NAME, String.class));
List<String> domainQualifiedNames = stakeholderTitleVertex.getMultiValuedProperty(ATTR_DOMAIN_QUALIFIED_NAMES, String.class);
if (CollectionUtils.isEmpty(domainQualifiedNames)) {
entityStore.deleteById(stakeholderTitle.getGuid());
}
}
}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,9 @@ private void processUpdateProduct(AtlasEntity entity, AtlasVertex vertex) throws
}

String vertexQnName = vertex.getProperty(QUALIFIED_NAME, String.class);
entity.setAttribute(QUALIFIED_NAME, vertexQnName);
// Check if authorized to update entities
AtlasAuthorizationUtils.verifyUpdateEntityAccess(typeRegistry, new AtlasEntityHeader(entity),"update entity: type=" + entity.getTypeName());

AtlasEntity storedProduct = entityRetriever.toAtlasEntity(vertex);
AtlasRelatedObjectId currentParentDomainObjectId = (AtlasRelatedObjectId) storedProduct.getRelationshipAttribute(DATA_DOMAIN_REL_TYPE);
Expand Down Expand Up @@ -175,7 +178,6 @@ private void processUpdateProduct(AtlasEntity entity, AtlasVertex vertex) throws
if (!productCurrentName.equals(productNewName)) {
productExists(productNewName, currentParentDomainQualifiedName, storedProduct.getGuid());
}
entity.setAttribute(QUALIFIED_NAME, vertexQnName);
}

if (isDaapVisibilityChanged) {
Expand Down

0 comments on commit dd50da2

Please sign in to comment.