From a1e11fe7b87e2de9b2ba0d4bb573541e847f99c1 Mon Sep 17 00:00:00 2001 From: PRATHAM2002-DS Date: Wed, 17 Apr 2024 21:16:13 +0530 Subject: [PATCH] Create Preprocessor for Updating Product/Domain QN --- .../apache/atlas/repository/Constants.java | 10 + .../store/graph/v2/AtlasEntityStoreV2.java | 10 + .../v2/preprocessor/PreProcessorUtils.java | 7 + .../datamesh/AbstractDomainPreProcessor.java | 131 +++++++++++++ .../datamesh/DataProductPreProcessor.java | 178 ++++++++++++++++++ .../datamesh/DomainPreProcessor.java | 174 +++++++++++++++++ 6 files changed, 510 insertions(+) create mode 100644 repository/src/main/java/org/apache/atlas/repository/store/graph/v2/preprocessor/datamesh/AbstractDomainPreProcessor.java create mode 100644 repository/src/main/java/org/apache/atlas/repository/store/graph/v2/preprocessor/datamesh/DataProductPreProcessor.java create mode 100644 repository/src/main/java/org/apache/atlas/repository/store/graph/v2/preprocessor/datamesh/DomainPreProcessor.java diff --git a/common/src/main/java/org/apache/atlas/repository/Constants.java b/common/src/main/java/org/apache/atlas/repository/Constants.java index 8d4d47ea57..22bcba57ba 100644 --- a/common/src/main/java/org/apache/atlas/repository/Constants.java +++ b/common/src/main/java/org/apache/atlas/repository/Constants.java @@ -134,6 +134,16 @@ public final class Constants { public static final String GLOSSARY_CATEGORY_EDGE_LABEL = "r:AtlasGlossaryCategoryAnchor"; + /** + * MESH property keys. + */ + public static final String DATA_DOMAIN_ENTITY_TYPE = "DataDomain"; + public static final String DATA_PRODUCT_ENTITY_TYPE = "DataProduct"; + public static final String DATA_PRODUCT_EDGE_LABEL = "__DataDomain.dataProducts"; + public static final String DOMAIN_PARENT_EDGE_LABEL = "__DataDomain.subDomains"; + + + /** * SQL property keys. */ diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityStoreV2.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityStoreV2.java index 6e3487f8d3..472c29cc46 100644 --- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityStoreV2.java +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityStoreV2.java @@ -62,6 +62,8 @@ import org.apache.atlas.repository.store.graph.v2.preprocessor.PreProcessor; import org.apache.atlas.repository.store.graph.v2.preprocessor.accesscontrol.PersonaPreProcessor; import org.apache.atlas.repository.store.graph.v2.preprocessor.accesscontrol.PurposePreProcessor; +import org.apache.atlas.repository.store.graph.v2.preprocessor.datamesh.DataProductPreProcessor; +import org.apache.atlas.repository.store.graph.v2.preprocessor.datamesh.DomainPreProcessor; import org.apache.atlas.repository.store.graph.v2.preprocessor.glossary.CategoryPreProcessor; import org.apache.atlas.repository.store.graph.v2.preprocessor.glossary.GlossaryPreProcessor; import org.apache.atlas.repository.store.graph.v2.preprocessor.glossary.TermPreProcessor; @@ -1800,6 +1802,14 @@ public PreProcessor getPreProcessor(String typeName) { preProcessor = new CategoryPreProcessor(typeRegistry, entityRetriever, graph, taskManagement, entityGraphMapper); break; + case DATA_DOMAIN_ENTITY_TYPE: + preProcessor = new DomainPreProcessor(typeRegistry, entityRetriever, graph, entityGraphMapper); + break; + + case DATA_PRODUCT_ENTITY_TYPE: + preProcessor = new DataProductPreProcessor(typeRegistry, entityRetriever, graph, entityGraphMapper); + break; + case QUERY_ENTITY_TYPE: preProcessor = new QueryPreProcessor(typeRegistry, entityRetriever); break; diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/preprocessor/PreProcessorUtils.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/preprocessor/PreProcessorUtils.java index 6c84900460..57fd775d4a 100644 --- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/preprocessor/PreProcessorUtils.java +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/preprocessor/PreProcessorUtils.java @@ -32,6 +32,13 @@ public class PreProcessorUtils { public static final String GLOSSARY_TERM_REL_TYPE = "AtlasGlossaryTermAnchor"; public static final String GLOSSARY_CATEGORY_REL_TYPE = "AtlasGlossaryCategoryAnchor"; + //DataMesh models constants + public static final String DATA_PRODUCT_TYPE = "DataProduct"; + public static final String PARENT_DOMAIN = "parentDomain"; + public static final String PARENT_DOMAIN_QN = "parentDomainQualifiedName"; + public static final String SUPER_DOMAIN_QN = "superDomainQualifiedName"; + public static final String DATA_DOMAIN = "dataDomain"; + //Query models constants public static final String PREFIX_QUERY_QN = "default/collection/"; public static final String COLLECTION_QUALIFIED_NAME = "collectionQualifiedName"; diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/preprocessor/datamesh/AbstractDomainPreProcessor.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/preprocessor/datamesh/AbstractDomainPreProcessor.java new file mode 100644 index 0000000000..e93f10aec5 --- /dev/null +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/preprocessor/datamesh/AbstractDomainPreProcessor.java @@ -0,0 +1,131 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.atlas.repository.store.graph.v2.preprocessor.datamesh; + +import org.apache.atlas.AtlasException; +import org.apache.atlas.RequestContext; +import org.apache.atlas.authorize.AtlasAuthorizationUtils; +import org.apache.atlas.authorize.AtlasEntityAccessRequest; +import org.apache.atlas.authorize.AtlasPrivilege; +import org.apache.atlas.discovery.EntityDiscoveryService; +import org.apache.atlas.exception.AtlasBaseException; +import org.apache.atlas.model.discovery.IndexSearchParams; +import org.apache.atlas.model.instance.AtlasEntity; +import org.apache.atlas.model.instance.AtlasEntityHeader; +import org.apache.atlas.repository.graphdb.AtlasGraph; +import org.apache.atlas.repository.graphdb.AtlasVertex; +import org.apache.atlas.repository.store.graph.v2.EntityGraphRetriever; +import org.apache.atlas.repository.store.graph.v2.preprocessor.PreProcessor; +import org.apache.atlas.repository.store.graph.v2.preprocessor.glossary.AbstractGlossaryPreProcessor; +import org.apache.atlas.type.AtlasEntityType; +import org.apache.atlas.type.AtlasTypeRegistry; +import org.apache.atlas.utils.AtlasPerfMetrics; +import org.apache.commons.collections.CollectionUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.*; + +import static org.apache.atlas.repository.Constants.*; +import static org.apache.atlas.repository.store.graph.v2.preprocessor.PreProcessorUtils.PARENT_DOMAIN_QN; +import static org.apache.atlas.repository.store.graph.v2.preprocessor.PreProcessorUtils.SUPER_DOMAIN_QN; +import static org.apache.atlas.repository.util.AccessControlUtils.ATTR_POLICY_CATEGORY; +import static org.apache.atlas.repository.util.AccessControlUtils.ATTR_POLICY_RESOURCES; +import static org.apache.atlas.repository.util.AtlasEntityUtils.mapOf; + +public abstract class AbstractDomainPreProcessor implements PreProcessor { + private static final Logger LOG = LoggerFactory.getLogger(AbstractGlossaryPreProcessor.class); + + + protected final AtlasTypeRegistry typeRegistry; + protected final EntityGraphRetriever entityRetriever; + + protected EntityDiscoveryService discovery; + + AbstractDomainPreProcessor(AtlasTypeRegistry typeRegistry, EntityGraphRetriever entityRetriever, AtlasGraph graph) { + this.entityRetriever = entityRetriever; + this.typeRegistry = typeRegistry; + + try { + this.discovery = new EntityDiscoveryService(typeRegistry, graph, null, null, null, null); + } catch (AtlasException e) { + e.printStackTrace(); + } + } + + public List indexSearchPaginated(Map dsl, String entityType) throws AtlasBaseException { + IndexSearchParams searchParams = new IndexSearchParams(); + List ret = new ArrayList<>(); + + List sortList = new ArrayList<>(0); + sortList.add(mapOf("__timestamp", mapOf("order", "asc"))); + sortList.add(mapOf("__guid", mapOf("order", "asc"))); + dsl.put("sort", sortList); + + int from = 0; + int size = 100; + boolean hasMore = true; + do { + dsl.put("from", from); + dsl.put("size", size); + searchParams.setDsl(dsl); + + if (entityType.equals(POLICY_ENTITY_TYPE)) { + Set attributes = new HashSet<>(Arrays.asList(ATTR_POLICY_RESOURCES, ATTR_POLICY_CATEGORY)); + searchParams.setAttributes(attributes); + } + + List headers = discovery.directIndexSearch(searchParams).getEntities(); + + if (CollectionUtils.isNotEmpty(headers)) { + ret.addAll(headers); + } else { + hasMore = false; + } + + from += size; + + } while (hasMore); + + return ret; + } + + protected void isAuthorized(AtlasEntityHeader sourceDomain, AtlasEntityHeader targetDomain) throws AtlasBaseException { + + // source -> CREATE + UPDATE + DELETE + AtlasAuthorizationUtils.verifyAccess(new AtlasEntityAccessRequest(typeRegistry, AtlasPrivilege.ENTITY_CREATE, sourceDomain), + "create on source Domain: ", sourceDomain.getAttribute(NAME)); + + AtlasAuthorizationUtils.verifyAccess(new AtlasEntityAccessRequest(typeRegistry, AtlasPrivilege.ENTITY_UPDATE, sourceDomain), + "update on source Domain: ", sourceDomain.getAttribute(NAME)); + + AtlasAuthorizationUtils.verifyAccess(new AtlasEntityAccessRequest(typeRegistry, AtlasPrivilege.ENTITY_DELETE, sourceDomain), + "delete on source Domain: ", sourceDomain.getAttribute(NAME)); + + + // target -> CREATE + UPDATE + DELETE + AtlasAuthorizationUtils.verifyAccess(new AtlasEntityAccessRequest(typeRegistry, AtlasPrivilege.ENTITY_CREATE, targetDomain), + "create on target Domain: ", targetDomain.getAttribute(NAME)); + + AtlasAuthorizationUtils.verifyAccess(new AtlasEntityAccessRequest(typeRegistry, AtlasPrivilege.ENTITY_UPDATE, targetDomain), + "update on target Domain: ", targetDomain.getAttribute(NAME)); + + AtlasAuthorizationUtils.verifyAccess(new AtlasEntityAccessRequest(typeRegistry, AtlasPrivilege.ENTITY_DELETE, targetDomain), + "delete on target Domain: ", targetDomain.getAttribute(NAME)); + } +} \ No newline at end of file diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/preprocessor/datamesh/DataProductPreProcessor.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/preprocessor/datamesh/DataProductPreProcessor.java new file mode 100644 index 0000000000..9d8ab16ddd --- /dev/null +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/preprocessor/datamesh/DataProductPreProcessor.java @@ -0,0 +1,178 @@ +package org.apache.atlas.repository.store.graph.v2.preprocessor.datamesh; + +import org.apache.atlas.AtlasErrorCode; +import org.apache.atlas.RequestContext; +import org.apache.atlas.exception.AtlasBaseException; +import org.apache.atlas.model.instance.*; +import org.apache.atlas.repository.graphdb.AtlasGraph; +import org.apache.atlas.repository.graphdb.AtlasVertex; +import org.apache.atlas.repository.store.graph.v2.EntityGraphMapper; +import org.apache.atlas.repository.store.graph.v2.EntityGraphRetriever; +import org.apache.atlas.repository.store.graph.v2.EntityMutationContext; +import org.apache.atlas.type.AtlasEntityType; +import org.apache.atlas.type.AtlasTypeRegistry; +import org.apache.atlas.utils.AtlasPerfMetrics; +import org.apache.commons.collections.CollectionUtils; +import org.apache.commons.collections.MapUtils; +import org.apache.commons.lang.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.*; + +import static org.apache.atlas.repository.Constants.*; +import static org.apache.atlas.repository.store.graph.v2.preprocessor.PreProcessorUtils.*; +import static org.apache.atlas.repository.util.AccessControlUtils.ATTR_POLICY_RESOURCES; +import static org.apache.atlas.repository.util.AtlasEntityUtils.mapOf; + +public class DataProductPreProcessor extends AbstractDomainPreProcessor { + private static final Logger LOG = LoggerFactory.getLogger(DomainPreProcessor.class); + private AtlasEntityHeader parentDomain; + private EntityMutationContext context; + public DataProductPreProcessor(AtlasTypeRegistry typeRegistry, EntityGraphRetriever entityRetriever, + AtlasGraph graph, EntityGraphMapper entityGraphMapper) { + super(typeRegistry, entityRetriever, graph); + } + + @Override + public void processAttributes(AtlasStruct entityStruct, EntityMutationContext context, + EntityMutations.EntityOperation operation) throws AtlasBaseException { + //Handle name & qualifiedName + if (operation == EntityMutations.EntityOperation.CREATE && LOG.isDebugEnabled()) { + LOG.debug("DataProductPreProcessor.processAttributes: pre processing {}, {}", + entityStruct.getAttribute(QUALIFIED_NAME), operation); + } + this.context = context; + + AtlasEntity entity = (AtlasEntity) entityStruct; + AtlasVertex vertex = context.getVertex(entity.getGuid()); + + setParent(entity, context); + + if (operation == EntityMutations.EntityOperation.CREATE) { + processCreateProduct(entity, vertex); + } + } + + private void processCreateProduct(AtlasEntity entity, AtlasVertex vertex) throws AtlasBaseException { + AtlasPerfMetrics.MetricRecorder metricRecorder = RequestContext.get().startMetricRecord("processCreateDomain"); + String domainName = (String) entity.getAttribute(NAME); + String parentDomainQualifiedName = (String) entity.getAttribute(PARENT_DOMAIN_QN); + + productExists(domainName, parentDomainQualifiedName); + String newQualifiedName = createQualifiedName(parentDomainQualifiedName); + if(!newQualifiedName.isEmpty()){ + entity.setAttribute(QUALIFIED_NAME, newQualifiedName); + } + else{ + throw new AtlasBaseException(AtlasErrorCode.BAD_REQUEST, "Parent Domain Qualified Name is empty"); + } + + RequestContext.get().endMetricRecord(metricRecorder); + } + + public static String createQualifiedName(String parentDomainQualifiedName) { + if (StringUtils.isNotEmpty(parentDomainQualifiedName) && parentDomainQualifiedName !=null) { + return parentDomainQualifiedName + "/product/" + getUUID(); + } + else{ + return ""; + } + } + + private void setParent(AtlasEntity entity, EntityMutationContext context) throws AtlasBaseException { + AtlasPerfMetrics.MetricRecorder metricRecorder = RequestContext.get().startMetricRecord("DataProductPreProcessor.setParent"); + if (parentDomain == null) { + Object relationshipAttribute = entity.getRelationshipAttribute(DATA_DOMAIN); + Set attributes = new HashSet<>(Arrays.asList(QUALIFIED_NAME, SUPER_DOMAIN_QN, PARENT_DOMAIN_QN, "__typeName")); + + if(relationshipAttribute instanceof AtlasObjectId){ + AtlasObjectId objectId = (AtlasObjectId) relationshipAttribute; + if (objectId != null) { + if (StringUtils.isNotEmpty(objectId.getGuid())) { + AtlasVertex vertex = entityRetriever.getEntityVertex(objectId.getGuid()); + + if (vertex == null) { + parentDomain = entityRetriever.toAtlasEntityHeader(objectId.getGuid(), attributes); + } else { + parentDomain = entityRetriever.toAtlasEntityHeader(vertex, attributes); + } + + } else if (MapUtils.isNotEmpty(objectId.getUniqueAttributes()) && + StringUtils.isNotEmpty((String) objectId.getUniqueAttributes().get(QUALIFIED_NAME))) { + AtlasVertex parentDomainVertex = entityRetriever.getEntityVertex(objectId); + parentDomain = entityRetriever.toAtlasEntityHeader(parentDomainVertex, attributes); + + } + } + } + else if(relationshipAttribute instanceof Map){ + Map relationshipMap = (Map) relationshipAttribute; + if (StringUtils.isNotEmpty((String) relationshipMap.get("guid"))) { + AtlasVertex vertex = entityRetriever.getEntityVertex((String) relationshipMap.get("guid")); + + if (vertex == null) { + parentDomain = entityRetriever.toAtlasEntityHeader((String) relationshipMap.get("guid"), attributes); + } else { + parentDomain = entityRetriever.toAtlasEntityHeader(vertex, attributes); + } + + } + else { + parentDomain = new AtlasEntityHeader((String) relationshipMap.get("typeName"), relationshipMap); + + } + } + else{ + LOG.warn("DataProductPreProcessor.setParent: Invalid relationshipAttribute {}", relationshipAttribute); + } + + } + RequestContext.get().endMetricRecord(metricRecorder); + } + + private void productExists(String productName, String parentDomainQualifiedName) throws AtlasBaseException { + AtlasPerfMetrics.MetricRecorder metricRecorder = RequestContext.get().startMetricRecord("domainExists"); + + boolean exists = false; + try { + List mustClauseList = new ArrayList(); + mustClauseList.add(mapOf("term", mapOf("__typeName.keyword", DATA_PRODUCT_ENTITY_TYPE))); + mustClauseList.add(mapOf("term", mapOf("__state", "ACTIVE"))); + mustClauseList.add(mapOf("term", mapOf("name.keyword", productName))); + + + Map bool = new HashMap<>(); + if (parentDomain != null) { + mustClauseList.add(mapOf("term", mapOf("parentDomainQualifiedName", parentDomainQualifiedName))); + } else { + List mustNotClauseList = new ArrayList(); + mustNotClauseList.add(mapOf("exists", mapOf("field", "parentDomainQualifiedName"))); + bool.put("must_not", mustNotClauseList); + } + + bool.put("must", mustClauseList); + + Map dsl = mapOf("query", mapOf("bool", bool)); + + List products = indexSearchPaginated(dsl, DATA_PRODUCT_ENTITY_TYPE); + + if (CollectionUtils.isNotEmpty(products)) { + for (AtlasEntityHeader product : products) { + String name = (String) product.getAttribute(NAME); + if (productName.equals(name)) { + exists = true; + break; + } + } + } + } finally { + RequestContext.get().endMetricRecord(metricRecorder); + } + + if (exists) { + throw new AtlasBaseException(AtlasErrorCode.BAD_REQUEST, productName+" already exists in the domain"); + } + } + +} \ No newline at end of file diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/preprocessor/datamesh/DomainPreProcessor.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/preprocessor/datamesh/DomainPreProcessor.java new file mode 100644 index 0000000000..55007d15fe --- /dev/null +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/preprocessor/datamesh/DomainPreProcessor.java @@ -0,0 +1,174 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.atlas.repository.store.graph.v2.preprocessor.datamesh; + + +import org.apache.atlas.AtlasErrorCode; +import org.apache.atlas.RequestContext; +import org.apache.atlas.exception.AtlasBaseException; +import org.apache.atlas.model.instance.AtlasEntity; +import org.apache.atlas.model.instance.AtlasEntityHeader; +import org.apache.atlas.model.instance.AtlasObjectId; +import org.apache.atlas.model.instance.AtlasRelatedObjectId; +import org.apache.atlas.model.instance.AtlasStruct; +import org.apache.atlas.model.instance.EntityMutations; +import org.apache.atlas.repository.graph.GraphHelper; +import org.apache.atlas.repository.graphdb.AtlasGraph; +import org.apache.atlas.repository.graphdb.AtlasVertex; +import org.apache.atlas.repository.store.graph.v2.EntityGraphMapper; +import org.apache.atlas.repository.store.graph.v2.EntityGraphRetriever; +import org.apache.atlas.repository.store.graph.v2.EntityMutationContext; +import org.apache.atlas.type.AtlasEntityType; +import org.apache.atlas.type.AtlasTypeRegistry; +import org.apache.atlas.utils.AtlasPerfMetrics; +import org.apache.commons.collections.CollectionUtils; +import org.apache.commons.collections.MapUtils; +import org.apache.commons.lang.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.*; + +import static org.apache.atlas.repository.Constants.*; +import static org.apache.atlas.repository.graph.GraphHelper.getActiveChildrenVertices; +import static org.apache.atlas.repository.store.graph.v2.preprocessor.PreProcessorUtils.*; +import static org.apache.atlas.repository.util.AccessControlUtils.ATTR_POLICY_RESOURCES; +import static org.apache.atlas.repository.util.AtlasEntityUtils.mapOf; + +public class DomainPreProcessor extends AbstractDomainPreProcessor { + private static final Logger LOG = LoggerFactory.getLogger(DomainPreProcessor.class); + private AtlasEntityHeader parentDomain; + private EntityGraphMapper entityGraphMapper; + private EntityMutationContext context; + + public DomainPreProcessor(AtlasTypeRegistry typeRegistry, EntityGraphRetriever entityRetriever, + AtlasGraph graph, EntityGraphMapper entityGraphMapper) { + super(typeRegistry, entityRetriever, graph); + this.entityGraphMapper = entityGraphMapper; + } + + @Override + public void processAttributes(AtlasStruct entityStruct, EntityMutationContext context, + EntityMutations.EntityOperation operation) throws AtlasBaseException { + //Handle name & qualifiedName + if (operation == EntityMutations.EntityOperation.CREATE && LOG.isDebugEnabled()) { + LOG.debug("DomainPreProcessor.processAttributes: pre processing {}, {}", + entityStruct.getAttribute(QUALIFIED_NAME), operation); + } + + this.context = context; + + AtlasEntity entity = (AtlasEntity) entityStruct; + AtlasVertex vertex = context.getVertex(entity.getGuid()); + + setParent(entity, context); + + if (operation == EntityMutations.EntityOperation.CREATE) { + processCreateDomain(entity, vertex); + } + } + + private void processCreateDomain(AtlasEntity entity, AtlasVertex vertex) throws AtlasBaseException { + AtlasPerfMetrics.MetricRecorder metricRecorder = RequestContext.get().startMetricRecord("processCreateDomain"); + String domainName = (String) entity.getAttribute(NAME); + String parentDomainQualifiedName = (String) entity.getAttribute(PARENT_DOMAIN_QN); + + domainExists(domainName, parentDomainQualifiedName); + entity.setAttribute(QUALIFIED_NAME, createQualifiedName(parentDomainQualifiedName)); + + RequestContext.get().endMetricRecord(metricRecorder); + } + + public static String createQualifiedName(String parentDomainQualifiedName) { + if (StringUtils.isNotEmpty(parentDomainQualifiedName) && parentDomainQualifiedName !=null) { + return parentDomainQualifiedName + "/domain/" + getUUID(); + } else{ + String prefixQN = "default/domain"; + return prefixQN + "/" + getUUID(); + } + } + + private void setParent(AtlasEntity entity, EntityMutationContext context) throws AtlasBaseException { + AtlasPerfMetrics.MetricRecorder metricRecorder = RequestContext.get().startMetricRecord("DomainPreProcessor.setParent"); + if (parentDomain == null) { + AtlasObjectId objectId = (AtlasObjectId) entity.getRelationshipAttribute(PARENT_DOMAIN); + Set attributes = new HashSet<>(Arrays.asList(QUALIFIED_NAME, SUPER_DOMAIN_QN, PARENT_DOMAIN_QN, "__typeName")); + + if (objectId != null) { + if (StringUtils.isNotEmpty(objectId.getGuid())) { + AtlasVertex vertex = entityRetriever.getEntityVertex(objectId.getGuid()); + + if (vertex == null) { + parentDomain = entityRetriever.toAtlasEntityHeader(objectId.getGuid(), attributes); + } else { + parentDomain = entityRetriever.toAtlasEntityHeader(vertex, attributes); + } + } else if (MapUtils.isNotEmpty(objectId.getUniqueAttributes()) && + StringUtils.isNotEmpty((String) objectId.getUniqueAttributes().get(QUALIFIED_NAME))) { + AtlasVertex parentDomainVertex = entityRetriever.getEntityVertex(objectId); + parentDomain = entityRetriever.toAtlasEntityHeader(parentDomainVertex, attributes); + } + } + } + RequestContext.get().endMetricRecord(metricRecorder); + } + + private void domainExists(String domainName, String parentDomainQualifiedName) throws AtlasBaseException { + AtlasPerfMetrics.MetricRecorder metricRecorder = RequestContext.get().startMetricRecord("domainExists"); + + boolean exists = false; + try { + List mustClauseList = new ArrayList(); + mustClauseList.add(mapOf("term", mapOf("__typeName.keyword", DATA_DOMAIN_ENTITY_TYPE))); + mustClauseList.add(mapOf("term", mapOf("__state", "ACTIVE"))); + mustClauseList.add(mapOf("term", mapOf("name.keyword", domainName))); + + + Map bool = new HashMap<>(); + if (parentDomain != null) { + mustClauseList.add(mapOf("term", mapOf("parentDomainQualifiedName", parentDomainQualifiedName))); + } else { + List mustNotClauseList = new ArrayList(); + mustNotClauseList.add(mapOf("exists", mapOf("field", "parentDomainQualifiedName"))); + bool.put("must_not", mustNotClauseList); + } + + bool.put("must", mustClauseList); + + Map dsl = mapOf("query", mapOf("bool", bool)); + + List domains = indexSearchPaginated(dsl, DATA_DOMAIN_ENTITY_TYPE); + + if (CollectionUtils.isNotEmpty(domains)) { + for (AtlasEntityHeader domain : domains) { + String name = (String) domain.getAttribute(NAME); + if (domainName.equals(name)) { + exists = true; + break; + } + } + } + } finally { + RequestContext.get().endMetricRecord(metricRecorder); + } + + if (exists) { + throw new AtlasBaseException(AtlasErrorCode.BAD_REQUEST, domainName+" already exists"); + } + } +} \ No newline at end of file