diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/preprocessor/datamesh/AbstractDomainPreProcessor.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/preprocessor/datamesh/AbstractDomainPreProcessor.java index d92bd688f5..9ff2ed5d07 100644 --- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/preprocessor/datamesh/AbstractDomainPreProcessor.java +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/preprocessor/datamesh/AbstractDomainPreProcessor.java @@ -30,6 +30,7 @@ import org.apache.atlas.repository.graphdb.AtlasGraph; import org.apache.atlas.repository.graphdb.AtlasVertex; import org.apache.atlas.repository.store.graph.v2.EntityGraphRetriever; +import org.apache.atlas.repository.store.graph.v2.EntityMutationContext; import org.apache.atlas.repository.store.graph.v2.preprocessor.PreProcessor; import org.apache.atlas.repository.store.graph.v2.preprocessor.glossary.AbstractGlossaryPreProcessor; import org.apache.atlas.type.AtlasEntityType; @@ -42,14 +43,12 @@ import java.util.*; import static org.apache.atlas.repository.Constants.*; -import static org.apache.atlas.repository.store.graph.v2.preprocessor.PreProcessorUtils.PARENT_DOMAIN_QN; -import static org.apache.atlas.repository.store.graph.v2.preprocessor.PreProcessorUtils.SUPER_DOMAIN_QN; import static org.apache.atlas.repository.util.AccessControlUtils.ATTR_POLICY_CATEGORY; import static org.apache.atlas.repository.util.AccessControlUtils.ATTR_POLICY_RESOURCES; import static org.apache.atlas.repository.util.AtlasEntityUtils.mapOf; public abstract class AbstractDomainPreProcessor implements PreProcessor { - private static final Logger LOG = LoggerFactory.getLogger(AbstractGlossaryPreProcessor.class); + private static final Logger LOG = LoggerFactory.getLogger(AbstractDomainPreProcessor.class); protected final AtlasTypeRegistry typeRegistry; @@ -129,6 +128,63 @@ protected void isAuthorized(AtlasEntityHeader sourceDomain, AtlasEntityHeader ta "delete on target Domain: ", targetDomain.getAttribute(NAME)); } + protected void updatePolicy(String currentQualifiedName, String updatedQualifiedName, EntityMutationContext context) throws AtlasBaseException { + AtlasPerfMetrics.MetricRecorder metricRecorder = RequestContext.get().startMetricRecord("updateDomainPolicy"); + try { + LOG.info("Updating policy for entity {}", currentQualifiedName); + Map updatedpolicyResources = new HashMap<>(); + + String currentResource = "entity:"+ currentQualifiedName; + String updatedResource = "entity:"+ updatedQualifiedName; + + updatedpolicyResources.put(currentResource, updatedResource); + + List policies = getPolicy(currentResource); + if (CollectionUtils.isNotEmpty(policies)) { + AtlasEntityType entityType = typeRegistry.getEntityTypeByName(POLICY_ENTITY_TYPE); + for (AtlasEntityHeader policy : policies) { + LOG.info("Updating policy {} for entity {}", policy.getGuid(), currentQualifiedName); + AtlasEntity policyEntity = entityRetriever.toAtlasEntity(policy.getGuid()); + List policyResources = (List) policyEntity.getAttribute(ATTR_POLICY_RESOURCES); + LOG.info("Policy resources {}", policyResources); + policyResources.remove(currentResource); + policyResources.add(updatedResource); + AtlasVertex policyVertex = context.getVertex(policy.getGuid()); + LOG.info("Policy Vertex {}", policyVertex); + LOG.info("Context {}", context); + policyVertex.removeProperty(ATTR_POLICY_RESOURCES); + policyEntity.setAttribute(ATTR_POLICY_RESOURCES, policyResources); + context.addUpdated(policyEntity.getGuid(), policyEntity, entityType, policyVertex); + } + } + + }finally { + RequestContext.get().endMetricRecord(metricRecorder); + } + + } + + protected List getPolicy(String resource) throws AtlasBaseException { + AtlasPerfMetrics.MetricRecorder metricRecorder = RequestContext.get().startMetricRecord("getPolicy"); + try { + List mustClauseList = new ArrayList(); + mustClauseList.add(mapOf("term", mapOf("__typeName.keyword", POLICY_ENTITY_TYPE))); + mustClauseList.add(mapOf("term", mapOf("__state", "ACTIVE"))); + mustClauseList.add(mapOf("terms", mapOf("policyResources", Arrays.asList(resource)))); + + Map bool = new HashMap<>(); + bool.put("must", mustClauseList); + + Map dsl = mapOf("query", mapOf("bool", bool)); + + List policies = indexSearchPaginated(dsl, POLICY_ENTITY_TYPE); + + return policies; + } finally { + RequestContext.get().endMetricRecord(metricRecorder); + } + } + /** * Record the updated child entities, it will be used to send notification and store audit logs * @param entityVertex Child entity vertex diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/preprocessor/datamesh/DataProductPreProcessor.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/preprocessor/datamesh/DataProductPreProcessor.java index ea7905ae8e..14b0b6d0fa 100644 --- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/preprocessor/datamesh/DataProductPreProcessor.java +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/preprocessor/datamesh/DataProductPreProcessor.java @@ -22,7 +22,6 @@ import static org.apache.atlas.repository.Constants.*; import static org.apache.atlas.repository.store.graph.v2.preprocessor.PreProcessorUtils.*; -import static org.apache.atlas.repository.util.AccessControlUtils.ATTR_POLICY_RESOURCES; import static org.apache.atlas.repository.util.AtlasEntityUtils.mapOf; public class DataProductPreProcessor extends AbstractDomainPreProcessor { @@ -54,38 +53,32 @@ public void processAttributes(AtlasStruct entityStruct, EntityMutationContext co processCreateProduct(entity, vertex); break; case UPDATE: - processUpdateDomain(entity, vertex, context); + processUpdateDomain(entity, vertex); break; } } private void processCreateProduct(AtlasEntity entity, AtlasVertex vertex) throws AtlasBaseException { - AtlasPerfMetrics.MetricRecorder metricRecorder = RequestContext.get().startMetricRecord("processCreateDomain"); - String domainName = (String) entity.getAttribute(NAME); + AtlasPerfMetrics.MetricRecorder metricRecorder = RequestContext.get().startMetricRecord("processCreateProduct"); + String productName = (String) entity.getAttribute(NAME); String parentDomainQualifiedName = (String) entity.getAttribute(PARENT_DOMAIN_QN); - productExists(domainName, parentDomainQualifiedName); + productExists(productName, parentDomainQualifiedName); String newQualifiedName = createQualifiedName(parentDomainQualifiedName); - if(!newQualifiedName.isEmpty()){ - entity.setAttribute(QUALIFIED_NAME, newQualifiedName); - } - else{ - throw new AtlasBaseException(AtlasErrorCode.BAD_REQUEST, "Parent Domain Qualified Name is empty"); - } + + entity.setAttribute(QUALIFIED_NAME, newQualifiedName); RequestContext.get().endMetricRecord(metricRecorder); } - public static String createQualifiedName(String parentDomainQualifiedName) { - if (StringUtils.isNotEmpty(parentDomainQualifiedName) && parentDomainQualifiedName !=null) { - return parentDomainQualifiedName + "/product/" + getUUID(); - } - else{ - return ""; + private static String createQualifiedName(String parentDomainQualifiedName) throws AtlasBaseException { + if (StringUtils.isNotEmpty(parentDomainQualifiedName)) { + throw new AtlasBaseException(AtlasErrorCode.BAD_REQUEST, "Parent Domain Qualified Name cannot be empty or null"); } + return parentDomainQualifiedName + "/product/" + getUUID(); } - private void processUpdateDomain(AtlasEntity entity, AtlasVertex vertex, EntityMutationContext context) throws AtlasBaseException { + private void processUpdateDomain(AtlasEntity entity, AtlasVertex vertex) throws AtlasBaseException { AtlasPerfMetrics.MetricRecorder metricRecorder = RequestContext.get().startMetricRecord("processUpdateDomain"); String productName = (String) entity.getAttribute(NAME); String vertexQnName = vertex.getProperty(QUALIFIED_NAME, String.class); @@ -116,7 +109,7 @@ private void processUpdateDomain(AtlasEntity entity, AtlasVertex vertex, EntityM //Auth check isAuthorized(currentParentDomainHeader, parentDomain); - processMoveDataProductToAnotherDomain(entity, currentParentDomainQualifiedName, newParentDomainQualifiedName, vertexQnName, superDomainQualifiedName, context); + processMoveDataProductToAnotherDomain(entity, currentParentDomainQualifiedName, newParentDomainQualifiedName, vertexQnName, superDomainQualifiedName); entity.setAttribute(PARENT_DOMAIN_QN, newParentDomainQualifiedName); } else { @@ -134,8 +127,7 @@ private void processMoveDataProductToAnotherDomain(AtlasEntity product, String sourceDomainQualifiedName, String targetDomainQualifiedName, String currentDataProductQualifiedName, - String superDomainQualifiedName, - EntityMutationContext context) throws AtlasBaseException { + String superDomainQualifiedName) throws AtlasBaseException { AtlasPerfMetrics.MetricRecorder recorder = RequestContext.get().startMetricRecord("processMoveDataProductToAnotherDomain"); try { @@ -161,59 +153,6 @@ private void processMoveDataProductToAnotherDomain(AtlasEntity product, } } - private void updatePolicy(String currentQualifiedName, String updatedQualifiedName, EntityMutationContext context) throws AtlasBaseException { - AtlasPerfMetrics.MetricRecorder metricRecorder = RequestContext.get().startMetricRecord("updateDomainPolicy"); - try { - LOG.info("Updating policy for entity {}", currentQualifiedName); - Map updatedpolicyResources = new HashMap<>(); - - String currentResource = "entity:"+ currentQualifiedName; - String updatedResource = "entity:"+ updatedQualifiedName; - - updatedpolicyResources.put(currentResource, updatedResource); - - List policies = getPolicy(currentResource); - if (CollectionUtils.isNotEmpty(policies)) { - AtlasEntityType entityType = typeRegistry.getEntityTypeByName(POLICY_ENTITY_TYPE); - for (AtlasEntityHeader policy : policies) { - AtlasEntity policyEntity = entityRetriever.toAtlasEntity(policy.getGuid()); - List policyResources = (List) policyEntity.getAttribute(ATTR_POLICY_RESOURCES); - policyResources.remove(currentResource); - policyResources.add(updatedResource); - AtlasVertex policyVertex = context.getVertex(policy.getGuid()); - policyVertex.removeProperty(ATTR_POLICY_RESOURCES); - policyEntity.setAttribute(ATTR_POLICY_RESOURCES, policyResources); - context.addUpdated(policyEntity.getGuid(), policyEntity, entityType, policyVertex); - } - } - - }finally { - RequestContext.get().endMetricRecord(metricRecorder); - } - - } - - private List getPolicy(String resource) throws AtlasBaseException { - AtlasPerfMetrics.MetricRecorder metricRecorder = RequestContext.get().startMetricRecord("getPolicy"); - try { - List mustClauseList = new ArrayList(); - mustClauseList.add(mapOf("term", mapOf("__typeName.keyword", POLICY_ENTITY_TYPE))); - mustClauseList.add(mapOf("term", mapOf("__state", "ACTIVE"))); - mustClauseList.add(mapOf("term", mapOf("policyResources", Arrays.asList(resource)))); - - Map bool = new HashMap<>(); - bool.put("must", mustClauseList); - - Map dsl = mapOf("query", mapOf("bool", bool)); - - List policies = indexSearchPaginated(dsl, POLICY_ENTITY_TYPE); - - return policies; - } finally { - RequestContext.get().endMetricRecord(metricRecorder); - } - } - private void setParent(AtlasEntity entity, EntityMutationContext context) throws AtlasBaseException { AtlasPerfMetrics.MetricRecorder metricRecorder = RequestContext.get().startMetricRecord("DataProductPreProcessor.setParent"); if (parentDomain == null) { diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/preprocessor/datamesh/DomainPreProcessor.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/preprocessor/datamesh/DomainPreProcessor.java index e944a63880..abe27701e0 100644 --- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/preprocessor/datamesh/DomainPreProcessor.java +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/preprocessor/datamesh/DomainPreProcessor.java @@ -47,7 +47,6 @@ import static org.apache.atlas.repository.Constants.*; import static org.apache.atlas.repository.graph.GraphHelper.getActiveChildrenVertices; import static org.apache.atlas.repository.store.graph.v2.preprocessor.PreProcessorUtils.*; -import static org.apache.atlas.repository.util.AccessControlUtils.ATTR_POLICY_RESOURCES; import static org.apache.atlas.repository.util.AtlasEntityUtils.mapOf; public class DomainPreProcessor extends AbstractDomainPreProcessor { @@ -83,7 +82,7 @@ public void processAttributes(AtlasStruct entityStruct, EntityMutationContext co processCreateDomain(entity, vertex); break; case UPDATE: - processUpdateDomain(entity, vertex, context); + processUpdateDomain(entity, vertex); break; } } @@ -99,16 +98,15 @@ private void processCreateDomain(AtlasEntity entity, AtlasVertex vertex) throws RequestContext.get().endMetricRecord(metricRecorder); } - public static String createQualifiedName(String parentDomainQualifiedName) { - if (StringUtils.isNotEmpty(parentDomainQualifiedName) && parentDomainQualifiedName !=null) { + private static String createQualifiedName(String parentDomainQualifiedName) { + if (StringUtils.isNotEmpty(parentDomainQualifiedName)) { return parentDomainQualifiedName + "/domain/" + getUUID(); } else{ - String prefixQN = "default/domain"; - return prefixQN + "/" + getUUID(); + return "default/domain" + "/" + getUUID(); } } - private void processUpdateDomain(AtlasEntity entity, AtlasVertex vertex, EntityMutationContext context) throws AtlasBaseException { + private void processUpdateDomain(AtlasEntity entity, AtlasVertex vertex) throws AtlasBaseException { AtlasPerfMetrics.MetricRecorder metricRecorder = RequestContext.get().startMetricRecord("processUpdateDomain"); String domainName = (String) entity.getAttribute(NAME); String vertexQnName = vertex.getProperty(QUALIFIED_NAME, String.class); @@ -145,7 +143,7 @@ private void processUpdateDomain(AtlasEntity entity, AtlasVertex vertex, EntityM isAuthorized(currentParentDomainHeader, parentDomain); } - processMoveSubDomainToAnotherDomain(entity, vertex, currentParentDomainQualifiedName, newParentDomainQualifiedName, vertexQnName, superDomainQualifiedName, context); + processMoveSubDomainToAnotherDomain(entity, vertex, currentParentDomainQualifiedName, newParentDomainQualifiedName, vertexQnName, superDomainQualifiedName); } else { String vertexName = vertex.getProperty(NAME, String.class); @@ -164,8 +162,7 @@ private void processMoveSubDomainToAnotherDomain(AtlasEntity domain, String sourceDomainQualifiedName, String targetDomainQualifiedName, String currentSubDomainQualifiedName, - String superDomainQualifiedName, - EntityMutationContext context) throws AtlasBaseException { + String superDomainQualifiedName) throws AtlasBaseException { AtlasPerfMetrics.MetricRecorder recorder = RequestContext.get().startMetricRecord("processMoveSubDomainToAnotherGlossary"); try { @@ -193,7 +190,7 @@ private void processMoveSubDomainToAnotherDomain(AtlasEntity domain, domain.setAttribute(SUPER_DOMAIN_QN, superDomainQualifiedName); } - moveChildrenToAnotherDomain(domainVertex, superDomainQualifiedName, null, sourceDomainQualifiedName, targetDomainQualifiedName, context); + moveChildrenToAnotherDomain(domainVertex, superDomainQualifiedName, null, sourceDomainQualifiedName, targetDomainQualifiedName); LOG.info("Moved subDomain {} to Domain {}", domainName, targetDomainQualifiedName); @@ -206,8 +203,7 @@ private void moveChildrenToAnotherDomain(AtlasVertex childDomainVertex, String superDomainQualifiedName, String parentDomainQualifiedName, String sourceDomainQualifiedName, - String targetDomainQualifiedName, - EntityMutationContext context) throws AtlasBaseException { + String targetDomainQualifiedName) throws AtlasBaseException { AtlasPerfMetrics.MetricRecorder recorder = RequestContext.get().startMetricRecord("moveChildrenToAnotherDomain"); @@ -238,7 +234,7 @@ private void moveChildrenToAnotherDomain(AtlasVertex childDomainVertex, while (products.hasNext()) { AtlasVertex productVertex = products.next(); - moveChildDataProductToAnotherDomain(productVertex, superDomainQualifiedName, updatedDomainQualifiedName, sourceDomainQualifiedName, targetDomainQualifiedName, context); + moveChildDataProductToAnotherDomain(productVertex, superDomainQualifiedName, updatedDomainQualifiedName, sourceDomainQualifiedName, targetDomainQualifiedName); } // Get all children domains of current domain @@ -246,7 +242,7 @@ private void moveChildrenToAnotherDomain(AtlasVertex childDomainVertex, while (childDomains.hasNext()) { AtlasVertex childVertex = childDomains.next(); - moveChildrenToAnotherDomain(childVertex, superDomainQualifiedName, updatedDomainQualifiedName, sourceDomainQualifiedName, targetDomainQualifiedName, context); + moveChildrenToAnotherDomain(childVertex, superDomainQualifiedName, updatedDomainQualifiedName, sourceDomainQualifiedName, targetDomainQualifiedName); } recordUpdatedChildEntities(childDomainVertex, updatedAttributes); @@ -261,8 +257,7 @@ private void moveChildDataProductToAnotherDomain(AtlasVertex productVertex, String superDomainQualifiedName, String parentDomainQualifiedName, String sourceDomainQualifiedName, - String targetDomainQualifiedName, - EntityMutationContext context) throws AtlasBaseException { + String targetDomainQualifiedName) throws AtlasBaseException { AtlasPerfMetrics.MetricRecorder recorder = RequestContext.get().startMetricRecord("moveChildDataProductToAnotherDomain"); try { @@ -294,59 +289,6 @@ private void moveChildDataProductToAnotherDomain(AtlasVertex productVertex, } } - private void updatePolicy(String currentQualifiedName, String updatedQualifiedName, EntityMutationContext context) throws AtlasBaseException { - AtlasPerfMetrics.MetricRecorder metricRecorder = RequestContext.get().startMetricRecord("updateDomainPolicy"); - try { - LOG.info("Updating policy for entity {}", currentQualifiedName); - Map updatedpolicyResources = new HashMap<>(); - - String currentResource = "entity:"+ currentQualifiedName; - String updatedResource = "entity:"+ updatedQualifiedName; - - updatedpolicyResources.put(currentResource, updatedResource); - - List policies = getPolicy(currentResource); - if (CollectionUtils.isNotEmpty(policies)) { - AtlasEntityType entityType = typeRegistry.getEntityTypeByName(POLICY_ENTITY_TYPE); - for (AtlasEntityHeader policy : policies) { - AtlasEntity policyEntity = entityRetriever.toAtlasEntity(policy.getGuid()); - List policyResources = (List) policyEntity.getAttribute(ATTR_POLICY_RESOURCES); - policyResources.remove(currentResource); - policyResources.add(updatedResource); - AtlasVertex policyVertex = context.getVertex(policy.getGuid()); - policyVertex.removeProperty(ATTR_POLICY_RESOURCES); - policyEntity.setAttribute(ATTR_POLICY_RESOURCES, policyResources); - context.addUpdated(policyEntity.getGuid(), policyEntity, entityType, policyVertex); - } - } - - }finally { - RequestContext.get().endMetricRecord(metricRecorder); - } - - } - - private List getPolicy(String resource) throws AtlasBaseException { - AtlasPerfMetrics.MetricRecorder metricRecorder = RequestContext.get().startMetricRecord("getPolicy"); - try { - List mustClauseList = new ArrayList(); - mustClauseList.add(mapOf("term", mapOf("__typeName.keyword", POLICY_ENTITY_TYPE))); - mustClauseList.add(mapOf("term", mapOf("__state", "ACTIVE"))); - mustClauseList.add(mapOf("terms", mapOf("policyResources", Arrays.asList(resource)))); - - Map bool = new HashMap<>(); - bool.put("must", mustClauseList); - - Map dsl = mapOf("query", mapOf("bool", bool)); - - List policies = indexSearchPaginated(dsl, POLICY_ENTITY_TYPE); - - return policies; - } finally { - RequestContext.get().endMetricRecord(metricRecorder); - } - } - private void setParent(AtlasEntity entity, EntityMutationContext context) throws AtlasBaseException { AtlasPerfMetrics.MetricRecorder metricRecorder = RequestContext.get().startMetricRecord("DomainPreProcessor.setParent"); if (parentDomain == null) {