From 1c6216af413218838ce0d393847b44df2e48161e Mon Sep 17 00:00:00 2001 From: PRATHAM2002-DS Date: Tue, 16 Apr 2024 14:33:42 +0530 Subject: [PATCH 1/2] added updatePolicy logic --- .../datamesh/DataProductPreProcessor.java | 75 ++++++++++++++-- .../datamesh/DomainPreProcessor.java | 88 ++++++++++++++++--- 2 files changed, 143 insertions(+), 20 deletions(-) diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/preprocessor/datamesh/DataProductPreProcessor.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/preprocessor/datamesh/DataProductPreProcessor.java index 1018afb2c3..057de37aca 100644 --- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/preprocessor/datamesh/DataProductPreProcessor.java +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/preprocessor/datamesh/DataProductPreProcessor.java @@ -9,6 +9,7 @@ import org.apache.atlas.repository.store.graph.v2.EntityGraphMapper; import org.apache.atlas.repository.store.graph.v2.EntityGraphRetriever; import org.apache.atlas.repository.store.graph.v2.EntityMutationContext; +import org.apache.atlas.type.AtlasEntityType; import org.apache.atlas.type.AtlasTypeRegistry; import org.apache.atlas.utils.AtlasPerfMetrics; import org.apache.commons.collections.CollectionUtils; @@ -21,6 +22,7 @@ import static org.apache.atlas.repository.Constants.*; import static org.apache.atlas.repository.store.graph.v2.preprocessor.PreProcessorUtils.*; +import static org.apache.atlas.repository.util.AccessControlUtils.ATTR_POLICY_RESOURCES; import static org.apache.atlas.repository.util.AtlasEntityUtils.mapOf; public class DataProductPreProcessor extends AbstractDomainPreProcessor { @@ -52,7 +54,7 @@ public void processAttributes(AtlasStruct entityStruct, EntityMutationContext co processCreateProduct(entity, vertex); break; case UPDATE: - processUpdateDomain(entity, vertex); + processUpdateDomain(entity, vertex, context); break; } } @@ -83,7 +85,7 @@ public static String createQualifiedName(String parentDomainQualifiedName) { } } - private void processUpdateDomain(AtlasEntity entity, AtlasVertex vertex) throws AtlasBaseException { + private void processUpdateDomain(AtlasEntity entity, AtlasVertex vertex, EntityMutationContext context) throws AtlasBaseException { AtlasPerfMetrics.MetricRecorder metricRecorder = RequestContext.get().startMetricRecord("processUpdateDomain"); String productName = (String) entity.getAttribute(NAME); String vertexQnName = vertex.getProperty(QUALIFIED_NAME, String.class); @@ -114,7 +116,7 @@ private void processUpdateDomain(AtlasEntity entity, AtlasVertex vertex) throws //Auth check isAuthorized(currentParentDomainHeader, parentDomain); - processMoveDataProductToAnotherDomain(entity, currentParentDomainQualifiedName, newParentDomainQualifiedName, vertexQnName, superDomainQualifiedName); + processMoveDataProductToAnotherDomain(entity, currentParentDomainQualifiedName, newParentDomainQualifiedName, vertexQnName, superDomainQualifiedName, context); entity.setAttribute(PARENT_DOMAIN_QN, newParentDomainQualifiedName); } else { @@ -129,10 +131,11 @@ private void processUpdateDomain(AtlasEntity entity, AtlasVertex vertex) throws } private void processMoveDataProductToAnotherDomain(AtlasEntity product, - String sourceDomainQualifiedName, - String targetDomainQualifiedName, - String currentDataProductQualifiedName, - String superDomainQualifiedName) throws AtlasBaseException { + String sourceDomainQualifiedName, + String targetDomainQualifiedName, + String currentDataProductQualifiedName, + String superDomainQualifiedName, + EntityMutationContext context) throws AtlasBaseException { AtlasPerfMetrics.MetricRecorder recorder = RequestContext.get().startMetricRecord("processMoveDataProductToAnotherDomain"); try { @@ -148,6 +151,9 @@ private void processMoveDataProductToAnotherDomain(AtlasEntity product, product.setAttribute(PARENT_DOMAIN_QN, targetDomainQualifiedName); product.setAttribute(SUPER_DOMAIN_QN, superDomainQualifiedName); + //Update policy + updatePolicy(currentDataProductQualifiedName, updatedQualifiedName, context); + LOG.info("Moved dataProduct {} to Domain {}", domainName, targetDomainQualifiedName); } finally { @@ -155,6 +161,59 @@ private void processMoveDataProductToAnotherDomain(AtlasEntity product, } } + private void updatePolicy(String currentQualifiedName, String updatedQualifiedName, EntityMutationContext context) throws AtlasBaseException { + AtlasPerfMetrics.MetricRecorder metricRecorder = RequestContext.get().startMetricRecord("updateDomainPolicy"); + try { + LOG.info("Updating policy for entity {}", currentQualifiedName); + Map updatedpolicyResources = new HashMap<>(); + + String currentResource = "entity:"+ currentQualifiedName; + String updatedResource = "entity:"+ updatedQualifiedName; + + updatedpolicyResources.put(currentResource, updatedResource); + + List policies = getPolicy(currentResource); + if (CollectionUtils.isNotEmpty(policies)) { + AtlasEntityType entityType = typeRegistry.getEntityTypeByName(POLICY_ENTITY_TYPE); + for (AtlasEntityHeader policy : policies) { + AtlasEntity policyEntity = entityRetriever.toAtlasEntity(policy.getGuid()); + List policyResources = (List) policyEntity.getAttribute(ATTR_POLICY_RESOURCES); + policyResources.remove(currentResource); + policyResources.add(updatedResource); + AtlasVertex policyVertex = context.getVertex(policy.getGuid()); + policyVertex.removeProperty(ATTR_POLICY_RESOURCES); + policyEntity.setAttribute(ATTR_POLICY_RESOURCES, policyResources); + context.addUpdated(policyEntity.getGuid(), policyEntity, entityType, policyVertex); + } + } + + }finally { + RequestContext.get().endMetricRecord(metricRecorder); + } + + } + + private List getPolicy(String resource) throws AtlasBaseException { + AtlasPerfMetrics.MetricRecorder metricRecorder = RequestContext.get().startMetricRecord("getPolicy"); + try { + List mustClauseList = new ArrayList(); + mustClauseList.add(mapOf("term", mapOf("__typeName.keyword", POLICY_ENTITY_TYPE))); + mustClauseList.add(mapOf("term", mapOf("__state", "ACTIVE"))); + mustClauseList.add(mapOf("term", mapOf("policyResources", Arrays.asList(resource)))); + + Map bool = new HashMap<>(); + bool.put("must", mustClauseList); + + Map dsl = mapOf("query", mapOf("bool", bool)); + + List policies = indexSearchPaginated(dsl); + + return policies; + } finally { + RequestContext.get().endMetricRecord(metricRecorder); + } + } + private void setParent(AtlasEntity entity, EntityMutationContext context) throws AtlasBaseException { AtlasPerfMetrics.MetricRecorder metricRecorder = RequestContext.get().startMetricRecord("DataProductPreProcessor.setParent"); if (parentDomain == null) { @@ -246,7 +305,7 @@ private void productExists(String productName, String parentDomainQualifiedName) } if (exists) { - throw new AtlasBaseException(AtlasErrorCode.BAD_REQUEST, productName); + throw new AtlasBaseException(AtlasErrorCode.BAD_REQUEST, productName+" already exists in the domain"); } } diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/preprocessor/datamesh/DomainPreProcessor.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/preprocessor/datamesh/DomainPreProcessor.java index 0c2b98aa36..a2c1618d0d 100644 --- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/preprocessor/datamesh/DomainPreProcessor.java +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/preprocessor/datamesh/DomainPreProcessor.java @@ -33,6 +33,7 @@ import org.apache.atlas.repository.store.graph.v2.EntityGraphMapper; import org.apache.atlas.repository.store.graph.v2.EntityGraphRetriever; import org.apache.atlas.repository.store.graph.v2.EntityMutationContext; +import org.apache.atlas.type.AtlasEntityType; import org.apache.atlas.type.AtlasTypeRegistry; import org.apache.atlas.utils.AtlasPerfMetrics; import org.apache.commons.collections.CollectionUtils; @@ -46,6 +47,7 @@ import static org.apache.atlas.repository.Constants.*; import static org.apache.atlas.repository.graph.GraphHelper.getActiveChildrenVertices; import static org.apache.atlas.repository.store.graph.v2.preprocessor.PreProcessorUtils.*; +import static org.apache.atlas.repository.util.AccessControlUtils.ATTR_POLICY_RESOURCES; import static org.apache.atlas.repository.util.AtlasEntityUtils.mapOf; public class DomainPreProcessor extends AbstractDomainPreProcessor { @@ -81,7 +83,7 @@ public void processAttributes(AtlasStruct entityStruct, EntityMutationContext co processCreateDomain(entity, vertex); break; case UPDATE: - processUpdateDomain(entity, vertex); + processUpdateDomain(entity, vertex, context); break; } } @@ -106,7 +108,7 @@ public static String createQualifiedName(String parentDomainQualifiedName) { } } - private void processUpdateDomain(AtlasEntity entity, AtlasVertex vertex) throws AtlasBaseException { + private void processUpdateDomain(AtlasEntity entity, AtlasVertex vertex, EntityMutationContext context) throws AtlasBaseException { AtlasPerfMetrics.MetricRecorder metricRecorder = RequestContext.get().startMetricRecord("processUpdateDomain"); String domainName = (String) entity.getAttribute(NAME); String vertexQnName = vertex.getProperty(QUALIFIED_NAME, String.class); @@ -143,7 +145,7 @@ private void processUpdateDomain(AtlasEntity entity, AtlasVertex vertex) throws isAuthorized(currentParentDomainHeader, parentDomain); } - processMoveSubDomainToAnotherDomain(entity, vertex, currentParentDomainQualifiedName, newParentDomainQualifiedName, vertexQnName, superDomainQualifiedName); + processMoveSubDomainToAnotherDomain(entity, vertex, currentParentDomainQualifiedName, newParentDomainQualifiedName, vertexQnName, superDomainQualifiedName, context); } else { String vertexName = vertex.getProperty(NAME, String.class); @@ -162,7 +164,8 @@ private void processMoveSubDomainToAnotherDomain(AtlasEntity domain, String sourceDomainQualifiedName, String targetDomainQualifiedName, String currentSubDomainQualifiedName, - String superDomainQualifiedName) throws AtlasBaseException { + String superDomainQualifiedName, + EntityMutationContext context) throws AtlasBaseException { AtlasPerfMetrics.MetricRecorder recorder = RequestContext.get().startMetricRecord("processMoveSubDomainToAnotherGlossary"); try { @@ -190,7 +193,7 @@ private void processMoveSubDomainToAnotherDomain(AtlasEntity domain, domain.setAttribute(SUPER_DOMAIN_QN, superDomainQualifiedName); } - moveChildrenToAnotherDomain(domainVertex, superDomainQualifiedName, null, sourceDomainQualifiedName, targetDomainQualifiedName); + moveChildrenToAnotherDomain(domainVertex, superDomainQualifiedName, null, sourceDomainQualifiedName, targetDomainQualifiedName, context); LOG.info("Moved subDomain {} to Domain {}", domainName, targetDomainQualifiedName); @@ -203,7 +206,8 @@ private void moveChildrenToAnotherDomain(AtlasVertex childDomainVertex, String superDomainQualifiedName, String parentDomainQualifiedName, String sourceDomainQualifiedName, - String targetDomainQualifiedName) throws AtlasBaseException { + String targetDomainQualifiedName, + EntityMutationContext context) throws AtlasBaseException { AtlasPerfMetrics.MetricRecorder recorder = RequestContext.get().startMetricRecord("moveChildrenToAnotherDomain"); @@ -222,6 +226,9 @@ private void moveChildrenToAnotherDomain(AtlasVertex childDomainVertex, childDomainVertex.setProperty(SUPER_DOMAIN_QN, superDomainQualifiedName); childDomainVertex.setProperty(PARENT_DOMAIN_QN, parentDomainQualifiedName); + //update policy + updatePolicy(currentDomainQualifiedName, updatedDomainQualifiedName, context); + //update system properties GraphHelper.setModifiedByAsString(childDomainVertex, RequestContext.get().getUser()); GraphHelper.setModifiedTime(childDomainVertex, System.currentTimeMillis()); @@ -231,7 +238,7 @@ private void moveChildrenToAnotherDomain(AtlasVertex childDomainVertex, while (products.hasNext()) { AtlasVertex productVertex = products.next(); - moveChildDataProductToAnotherDomain(productVertex, superDomainQualifiedName, updatedDomainQualifiedName, sourceDomainQualifiedName, targetDomainQualifiedName); + moveChildDataProductToAnotherDomain(productVertex, superDomainQualifiedName, updatedDomainQualifiedName, sourceDomainQualifiedName, targetDomainQualifiedName, context); } // Get all children domains of current domain @@ -239,7 +246,7 @@ private void moveChildrenToAnotherDomain(AtlasVertex childDomainVertex, while (childDomains.hasNext()) { AtlasVertex childVertex = childDomains.next(); - moveChildrenToAnotherDomain(childVertex, superDomainQualifiedName, updatedDomainQualifiedName, sourceDomainQualifiedName, targetDomainQualifiedName); + moveChildrenToAnotherDomain(childVertex, superDomainQualifiedName, updatedDomainQualifiedName, sourceDomainQualifiedName, targetDomainQualifiedName, context); } recordUpdatedChildEntities(childDomainVertex, updatedAttributes); @@ -254,7 +261,8 @@ private void moveChildDataProductToAnotherDomain(AtlasVertex productVertex, String superDomainQualifiedName, String parentDomainQualifiedName, String sourceDomainQualifiedName, - String targetDomainQualifiedName) throws AtlasBaseException { + String targetDomainQualifiedName, + EntityMutationContext context) throws AtlasBaseException { AtlasPerfMetrics.MetricRecorder recorder = RequestContext.get().startMetricRecord("moveChildDataProductToAnotherDomain"); try { @@ -262,8 +270,8 @@ private void moveChildDataProductToAnotherDomain(AtlasVertex productVertex, LOG.info("Moving dataProduct {} to Domain {}", productName, targetDomainQualifiedName); Map updatedAttributes = new HashMap<>(); - String currentDomainQualifiedName = productVertex.getProperty(PARENT_DOMAIN_QN, String.class); - String updatedQualifiedName = currentDomainQualifiedName.replace(sourceDomainQualifiedName, targetDomainQualifiedName); + String currentQualifiedName = productVertex.getProperty(PARENT_DOMAIN_QN, String.class); + String updatedQualifiedName = currentQualifiedName.replace(sourceDomainQualifiedName, targetDomainQualifiedName); productVertex.setProperty(QUALIFIED_NAME, updatedQualifiedName); updatedAttributes.put(QUALIFIED_NAME, updatedQualifiedName); @@ -271,6 +279,9 @@ private void moveChildDataProductToAnotherDomain(AtlasVertex productVertex, productVertex.setProperty(PARENT_DOMAIN_QN, parentDomainQualifiedName); productVertex.setProperty(SUPER_DOMAIN_QN, superDomainQualifiedName); + //update policy + updatePolicy(currentQualifiedName, updatedQualifiedName, context); + //update system properties GraphHelper.setModifiedByAsString(productVertex, RequestContext.get().getUser()); GraphHelper.setModifiedTime(productVertex, System.currentTimeMillis()); @@ -283,6 +294,59 @@ private void moveChildDataProductToAnotherDomain(AtlasVertex productVertex, } } + private void updatePolicy(String currentQualifiedName, String updatedQualifiedName, EntityMutationContext context) throws AtlasBaseException { + AtlasPerfMetrics.MetricRecorder metricRecorder = RequestContext.get().startMetricRecord("updateDomainPolicy"); + try { + LOG.info("Updating policy for entity {}", currentQualifiedName); + Map updatedpolicyResources = new HashMap<>(); + + String currentResource = "entity:"+ currentQualifiedName; + String updatedResource = "entity:"+ updatedQualifiedName; + + updatedpolicyResources.put(currentResource, updatedResource); + + List policies = getPolicy(currentResource); + if (CollectionUtils.isNotEmpty(policies)) { + AtlasEntityType entityType = typeRegistry.getEntityTypeByName(POLICY_ENTITY_TYPE); + for (AtlasEntityHeader policy : policies) { + AtlasEntity policyEntity = entityRetriever.toAtlasEntity(policy.getGuid()); + List policyResources = (List) policyEntity.getAttribute(ATTR_POLICY_RESOURCES); + policyResources.remove(currentResource); + policyResources.add(updatedResource); + AtlasVertex policyVertex = context.getVertex(policy.getGuid()); + policyVertex.removeProperty(ATTR_POLICY_RESOURCES); + policyEntity.setAttribute(ATTR_POLICY_RESOURCES, policyResources); + context.addUpdated(policyEntity.getGuid(), policyEntity, entityType, policyVertex); + } + } + + }finally { + RequestContext.get().endMetricRecord(metricRecorder); + } + + } + + private List getPolicy(String resource) throws AtlasBaseException { + AtlasPerfMetrics.MetricRecorder metricRecorder = RequestContext.get().startMetricRecord("getPolicy"); + try { + List mustClauseList = new ArrayList(); + mustClauseList.add(mapOf("term", mapOf("__typeName.keyword", POLICY_ENTITY_TYPE))); + mustClauseList.add(mapOf("term", mapOf("__state", "ACTIVE"))); + mustClauseList.add(mapOf("term", mapOf("policyResources", Arrays.asList(resource)))); + + Map bool = new HashMap<>(); + bool.put("must", mustClauseList); + + Map dsl = mapOf("query", mapOf("bool", bool)); + + List policies = indexSearchPaginated(dsl); + + return policies; + } finally { + RequestContext.get().endMetricRecord(metricRecorder); + } + } + private void setParent(AtlasEntity entity, EntityMutationContext context) throws AtlasBaseException { AtlasPerfMetrics.MetricRecorder metricRecorder = RequestContext.get().startMetricRecord("DomainPreProcessor.setParent"); if (parentDomain == null) { @@ -348,7 +412,7 @@ private void domainExists(String domainName, String parentDomainQualifiedName) t } if (exists) { - throw new AtlasBaseException(AtlasErrorCode.BAD_REQUEST, domainName); + throw new AtlasBaseException(AtlasErrorCode.BAD_REQUEST, domainName+" already exists"); } } } From b09487525b566bd4bfa35acadeec68dc140fa2bd Mon Sep 17 00:00:00 2001 From: PRATHAM2002-DS Date: Tue, 16 Apr 2024 17:41:00 +0530 Subject: [PATCH 2/2] indexSearch query issue resolved --- .../graph/v2/preprocessor/datamesh/DomainPreProcessor.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/preprocessor/datamesh/DomainPreProcessor.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/preprocessor/datamesh/DomainPreProcessor.java index a2c1618d0d..a33150e607 100644 --- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/preprocessor/datamesh/DomainPreProcessor.java +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/preprocessor/datamesh/DomainPreProcessor.java @@ -332,7 +332,7 @@ private List getPolicy(String resource) throws AtlasBaseExcep List mustClauseList = new ArrayList(); mustClauseList.add(mapOf("term", mapOf("__typeName.keyword", POLICY_ENTITY_TYPE))); mustClauseList.add(mapOf("term", mapOf("__state", "ACTIVE"))); - mustClauseList.add(mapOf("term", mapOf("policyResources", Arrays.asList(resource)))); + mustClauseList.add(mapOf("terms", mapOf("policyResources", Arrays.asList(resource)))); Map bool = new HashMap<>(); bool.put("must", mustClauseList);