Skip to content

Commit

Permalink
logic to delete stakeholders before archiving domain
Browse files Browse the repository at this point in the history
  • Loading branch information
PRATHAM2002-DS committed Jul 9, 2024
1 parent 5653f9e commit c743860
Show file tree
Hide file tree
Showing 3 changed files with 52 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1834,7 +1834,7 @@ public PreProcessor getPreProcessor(String typeName) {
break;

case DATA_DOMAIN_ENTITY_TYPE:
preProcessor = new DataDomainPreProcessor(typeRegistry, entityRetriever, graph);
preProcessor = new DataDomainPreProcessor(typeRegistry, entityRetriever, graph, this);
break;

case DATA_PRODUCT_ENTITY_TYPE:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,12 @@
import org.apache.atlas.authorize.AtlasEntityAccessRequest;
import org.apache.atlas.authorize.AtlasPrivilege;
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.atlas.model.instance.AtlasEntityHeader;
import org.apache.atlas.model.instance.AtlasObjectId;
import org.apache.atlas.model.instance.AtlasRelatedObjectId;
import org.apache.atlas.model.instance.AtlasStruct;
import org.apache.atlas.model.instance.EntityMutations;
import org.apache.atlas.model.instance.*;
import org.apache.atlas.repository.graph.GraphHelper;
import org.apache.atlas.repository.graphdb.AtlasGraph;
import org.apache.atlas.repository.graphdb.AtlasVertex;
import org.apache.atlas.repository.store.graph.AtlasEntityStore;
import org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2;
import org.apache.atlas.repository.store.graph.v2.EntityGraphRetriever;
import org.apache.atlas.repository.store.graph.v2.EntityMutationContext;
import org.apache.atlas.type.AtlasTypeRegistry;
Expand All @@ -47,7 +44,7 @@
import static org.apache.atlas.repository.Constants.*;
import static org.apache.atlas.repository.graph.GraphHelper.*;
import static org.apache.atlas.repository.store.graph.v2.preprocessor.PreProcessorUtils.*;
import static org.apache.atlas.repository.store.graph.v2.preprocessor.datamesh.StakeholderTitlePreProcessor.ATTR_DOMAIN_QUALIFIED_NAMES;
import static org.apache.atlas.repository.store.graph.v2.preprocessor.datamesh.StakeholderTitlePreProcessor.*;
import static org.apache.atlas.repository.util.AtlasEntityUtils.mapOf;

public class DataDomainPreProcessor extends AbstractDomainPreProcessor {
Expand All @@ -57,13 +54,15 @@ public class DataDomainPreProcessor extends AbstractDomainPreProcessor {
private Map<String, String> updatedPolicyResources;
private EntityGraphRetriever retrieverNoRelation = null;
private Map<String, String> updatedDomainQualifiedNames;
private AtlasEntityStore entityStore;

public DataDomainPreProcessor(AtlasTypeRegistry typeRegistry, EntityGraphRetriever entityRetriever,
AtlasGraph graph) {
AtlasGraph graph, AtlasEntityStore entityStore) {
super(typeRegistry, entityRetriever, graph);
this.updatedPolicyResources = new HashMap<>();
this.retrieverNoRelation = new EntityGraphRetriever(graph, typeRegistry, true);
this.updatedDomainQualifiedNames = new HashMap<>();
this.entityStore = entityStore;
}

@Override
Expand Down Expand Up @@ -400,30 +399,42 @@ private void validateStakeholderRelationship(AtlasEntity entity) throws AtlasBas
}
}

public boolean verifyStakeholderTitleExists(String domainQualifiedName) throws AtlasBaseException {

List<Map<String, Object>> mustClauseList = new ArrayList();
mustClauseList.add(mapOf("term", mapOf("__typeName.keyword", STAKEHOLDER_TITLE_ENTITY_TYPE)));
public List<AtlasEntityHeader> getStakeholderTitle(String domainQualifiedName) throws AtlasBaseException {
List<Map<String, Object>> mustClauseList = new ArrayList<>();
mustClauseList.add(mapOf("term", mapOf("__state", "ACTIVE")));
mustClauseList.add(mapOf("term", mapOf(ATTR_DOMAIN_QUALIFIED_NAMES, domainQualifiedName)));
mustClauseList.add(mapOf("term", mapOf("__typeName.keyword", STAKEHOLDER_TITLE_ENTITY_TYPE)));

List<String> termsList = Arrays.asList(
SUPER_WILDCARD,
STAR,
domainQualifiedName
);

Map<String, Object> bool = mapOf("must", mustClauseList);
Map<String, Object> dsl = mapOf("query", mapOf("bool", bool));
Map<String, Object> termsMap = mapOf(ATTR_DOMAIN_QUALIFIED_NAMES, termsList);
Map<String, Object> termsFilter = mapOf("terms", termsMap);
Map<String, Object> filterBool = mapOf("filter", termsFilter);
Map<String, Object> nestedBool = mapOf("bool", filterBool);

List<AtlasEntityHeader> assets = indexSearchPaginated(dsl, null, super.discovery);
mustClauseList.add(mapOf("bool", nestedBool));

if (CollectionUtils.isNotEmpty(assets)) {
return true;
}
Map<String, Object> topBool = mapOf("must", mustClauseList);
Map<String, Object> topFilter = mapOf("bool", topBool);
Map<String, Object> query = mapOf("filter", topFilter);
Map<String, Object> dsl = mapOf("query", mapOf("bool", query));

List<AtlasEntityHeader> assets = indexSearchPaginated(dsl, null, super.discovery);

return false;
return assets;
}


@Override
public void processDelete(AtlasVertex vertex) throws AtlasBaseException {
AtlasPerfMetrics.MetricRecorder metricRecorder = RequestContext.get().startMetricRecord("processProductDelete");
AtlasPerfMetrics.MetricRecorder metricRecorder = RequestContext.get().startMetricRecord("processDomainDelete");

try{
List<String> stakeHolderGuids = new ArrayList<>();

// active childrens exists?
Iterator<AtlasVertex> childrens = getActiveChildrenVertices(vertex,
DOMAIN_PARENT_EDGE_LABEL, DATA_PRODUCT_EDGE_LABEL);
Expand All @@ -433,13 +444,27 @@ public void processDelete(AtlasVertex vertex) throws AtlasBaseException {

// active stakeholder exists?
childrens = getActiveChildrenVertices(vertex, STAKEHOLDER_EDGE_LABEL);
if (childrens.hasNext()){
throw new AtlasBaseException(AtlasErrorCode.OPERATION_NOT_SUPPORTED, "Domain cannot be archived because some stakeholders are active in this domain");
while (childrens.hasNext()){
AtlasVertex child = childrens.next();
AtlasObjectId childId = entityRetriever.toAtlasObjectId(child);
stakeHolderGuids.add(childId.getGuid());
}

if (CollectionUtils.isNotEmpty(stakeHolderGuids)) {
entityStore.deleteByIds(stakeHolderGuids);
}

// active stakeholder titles exists?
if(verifyStakeholderTitleExists(vertex.getProperty(QUALIFIED_NAME, String.class))){
throw new AtlasBaseException(AtlasErrorCode.OPERATION_NOT_SUPPORTED, "Domain cannot be archived because some stakeholdersTitles are active in this domain");
List<AtlasEntityHeader> stakeholderTitles = getStakeholderTitle(vertex.getProperty(QUALIFIED_NAME, String.class));
if (CollectionUtils.isNotEmpty(stakeholderTitles)) {
for (AtlasEntityHeader stakeholderTitle : stakeholderTitles) {
AtlasVertex stakeholderTitleVertex = entityRetriever.getEntityVertex(stakeholderTitle.getGuid());
AtlasGraphUtilsV2.removeItemFromListPropertyValue(stakeholderTitleVertex, ATTR_DOMAIN_QUALIFIED_NAMES, vertex.getProperty(QUALIFIED_NAME, String.class));
List<String> domainQualifiedNames = stakeholderTitleVertex.getMultiValuedProperty(ATTR_DOMAIN_QUALIFIED_NAMES, String.class);
if (CollectionUtils.isEmpty(domainQualifiedNames)) {
entityStore.deleteById(stakeholderTitle.getGuid());
}
}
}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ public class StakeholderTitlePreProcessor implements PreProcessor {


public static final String STAR = "*/super";
public static final String SUPER_WILDCARD = "default/domain/*/super";
public static final String ATTR_DOMAIN_QUALIFIED_NAMES = "stakeholderTitleDomainQualifiedNames";

public static final String REL_ATTR_STAKEHOLDERS = "stakeholders";
Expand Down

0 comments on commit c743860

Please sign in to comment.