diff --git a/common/src/main/java/org/apache/atlas/repository/Constants.java b/common/src/main/java/org/apache/atlas/repository/Constants.java index 8d4d47ea57..259cde132f 100644 --- a/common/src/main/java/org/apache/atlas/repository/Constants.java +++ b/common/src/main/java/org/apache/atlas/repository/Constants.java @@ -134,6 +134,13 @@ public final class Constants { public static final String GLOSSARY_CATEGORY_EDGE_LABEL = "r:AtlasGlossaryCategoryAnchor"; + /** + * MESH property keys. + */ + public static final String DATA_DOMAIN_ENTITY_TYPE = "DataDomain"; + public static final String DATA_PRODUCT_ENTITY_TYPE = "DataProduct"; + + /** * SQL property keys. */ diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityStoreV2.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityStoreV2.java index 6e3487f8d3..896ad8bde4 100644 --- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityStoreV2.java +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityStoreV2.java @@ -62,6 +62,8 @@ import org.apache.atlas.repository.store.graph.v2.preprocessor.PreProcessor; import org.apache.atlas.repository.store.graph.v2.preprocessor.accesscontrol.PersonaPreProcessor; import org.apache.atlas.repository.store.graph.v2.preprocessor.accesscontrol.PurposePreProcessor; +import org.apache.atlas.repository.store.graph.v2.preprocessor.datamesh.DataProductPreProcessor; +import org.apache.atlas.repository.store.graph.v2.preprocessor.datamesh.DomainPreProcessor; import org.apache.atlas.repository.store.graph.v2.preprocessor.glossary.CategoryPreProcessor; import org.apache.atlas.repository.store.graph.v2.preprocessor.glossary.GlossaryPreProcessor; import org.apache.atlas.repository.store.graph.v2.preprocessor.glossary.TermPreProcessor; @@ -1800,6 +1802,14 @@ public PreProcessor getPreProcessor(String typeName) { preProcessor = new CategoryPreProcessor(typeRegistry, entityRetriever, graph, taskManagement, entityGraphMapper); break; + case DATA_DOMAIN_ENTITY_TYPE: + preProcessor = new DomainPreProcessor(typeRegistry, graph); + break; + + case DATA_PRODUCT_ENTITY_TYPE: + preProcessor = new DataProductPreProcessor(typeRegistry, graph); + break; + case QUERY_ENTITY_TYPE: preProcessor = new QueryPreProcessor(typeRegistry, entityRetriever); break; diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/preprocessor/PreProcessorUtils.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/preprocessor/PreProcessorUtils.java index 6c84900460..7085139d27 100644 --- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/preprocessor/PreProcessorUtils.java +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/preprocessor/PreProcessorUtils.java @@ -1,7 +1,10 @@ package org.apache.atlas.repository.store.graph.v2.preprocessor; +import org.apache.atlas.discovery.EntityDiscoveryService; import org.apache.atlas.exception.AtlasBaseException; +import org.apache.atlas.model.discovery.IndexSearchParams; import org.apache.atlas.model.instance.AtlasEntity; +import org.apache.atlas.model.instance.AtlasEntityHeader; import org.apache.atlas.model.instance.AtlasObjectId; import org.apache.atlas.repository.graphdb.AtlasVertex; import org.apache.atlas.repository.store.graph.v2.EntityGraphRetriever; @@ -11,13 +14,19 @@ import org.apache.atlas.type.AtlasTypeRegistry; import org.apache.atlas.util.NanoIdUtils; import org.apache.atlas.utils.AtlasEntityUtil; +import org.apache.commons.collections.CollectionUtils; import org.apache.commons.lang.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + import static org.apache.atlas.repository.Constants.QUERY_COLLECTION_ENTITY_TYPE; import static org.apache.atlas.repository.Constants.QUALIFIED_NAME; import static org.apache.atlas.repository.Constants.ENTITY_TYPE_PROPERTY_KEY; +import static org.apache.atlas.repository.util.AtlasEntityUtils.mapOf; public class PreProcessorUtils { private static final Logger LOG = LoggerFactory.getLogger(PreProcessorUtils.class); @@ -32,6 +41,10 @@ public class PreProcessorUtils { public static final String GLOSSARY_TERM_REL_TYPE = "AtlasGlossaryTermAnchor"; public static final String GLOSSARY_CATEGORY_REL_TYPE = "AtlasGlossaryCategoryAnchor"; + //DataMesh models constants + public static final String PARENT_DOMAIN_QN = "parentDomainQualifiedName"; + public static final String MIGRATION_CUSTOM_ATTRIBUTE = "isQualifiedNameMigrated"; + //Query models constants public static final String PREFIX_QUERY_QN = "default/collection/"; public static final String COLLECTION_QUALIFIED_NAME = "collectionQualifiedName"; @@ -107,4 +120,36 @@ public static String updateQueryResourceAttributes(AtlasTypeRegistry typeRegistr return newCollectionQualifiedName; } + + public static List indexSearchPaginated(Map dsl, EntityDiscoveryService discovery) throws AtlasBaseException { + IndexSearchParams searchParams = new IndexSearchParams(); + List ret = new ArrayList<>(); + + List sortList = new ArrayList<>(0); + sortList.add(mapOf("__timestamp", mapOf("order", "asc"))); + sortList.add(mapOf("__guid", mapOf("order", "asc"))); + dsl.put("sort", sortList); + + int from = 0; + int size = 100; + boolean hasMore = true; + do { + dsl.put("from", from); + dsl.put("size", size); + searchParams.setDsl(dsl); + + List headers = discovery.directIndexSearch(searchParams).getEntities(); + + if (CollectionUtils.isNotEmpty(headers)) { + ret.addAll(headers); + } else { + hasMore = false; + } + + from += size; + + } while (hasMore); + + return ret; + } } diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/preprocessor/datamesh/DataProductPreProcessor.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/preprocessor/datamesh/DataProductPreProcessor.java new file mode 100644 index 0000000000..6865806184 --- /dev/null +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/preprocessor/datamesh/DataProductPreProcessor.java @@ -0,0 +1,147 @@ +package org.apache.atlas.repository.store.graph.v2.preprocessor.datamesh; + +import org.apache.atlas.AtlasErrorCode; +import org.apache.atlas.AtlasException; +import org.apache.atlas.RequestContext; +import org.apache.atlas.discovery.EntityDiscoveryService; +import org.apache.atlas.exception.AtlasBaseException; +import org.apache.atlas.model.instance.*; +import org.apache.atlas.repository.graphdb.AtlasGraph; +import org.apache.atlas.repository.graphdb.AtlasVertex; +import org.apache.atlas.repository.store.graph.v2.EntityMutationContext; +import org.apache.atlas.repository.store.graph.v2.preprocessor.PreProcessor; +import org.apache.atlas.type.AtlasTypeRegistry; +import org.apache.atlas.utils.AtlasPerfMetrics; +import org.apache.commons.collections.CollectionUtils; +import org.apache.commons.lang.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.*; + +import static org.apache.atlas.repository.Constants.*; +import static org.apache.atlas.repository.store.graph.v2.preprocessor.PreProcessorUtils.*; +import static org.apache.atlas.repository.util.AtlasEntityUtils.mapOf; + +public class DataProductPreProcessor implements PreProcessor { + private static final Logger LOG = LoggerFactory.getLogger(DataProductPreProcessor.class); + protected EntityDiscoveryService discovery; + public DataProductPreProcessor(AtlasTypeRegistry typeRegistry, AtlasGraph graph) { + try { + this.discovery = new EntityDiscoveryService(typeRegistry, graph, null, null, null, null); + } catch (AtlasException e) { + e.printStackTrace(); + } + } + + @Override + public void processAttributes(AtlasStruct entityStruct, EntityMutationContext context, + EntityMutations.EntityOperation operation) throws AtlasBaseException { + //Handle name & qualifiedName + if (LOG.isDebugEnabled()) { + LOG.debug("DataProductPreProcessor.processAttributes: pre processing {}, {}", + entityStruct.getAttribute(QUALIFIED_NAME), operation); + } + + AtlasEntity entity = (AtlasEntity) entityStruct; + AtlasVertex vertex = context.getVertex(entity.getGuid()); + + switch (operation) { + case CREATE: + processCreateProduct(entity, vertex); + break; + case UPDATE: + processUpdateProduct(entity, vertex); + break; + } + } + + private void processCreateProduct(AtlasEntity entity, AtlasVertex vertex) throws AtlasBaseException { + AtlasPerfMetrics.MetricRecorder metricRecorder = RequestContext.get().startMetricRecord("processCreateProduct"); + String productName = (String) entity.getAttribute(NAME); + String parentDomainQualifiedName = (String) entity.getAttribute(PARENT_DOMAIN_QN); + Map customAttributes = new HashMap<>(); + customAttributes.put(MIGRATION_CUSTOM_ATTRIBUTE, "true"); + + productExists(productName, parentDomainQualifiedName); + String newQualifiedName = createQualifiedName(parentDomainQualifiedName); + + entity.setAttribute(QUALIFIED_NAME, newQualifiedName); + entity.setCustomAttributes(customAttributes); + + RequestContext.get().endMetricRecord(metricRecorder); + } + + private void processUpdateProduct(AtlasEntity entity, AtlasVertex vertex) throws AtlasBaseException { + AtlasPerfMetrics.MetricRecorder metricRecorder = RequestContext.get().startMetricRecord("processUpdateProduct"); + String VertexQName = vertex.getProperty(QUALIFIED_NAME, String.class); + String productName = (String) entity.getAttribute(NAME); + String parentDomainQualifiedName = (String) entity.getAttribute(PARENT_DOMAIN_QN); + + if (StringUtils.isEmpty(parentDomainQualifiedName)){ + parentDomainQualifiedName = vertex.getProperty(PARENT_DOMAIN_QN, String.class); + } + + String productVertexName = vertex.getProperty(NAME, String.class); + + if (!productVertexName.equals(productName)) { + productExists(productName, parentDomainQualifiedName); + } + + entity.setAttribute(QUALIFIED_NAME, VertexQName); + RequestContext.get().endMetricRecord(metricRecorder); + } + + private static String createQualifiedName(String parentDomainQualifiedName) throws AtlasBaseException { + if (StringUtils.isEmpty(parentDomainQualifiedName)) { + throw new AtlasBaseException(AtlasErrorCode.BAD_REQUEST, "Parent Domain Qualified Name cannot be empty or null"); + } + return parentDomainQualifiedName + "/product/" + getUUID(); + + } + + private void productExists(String productName, String parentDomainQualifiedName) throws AtlasBaseException { + AtlasPerfMetrics.MetricRecorder metricRecorder = RequestContext.get().startMetricRecord("domainExists"); + + boolean exists = false; + try { + List> mustClauseList = new ArrayList<>(); + mustClauseList.add(mapOf("term", mapOf("__typeName.keyword", DATA_PRODUCT_ENTITY_TYPE))); + mustClauseList.add(mapOf("term", mapOf("__state", "ACTIVE"))); + mustClauseList.add(mapOf("term", mapOf("name.keyword", productName))); + + + Map bool = new HashMap<>(); + if (StringUtils.isNotEmpty(parentDomainQualifiedName)) { + mustClauseList.add(mapOf("term", mapOf("parentDomainQualifiedName", parentDomainQualifiedName))); + } else { + List> mustNotClauseList = new ArrayList<>(); + mustNotClauseList.add(mapOf("exists", mapOf("field", "parentDomainQualifiedName"))); + bool.put("must_not", mustNotClauseList); + } + + bool.put("must", mustClauseList); + + Map dsl = mapOf("query", mapOf("bool", bool)); + + List products = indexSearchPaginated(dsl, this.discovery); + + if (CollectionUtils.isNotEmpty(products)) { + for (AtlasEntityHeader product : products) { + String name = (String) product.getAttribute(NAME); + if (productName.equals(name)) { + exists = true; + break; + } + } + } + } finally { + RequestContext.get().endMetricRecord(metricRecorder); + } + + if (exists) { + throw new AtlasBaseException(AtlasErrorCode.BAD_REQUEST, productName+" already exists in the domain"); + } + } + +} \ No newline at end of file diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/preprocessor/datamesh/DomainPreProcessor.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/preprocessor/datamesh/DomainPreProcessor.java new file mode 100644 index 0000000000..3ac2888332 --- /dev/null +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/preprocessor/datamesh/DomainPreProcessor.java @@ -0,0 +1,172 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.atlas.repository.store.graph.v2.preprocessor.datamesh; + + +import org.apache.atlas.AtlasErrorCode; +import org.apache.atlas.AtlasException; +import org.apache.atlas.RequestContext; +import org.apache.atlas.discovery.EntityDiscoveryService; +import org.apache.atlas.exception.AtlasBaseException; +import org.apache.atlas.model.instance.AtlasEntity; +import org.apache.atlas.model.instance.AtlasEntityHeader; +import org.apache.atlas.model.instance.AtlasStruct; +import org.apache.atlas.model.instance.EntityMutations; +import org.apache.atlas.repository.graphdb.AtlasGraph; +import org.apache.atlas.repository.graphdb.AtlasVertex; +import org.apache.atlas.repository.store.graph.v2.EntityMutationContext; +import org.apache.atlas.repository.store.graph.v2.preprocessor.PreProcessor; +import org.apache.atlas.type.AtlasTypeRegistry; +import org.apache.atlas.utils.AtlasPerfMetrics; +import org.apache.commons.collections.CollectionUtils; +import org.apache.commons.lang.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.*; + +import static org.apache.atlas.repository.Constants.*; +import static org.apache.atlas.repository.store.graph.v2.preprocessor.PreProcessorUtils.*; +import static org.apache.atlas.repository.util.AtlasEntityUtils.mapOf; + +public class DomainPreProcessor implements PreProcessor { + private static final Logger LOG = LoggerFactory.getLogger(DomainPreProcessor.class); + protected EntityDiscoveryService discovery; + + public DomainPreProcessor(AtlasTypeRegistry typeRegistry, AtlasGraph graph) { + + try { + this.discovery = new EntityDiscoveryService(typeRegistry, graph, null, null, null, null); + } catch (AtlasException e) { + e.printStackTrace(); + } + + } + + @Override + public void processAttributes(AtlasStruct entityStruct, EntityMutationContext context, + EntityMutations.EntityOperation operation) throws AtlasBaseException { + //Handle name & qualifiedName + if (LOG.isDebugEnabled()) { + LOG.debug("DomainPreProcessor.processAttributes: pre processing {}, {}", + entityStruct.getAttribute(QUALIFIED_NAME), operation); + } + + AtlasEntity entity = (AtlasEntity) entityStruct; + AtlasVertex vertex = context.getVertex(entity.getGuid()); + + switch (operation) { + case CREATE: + processCreateDomain(entity, vertex); + break; + case UPDATE: + processUpdateDomain(entity, vertex); + break; + default: + break; + } + } + + private void processCreateDomain(AtlasEntity entity, AtlasVertex vertex) throws AtlasBaseException { + AtlasPerfMetrics.MetricRecorder metricRecorder = RequestContext.get().startMetricRecord("processCreateDomain"); + String domainName = (String) entity.getAttribute(NAME); + String parentDomainQualifiedName = (String) entity.getAttribute(PARENT_DOMAIN_QN); + Map customAttributes = new HashMap<>(); + customAttributes.put(MIGRATION_CUSTOM_ATTRIBUTE, "true"); + + domainExists(domainName, parentDomainQualifiedName); + entity.setAttribute(QUALIFIED_NAME, createQualifiedName(parentDomainQualifiedName)); + entity.setCustomAttributes(customAttributes); + + + RequestContext.get().endMetricRecord(metricRecorder); + } + + private static String createQualifiedName(String parentDomainQualifiedName) { + if (StringUtils.isNotEmpty(parentDomainQualifiedName)) { + return parentDomainQualifiedName + "/domain/" + getUUID(); + } else{ + return "default/domain/" + getUUID() + "/super"; + } + } + + private void processUpdateDomain(AtlasEntity entity, AtlasVertex vertex) throws AtlasBaseException { + AtlasPerfMetrics.MetricRecorder metricRecorder = RequestContext.get().startMetricRecord("processUpdateDomain"); + String vertexQName = vertex.getProperty(QUALIFIED_NAME, String.class); + String parentDomainQualifiedName = (String) entity.getAttribute(PARENT_DOMAIN_QN); + + if (StringUtils.isEmpty(parentDomainQualifiedName)) { + parentDomainQualifiedName = vertex.getProperty(PARENT_DOMAIN_QN, String.class); + } + + String domainName = (String) entity.getAttribute(NAME); + String domainVertexName = vertex.getProperty(NAME, String.class); + + if (!domainVertexName.equals(domainName)) { + domainExists(domainName, parentDomainQualifiedName); + } + + entity.setAttribute(QUALIFIED_NAME, vertexQName); + + RequestContext.get().endMetricRecord(metricRecorder); + } + + private void domainExists(String domainName, String parentDomainQualifiedName) throws AtlasBaseException { + AtlasPerfMetrics.MetricRecorder metricRecorder = RequestContext.get().startMetricRecord("domainExists"); + + boolean exists = false; + try { + List> mustClauseList = new ArrayList<>(); + mustClauseList.add(mapOf("term", mapOf("__typeName.keyword", DATA_DOMAIN_ENTITY_TYPE))); + mustClauseList.add(mapOf("term", mapOf("__state", "ACTIVE"))); + mustClauseList.add(mapOf("term", mapOf("name.keyword", domainName))); + + + Map bool = new HashMap<>(); + if (StringUtils.isNotEmpty(parentDomainQualifiedName)) { + mustClauseList.add(mapOf("term", mapOf("parentDomainQualifiedName", parentDomainQualifiedName))); + } else { + List> mustNotClauseList = new ArrayList<>(); + mustNotClauseList.add(mapOf("exists", mapOf("field", "parentDomainQualifiedName"))); + bool.put("must_not", mustNotClauseList); + } + + bool.put("must", mustClauseList); + + Map dsl = mapOf("query", mapOf("bool", bool)); + + List domains = indexSearchPaginated(dsl, this.discovery); + + if (CollectionUtils.isNotEmpty(domains)) { + for (AtlasEntityHeader domain : domains) { + String name = (String) domain.getAttribute(NAME); + if (domainName.equals(name)) { + exists = true; + break; + } + } + } + } finally { + RequestContext.get().endMetricRecord(metricRecorder); + } + + if (exists) { + throw new AtlasBaseException(AtlasErrorCode.BAD_REQUEST, domainName+" already exists"); + } + } +} \ No newline at end of file diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/preprocessor/glossary/AbstractGlossaryPreProcessor.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/preprocessor/glossary/AbstractGlossaryPreProcessor.java index 91950f783c..103af46fee 100644 --- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/preprocessor/glossary/AbstractGlossaryPreProcessor.java +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/preprocessor/glossary/AbstractGlossaryPreProcessor.java @@ -58,6 +58,7 @@ import static org.apache.atlas.repository.Constants.ELASTICSEARCH_PAGINATION_SIZE; import static org.apache.atlas.repository.Constants.NAME; import static org.apache.atlas.repository.Constants.STATE_PROPERTY_KEY; +import static org.apache.atlas.repository.store.graph.v2.preprocessor.PreProcessorUtils.indexSearchPaginated; import static org.apache.atlas.repository.util.AtlasEntityUtils.mapOf; import static org.apache.atlas.type.Constants.MEANINGS_PROPERTY_KEY; import static org.apache.atlas.type.Constants.MEANINGS_TEXT_PROPERTY_KEY; @@ -103,7 +104,7 @@ public void termExists(String termName, String glossaryQName) throws AtlasBaseEx Map dsl = mapOf("query", mapOf("bool", mapOf("must", mustClauseList))); - List terms = indexSearchPaginated(dsl); + List terms = indexSearchPaginated(dsl, this.discovery); if (CollectionUtils.isNotEmpty(terms)) { ret = terms.stream().map(term -> (String) term.getAttribute(NAME)).anyMatch(name -> termName.equals(name)); @@ -137,38 +138,6 @@ public boolean checkEntityTermAssociation(String termQName) throws AtlasBaseExce return entityHeader != null; } - public List indexSearchPaginated(Map dsl) throws AtlasBaseException { - IndexSearchParams searchParams = new IndexSearchParams(); - List ret = new ArrayList<>(); - - List sortList = new ArrayList<>(0); - sortList.add(mapOf("__timestamp", mapOf("order", "asc"))); - sortList.add(mapOf("__guid", mapOf("order", "asc"))); - dsl.put("sort", sortList); - - int from = 0; - int size = 100; - boolean hasMore = true; - do { - dsl.put("from", from); - dsl.put("size", size); - searchParams.setDsl(dsl); - - List headers = discovery.directIndexSearch(searchParams).getEntities(); - - if (CollectionUtils.isNotEmpty(headers)) { - ret.addAll(headers); - } else { - hasMore = false; - } - - from += size; - - } while (hasMore); - - return ret; - } - public void updateMeaningsAttributesInEntitiesOnTermUpdate(String currentTermName, String updatedTermName, String termQName, String updatedTermQName, String termGuid) throws AtlasBaseException { diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/preprocessor/glossary/CategoryPreProcessor.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/preprocessor/glossary/CategoryPreProcessor.java index eb39ff3b1d..69b00449e8 100644 --- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/preprocessor/glossary/CategoryPreProcessor.java +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/preprocessor/glossary/CategoryPreProcessor.java @@ -19,10 +19,12 @@ import org.apache.atlas.AtlasErrorCode; +import org.apache.atlas.AtlasException; import org.apache.atlas.RequestContext; import org.apache.atlas.authorize.AtlasAuthorizationUtils; import org.apache.atlas.authorize.AtlasEntityAccessRequest; import org.apache.atlas.authorize.AtlasPrivilege; +import org.apache.atlas.discovery.EntityDiscoveryService; import org.apache.atlas.exception.AtlasBaseException; import org.apache.atlas.model.instance.AtlasEntity; import org.apache.atlas.model.instance.AtlasEntityHeader; @@ -356,7 +358,7 @@ private void categoryExists(String categoryName, String glossaryQualifiedName) t Map dsl = mapOf("query", mapOf("bool", bool)); - List categories = indexSearchPaginated(dsl); + List categories = indexSearchPaginated(dsl, this.discovery); if (CollectionUtils.isNotEmpty(categories)) { for (AtlasEntityHeader category : categories) {