From a1e11fe7b87e2de9b2ba0d4bb573541e847f99c1 Mon Sep 17 00:00:00 2001 From: PRATHAM2002-DS Date: Wed, 17 Apr 2024 21:16:13 +0530 Subject: [PATCH 01/11] Create Preprocessor for Updating Product/Domain QN --- .../apache/atlas/repository/Constants.java | 10 + .../store/graph/v2/AtlasEntityStoreV2.java | 10 + .../v2/preprocessor/PreProcessorUtils.java | 7 + .../datamesh/AbstractDomainPreProcessor.java | 131 +++++++++++++ .../datamesh/DataProductPreProcessor.java | 178 ++++++++++++++++++ .../datamesh/DomainPreProcessor.java | 174 +++++++++++++++++ 6 files changed, 510 insertions(+) create mode 100644 repository/src/main/java/org/apache/atlas/repository/store/graph/v2/preprocessor/datamesh/AbstractDomainPreProcessor.java create mode 100644 repository/src/main/java/org/apache/atlas/repository/store/graph/v2/preprocessor/datamesh/DataProductPreProcessor.java create mode 100644 repository/src/main/java/org/apache/atlas/repository/store/graph/v2/preprocessor/datamesh/DomainPreProcessor.java diff --git a/common/src/main/java/org/apache/atlas/repository/Constants.java b/common/src/main/java/org/apache/atlas/repository/Constants.java index 8d4d47ea57..22bcba57ba 100644 --- a/common/src/main/java/org/apache/atlas/repository/Constants.java +++ b/common/src/main/java/org/apache/atlas/repository/Constants.java @@ -134,6 +134,16 @@ public final class Constants { public static final String GLOSSARY_CATEGORY_EDGE_LABEL = "r:AtlasGlossaryCategoryAnchor"; + /** + * MESH property keys. + */ + public static final String DATA_DOMAIN_ENTITY_TYPE = "DataDomain"; + public static final String DATA_PRODUCT_ENTITY_TYPE = "DataProduct"; + public static final String DATA_PRODUCT_EDGE_LABEL = "__DataDomain.dataProducts"; + public static final String DOMAIN_PARENT_EDGE_LABEL = "__DataDomain.subDomains"; + + + /** * SQL property keys. */ diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityStoreV2.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityStoreV2.java index 6e3487f8d3..472c29cc46 100644 --- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityStoreV2.java +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityStoreV2.java @@ -62,6 +62,8 @@ import org.apache.atlas.repository.store.graph.v2.preprocessor.PreProcessor; import org.apache.atlas.repository.store.graph.v2.preprocessor.accesscontrol.PersonaPreProcessor; import org.apache.atlas.repository.store.graph.v2.preprocessor.accesscontrol.PurposePreProcessor; +import org.apache.atlas.repository.store.graph.v2.preprocessor.datamesh.DataProductPreProcessor; +import org.apache.atlas.repository.store.graph.v2.preprocessor.datamesh.DomainPreProcessor; import org.apache.atlas.repository.store.graph.v2.preprocessor.glossary.CategoryPreProcessor; import org.apache.atlas.repository.store.graph.v2.preprocessor.glossary.GlossaryPreProcessor; import org.apache.atlas.repository.store.graph.v2.preprocessor.glossary.TermPreProcessor; @@ -1800,6 +1802,14 @@ public PreProcessor getPreProcessor(String typeName) { preProcessor = new CategoryPreProcessor(typeRegistry, entityRetriever, graph, taskManagement, entityGraphMapper); break; + case DATA_DOMAIN_ENTITY_TYPE: + preProcessor = new DomainPreProcessor(typeRegistry, entityRetriever, graph, entityGraphMapper); + break; + + case DATA_PRODUCT_ENTITY_TYPE: + preProcessor = new DataProductPreProcessor(typeRegistry, entityRetriever, graph, entityGraphMapper); + break; + case QUERY_ENTITY_TYPE: preProcessor = new QueryPreProcessor(typeRegistry, entityRetriever); break; diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/preprocessor/PreProcessorUtils.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/preprocessor/PreProcessorUtils.java index 6c84900460..57fd775d4a 100644 --- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/preprocessor/PreProcessorUtils.java +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/preprocessor/PreProcessorUtils.java @@ -32,6 +32,13 @@ public class PreProcessorUtils { public static final String GLOSSARY_TERM_REL_TYPE = "AtlasGlossaryTermAnchor"; public static final String GLOSSARY_CATEGORY_REL_TYPE = "AtlasGlossaryCategoryAnchor"; + //DataMesh models constants + public static final String DATA_PRODUCT_TYPE = "DataProduct"; + public static final String PARENT_DOMAIN = "parentDomain"; + public static final String PARENT_DOMAIN_QN = "parentDomainQualifiedName"; + public static final String SUPER_DOMAIN_QN = "superDomainQualifiedName"; + public static final String DATA_DOMAIN = "dataDomain"; + //Query models constants public static final String PREFIX_QUERY_QN = "default/collection/"; public static final String COLLECTION_QUALIFIED_NAME = "collectionQualifiedName"; diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/preprocessor/datamesh/AbstractDomainPreProcessor.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/preprocessor/datamesh/AbstractDomainPreProcessor.java new file mode 100644 index 0000000000..e93f10aec5 --- /dev/null +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/preprocessor/datamesh/AbstractDomainPreProcessor.java @@ -0,0 +1,131 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.atlas.repository.store.graph.v2.preprocessor.datamesh; + +import org.apache.atlas.AtlasException; +import org.apache.atlas.RequestContext; +import org.apache.atlas.authorize.AtlasAuthorizationUtils; +import org.apache.atlas.authorize.AtlasEntityAccessRequest; +import org.apache.atlas.authorize.AtlasPrivilege; +import org.apache.atlas.discovery.EntityDiscoveryService; +import org.apache.atlas.exception.AtlasBaseException; +import org.apache.atlas.model.discovery.IndexSearchParams; +import org.apache.atlas.model.instance.AtlasEntity; +import org.apache.atlas.model.instance.AtlasEntityHeader; +import org.apache.atlas.repository.graphdb.AtlasGraph; +import org.apache.atlas.repository.graphdb.AtlasVertex; +import org.apache.atlas.repository.store.graph.v2.EntityGraphRetriever; +import org.apache.atlas.repository.store.graph.v2.preprocessor.PreProcessor; +import org.apache.atlas.repository.store.graph.v2.preprocessor.glossary.AbstractGlossaryPreProcessor; +import org.apache.atlas.type.AtlasEntityType; +import org.apache.atlas.type.AtlasTypeRegistry; +import org.apache.atlas.utils.AtlasPerfMetrics; +import org.apache.commons.collections.CollectionUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.*; + +import static org.apache.atlas.repository.Constants.*; +import static org.apache.atlas.repository.store.graph.v2.preprocessor.PreProcessorUtils.PARENT_DOMAIN_QN; +import static org.apache.atlas.repository.store.graph.v2.preprocessor.PreProcessorUtils.SUPER_DOMAIN_QN; +import static org.apache.atlas.repository.util.AccessControlUtils.ATTR_POLICY_CATEGORY; +import static org.apache.atlas.repository.util.AccessControlUtils.ATTR_POLICY_RESOURCES; +import static org.apache.atlas.repository.util.AtlasEntityUtils.mapOf; + +public abstract class AbstractDomainPreProcessor implements PreProcessor { + private static final Logger LOG = LoggerFactory.getLogger(AbstractGlossaryPreProcessor.class); + + + protected final AtlasTypeRegistry typeRegistry; + protected final EntityGraphRetriever entityRetriever; + + protected EntityDiscoveryService discovery; + + AbstractDomainPreProcessor(AtlasTypeRegistry typeRegistry, EntityGraphRetriever entityRetriever, AtlasGraph graph) { + this.entityRetriever = entityRetriever; + this.typeRegistry = typeRegistry; + + try { + this.discovery = new EntityDiscoveryService(typeRegistry, graph, null, null, null, null); + } catch (AtlasException e) { + e.printStackTrace(); + } + } + + public List indexSearchPaginated(Map dsl, String entityType) throws AtlasBaseException { + IndexSearchParams searchParams = new IndexSearchParams(); + List ret = new ArrayList<>(); + + List sortList = new ArrayList<>(0); + sortList.add(mapOf("__timestamp", mapOf("order", "asc"))); + sortList.add(mapOf("__guid", mapOf("order", "asc"))); + dsl.put("sort", sortList); + + int from = 0; + int size = 100; + boolean hasMore = true; + do { + dsl.put("from", from); + dsl.put("size", size); + searchParams.setDsl(dsl); + + if (entityType.equals(POLICY_ENTITY_TYPE)) { + Set attributes = new HashSet<>(Arrays.asList(ATTR_POLICY_RESOURCES, ATTR_POLICY_CATEGORY)); + searchParams.setAttributes(attributes); + } + + List headers = discovery.directIndexSearch(searchParams).getEntities(); + + if (CollectionUtils.isNotEmpty(headers)) { + ret.addAll(headers); + } else { + hasMore = false; + } + + from += size; + + } while (hasMore); + + return ret; + } + + protected void isAuthorized(AtlasEntityHeader sourceDomain, AtlasEntityHeader targetDomain) throws AtlasBaseException { + + // source -> CREATE + UPDATE + DELETE + AtlasAuthorizationUtils.verifyAccess(new AtlasEntityAccessRequest(typeRegistry, AtlasPrivilege.ENTITY_CREATE, sourceDomain), + "create on source Domain: ", sourceDomain.getAttribute(NAME)); + + AtlasAuthorizationUtils.verifyAccess(new AtlasEntityAccessRequest(typeRegistry, AtlasPrivilege.ENTITY_UPDATE, sourceDomain), + "update on source Domain: ", sourceDomain.getAttribute(NAME)); + + AtlasAuthorizationUtils.verifyAccess(new AtlasEntityAccessRequest(typeRegistry, AtlasPrivilege.ENTITY_DELETE, sourceDomain), + "delete on source Domain: ", sourceDomain.getAttribute(NAME)); + + + // target -> CREATE + UPDATE + DELETE + AtlasAuthorizationUtils.verifyAccess(new AtlasEntityAccessRequest(typeRegistry, AtlasPrivilege.ENTITY_CREATE, targetDomain), + "create on target Domain: ", targetDomain.getAttribute(NAME)); + + AtlasAuthorizationUtils.verifyAccess(new AtlasEntityAccessRequest(typeRegistry, AtlasPrivilege.ENTITY_UPDATE, targetDomain), + "update on target Domain: ", targetDomain.getAttribute(NAME)); + + AtlasAuthorizationUtils.verifyAccess(new AtlasEntityAccessRequest(typeRegistry, AtlasPrivilege.ENTITY_DELETE, targetDomain), + "delete on target Domain: ", targetDomain.getAttribute(NAME)); + } +} \ No newline at end of file diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/preprocessor/datamesh/DataProductPreProcessor.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/preprocessor/datamesh/DataProductPreProcessor.java new file mode 100644 index 0000000000..9d8ab16ddd --- /dev/null +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/preprocessor/datamesh/DataProductPreProcessor.java @@ -0,0 +1,178 @@ +package org.apache.atlas.repository.store.graph.v2.preprocessor.datamesh; + +import org.apache.atlas.AtlasErrorCode; +import org.apache.atlas.RequestContext; +import org.apache.atlas.exception.AtlasBaseException; +import org.apache.atlas.model.instance.*; +import org.apache.atlas.repository.graphdb.AtlasGraph; +import org.apache.atlas.repository.graphdb.AtlasVertex; +import org.apache.atlas.repository.store.graph.v2.EntityGraphMapper; +import org.apache.atlas.repository.store.graph.v2.EntityGraphRetriever; +import org.apache.atlas.repository.store.graph.v2.EntityMutationContext; +import org.apache.atlas.type.AtlasEntityType; +import org.apache.atlas.type.AtlasTypeRegistry; +import org.apache.atlas.utils.AtlasPerfMetrics; +import org.apache.commons.collections.CollectionUtils; +import org.apache.commons.collections.MapUtils; +import org.apache.commons.lang.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.*; + +import static org.apache.atlas.repository.Constants.*; +import static org.apache.atlas.repository.store.graph.v2.preprocessor.PreProcessorUtils.*; +import static org.apache.atlas.repository.util.AccessControlUtils.ATTR_POLICY_RESOURCES; +import static org.apache.atlas.repository.util.AtlasEntityUtils.mapOf; + +public class DataProductPreProcessor extends AbstractDomainPreProcessor { + private static final Logger LOG = LoggerFactory.getLogger(DomainPreProcessor.class); + private AtlasEntityHeader parentDomain; + private EntityMutationContext context; + public DataProductPreProcessor(AtlasTypeRegistry typeRegistry, EntityGraphRetriever entityRetriever, + AtlasGraph graph, EntityGraphMapper entityGraphMapper) { + super(typeRegistry, entityRetriever, graph); + } + + @Override + public void processAttributes(AtlasStruct entityStruct, EntityMutationContext context, + EntityMutations.EntityOperation operation) throws AtlasBaseException { + //Handle name & qualifiedName + if (operation == EntityMutations.EntityOperation.CREATE && LOG.isDebugEnabled()) { + LOG.debug("DataProductPreProcessor.processAttributes: pre processing {}, {}", + entityStruct.getAttribute(QUALIFIED_NAME), operation); + } + this.context = context; + + AtlasEntity entity = (AtlasEntity) entityStruct; + AtlasVertex vertex = context.getVertex(entity.getGuid()); + + setParent(entity, context); + + if (operation == EntityMutations.EntityOperation.CREATE) { + processCreateProduct(entity, vertex); + } + } + + private void processCreateProduct(AtlasEntity entity, AtlasVertex vertex) throws AtlasBaseException { + AtlasPerfMetrics.MetricRecorder metricRecorder = RequestContext.get().startMetricRecord("processCreateDomain"); + String domainName = (String) entity.getAttribute(NAME); + String parentDomainQualifiedName = (String) entity.getAttribute(PARENT_DOMAIN_QN); + + productExists(domainName, parentDomainQualifiedName); + String newQualifiedName = createQualifiedName(parentDomainQualifiedName); + if(!newQualifiedName.isEmpty()){ + entity.setAttribute(QUALIFIED_NAME, newQualifiedName); + } + else{ + throw new AtlasBaseException(AtlasErrorCode.BAD_REQUEST, "Parent Domain Qualified Name is empty"); + } + + RequestContext.get().endMetricRecord(metricRecorder); + } + + public static String createQualifiedName(String parentDomainQualifiedName) { + if (StringUtils.isNotEmpty(parentDomainQualifiedName) && parentDomainQualifiedName !=null) { + return parentDomainQualifiedName + "/product/" + getUUID(); + } + else{ + return ""; + } + } + + private void setParent(AtlasEntity entity, EntityMutationContext context) throws AtlasBaseException { + AtlasPerfMetrics.MetricRecorder metricRecorder = RequestContext.get().startMetricRecord("DataProductPreProcessor.setParent"); + if (parentDomain == null) { + Object relationshipAttribute = entity.getRelationshipAttribute(DATA_DOMAIN); + Set attributes = new HashSet<>(Arrays.asList(QUALIFIED_NAME, SUPER_DOMAIN_QN, PARENT_DOMAIN_QN, "__typeName")); + + if(relationshipAttribute instanceof AtlasObjectId){ + AtlasObjectId objectId = (AtlasObjectId) relationshipAttribute; + if (objectId != null) { + if (StringUtils.isNotEmpty(objectId.getGuid())) { + AtlasVertex vertex = entityRetriever.getEntityVertex(objectId.getGuid()); + + if (vertex == null) { + parentDomain = entityRetriever.toAtlasEntityHeader(objectId.getGuid(), attributes); + } else { + parentDomain = entityRetriever.toAtlasEntityHeader(vertex, attributes); + } + + } else if (MapUtils.isNotEmpty(objectId.getUniqueAttributes()) && + StringUtils.isNotEmpty((String) objectId.getUniqueAttributes().get(QUALIFIED_NAME))) { + AtlasVertex parentDomainVertex = entityRetriever.getEntityVertex(objectId); + parentDomain = entityRetriever.toAtlasEntityHeader(parentDomainVertex, attributes); + + } + } + } + else if(relationshipAttribute instanceof Map){ + Map relationshipMap = (Map) relationshipAttribute; + if (StringUtils.isNotEmpty((String) relationshipMap.get("guid"))) { + AtlasVertex vertex = entityRetriever.getEntityVertex((String) relationshipMap.get("guid")); + + if (vertex == null) { + parentDomain = entityRetriever.toAtlasEntityHeader((String) relationshipMap.get("guid"), attributes); + } else { + parentDomain = entityRetriever.toAtlasEntityHeader(vertex, attributes); + } + + } + else { + parentDomain = new AtlasEntityHeader((String) relationshipMap.get("typeName"), relationshipMap); + + } + } + else{ + LOG.warn("DataProductPreProcessor.setParent: Invalid relationshipAttribute {}", relationshipAttribute); + } + + } + RequestContext.get().endMetricRecord(metricRecorder); + } + + private void productExists(String productName, String parentDomainQualifiedName) throws AtlasBaseException { + AtlasPerfMetrics.MetricRecorder metricRecorder = RequestContext.get().startMetricRecord("domainExists"); + + boolean exists = false; + try { + List mustClauseList = new ArrayList(); + mustClauseList.add(mapOf("term", mapOf("__typeName.keyword", DATA_PRODUCT_ENTITY_TYPE))); + mustClauseList.add(mapOf("term", mapOf("__state", "ACTIVE"))); + mustClauseList.add(mapOf("term", mapOf("name.keyword", productName))); + + + Map bool = new HashMap<>(); + if (parentDomain != null) { + mustClauseList.add(mapOf("term", mapOf("parentDomainQualifiedName", parentDomainQualifiedName))); + } else { + List mustNotClauseList = new ArrayList(); + mustNotClauseList.add(mapOf("exists", mapOf("field", "parentDomainQualifiedName"))); + bool.put("must_not", mustNotClauseList); + } + + bool.put("must", mustClauseList); + + Map dsl = mapOf("query", mapOf("bool", bool)); + + List products = indexSearchPaginated(dsl, DATA_PRODUCT_ENTITY_TYPE); + + if (CollectionUtils.isNotEmpty(products)) { + for (AtlasEntityHeader product : products) { + String name = (String) product.getAttribute(NAME); + if (productName.equals(name)) { + exists = true; + break; + } + } + } + } finally { + RequestContext.get().endMetricRecord(metricRecorder); + } + + if (exists) { + throw new AtlasBaseException(AtlasErrorCode.BAD_REQUEST, productName+" already exists in the domain"); + } + } + +} \ No newline at end of file diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/preprocessor/datamesh/DomainPreProcessor.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/preprocessor/datamesh/DomainPreProcessor.java new file mode 100644 index 0000000000..55007d15fe --- /dev/null +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/preprocessor/datamesh/DomainPreProcessor.java @@ -0,0 +1,174 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.atlas.repository.store.graph.v2.preprocessor.datamesh; + + +import org.apache.atlas.AtlasErrorCode; +import org.apache.atlas.RequestContext; +import org.apache.atlas.exception.AtlasBaseException; +import org.apache.atlas.model.instance.AtlasEntity; +import org.apache.atlas.model.instance.AtlasEntityHeader; +import org.apache.atlas.model.instance.AtlasObjectId; +import org.apache.atlas.model.instance.AtlasRelatedObjectId; +import org.apache.atlas.model.instance.AtlasStruct; +import org.apache.atlas.model.instance.EntityMutations; +import org.apache.atlas.repository.graph.GraphHelper; +import org.apache.atlas.repository.graphdb.AtlasGraph; +import org.apache.atlas.repository.graphdb.AtlasVertex; +import org.apache.atlas.repository.store.graph.v2.EntityGraphMapper; +import org.apache.atlas.repository.store.graph.v2.EntityGraphRetriever; +import org.apache.atlas.repository.store.graph.v2.EntityMutationContext; +import org.apache.atlas.type.AtlasEntityType; +import org.apache.atlas.type.AtlasTypeRegistry; +import org.apache.atlas.utils.AtlasPerfMetrics; +import org.apache.commons.collections.CollectionUtils; +import org.apache.commons.collections.MapUtils; +import org.apache.commons.lang.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.*; + +import static org.apache.atlas.repository.Constants.*; +import static org.apache.atlas.repository.graph.GraphHelper.getActiveChildrenVertices; +import static org.apache.atlas.repository.store.graph.v2.preprocessor.PreProcessorUtils.*; +import static org.apache.atlas.repository.util.AccessControlUtils.ATTR_POLICY_RESOURCES; +import static org.apache.atlas.repository.util.AtlasEntityUtils.mapOf; + +public class DomainPreProcessor extends AbstractDomainPreProcessor { + private static final Logger LOG = LoggerFactory.getLogger(DomainPreProcessor.class); + private AtlasEntityHeader parentDomain; + private EntityGraphMapper entityGraphMapper; + private EntityMutationContext context; + + public DomainPreProcessor(AtlasTypeRegistry typeRegistry, EntityGraphRetriever entityRetriever, + AtlasGraph graph, EntityGraphMapper entityGraphMapper) { + super(typeRegistry, entityRetriever, graph); + this.entityGraphMapper = entityGraphMapper; + } + + @Override + public void processAttributes(AtlasStruct entityStruct, EntityMutationContext context, + EntityMutations.EntityOperation operation) throws AtlasBaseException { + //Handle name & qualifiedName + if (operation == EntityMutations.EntityOperation.CREATE && LOG.isDebugEnabled()) { + LOG.debug("DomainPreProcessor.processAttributes: pre processing {}, {}", + entityStruct.getAttribute(QUALIFIED_NAME), operation); + } + + this.context = context; + + AtlasEntity entity = (AtlasEntity) entityStruct; + AtlasVertex vertex = context.getVertex(entity.getGuid()); + + setParent(entity, context); + + if (operation == EntityMutations.EntityOperation.CREATE) { + processCreateDomain(entity, vertex); + } + } + + private void processCreateDomain(AtlasEntity entity, AtlasVertex vertex) throws AtlasBaseException { + AtlasPerfMetrics.MetricRecorder metricRecorder = RequestContext.get().startMetricRecord("processCreateDomain"); + String domainName = (String) entity.getAttribute(NAME); + String parentDomainQualifiedName = (String) entity.getAttribute(PARENT_DOMAIN_QN); + + domainExists(domainName, parentDomainQualifiedName); + entity.setAttribute(QUALIFIED_NAME, createQualifiedName(parentDomainQualifiedName)); + + RequestContext.get().endMetricRecord(metricRecorder); + } + + public static String createQualifiedName(String parentDomainQualifiedName) { + if (StringUtils.isNotEmpty(parentDomainQualifiedName) && parentDomainQualifiedName !=null) { + return parentDomainQualifiedName + "/domain/" + getUUID(); + } else{ + String prefixQN = "default/domain"; + return prefixQN + "/" + getUUID(); + } + } + + private void setParent(AtlasEntity entity, EntityMutationContext context) throws AtlasBaseException { + AtlasPerfMetrics.MetricRecorder metricRecorder = RequestContext.get().startMetricRecord("DomainPreProcessor.setParent"); + if (parentDomain == null) { + AtlasObjectId objectId = (AtlasObjectId) entity.getRelationshipAttribute(PARENT_DOMAIN); + Set attributes = new HashSet<>(Arrays.asList(QUALIFIED_NAME, SUPER_DOMAIN_QN, PARENT_DOMAIN_QN, "__typeName")); + + if (objectId != null) { + if (StringUtils.isNotEmpty(objectId.getGuid())) { + AtlasVertex vertex = entityRetriever.getEntityVertex(objectId.getGuid()); + + if (vertex == null) { + parentDomain = entityRetriever.toAtlasEntityHeader(objectId.getGuid(), attributes); + } else { + parentDomain = entityRetriever.toAtlasEntityHeader(vertex, attributes); + } + } else if (MapUtils.isNotEmpty(objectId.getUniqueAttributes()) && + StringUtils.isNotEmpty((String) objectId.getUniqueAttributes().get(QUALIFIED_NAME))) { + AtlasVertex parentDomainVertex = entityRetriever.getEntityVertex(objectId); + parentDomain = entityRetriever.toAtlasEntityHeader(parentDomainVertex, attributes); + } + } + } + RequestContext.get().endMetricRecord(metricRecorder); + } + + private void domainExists(String domainName, String parentDomainQualifiedName) throws AtlasBaseException { + AtlasPerfMetrics.MetricRecorder metricRecorder = RequestContext.get().startMetricRecord("domainExists"); + + boolean exists = false; + try { + List mustClauseList = new ArrayList(); + mustClauseList.add(mapOf("term", mapOf("__typeName.keyword", DATA_DOMAIN_ENTITY_TYPE))); + mustClauseList.add(mapOf("term", mapOf("__state", "ACTIVE"))); + mustClauseList.add(mapOf("term", mapOf("name.keyword", domainName))); + + + Map bool = new HashMap<>(); + if (parentDomain != null) { + mustClauseList.add(mapOf("term", mapOf("parentDomainQualifiedName", parentDomainQualifiedName))); + } else { + List mustNotClauseList = new ArrayList(); + mustNotClauseList.add(mapOf("exists", mapOf("field", "parentDomainQualifiedName"))); + bool.put("must_not", mustNotClauseList); + } + + bool.put("must", mustClauseList); + + Map dsl = mapOf("query", mapOf("bool", bool)); + + List domains = indexSearchPaginated(dsl, DATA_DOMAIN_ENTITY_TYPE); + + if (CollectionUtils.isNotEmpty(domains)) { + for (AtlasEntityHeader domain : domains) { + String name = (String) domain.getAttribute(NAME); + if (domainName.equals(name)) { + exists = true; + break; + } + } + } + } finally { + RequestContext.get().endMetricRecord(metricRecorder); + } + + if (exists) { + throw new AtlasBaseException(AtlasErrorCode.BAD_REQUEST, domainName+" already exists"); + } + } +} \ No newline at end of file From f9444b7556d08ddf5d932cbffcb7bc04dcec4c3c Mon Sep 17 00:00:00 2001 From: PRATHAM2002-DS Date: Sat, 20 Apr 2024 13:11:17 +0530 Subject: [PATCH 02/11] implemented changes on PR --- .../apache/atlas/repository/Constants.java | 3 -- .../v2/preprocessor/PreProcessorUtils.java | 1 - .../datamesh/AbstractDomainPreProcessor.java | 37 +--------------- .../datamesh/DataProductPreProcessor.java | 44 +++++++++++-------- .../datamesh/DomainPreProcessor.java | 34 +++++++++----- 5 files changed, 48 insertions(+), 71 deletions(-) diff --git a/common/src/main/java/org/apache/atlas/repository/Constants.java b/common/src/main/java/org/apache/atlas/repository/Constants.java index 22bcba57ba..259cde132f 100644 --- a/common/src/main/java/org/apache/atlas/repository/Constants.java +++ b/common/src/main/java/org/apache/atlas/repository/Constants.java @@ -139,9 +139,6 @@ public final class Constants { */ public static final String DATA_DOMAIN_ENTITY_TYPE = "DataDomain"; public static final String DATA_PRODUCT_ENTITY_TYPE = "DataProduct"; - public static final String DATA_PRODUCT_EDGE_LABEL = "__DataDomain.dataProducts"; - public static final String DOMAIN_PARENT_EDGE_LABEL = "__DataDomain.subDomains"; - /** diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/preprocessor/PreProcessorUtils.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/preprocessor/PreProcessorUtils.java index 57fd775d4a..4655dab17c 100644 --- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/preprocessor/PreProcessorUtils.java +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/preprocessor/PreProcessorUtils.java @@ -33,7 +33,6 @@ public class PreProcessorUtils { public static final String GLOSSARY_CATEGORY_REL_TYPE = "AtlasGlossaryCategoryAnchor"; //DataMesh models constants - public static final String DATA_PRODUCT_TYPE = "DataProduct"; public static final String PARENT_DOMAIN = "parentDomain"; public static final String PARENT_DOMAIN_QN = "parentDomainQualifiedName"; public static final String SUPER_DOMAIN_QN = "superDomainQualifiedName"; diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/preprocessor/datamesh/AbstractDomainPreProcessor.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/preprocessor/datamesh/AbstractDomainPreProcessor.java index e93f10aec5..86fb09a96e 100644 --- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/preprocessor/datamesh/AbstractDomainPreProcessor.java +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/preprocessor/datamesh/AbstractDomainPreProcessor.java @@ -18,23 +18,14 @@ package org.apache.atlas.repository.store.graph.v2.preprocessor.datamesh; import org.apache.atlas.AtlasException; -import org.apache.atlas.RequestContext; -import org.apache.atlas.authorize.AtlasAuthorizationUtils; -import org.apache.atlas.authorize.AtlasEntityAccessRequest; -import org.apache.atlas.authorize.AtlasPrivilege; import org.apache.atlas.discovery.EntityDiscoveryService; import org.apache.atlas.exception.AtlasBaseException; import org.apache.atlas.model.discovery.IndexSearchParams; -import org.apache.atlas.model.instance.AtlasEntity; import org.apache.atlas.model.instance.AtlasEntityHeader; import org.apache.atlas.repository.graphdb.AtlasGraph; -import org.apache.atlas.repository.graphdb.AtlasVertex; import org.apache.atlas.repository.store.graph.v2.EntityGraphRetriever; import org.apache.atlas.repository.store.graph.v2.preprocessor.PreProcessor; -import org.apache.atlas.repository.store.graph.v2.preprocessor.glossary.AbstractGlossaryPreProcessor; -import org.apache.atlas.type.AtlasEntityType; import org.apache.atlas.type.AtlasTypeRegistry; -import org.apache.atlas.utils.AtlasPerfMetrics; import org.apache.commons.collections.CollectionUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -42,14 +33,12 @@ import java.util.*; import static org.apache.atlas.repository.Constants.*; -import static org.apache.atlas.repository.store.graph.v2.preprocessor.PreProcessorUtils.PARENT_DOMAIN_QN; -import static org.apache.atlas.repository.store.graph.v2.preprocessor.PreProcessorUtils.SUPER_DOMAIN_QN; import static org.apache.atlas.repository.util.AccessControlUtils.ATTR_POLICY_CATEGORY; import static org.apache.atlas.repository.util.AccessControlUtils.ATTR_POLICY_RESOURCES; import static org.apache.atlas.repository.util.AtlasEntityUtils.mapOf; public abstract class AbstractDomainPreProcessor implements PreProcessor { - private static final Logger LOG = LoggerFactory.getLogger(AbstractGlossaryPreProcessor.class); + private static final Logger LOG = LoggerFactory.getLogger(AbstractDomainPreProcessor.class); protected final AtlasTypeRegistry typeRegistry; @@ -104,28 +93,4 @@ public List indexSearchPaginated(Map dsl, Str return ret; } - - protected void isAuthorized(AtlasEntityHeader sourceDomain, AtlasEntityHeader targetDomain) throws AtlasBaseException { - - // source -> CREATE + UPDATE + DELETE - AtlasAuthorizationUtils.verifyAccess(new AtlasEntityAccessRequest(typeRegistry, AtlasPrivilege.ENTITY_CREATE, sourceDomain), - "create on source Domain: ", sourceDomain.getAttribute(NAME)); - - AtlasAuthorizationUtils.verifyAccess(new AtlasEntityAccessRequest(typeRegistry, AtlasPrivilege.ENTITY_UPDATE, sourceDomain), - "update on source Domain: ", sourceDomain.getAttribute(NAME)); - - AtlasAuthorizationUtils.verifyAccess(new AtlasEntityAccessRequest(typeRegistry, AtlasPrivilege.ENTITY_DELETE, sourceDomain), - "delete on source Domain: ", sourceDomain.getAttribute(NAME)); - - - // target -> CREATE + UPDATE + DELETE - AtlasAuthorizationUtils.verifyAccess(new AtlasEntityAccessRequest(typeRegistry, AtlasPrivilege.ENTITY_CREATE, targetDomain), - "create on target Domain: ", targetDomain.getAttribute(NAME)); - - AtlasAuthorizationUtils.verifyAccess(new AtlasEntityAccessRequest(typeRegistry, AtlasPrivilege.ENTITY_UPDATE, targetDomain), - "update on target Domain: ", targetDomain.getAttribute(NAME)); - - AtlasAuthorizationUtils.verifyAccess(new AtlasEntityAccessRequest(typeRegistry, AtlasPrivilege.ENTITY_DELETE, targetDomain), - "delete on target Domain: ", targetDomain.getAttribute(NAME)); - } } \ No newline at end of file diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/preprocessor/datamesh/DataProductPreProcessor.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/preprocessor/datamesh/DataProductPreProcessor.java index 9d8ab16ddd..78e1097234 100644 --- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/preprocessor/datamesh/DataProductPreProcessor.java +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/preprocessor/datamesh/DataProductPreProcessor.java @@ -9,7 +9,6 @@ import org.apache.atlas.repository.store.graph.v2.EntityGraphMapper; import org.apache.atlas.repository.store.graph.v2.EntityGraphRetriever; import org.apache.atlas.repository.store.graph.v2.EntityMutationContext; -import org.apache.atlas.type.AtlasEntityType; import org.apache.atlas.type.AtlasTypeRegistry; import org.apache.atlas.utils.AtlasPerfMetrics; import org.apache.commons.collections.CollectionUtils; @@ -22,7 +21,6 @@ import static org.apache.atlas.repository.Constants.*; import static org.apache.atlas.repository.store.graph.v2.preprocessor.PreProcessorUtils.*; -import static org.apache.atlas.repository.util.AccessControlUtils.ATTR_POLICY_RESOURCES; import static org.apache.atlas.repository.util.AtlasEntityUtils.mapOf; public class DataProductPreProcessor extends AbstractDomainPreProcessor { @@ -49,35 +47,43 @@ public void processAttributes(AtlasStruct entityStruct, EntityMutationContext co setParent(entity, context); - if (operation == EntityMutations.EntityOperation.CREATE) { - processCreateProduct(entity, vertex); + switch (operation) { + case CREATE: + processCreateProduct(entity, vertex); + break; + case UPDATE: + processUpdateProduct(entity, vertex); + break; } } private void processCreateProduct(AtlasEntity entity, AtlasVertex vertex) throws AtlasBaseException { - AtlasPerfMetrics.MetricRecorder metricRecorder = RequestContext.get().startMetricRecord("processCreateDomain"); - String domainName = (String) entity.getAttribute(NAME); + AtlasPerfMetrics.MetricRecorder metricRecorder = RequestContext.get().startMetricRecord("processCreateProduct"); + String productName = (String) entity.getAttribute(NAME); String parentDomainQualifiedName = (String) entity.getAttribute(PARENT_DOMAIN_QN); - productExists(domainName, parentDomainQualifiedName); + productExists(productName, parentDomainQualifiedName); String newQualifiedName = createQualifiedName(parentDomainQualifiedName); - if(!newQualifiedName.isEmpty()){ - entity.setAttribute(QUALIFIED_NAME, newQualifiedName); - } - else{ - throw new AtlasBaseException(AtlasErrorCode.BAD_REQUEST, "Parent Domain Qualified Name is empty"); - } + + entity.setAttribute(QUALIFIED_NAME, newQualifiedName); RequestContext.get().endMetricRecord(metricRecorder); } - public static String createQualifiedName(String parentDomainQualifiedName) { - if (StringUtils.isNotEmpty(parentDomainQualifiedName) && parentDomainQualifiedName !=null) { - return parentDomainQualifiedName + "/product/" + getUUID(); - } - else{ - return ""; + private void processUpdateProduct(AtlasEntity entity, AtlasVertex vertex) throws AtlasBaseException { + AtlasPerfMetrics.MetricRecorder metricRecorder = RequestContext.get().startMetricRecord("processUpdateProduct"); + String VertexQName = vertex.getProperty(QUALIFIED_NAME, String.class); + + entity.setAttribute(QUALIFIED_NAME, VertexQName); + RequestContext.get().endMetricRecord(metricRecorder); + } + + private static String createQualifiedName(String parentDomainQualifiedName) throws AtlasBaseException { + if (StringUtils.isNotEmpty(parentDomainQualifiedName)) { + throw new AtlasBaseException(AtlasErrorCode.BAD_REQUEST, "Parent Domain Qualified Name cannot be empty or null"); } + return parentDomainQualifiedName + "/product/" + getUUID(); + } private void setParent(AtlasEntity entity, EntityMutationContext context) throws AtlasBaseException { diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/preprocessor/datamesh/DomainPreProcessor.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/preprocessor/datamesh/DomainPreProcessor.java index 55007d15fe..728f09820d 100644 --- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/preprocessor/datamesh/DomainPreProcessor.java +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/preprocessor/datamesh/DomainPreProcessor.java @@ -24,16 +24,13 @@ import org.apache.atlas.model.instance.AtlasEntity; import org.apache.atlas.model.instance.AtlasEntityHeader; import org.apache.atlas.model.instance.AtlasObjectId; -import org.apache.atlas.model.instance.AtlasRelatedObjectId; import org.apache.atlas.model.instance.AtlasStruct; import org.apache.atlas.model.instance.EntityMutations; -import org.apache.atlas.repository.graph.GraphHelper; import org.apache.atlas.repository.graphdb.AtlasGraph; import org.apache.atlas.repository.graphdb.AtlasVertex; import org.apache.atlas.repository.store.graph.v2.EntityGraphMapper; import org.apache.atlas.repository.store.graph.v2.EntityGraphRetriever; import org.apache.atlas.repository.store.graph.v2.EntityMutationContext; -import org.apache.atlas.type.AtlasEntityType; import org.apache.atlas.type.AtlasTypeRegistry; import org.apache.atlas.utils.AtlasPerfMetrics; import org.apache.commons.collections.CollectionUtils; @@ -45,9 +42,7 @@ import java.util.*; import static org.apache.atlas.repository.Constants.*; -import static org.apache.atlas.repository.graph.GraphHelper.getActiveChildrenVertices; import static org.apache.atlas.repository.store.graph.v2.preprocessor.PreProcessorUtils.*; -import static org.apache.atlas.repository.util.AccessControlUtils.ATTR_POLICY_RESOURCES; import static org.apache.atlas.repository.util.AtlasEntityUtils.mapOf; public class DomainPreProcessor extends AbstractDomainPreProcessor { @@ -66,7 +61,7 @@ public DomainPreProcessor(AtlasTypeRegistry typeRegistry, EntityGraphRetriever e public void processAttributes(AtlasStruct entityStruct, EntityMutationContext context, EntityMutations.EntityOperation operation) throws AtlasBaseException { //Handle name & qualifiedName - if (operation == EntityMutations.EntityOperation.CREATE && LOG.isDebugEnabled()) { + if (LOG.isDebugEnabled()) { LOG.debug("DomainPreProcessor.processAttributes: pre processing {}, {}", entityStruct.getAttribute(QUALIFIED_NAME), operation); } @@ -78,8 +73,15 @@ public void processAttributes(AtlasStruct entityStruct, EntityMutationContext co setParent(entity, context); - if (operation == EntityMutations.EntityOperation.CREATE) { - processCreateDomain(entity, vertex); + switch (operation) { + case CREATE: + processCreateDomain(entity, vertex); + break; + case UPDATE: + processUpdateDomain(entity, vertex); + break; + default: + break; } } @@ -94,15 +96,23 @@ private void processCreateDomain(AtlasEntity entity, AtlasVertex vertex) throws RequestContext.get().endMetricRecord(metricRecorder); } - public static String createQualifiedName(String parentDomainQualifiedName) { - if (StringUtils.isNotEmpty(parentDomainQualifiedName) && parentDomainQualifiedName !=null) { + private static String createQualifiedName(String parentDomainQualifiedName) { + if (StringUtils.isNotEmpty(parentDomainQualifiedName)) { return parentDomainQualifiedName + "/domain/" + getUUID(); } else{ - String prefixQN = "default/domain"; - return prefixQN + "/" + getUUID(); + return "default/domain" + "/" + getUUID(); } } + private void processUpdateDomain(AtlasEntity entity, AtlasVertex vertex) throws AtlasBaseException { + AtlasPerfMetrics.MetricRecorder metricRecorder = RequestContext.get().startMetricRecord("processUpdateDomain"); + String vertexQName = vertex.getProperty(QUALIFIED_NAME, String.class); + + entity.setAttribute(QUALIFIED_NAME, vertexQName); + + RequestContext.get().endMetricRecord(metricRecorder); + } + private void setParent(AtlasEntity entity, EntityMutationContext context) throws AtlasBaseException { AtlasPerfMetrics.MetricRecorder metricRecorder = RequestContext.get().startMetricRecord("DomainPreProcessor.setParent"); if (parentDomain == null) { From 2b74979e0432240581f1b1ae2096484b4e97bd22 Mon Sep 17 00:00:00 2001 From: PRATHAM2002-DS Date: Mon, 22 Apr 2024 18:16:00 +0530 Subject: [PATCH 03/11] added name check --- .../datamesh/AbstractDomainPreProcessor.java | 10 +-- .../datamesh/DataProductPreProcessor.java | 62 +++---------------- .../datamesh/DomainPreProcessor.java | 38 +++--------- 3 files changed, 18 insertions(+), 92 deletions(-) diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/preprocessor/datamesh/AbstractDomainPreProcessor.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/preprocessor/datamesh/AbstractDomainPreProcessor.java index 86fb09a96e..20378a1c50 100644 --- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/preprocessor/datamesh/AbstractDomainPreProcessor.java +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/preprocessor/datamesh/AbstractDomainPreProcessor.java @@ -32,9 +32,6 @@ import java.util.*; -import static org.apache.atlas.repository.Constants.*; -import static org.apache.atlas.repository.util.AccessControlUtils.ATTR_POLICY_CATEGORY; -import static org.apache.atlas.repository.util.AccessControlUtils.ATTR_POLICY_RESOURCES; import static org.apache.atlas.repository.util.AtlasEntityUtils.mapOf; public abstract class AbstractDomainPreProcessor implements PreProcessor { @@ -57,7 +54,7 @@ public abstract class AbstractDomainPreProcessor implements PreProcessor { } } - public List indexSearchPaginated(Map dsl, String entityType) throws AtlasBaseException { + public List indexSearchPaginated(Map dsl) throws AtlasBaseException { IndexSearchParams searchParams = new IndexSearchParams(); List ret = new ArrayList<>(); @@ -74,11 +71,6 @@ public List indexSearchPaginated(Map dsl, Str dsl.put("size", size); searchParams.setDsl(dsl); - if (entityType.equals(POLICY_ENTITY_TYPE)) { - Set attributes = new HashSet<>(Arrays.asList(ATTR_POLICY_RESOURCES, ATTR_POLICY_CATEGORY)); - searchParams.setAttributes(attributes); - } - List headers = discovery.directIndexSearch(searchParams).getEntities(); if (CollectionUtils.isNotEmpty(headers)) { diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/preprocessor/datamesh/DataProductPreProcessor.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/preprocessor/datamesh/DataProductPreProcessor.java index 78e1097234..3331c33ba5 100644 --- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/preprocessor/datamesh/DataProductPreProcessor.java +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/preprocessor/datamesh/DataProductPreProcessor.java @@ -45,8 +45,6 @@ public void processAttributes(AtlasStruct entityStruct, EntityMutationContext co AtlasEntity entity = (AtlasEntity) entityStruct; AtlasVertex vertex = context.getVertex(entity.getGuid()); - setParent(entity, context); - switch (operation) { case CREATE: processCreateProduct(entity, vertex); @@ -73,6 +71,13 @@ private void processCreateProduct(AtlasEntity entity, AtlasVertex vertex) throws private void processUpdateProduct(AtlasEntity entity, AtlasVertex vertex) throws AtlasBaseException { AtlasPerfMetrics.MetricRecorder metricRecorder = RequestContext.get().startMetricRecord("processUpdateProduct"); String VertexQName = vertex.getProperty(QUALIFIED_NAME, String.class); + String productName = (String) entity.getAttribute(NAME); + String parentDomainQualifiedName = (String) entity.getAttribute(PARENT_DOMAIN_QN); + String productVertexName = vertex.getProperty(NAME, String.class); + + if (!productVertexName.equals(productName)) { + productExists(productName, parentDomainQualifiedName); + } entity.setAttribute(QUALIFIED_NAME, VertexQName); RequestContext.get().endMetricRecord(metricRecorder); @@ -86,57 +91,6 @@ private static String createQualifiedName(String parentDomainQualifiedName) thro } - private void setParent(AtlasEntity entity, EntityMutationContext context) throws AtlasBaseException { - AtlasPerfMetrics.MetricRecorder metricRecorder = RequestContext.get().startMetricRecord("DataProductPreProcessor.setParent"); - if (parentDomain == null) { - Object relationshipAttribute = entity.getRelationshipAttribute(DATA_DOMAIN); - Set attributes = new HashSet<>(Arrays.asList(QUALIFIED_NAME, SUPER_DOMAIN_QN, PARENT_DOMAIN_QN, "__typeName")); - - if(relationshipAttribute instanceof AtlasObjectId){ - AtlasObjectId objectId = (AtlasObjectId) relationshipAttribute; - if (objectId != null) { - if (StringUtils.isNotEmpty(objectId.getGuid())) { - AtlasVertex vertex = entityRetriever.getEntityVertex(objectId.getGuid()); - - if (vertex == null) { - parentDomain = entityRetriever.toAtlasEntityHeader(objectId.getGuid(), attributes); - } else { - parentDomain = entityRetriever.toAtlasEntityHeader(vertex, attributes); - } - - } else if (MapUtils.isNotEmpty(objectId.getUniqueAttributes()) && - StringUtils.isNotEmpty((String) objectId.getUniqueAttributes().get(QUALIFIED_NAME))) { - AtlasVertex parentDomainVertex = entityRetriever.getEntityVertex(objectId); - parentDomain = entityRetriever.toAtlasEntityHeader(parentDomainVertex, attributes); - - } - } - } - else if(relationshipAttribute instanceof Map){ - Map relationshipMap = (Map) relationshipAttribute; - if (StringUtils.isNotEmpty((String) relationshipMap.get("guid"))) { - AtlasVertex vertex = entityRetriever.getEntityVertex((String) relationshipMap.get("guid")); - - if (vertex == null) { - parentDomain = entityRetriever.toAtlasEntityHeader((String) relationshipMap.get("guid"), attributes); - } else { - parentDomain = entityRetriever.toAtlasEntityHeader(vertex, attributes); - } - - } - else { - parentDomain = new AtlasEntityHeader((String) relationshipMap.get("typeName"), relationshipMap); - - } - } - else{ - LOG.warn("DataProductPreProcessor.setParent: Invalid relationshipAttribute {}", relationshipAttribute); - } - - } - RequestContext.get().endMetricRecord(metricRecorder); - } - private void productExists(String productName, String parentDomainQualifiedName) throws AtlasBaseException { AtlasPerfMetrics.MetricRecorder metricRecorder = RequestContext.get().startMetricRecord("domainExists"); @@ -161,7 +115,7 @@ private void productExists(String productName, String parentDomainQualifiedName) Map dsl = mapOf("query", mapOf("bool", bool)); - List products = indexSearchPaginated(dsl, DATA_PRODUCT_ENTITY_TYPE); + List products = indexSearchPaginated(dsl); if (CollectionUtils.isNotEmpty(products)) { for (AtlasEntityHeader product : products) { diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/preprocessor/datamesh/DomainPreProcessor.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/preprocessor/datamesh/DomainPreProcessor.java index 728f09820d..8c1ca3e4f2 100644 --- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/preprocessor/datamesh/DomainPreProcessor.java +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/preprocessor/datamesh/DomainPreProcessor.java @@ -71,8 +71,6 @@ public void processAttributes(AtlasStruct entityStruct, EntityMutationContext co AtlasEntity entity = (AtlasEntity) entityStruct; AtlasVertex vertex = context.getVertex(entity.getGuid()); - setParent(entity, context); - switch (operation) { case CREATE: processCreateDomain(entity, vertex); @@ -107,34 +105,16 @@ private static String createQualifiedName(String parentDomainQualifiedName) { private void processUpdateDomain(AtlasEntity entity, AtlasVertex vertex) throws AtlasBaseException { AtlasPerfMetrics.MetricRecorder metricRecorder = RequestContext.get().startMetricRecord("processUpdateDomain"); String vertexQName = vertex.getProperty(QUALIFIED_NAME, String.class); + String parentDomainQualifiedName = (String) entity.getAttribute(PARENT_DOMAIN_QN); + String domainName = (String) entity.getAttribute(NAME); + String domainVertexName = vertex.getProperty(NAME, String.class); - entity.setAttribute(QUALIFIED_NAME, vertexQName); - - RequestContext.get().endMetricRecord(metricRecorder); - } - - private void setParent(AtlasEntity entity, EntityMutationContext context) throws AtlasBaseException { - AtlasPerfMetrics.MetricRecorder metricRecorder = RequestContext.get().startMetricRecord("DomainPreProcessor.setParent"); - if (parentDomain == null) { - AtlasObjectId objectId = (AtlasObjectId) entity.getRelationshipAttribute(PARENT_DOMAIN); - Set attributes = new HashSet<>(Arrays.asList(QUALIFIED_NAME, SUPER_DOMAIN_QN, PARENT_DOMAIN_QN, "__typeName")); + if (!domainVertexName.equals(domainName)) { + domainExists(domainName, parentDomainQualifiedName); + } - if (objectId != null) { - if (StringUtils.isNotEmpty(objectId.getGuid())) { - AtlasVertex vertex = entityRetriever.getEntityVertex(objectId.getGuid()); + entity.setAttribute(QUALIFIED_NAME, vertexQName); - if (vertex == null) { - parentDomain = entityRetriever.toAtlasEntityHeader(objectId.getGuid(), attributes); - } else { - parentDomain = entityRetriever.toAtlasEntityHeader(vertex, attributes); - } - } else if (MapUtils.isNotEmpty(objectId.getUniqueAttributes()) && - StringUtils.isNotEmpty((String) objectId.getUniqueAttributes().get(QUALIFIED_NAME))) { - AtlasVertex parentDomainVertex = entityRetriever.getEntityVertex(objectId); - parentDomain = entityRetriever.toAtlasEntityHeader(parentDomainVertex, attributes); - } - } - } RequestContext.get().endMetricRecord(metricRecorder); } @@ -150,7 +130,7 @@ private void domainExists(String domainName, String parentDomainQualifiedName) t Map bool = new HashMap<>(); - if (parentDomain != null) { + if (StringUtils.isNotEmpty(parentDomainQualifiedName)) { mustClauseList.add(mapOf("term", mapOf("parentDomainQualifiedName", parentDomainQualifiedName))); } else { List mustNotClauseList = new ArrayList(); @@ -162,7 +142,7 @@ private void domainExists(String domainName, String parentDomainQualifiedName) t Map dsl = mapOf("query", mapOf("bool", bool)); - List domains = indexSearchPaginated(dsl, DATA_DOMAIN_ENTITY_TYPE); + List domains = indexSearchPaginated(dsl); if (CollectionUtils.isNotEmpty(domains)) { for (AtlasEntityHeader domain : domains) { From b055d62bfaf6f23ad6dd59c38c907c22d9df8194 Mon Sep 17 00:00:00 2001 From: PRATHAM2002-DS Date: Wed, 24 Apr 2024 14:03:56 +0530 Subject: [PATCH 04/11] resolved PR comments --- .../datamesh/DataProductPreProcessor.java | 13 ++++++++----- .../preprocessor/datamesh/DomainPreProcessor.java | 10 ++++++---- 2 files changed, 14 insertions(+), 9 deletions(-) diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/preprocessor/datamesh/DataProductPreProcessor.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/preprocessor/datamesh/DataProductPreProcessor.java index 3331c33ba5..005d580caa 100644 --- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/preprocessor/datamesh/DataProductPreProcessor.java +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/preprocessor/datamesh/DataProductPreProcessor.java @@ -12,7 +12,6 @@ import org.apache.atlas.type.AtlasTypeRegistry; import org.apache.atlas.utils.AtlasPerfMetrics; import org.apache.commons.collections.CollectionUtils; -import org.apache.commons.collections.MapUtils; import org.apache.commons.lang.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -24,8 +23,7 @@ import static org.apache.atlas.repository.util.AtlasEntityUtils.mapOf; public class DataProductPreProcessor extends AbstractDomainPreProcessor { - private static final Logger LOG = LoggerFactory.getLogger(DomainPreProcessor.class); - private AtlasEntityHeader parentDomain; + private static final Logger LOG = LoggerFactory.getLogger(DataProductPreProcessor.class); private EntityMutationContext context; public DataProductPreProcessor(AtlasTypeRegistry typeRegistry, EntityGraphRetriever entityRetriever, AtlasGraph graph, EntityGraphMapper entityGraphMapper) { @@ -36,7 +34,7 @@ public DataProductPreProcessor(AtlasTypeRegistry typeRegistry, EntityGraphRetrie public void processAttributes(AtlasStruct entityStruct, EntityMutationContext context, EntityMutations.EntityOperation operation) throws AtlasBaseException { //Handle name & qualifiedName - if (operation == EntityMutations.EntityOperation.CREATE && LOG.isDebugEnabled()) { + if (LOG.isDebugEnabled()) { LOG.debug("DataProductPreProcessor.processAttributes: pre processing {}, {}", entityStruct.getAttribute(QUALIFIED_NAME), operation); } @@ -73,6 +71,11 @@ private void processUpdateProduct(AtlasEntity entity, AtlasVertex vertex) throws String VertexQName = vertex.getProperty(QUALIFIED_NAME, String.class); String productName = (String) entity.getAttribute(NAME); String parentDomainQualifiedName = (String) entity.getAttribute(PARENT_DOMAIN_QN); + + if (StringUtils.isEmpty(parentDomainQualifiedName)){ + parentDomainQualifiedName = vertex.getProperty(PARENT_DOMAIN_QN, String.class); + } + String productVertexName = vertex.getProperty(NAME, String.class); if (!productVertexName.equals(productName)) { @@ -103,7 +106,7 @@ private void productExists(String productName, String parentDomainQualifiedName) Map bool = new HashMap<>(); - if (parentDomain != null) { + if (StringUtils.isNotEmpty(parentDomainQualifiedName)) { mustClauseList.add(mapOf("term", mapOf("parentDomainQualifiedName", parentDomainQualifiedName))); } else { List mustNotClauseList = new ArrayList(); diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/preprocessor/datamesh/DomainPreProcessor.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/preprocessor/datamesh/DomainPreProcessor.java index 8c1ca3e4f2..d89d698ece 100644 --- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/preprocessor/datamesh/DomainPreProcessor.java +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/preprocessor/datamesh/DomainPreProcessor.java @@ -23,7 +23,6 @@ import org.apache.atlas.exception.AtlasBaseException; import org.apache.atlas.model.instance.AtlasEntity; import org.apache.atlas.model.instance.AtlasEntityHeader; -import org.apache.atlas.model.instance.AtlasObjectId; import org.apache.atlas.model.instance.AtlasStruct; import org.apache.atlas.model.instance.EntityMutations; import org.apache.atlas.repository.graphdb.AtlasGraph; @@ -34,7 +33,6 @@ import org.apache.atlas.type.AtlasTypeRegistry; import org.apache.atlas.utils.AtlasPerfMetrics; import org.apache.commons.collections.CollectionUtils; -import org.apache.commons.collections.MapUtils; import org.apache.commons.lang.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -47,7 +45,6 @@ public class DomainPreProcessor extends AbstractDomainPreProcessor { private static final Logger LOG = LoggerFactory.getLogger(DomainPreProcessor.class); - private AtlasEntityHeader parentDomain; private EntityGraphMapper entityGraphMapper; private EntityMutationContext context; @@ -98,7 +95,7 @@ private static String createQualifiedName(String parentDomainQualifiedName) { if (StringUtils.isNotEmpty(parentDomainQualifiedName)) { return parentDomainQualifiedName + "/domain/" + getUUID(); } else{ - return "default/domain" + "/" + getUUID(); + return "default/domain/" + getUUID(); } } @@ -106,6 +103,11 @@ private void processUpdateDomain(AtlasEntity entity, AtlasVertex vertex) throws AtlasPerfMetrics.MetricRecorder metricRecorder = RequestContext.get().startMetricRecord("processUpdateDomain"); String vertexQName = vertex.getProperty(QUALIFIED_NAME, String.class); String parentDomainQualifiedName = (String) entity.getAttribute(PARENT_DOMAIN_QN); + + if (StringUtils.isEmpty(parentDomainQualifiedName)) { + parentDomainQualifiedName = vertex.getProperty(PARENT_DOMAIN_QN, String.class); + } + String domainName = (String) entity.getAttribute(NAME); String domainVertexName = vertex.getProperty(NAME, String.class); From f66041905c6afe5cad05a6e12cc80cd297928846 Mon Sep 17 00:00:00 2001 From: PRATHAM2002-DS Date: Wed, 24 Apr 2024 17:44:36 +0530 Subject: [PATCH 05/11] check for parentDomain for product corrected --- .../graph/v2/preprocessor/datamesh/DataProductPreProcessor.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/preprocessor/datamesh/DataProductPreProcessor.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/preprocessor/datamesh/DataProductPreProcessor.java index 005d580caa..a08ca5a48a 100644 --- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/preprocessor/datamesh/DataProductPreProcessor.java +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/preprocessor/datamesh/DataProductPreProcessor.java @@ -87,7 +87,7 @@ private void processUpdateProduct(AtlasEntity entity, AtlasVertex vertex) throws } private static String createQualifiedName(String parentDomainQualifiedName) throws AtlasBaseException { - if (StringUtils.isNotEmpty(parentDomainQualifiedName)) { + if (StringUtils.isEmpty(parentDomainQualifiedName)) { throw new AtlasBaseException(AtlasErrorCode.BAD_REQUEST, "Parent Domain Qualified Name cannot be empty or null"); } return parentDomainQualifiedName + "/product/" + getUUID(); From 180437177b5709520d5ff37dd30bd83320997717 Mon Sep 17 00:00:00 2001 From: PRATHAM2002-DS Date: Wed, 24 Apr 2024 18:04:55 +0530 Subject: [PATCH 06/11] added type to list --- .../v2/preprocessor/datamesh/DataProductPreProcessor.java | 4 ++-- .../graph/v2/preprocessor/datamesh/DomainPreProcessor.java | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/preprocessor/datamesh/DataProductPreProcessor.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/preprocessor/datamesh/DataProductPreProcessor.java index a08ca5a48a..1a30ea463e 100644 --- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/preprocessor/datamesh/DataProductPreProcessor.java +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/preprocessor/datamesh/DataProductPreProcessor.java @@ -99,7 +99,7 @@ private void productExists(String productName, String parentDomainQualifiedName) boolean exists = false; try { - List mustClauseList = new ArrayList(); + List> mustClauseList = new ArrayList<>(); mustClauseList.add(mapOf("term", mapOf("__typeName.keyword", DATA_PRODUCT_ENTITY_TYPE))); mustClauseList.add(mapOf("term", mapOf("__state", "ACTIVE"))); mustClauseList.add(mapOf("term", mapOf("name.keyword", productName))); @@ -109,7 +109,7 @@ private void productExists(String productName, String parentDomainQualifiedName) if (StringUtils.isNotEmpty(parentDomainQualifiedName)) { mustClauseList.add(mapOf("term", mapOf("parentDomainQualifiedName", parentDomainQualifiedName))); } else { - List mustNotClauseList = new ArrayList(); + List> mustNotClauseList = new ArrayList<>(); mustNotClauseList.add(mapOf("exists", mapOf("field", "parentDomainQualifiedName"))); bool.put("must_not", mustNotClauseList); } diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/preprocessor/datamesh/DomainPreProcessor.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/preprocessor/datamesh/DomainPreProcessor.java index d89d698ece..ae88a83afe 100644 --- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/preprocessor/datamesh/DomainPreProcessor.java +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/preprocessor/datamesh/DomainPreProcessor.java @@ -125,7 +125,7 @@ private void domainExists(String domainName, String parentDomainQualifiedName) t boolean exists = false; try { - List mustClauseList = new ArrayList(); + List> mustClauseList = new ArrayList<>(); mustClauseList.add(mapOf("term", mapOf("__typeName.keyword", DATA_DOMAIN_ENTITY_TYPE))); mustClauseList.add(mapOf("term", mapOf("__state", "ACTIVE"))); mustClauseList.add(mapOf("term", mapOf("name.keyword", domainName))); @@ -135,7 +135,7 @@ private void domainExists(String domainName, String parentDomainQualifiedName) t if (StringUtils.isNotEmpty(parentDomainQualifiedName)) { mustClauseList.add(mapOf("term", mapOf("parentDomainQualifiedName", parentDomainQualifiedName))); } else { - List mustNotClauseList = new ArrayList(); + List> mustNotClauseList = new ArrayList<>(); mustNotClauseList.add(mapOf("exists", mapOf("field", "parentDomainQualifiedName"))); bool.put("must_not", mustNotClauseList); } From a4b11728b3b241c23b8b77f0f4613053c414a6c7 Mon Sep 17 00:00:00 2001 From: PRATHAM2002-DS Date: Wed, 24 Apr 2024 19:47:33 +0530 Subject: [PATCH 07/11] removed AbstractDomainPreProcessor --- .../store/graph/v2/AtlasEntityStoreV2.java | 4 +- .../v2/preprocessor/PreProcessorUtils.java | 41 +++++++++ .../datamesh/AbstractDomainPreProcessor.java | 88 ------------------- .../datamesh/DataProductPreProcessor.java | 21 +++-- .../datamesh/DomainPreProcessor.java | 26 +++--- .../AbstractGlossaryPreProcessor.java | 36 +------- .../glossary/CategoryPreProcessor.java | 4 +- 7 files changed, 74 insertions(+), 146 deletions(-) delete mode 100644 repository/src/main/java/org/apache/atlas/repository/store/graph/v2/preprocessor/datamesh/AbstractDomainPreProcessor.java diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityStoreV2.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityStoreV2.java index 472c29cc46..896ad8bde4 100644 --- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityStoreV2.java +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityStoreV2.java @@ -1803,11 +1803,11 @@ public PreProcessor getPreProcessor(String typeName) { break; case DATA_DOMAIN_ENTITY_TYPE: - preProcessor = new DomainPreProcessor(typeRegistry, entityRetriever, graph, entityGraphMapper); + preProcessor = new DomainPreProcessor(typeRegistry, graph); break; case DATA_PRODUCT_ENTITY_TYPE: - preProcessor = new DataProductPreProcessor(typeRegistry, entityRetriever, graph, entityGraphMapper); + preProcessor = new DataProductPreProcessor(typeRegistry, graph); break; case QUERY_ENTITY_TYPE: diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/preprocessor/PreProcessorUtils.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/preprocessor/PreProcessorUtils.java index 4655dab17c..bd25869e8d 100644 --- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/preprocessor/PreProcessorUtils.java +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/preprocessor/PreProcessorUtils.java @@ -1,7 +1,10 @@ package org.apache.atlas.repository.store.graph.v2.preprocessor; +import org.apache.atlas.discovery.EntityDiscoveryService; import org.apache.atlas.exception.AtlasBaseException; +import org.apache.atlas.model.discovery.IndexSearchParams; import org.apache.atlas.model.instance.AtlasEntity; +import org.apache.atlas.model.instance.AtlasEntityHeader; import org.apache.atlas.model.instance.AtlasObjectId; import org.apache.atlas.repository.graphdb.AtlasVertex; import org.apache.atlas.repository.store.graph.v2.EntityGraphRetriever; @@ -11,13 +14,19 @@ import org.apache.atlas.type.AtlasTypeRegistry; import org.apache.atlas.util.NanoIdUtils; import org.apache.atlas.utils.AtlasEntityUtil; +import org.apache.commons.collections.CollectionUtils; import org.apache.commons.lang.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + import static org.apache.atlas.repository.Constants.QUERY_COLLECTION_ENTITY_TYPE; import static org.apache.atlas.repository.Constants.QUALIFIED_NAME; import static org.apache.atlas.repository.Constants.ENTITY_TYPE_PROPERTY_KEY; +import static org.apache.atlas.repository.util.AtlasEntityUtils.mapOf; public class PreProcessorUtils { private static final Logger LOG = LoggerFactory.getLogger(PreProcessorUtils.class); @@ -113,4 +122,36 @@ public static String updateQueryResourceAttributes(AtlasTypeRegistry typeRegistr return newCollectionQualifiedName; } + + public static List indexSearchPaginated(Map dsl, EntityDiscoveryService discovery) throws AtlasBaseException { + IndexSearchParams searchParams = new IndexSearchParams(); + List ret = new ArrayList<>(); + + List sortList = new ArrayList<>(0); + sortList.add(mapOf("__timestamp", mapOf("order", "asc"))); + sortList.add(mapOf("__guid", mapOf("order", "asc"))); + dsl.put("sort", sortList); + + int from = 0; + int size = 100; + boolean hasMore = true; + do { + dsl.put("from", from); + dsl.put("size", size); + searchParams.setDsl(dsl); + + List headers = discovery.directIndexSearch(searchParams).getEntities(); + + if (CollectionUtils.isNotEmpty(headers)) { + ret.addAll(headers); + } else { + hasMore = false; + } + + from += size; + + } while (hasMore); + + return ret; + } } diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/preprocessor/datamesh/AbstractDomainPreProcessor.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/preprocessor/datamesh/AbstractDomainPreProcessor.java deleted file mode 100644 index 20378a1c50..0000000000 --- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/preprocessor/datamesh/AbstractDomainPreProcessor.java +++ /dev/null @@ -1,88 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.atlas.repository.store.graph.v2.preprocessor.datamesh; - -import org.apache.atlas.AtlasException; -import org.apache.atlas.discovery.EntityDiscoveryService; -import org.apache.atlas.exception.AtlasBaseException; -import org.apache.atlas.model.discovery.IndexSearchParams; -import org.apache.atlas.model.instance.AtlasEntityHeader; -import org.apache.atlas.repository.graphdb.AtlasGraph; -import org.apache.atlas.repository.store.graph.v2.EntityGraphRetriever; -import org.apache.atlas.repository.store.graph.v2.preprocessor.PreProcessor; -import org.apache.atlas.type.AtlasTypeRegistry; -import org.apache.commons.collections.CollectionUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.*; - -import static org.apache.atlas.repository.util.AtlasEntityUtils.mapOf; - -public abstract class AbstractDomainPreProcessor implements PreProcessor { - private static final Logger LOG = LoggerFactory.getLogger(AbstractDomainPreProcessor.class); - - - protected final AtlasTypeRegistry typeRegistry; - protected final EntityGraphRetriever entityRetriever; - - protected EntityDiscoveryService discovery; - - AbstractDomainPreProcessor(AtlasTypeRegistry typeRegistry, EntityGraphRetriever entityRetriever, AtlasGraph graph) { - this.entityRetriever = entityRetriever; - this.typeRegistry = typeRegistry; - - try { - this.discovery = new EntityDiscoveryService(typeRegistry, graph, null, null, null, null); - } catch (AtlasException e) { - e.printStackTrace(); - } - } - - public List indexSearchPaginated(Map dsl) throws AtlasBaseException { - IndexSearchParams searchParams = new IndexSearchParams(); - List ret = new ArrayList<>(); - - List sortList = new ArrayList<>(0); - sortList.add(mapOf("__timestamp", mapOf("order", "asc"))); - sortList.add(mapOf("__guid", mapOf("order", "asc"))); - dsl.put("sort", sortList); - - int from = 0; - int size = 100; - boolean hasMore = true; - do { - dsl.put("from", from); - dsl.put("size", size); - searchParams.setDsl(dsl); - - List headers = discovery.directIndexSearch(searchParams).getEntities(); - - if (CollectionUtils.isNotEmpty(headers)) { - ret.addAll(headers); - } else { - hasMore = false; - } - - from += size; - - } while (hasMore); - - return ret; - } -} \ No newline at end of file diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/preprocessor/datamesh/DataProductPreProcessor.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/preprocessor/datamesh/DataProductPreProcessor.java index 1a30ea463e..e61bd499cd 100644 --- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/preprocessor/datamesh/DataProductPreProcessor.java +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/preprocessor/datamesh/DataProductPreProcessor.java @@ -1,14 +1,15 @@ package org.apache.atlas.repository.store.graph.v2.preprocessor.datamesh; import org.apache.atlas.AtlasErrorCode; +import org.apache.atlas.AtlasException; import org.apache.atlas.RequestContext; +import org.apache.atlas.discovery.EntityDiscoveryService; import org.apache.atlas.exception.AtlasBaseException; import org.apache.atlas.model.instance.*; import org.apache.atlas.repository.graphdb.AtlasGraph; import org.apache.atlas.repository.graphdb.AtlasVertex; -import org.apache.atlas.repository.store.graph.v2.EntityGraphMapper; -import org.apache.atlas.repository.store.graph.v2.EntityGraphRetriever; import org.apache.atlas.repository.store.graph.v2.EntityMutationContext; +import org.apache.atlas.repository.store.graph.v2.preprocessor.PreProcessor; import org.apache.atlas.type.AtlasTypeRegistry; import org.apache.atlas.utils.AtlasPerfMetrics; import org.apache.commons.collections.CollectionUtils; @@ -22,12 +23,15 @@ import static org.apache.atlas.repository.store.graph.v2.preprocessor.PreProcessorUtils.*; import static org.apache.atlas.repository.util.AtlasEntityUtils.mapOf; -public class DataProductPreProcessor extends AbstractDomainPreProcessor { +public class DataProductPreProcessor implements PreProcessor { private static final Logger LOG = LoggerFactory.getLogger(DataProductPreProcessor.class); - private EntityMutationContext context; - public DataProductPreProcessor(AtlasTypeRegistry typeRegistry, EntityGraphRetriever entityRetriever, - AtlasGraph graph, EntityGraphMapper entityGraphMapper) { - super(typeRegistry, entityRetriever, graph); + protected EntityDiscoveryService discovery; + public DataProductPreProcessor(AtlasTypeRegistry typeRegistry, AtlasGraph graph) { + try { + this.discovery = new EntityDiscoveryService(typeRegistry, graph, null, null, null, null); + } catch (AtlasException e) { + e.printStackTrace(); + } } @Override @@ -38,7 +42,6 @@ public void processAttributes(AtlasStruct entityStruct, EntityMutationContext co LOG.debug("DataProductPreProcessor.processAttributes: pre processing {}, {}", entityStruct.getAttribute(QUALIFIED_NAME), operation); } - this.context = context; AtlasEntity entity = (AtlasEntity) entityStruct; AtlasVertex vertex = context.getVertex(entity.getGuid()); @@ -118,7 +121,7 @@ private void productExists(String productName, String parentDomainQualifiedName) Map dsl = mapOf("query", mapOf("bool", bool)); - List products = indexSearchPaginated(dsl); + List products = indexSearchPaginated(dsl, this.discovery); if (CollectionUtils.isNotEmpty(products)) { for (AtlasEntityHeader product : products) { diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/preprocessor/datamesh/DomainPreProcessor.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/preprocessor/datamesh/DomainPreProcessor.java index ae88a83afe..63f086bc43 100644 --- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/preprocessor/datamesh/DomainPreProcessor.java +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/preprocessor/datamesh/DomainPreProcessor.java @@ -19,7 +19,9 @@ import org.apache.atlas.AtlasErrorCode; +import org.apache.atlas.AtlasException; import org.apache.atlas.RequestContext; +import org.apache.atlas.discovery.EntityDiscoveryService; import org.apache.atlas.exception.AtlasBaseException; import org.apache.atlas.model.instance.AtlasEntity; import org.apache.atlas.model.instance.AtlasEntityHeader; @@ -27,9 +29,8 @@ import org.apache.atlas.model.instance.EntityMutations; import org.apache.atlas.repository.graphdb.AtlasGraph; import org.apache.atlas.repository.graphdb.AtlasVertex; -import org.apache.atlas.repository.store.graph.v2.EntityGraphMapper; -import org.apache.atlas.repository.store.graph.v2.EntityGraphRetriever; import org.apache.atlas.repository.store.graph.v2.EntityMutationContext; +import org.apache.atlas.repository.store.graph.v2.preprocessor.PreProcessor; import org.apache.atlas.type.AtlasTypeRegistry; import org.apache.atlas.utils.AtlasPerfMetrics; import org.apache.commons.collections.CollectionUtils; @@ -43,15 +44,18 @@ import static org.apache.atlas.repository.store.graph.v2.preprocessor.PreProcessorUtils.*; import static org.apache.atlas.repository.util.AtlasEntityUtils.mapOf; -public class DomainPreProcessor extends AbstractDomainPreProcessor { +public class DomainPreProcessor implements PreProcessor { private static final Logger LOG = LoggerFactory.getLogger(DomainPreProcessor.class); - private EntityGraphMapper entityGraphMapper; - private EntityMutationContext context; + protected EntityDiscoveryService discovery; + + public DomainPreProcessor(AtlasTypeRegistry typeRegistry, AtlasGraph graph) { + + try { + this.discovery = new EntityDiscoveryService(typeRegistry, graph, null, null, null, null); + } catch (AtlasException e) { + e.printStackTrace(); + } - public DomainPreProcessor(AtlasTypeRegistry typeRegistry, EntityGraphRetriever entityRetriever, - AtlasGraph graph, EntityGraphMapper entityGraphMapper) { - super(typeRegistry, entityRetriever, graph); - this.entityGraphMapper = entityGraphMapper; } @Override @@ -63,8 +67,6 @@ public void processAttributes(AtlasStruct entityStruct, EntityMutationContext co entityStruct.getAttribute(QUALIFIED_NAME), operation); } - this.context = context; - AtlasEntity entity = (AtlasEntity) entityStruct; AtlasVertex vertex = context.getVertex(entity.getGuid()); @@ -144,7 +146,7 @@ private void domainExists(String domainName, String parentDomainQualifiedName) t Map dsl = mapOf("query", mapOf("bool", bool)); - List domains = indexSearchPaginated(dsl); + List domains = indexSearchPaginated(dsl, this.discovery); if (CollectionUtils.isNotEmpty(domains)) { for (AtlasEntityHeader domain : domains) { diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/preprocessor/glossary/AbstractGlossaryPreProcessor.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/preprocessor/glossary/AbstractGlossaryPreProcessor.java index 91950f783c..1d43e3fd59 100644 --- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/preprocessor/glossary/AbstractGlossaryPreProcessor.java +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/preprocessor/glossary/AbstractGlossaryPreProcessor.java @@ -58,6 +58,7 @@ import static org.apache.atlas.repository.Constants.ELASTICSEARCH_PAGINATION_SIZE; import static org.apache.atlas.repository.Constants.NAME; import static org.apache.atlas.repository.Constants.STATE_PROPERTY_KEY; +import static org.apache.atlas.repository.store.graph.v2.preprocessor.PreProcessorUtils.indexSearchPaginated; import static org.apache.atlas.repository.util.AtlasEntityUtils.mapOf; import static org.apache.atlas.type.Constants.MEANINGS_PROPERTY_KEY; import static org.apache.atlas.type.Constants.MEANINGS_TEXT_PROPERTY_KEY; @@ -75,7 +76,6 @@ public abstract class AbstractGlossaryPreProcessor implements PreProcessor { protected final AtlasTypeRegistry typeRegistry; protected final EntityGraphRetriever entityRetriever; protected final TaskManagement taskManagement; - protected EntityDiscoveryService discovery; AbstractGlossaryPreProcessor(AtlasTypeRegistry typeRegistry, EntityGraphRetriever entityRetriever, AtlasGraph graph, TaskManagement taskManagement) { @@ -103,7 +103,7 @@ public void termExists(String termName, String glossaryQName) throws AtlasBaseEx Map dsl = mapOf("query", mapOf("bool", mapOf("must", mustClauseList))); - List terms = indexSearchPaginated(dsl); + List terms = indexSearchPaginated(dsl, this.discovery); if (CollectionUtils.isNotEmpty(terms)) { ret = terms.stream().map(term -> (String) term.getAttribute(NAME)).anyMatch(name -> termName.equals(name)); @@ -137,38 +137,6 @@ public boolean checkEntityTermAssociation(String termQName) throws AtlasBaseExce return entityHeader != null; } - public List indexSearchPaginated(Map dsl) throws AtlasBaseException { - IndexSearchParams searchParams = new IndexSearchParams(); - List ret = new ArrayList<>(); - - List sortList = new ArrayList<>(0); - sortList.add(mapOf("__timestamp", mapOf("order", "asc"))); - sortList.add(mapOf("__guid", mapOf("order", "asc"))); - dsl.put("sort", sortList); - - int from = 0; - int size = 100; - boolean hasMore = true; - do { - dsl.put("from", from); - dsl.put("size", size); - searchParams.setDsl(dsl); - - List headers = discovery.directIndexSearch(searchParams).getEntities(); - - if (CollectionUtils.isNotEmpty(headers)) { - ret.addAll(headers); - } else { - hasMore = false; - } - - from += size; - - } while (hasMore); - - return ret; - } - public void updateMeaningsAttributesInEntitiesOnTermUpdate(String currentTermName, String updatedTermName, String termQName, String updatedTermQName, String termGuid) throws AtlasBaseException { diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/preprocessor/glossary/CategoryPreProcessor.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/preprocessor/glossary/CategoryPreProcessor.java index eb39ff3b1d..69b00449e8 100644 --- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/preprocessor/glossary/CategoryPreProcessor.java +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/preprocessor/glossary/CategoryPreProcessor.java @@ -19,10 +19,12 @@ import org.apache.atlas.AtlasErrorCode; +import org.apache.atlas.AtlasException; import org.apache.atlas.RequestContext; import org.apache.atlas.authorize.AtlasAuthorizationUtils; import org.apache.atlas.authorize.AtlasEntityAccessRequest; import org.apache.atlas.authorize.AtlasPrivilege; +import org.apache.atlas.discovery.EntityDiscoveryService; import org.apache.atlas.exception.AtlasBaseException; import org.apache.atlas.model.instance.AtlasEntity; import org.apache.atlas.model.instance.AtlasEntityHeader; @@ -356,7 +358,7 @@ private void categoryExists(String categoryName, String glossaryQualifiedName) t Map dsl = mapOf("query", mapOf("bool", bool)); - List categories = indexSearchPaginated(dsl); + List categories = indexSearchPaginated(dsl, this.discovery); if (CollectionUtils.isNotEmpty(categories)) { for (AtlasEntityHeader category : categories) { From 1609e4e2f662302a1fe52740e0ded82069566a86 Mon Sep 17 00:00:00 2001 From: PRATHAM2002-DS Date: Wed, 24 Apr 2024 20:05:49 +0530 Subject: [PATCH 08/11] removed unrequired change --- .../v2/preprocessor/glossary/AbstractGlossaryPreProcessor.java | 1 + 1 file changed, 1 insertion(+) diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/preprocessor/glossary/AbstractGlossaryPreProcessor.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/preprocessor/glossary/AbstractGlossaryPreProcessor.java index 1d43e3fd59..103af46fee 100644 --- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/preprocessor/glossary/AbstractGlossaryPreProcessor.java +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/preprocessor/glossary/AbstractGlossaryPreProcessor.java @@ -76,6 +76,7 @@ public abstract class AbstractGlossaryPreProcessor implements PreProcessor { protected final AtlasTypeRegistry typeRegistry; protected final EntityGraphRetriever entityRetriever; protected final TaskManagement taskManagement; + protected EntityDiscoveryService discovery; AbstractGlossaryPreProcessor(AtlasTypeRegistry typeRegistry, EntityGraphRetriever entityRetriever, AtlasGraph graph, TaskManagement taskManagement) { From 7ab9c56e3c43ebbb9ad6fc7c412257175b150944 Mon Sep 17 00:00:00 2001 From: PRATHAM2002-DS Date: Mon, 29 Apr 2024 13:09:29 +0530 Subject: [PATCH 09/11] added custom attribute --- .../v2/preprocessor/datamesh/DataProductPreProcessor.java | 3 +++ .../graph/v2/preprocessor/datamesh/DomainPreProcessor.java | 4 ++++ 2 files changed, 7 insertions(+) diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/preprocessor/datamesh/DataProductPreProcessor.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/preprocessor/datamesh/DataProductPreProcessor.java index e61bd499cd..9956e07fcd 100644 --- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/preprocessor/datamesh/DataProductPreProcessor.java +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/preprocessor/datamesh/DataProductPreProcessor.java @@ -60,11 +60,14 @@ private void processCreateProduct(AtlasEntity entity, AtlasVertex vertex) throws AtlasPerfMetrics.MetricRecorder metricRecorder = RequestContext.get().startMetricRecord("processCreateProduct"); String productName = (String) entity.getAttribute(NAME); String parentDomainQualifiedName = (String) entity.getAttribute(PARENT_DOMAIN_QN); + Map customAttributes = new HashMap<>(); + customAttributes.put("isQualifiedNameMigrated", "true"); productExists(productName, parentDomainQualifiedName); String newQualifiedName = createQualifiedName(parentDomainQualifiedName); entity.setAttribute(QUALIFIED_NAME, newQualifiedName); + entity.setCustomAttributes(customAttributes); RequestContext.get().endMetricRecord(metricRecorder); } diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/preprocessor/datamesh/DomainPreProcessor.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/preprocessor/datamesh/DomainPreProcessor.java index 63f086bc43..b777978d37 100644 --- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/preprocessor/datamesh/DomainPreProcessor.java +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/preprocessor/datamesh/DomainPreProcessor.java @@ -86,9 +86,13 @@ private void processCreateDomain(AtlasEntity entity, AtlasVertex vertex) throws AtlasPerfMetrics.MetricRecorder metricRecorder = RequestContext.get().startMetricRecord("processCreateDomain"); String domainName = (String) entity.getAttribute(NAME); String parentDomainQualifiedName = (String) entity.getAttribute(PARENT_DOMAIN_QN); + Map customAttributes = new HashMap<>(); + customAttributes.put("isQualifiedNameMigrated", "true"); domainExists(domainName, parentDomainQualifiedName); entity.setAttribute(QUALIFIED_NAME, createQualifiedName(parentDomainQualifiedName)); + entity.setCustomAttributes(customAttributes); + RequestContext.get().endMetricRecord(metricRecorder); } From c354ede4359429e7eaa99d3ae6e70919618ada4e Mon Sep 17 00:00:00 2001 From: PRATHAM2002-DS Date: Tue, 30 Apr 2024 11:26:25 +0530 Subject: [PATCH 10/11] created a static variable for customAttribute --- .../store/graph/v2/preprocessor/PreProcessorUtils.java | 4 +--- .../v2/preprocessor/datamesh/DataProductPreProcessor.java | 2 +- .../graph/v2/preprocessor/datamesh/DomainPreProcessor.java | 2 +- 3 files changed, 3 insertions(+), 5 deletions(-) diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/preprocessor/PreProcessorUtils.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/preprocessor/PreProcessorUtils.java index bd25869e8d..7085139d27 100644 --- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/preprocessor/PreProcessorUtils.java +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/preprocessor/PreProcessorUtils.java @@ -42,10 +42,8 @@ public class PreProcessorUtils { public static final String GLOSSARY_CATEGORY_REL_TYPE = "AtlasGlossaryCategoryAnchor"; //DataMesh models constants - public static final String PARENT_DOMAIN = "parentDomain"; public static final String PARENT_DOMAIN_QN = "parentDomainQualifiedName"; - public static final String SUPER_DOMAIN_QN = "superDomainQualifiedName"; - public static final String DATA_DOMAIN = "dataDomain"; + public static final String MIGRATION_CUSTOM_ATTRIBUTE = "isQualifiedNameMigrated"; //Query models constants public static final String PREFIX_QUERY_QN = "default/collection/"; diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/preprocessor/datamesh/DataProductPreProcessor.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/preprocessor/datamesh/DataProductPreProcessor.java index 9956e07fcd..6865806184 100644 --- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/preprocessor/datamesh/DataProductPreProcessor.java +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/preprocessor/datamesh/DataProductPreProcessor.java @@ -61,7 +61,7 @@ private void processCreateProduct(AtlasEntity entity, AtlasVertex vertex) throws String productName = (String) entity.getAttribute(NAME); String parentDomainQualifiedName = (String) entity.getAttribute(PARENT_DOMAIN_QN); Map customAttributes = new HashMap<>(); - customAttributes.put("isQualifiedNameMigrated", "true"); + customAttributes.put(MIGRATION_CUSTOM_ATTRIBUTE, "true"); productExists(productName, parentDomainQualifiedName); String newQualifiedName = createQualifiedName(parentDomainQualifiedName); diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/preprocessor/datamesh/DomainPreProcessor.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/preprocessor/datamesh/DomainPreProcessor.java index b777978d37..dac1be3c2c 100644 --- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/preprocessor/datamesh/DomainPreProcessor.java +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/preprocessor/datamesh/DomainPreProcessor.java @@ -87,7 +87,7 @@ private void processCreateDomain(AtlasEntity entity, AtlasVertex vertex) throws String domainName = (String) entity.getAttribute(NAME); String parentDomainQualifiedName = (String) entity.getAttribute(PARENT_DOMAIN_QN); Map customAttributes = new HashMap<>(); - customAttributes.put("isQualifiedNameMigrated", "true"); + customAttributes.put(MIGRATION_CUSTOM_ATTRIBUTE, "true"); domainExists(domainName, parentDomainQualifiedName); entity.setAttribute(QUALIFIED_NAME, createQualifiedName(parentDomainQualifiedName)); From 1db1446f0ec857e70180fd0d296a727ae825d956 Mon Sep 17 00:00:00 2001 From: hr2904 Date: Tue, 7 May 2024 10:19:10 +0530 Subject: [PATCH 11/11] Added "/super" delimiter to super domain QN. --- .../graph/v2/preprocessor/datamesh/DomainPreProcessor.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/preprocessor/datamesh/DomainPreProcessor.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/preprocessor/datamesh/DomainPreProcessor.java index dac1be3c2c..3ac2888332 100644 --- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/preprocessor/datamesh/DomainPreProcessor.java +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/preprocessor/datamesh/DomainPreProcessor.java @@ -101,7 +101,7 @@ private static String createQualifiedName(String parentDomainQualifiedName) { if (StringUtils.isNotEmpty(parentDomainQualifiedName)) { return parentDomainQualifiedName + "/domain/" + getUUID(); } else{ - return "default/domain/" + getUUID(); + return "default/domain/" + getUUID() + "/super"; } }