Skip to content

Commit

Permalink
added updatePolicy logic
Browse files Browse the repository at this point in the history
  • Loading branch information
PRATHAM2002-DS committed Apr 16, 2024
1 parent 7e54560 commit 1c6216a
Show file tree
Hide file tree
Showing 2 changed files with 143 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import org.apache.atlas.repository.store.graph.v2.EntityGraphMapper;
import org.apache.atlas.repository.store.graph.v2.EntityGraphRetriever;
import org.apache.atlas.repository.store.graph.v2.EntityMutationContext;
import org.apache.atlas.type.AtlasEntityType;
import org.apache.atlas.type.AtlasTypeRegistry;
import org.apache.atlas.utils.AtlasPerfMetrics;
import org.apache.commons.collections.CollectionUtils;
Expand All @@ -21,6 +22,7 @@

import static org.apache.atlas.repository.Constants.*;
import static org.apache.atlas.repository.store.graph.v2.preprocessor.PreProcessorUtils.*;
import static org.apache.atlas.repository.util.AccessControlUtils.ATTR_POLICY_RESOURCES;
import static org.apache.atlas.repository.util.AtlasEntityUtils.mapOf;

public class DataProductPreProcessor extends AbstractDomainPreProcessor {
Expand Down Expand Up @@ -52,7 +54,7 @@ public void processAttributes(AtlasStruct entityStruct, EntityMutationContext co
processCreateProduct(entity, vertex);
break;
case UPDATE:
processUpdateDomain(entity, vertex);
processUpdateDomain(entity, vertex, context);
break;
}
}
Expand Down Expand Up @@ -83,7 +85,7 @@ public static String createQualifiedName(String parentDomainQualifiedName) {
}
}

private void processUpdateDomain(AtlasEntity entity, AtlasVertex vertex) throws AtlasBaseException {
private void processUpdateDomain(AtlasEntity entity, AtlasVertex vertex, EntityMutationContext context) throws AtlasBaseException {
AtlasPerfMetrics.MetricRecorder metricRecorder = RequestContext.get().startMetricRecord("processUpdateDomain");
String productName = (String) entity.getAttribute(NAME);
String vertexQnName = vertex.getProperty(QUALIFIED_NAME, String.class);
Expand Down Expand Up @@ -114,7 +116,7 @@ private void processUpdateDomain(AtlasEntity entity, AtlasVertex vertex) throws
//Auth check
isAuthorized(currentParentDomainHeader, parentDomain);

processMoveDataProductToAnotherDomain(entity, currentParentDomainQualifiedName, newParentDomainQualifiedName, vertexQnName, superDomainQualifiedName);
processMoveDataProductToAnotherDomain(entity, currentParentDomainQualifiedName, newParentDomainQualifiedName, vertexQnName, superDomainQualifiedName, context);
entity.setAttribute(PARENT_DOMAIN_QN, newParentDomainQualifiedName);

} else {
Expand All @@ -129,10 +131,11 @@ private void processUpdateDomain(AtlasEntity entity, AtlasVertex vertex) throws
}

private void processMoveDataProductToAnotherDomain(AtlasEntity product,
String sourceDomainQualifiedName,
String targetDomainQualifiedName,
String currentDataProductQualifiedName,
String superDomainQualifiedName) throws AtlasBaseException {
String sourceDomainQualifiedName,
String targetDomainQualifiedName,
String currentDataProductQualifiedName,
String superDomainQualifiedName,
EntityMutationContext context) throws AtlasBaseException {
AtlasPerfMetrics.MetricRecorder recorder = RequestContext.get().startMetricRecord("processMoveDataProductToAnotherDomain");

try {
Expand All @@ -148,13 +151,69 @@ private void processMoveDataProductToAnotherDomain(AtlasEntity product,
product.setAttribute(PARENT_DOMAIN_QN, targetDomainQualifiedName);
product.setAttribute(SUPER_DOMAIN_QN, superDomainQualifiedName);

//Update policy
updatePolicy(currentDataProductQualifiedName, updatedQualifiedName, context);

LOG.info("Moved dataProduct {} to Domain {}", domainName, targetDomainQualifiedName);

} finally {
RequestContext.get().endMetricRecord(recorder);
}
}

private void updatePolicy(String currentQualifiedName, String updatedQualifiedName, EntityMutationContext context) throws AtlasBaseException {
AtlasPerfMetrics.MetricRecorder metricRecorder = RequestContext.get().startMetricRecord("updateDomainPolicy");
try {
LOG.info("Updating policy for entity {}", currentQualifiedName);
Map<String, Object> updatedpolicyResources = new HashMap<>();

String currentResource = "entity:"+ currentQualifiedName;
String updatedResource = "entity:"+ updatedQualifiedName;

updatedpolicyResources.put(currentResource, updatedResource);

List<AtlasEntityHeader> policies = getPolicy(currentResource);
if (CollectionUtils.isNotEmpty(policies)) {
AtlasEntityType entityType = typeRegistry.getEntityTypeByName(POLICY_ENTITY_TYPE);
for (AtlasEntityHeader policy : policies) {
AtlasEntity policyEntity = entityRetriever.toAtlasEntity(policy.getGuid());
List<String> policyResources = (List<String>) policyEntity.getAttribute(ATTR_POLICY_RESOURCES);
policyResources.remove(currentResource);
policyResources.add(updatedResource);
AtlasVertex policyVertex = context.getVertex(policy.getGuid());
policyVertex.removeProperty(ATTR_POLICY_RESOURCES);
policyEntity.setAttribute(ATTR_POLICY_RESOURCES, policyResources);
context.addUpdated(policyEntity.getGuid(), policyEntity, entityType, policyVertex);
}
}

}finally {
RequestContext.get().endMetricRecord(metricRecorder);
}

}

private List<AtlasEntityHeader> getPolicy(String resource) throws AtlasBaseException {
AtlasPerfMetrics.MetricRecorder metricRecorder = RequestContext.get().startMetricRecord("getPolicy");
try {
List mustClauseList = new ArrayList();
mustClauseList.add(mapOf("term", mapOf("__typeName.keyword", POLICY_ENTITY_TYPE)));
mustClauseList.add(mapOf("term", mapOf("__state", "ACTIVE")));
mustClauseList.add(mapOf("term", mapOf("policyResources", Arrays.asList(resource))));

Map<String, Object> bool = new HashMap<>();
bool.put("must", mustClauseList);

Map<String, Object> dsl = mapOf("query", mapOf("bool", bool));

List<AtlasEntityHeader> policies = indexSearchPaginated(dsl);

return policies;
} finally {
RequestContext.get().endMetricRecord(metricRecorder);
}
}

private void setParent(AtlasEntity entity, EntityMutationContext context) throws AtlasBaseException {
AtlasPerfMetrics.MetricRecorder metricRecorder = RequestContext.get().startMetricRecord("DataProductPreProcessor.setParent");
if (parentDomain == null) {
Expand Down Expand Up @@ -246,7 +305,7 @@ private void productExists(String productName, String parentDomainQualifiedName)
}

if (exists) {
throw new AtlasBaseException(AtlasErrorCode.BAD_REQUEST, productName);
throw new AtlasBaseException(AtlasErrorCode.BAD_REQUEST, productName+" already exists in the domain");
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.atlas.repository.store.graph.v2.EntityGraphMapper;
import org.apache.atlas.repository.store.graph.v2.EntityGraphRetriever;
import org.apache.atlas.repository.store.graph.v2.EntityMutationContext;
import org.apache.atlas.type.AtlasEntityType;
import org.apache.atlas.type.AtlasTypeRegistry;
import org.apache.atlas.utils.AtlasPerfMetrics;
import org.apache.commons.collections.CollectionUtils;
Expand All @@ -46,6 +47,7 @@
import static org.apache.atlas.repository.Constants.*;
import static org.apache.atlas.repository.graph.GraphHelper.getActiveChildrenVertices;
import static org.apache.atlas.repository.store.graph.v2.preprocessor.PreProcessorUtils.*;
import static org.apache.atlas.repository.util.AccessControlUtils.ATTR_POLICY_RESOURCES;
import static org.apache.atlas.repository.util.AtlasEntityUtils.mapOf;

public class DomainPreProcessor extends AbstractDomainPreProcessor {
Expand Down Expand Up @@ -81,7 +83,7 @@ public void processAttributes(AtlasStruct entityStruct, EntityMutationContext co
processCreateDomain(entity, vertex);
break;
case UPDATE:
processUpdateDomain(entity, vertex);
processUpdateDomain(entity, vertex, context);
break;
}
}
Expand All @@ -106,7 +108,7 @@ public static String createQualifiedName(String parentDomainQualifiedName) {
}
}

private void processUpdateDomain(AtlasEntity entity, AtlasVertex vertex) throws AtlasBaseException {
private void processUpdateDomain(AtlasEntity entity, AtlasVertex vertex, EntityMutationContext context) throws AtlasBaseException {
AtlasPerfMetrics.MetricRecorder metricRecorder = RequestContext.get().startMetricRecord("processUpdateDomain");
String domainName = (String) entity.getAttribute(NAME);
String vertexQnName = vertex.getProperty(QUALIFIED_NAME, String.class);
Expand Down Expand Up @@ -143,7 +145,7 @@ private void processUpdateDomain(AtlasEntity entity, AtlasVertex vertex) throws
isAuthorized(currentParentDomainHeader, parentDomain);
}

processMoveSubDomainToAnotherDomain(entity, vertex, currentParentDomainQualifiedName, newParentDomainQualifiedName, vertexQnName, superDomainQualifiedName);
processMoveSubDomainToAnotherDomain(entity, vertex, currentParentDomainQualifiedName, newParentDomainQualifiedName, vertexQnName, superDomainQualifiedName, context);

} else {
String vertexName = vertex.getProperty(NAME, String.class);
Expand All @@ -162,7 +164,8 @@ private void processMoveSubDomainToAnotherDomain(AtlasEntity domain,
String sourceDomainQualifiedName,
String targetDomainQualifiedName,
String currentSubDomainQualifiedName,
String superDomainQualifiedName) throws AtlasBaseException {
String superDomainQualifiedName,
EntityMutationContext context) throws AtlasBaseException {
AtlasPerfMetrics.MetricRecorder recorder = RequestContext.get().startMetricRecord("processMoveSubDomainToAnotherGlossary");

try {
Expand Down Expand Up @@ -190,7 +193,7 @@ private void processMoveSubDomainToAnotherDomain(AtlasEntity domain,
domain.setAttribute(SUPER_DOMAIN_QN, superDomainQualifiedName);
}

moveChildrenToAnotherDomain(domainVertex, superDomainQualifiedName, null, sourceDomainQualifiedName, targetDomainQualifiedName);
moveChildrenToAnotherDomain(domainVertex, superDomainQualifiedName, null, sourceDomainQualifiedName, targetDomainQualifiedName, context);

LOG.info("Moved subDomain {} to Domain {}", domainName, targetDomainQualifiedName);

Expand All @@ -203,7 +206,8 @@ private void moveChildrenToAnotherDomain(AtlasVertex childDomainVertex,
String superDomainQualifiedName,
String parentDomainQualifiedName,
String sourceDomainQualifiedName,
String targetDomainQualifiedName) throws AtlasBaseException {
String targetDomainQualifiedName,
EntityMutationContext context) throws AtlasBaseException {
AtlasPerfMetrics.MetricRecorder recorder = RequestContext.get().startMetricRecord("moveChildrenToAnotherDomain");


Expand All @@ -222,6 +226,9 @@ private void moveChildrenToAnotherDomain(AtlasVertex childDomainVertex,
childDomainVertex.setProperty(SUPER_DOMAIN_QN, superDomainQualifiedName);
childDomainVertex.setProperty(PARENT_DOMAIN_QN, parentDomainQualifiedName);

//update policy
updatePolicy(currentDomainQualifiedName, updatedDomainQualifiedName, context);

//update system properties
GraphHelper.setModifiedByAsString(childDomainVertex, RequestContext.get().getUser());
GraphHelper.setModifiedTime(childDomainVertex, System.currentTimeMillis());
Expand All @@ -231,15 +238,15 @@ private void moveChildrenToAnotherDomain(AtlasVertex childDomainVertex,

while (products.hasNext()) {
AtlasVertex productVertex = products.next();
moveChildDataProductToAnotherDomain(productVertex, superDomainQualifiedName, updatedDomainQualifiedName, sourceDomainQualifiedName, targetDomainQualifiedName);
moveChildDataProductToAnotherDomain(productVertex, superDomainQualifiedName, updatedDomainQualifiedName, sourceDomainQualifiedName, targetDomainQualifiedName, context);
}

// Get all children domains of current domain
Iterator<AtlasVertex> childDomains = getActiveChildrenVertices(childDomainVertex, DOMAIN_PARENT_EDGE_LABEL);

while (childDomains.hasNext()) {
AtlasVertex childVertex = childDomains.next();
moveChildrenToAnotherDomain(childVertex, superDomainQualifiedName, updatedDomainQualifiedName, sourceDomainQualifiedName, targetDomainQualifiedName);
moveChildrenToAnotherDomain(childVertex, superDomainQualifiedName, updatedDomainQualifiedName, sourceDomainQualifiedName, targetDomainQualifiedName, context);
}

recordUpdatedChildEntities(childDomainVertex, updatedAttributes);
Expand All @@ -254,23 +261,27 @@ private void moveChildDataProductToAnotherDomain(AtlasVertex productVertex,
String superDomainQualifiedName,
String parentDomainQualifiedName,
String sourceDomainQualifiedName,
String targetDomainQualifiedName) throws AtlasBaseException {
String targetDomainQualifiedName,
EntityMutationContext context) throws AtlasBaseException {
AtlasPerfMetrics.MetricRecorder recorder = RequestContext.get().startMetricRecord("moveChildDataProductToAnotherDomain");

try {
String productName = productVertex.getProperty(NAME, String.class);
LOG.info("Moving dataProduct {} to Domain {}", productName, targetDomainQualifiedName);
Map<String, Object> updatedAttributes = new HashMap<>();

String currentDomainQualifiedName = productVertex.getProperty(PARENT_DOMAIN_QN, String.class);
String updatedQualifiedName = currentDomainQualifiedName.replace(sourceDomainQualifiedName, targetDomainQualifiedName);
String currentQualifiedName = productVertex.getProperty(PARENT_DOMAIN_QN, String.class);
String updatedQualifiedName = currentQualifiedName.replace(sourceDomainQualifiedName, targetDomainQualifiedName);

productVertex.setProperty(QUALIFIED_NAME, updatedQualifiedName);
updatedAttributes.put(QUALIFIED_NAME, updatedQualifiedName);

productVertex.setProperty(PARENT_DOMAIN_QN, parentDomainQualifiedName);
productVertex.setProperty(SUPER_DOMAIN_QN, superDomainQualifiedName);

//update policy
updatePolicy(currentQualifiedName, updatedQualifiedName, context);

//update system properties
GraphHelper.setModifiedByAsString(productVertex, RequestContext.get().getUser());
GraphHelper.setModifiedTime(productVertex, System.currentTimeMillis());
Expand All @@ -283,6 +294,59 @@ private void moveChildDataProductToAnotherDomain(AtlasVertex productVertex,
}
}

private void updatePolicy(String currentQualifiedName, String updatedQualifiedName, EntityMutationContext context) throws AtlasBaseException {
AtlasPerfMetrics.MetricRecorder metricRecorder = RequestContext.get().startMetricRecord("updateDomainPolicy");
try {
LOG.info("Updating policy for entity {}", currentQualifiedName);
Map<String, Object> updatedpolicyResources = new HashMap<>();

String currentResource = "entity:"+ currentQualifiedName;
String updatedResource = "entity:"+ updatedQualifiedName;

updatedpolicyResources.put(currentResource, updatedResource);

List<AtlasEntityHeader> policies = getPolicy(currentResource);
if (CollectionUtils.isNotEmpty(policies)) {
AtlasEntityType entityType = typeRegistry.getEntityTypeByName(POLICY_ENTITY_TYPE);
for (AtlasEntityHeader policy : policies) {
AtlasEntity policyEntity = entityRetriever.toAtlasEntity(policy.getGuid());
List<String> policyResources = (List<String>) policyEntity.getAttribute(ATTR_POLICY_RESOURCES);
policyResources.remove(currentResource);
policyResources.add(updatedResource);
AtlasVertex policyVertex = context.getVertex(policy.getGuid());
policyVertex.removeProperty(ATTR_POLICY_RESOURCES);
policyEntity.setAttribute(ATTR_POLICY_RESOURCES, policyResources);
context.addUpdated(policyEntity.getGuid(), policyEntity, entityType, policyVertex);
}
}

}finally {
RequestContext.get().endMetricRecord(metricRecorder);
}

}

private List<AtlasEntityHeader> getPolicy(String resource) throws AtlasBaseException {
AtlasPerfMetrics.MetricRecorder metricRecorder = RequestContext.get().startMetricRecord("getPolicy");
try {
List mustClauseList = new ArrayList();
mustClauseList.add(mapOf("term", mapOf("__typeName.keyword", POLICY_ENTITY_TYPE)));
mustClauseList.add(mapOf("term", mapOf("__state", "ACTIVE")));
mustClauseList.add(mapOf("term", mapOf("policyResources", Arrays.asList(resource))));

Map<String, Object> bool = new HashMap<>();
bool.put("must", mustClauseList);

Map<String, Object> dsl = mapOf("query", mapOf("bool", bool));

List<AtlasEntityHeader> policies = indexSearchPaginated(dsl);

return policies;
} finally {
RequestContext.get().endMetricRecord(metricRecorder);
}
}

private void setParent(AtlasEntity entity, EntityMutationContext context) throws AtlasBaseException {
AtlasPerfMetrics.MetricRecorder metricRecorder = RequestContext.get().startMetricRecord("DomainPreProcessor.setParent");
if (parentDomain == null) {
Expand Down Expand Up @@ -348,7 +412,7 @@ private void domainExists(String domainName, String parentDomainQualifiedName) t
}

if (exists) {
throw new AtlasBaseException(AtlasErrorCode.BAD_REQUEST, domainName);
throw new AtlasBaseException(AtlasErrorCode.BAD_REQUEST, domainName+" already exists");
}
}
}
Expand Down

0 comments on commit 1c6216a

Please sign in to comment.