Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[beta]- DG 996 Move Domains #2930

Merged
merged 2 commits into from
Apr 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import org.apache.atlas.repository.store.graph.v2.EntityGraphMapper;
import org.apache.atlas.repository.store.graph.v2.EntityGraphRetriever;
import org.apache.atlas.repository.store.graph.v2.EntityMutationContext;
import org.apache.atlas.type.AtlasEntityType;
import org.apache.atlas.type.AtlasTypeRegistry;
import org.apache.atlas.utils.AtlasPerfMetrics;
import org.apache.commons.collections.CollectionUtils;
Expand All @@ -21,6 +22,7 @@

import static org.apache.atlas.repository.Constants.*;
import static org.apache.atlas.repository.store.graph.v2.preprocessor.PreProcessorUtils.*;
import static org.apache.atlas.repository.util.AccessControlUtils.ATTR_POLICY_RESOURCES;
import static org.apache.atlas.repository.util.AtlasEntityUtils.mapOf;

public class DataProductPreProcessor extends AbstractDomainPreProcessor {
Expand Down Expand Up @@ -52,7 +54,7 @@ public void processAttributes(AtlasStruct entityStruct, EntityMutationContext co
processCreateProduct(entity, vertex);
break;
case UPDATE:
processUpdateDomain(entity, vertex);
processUpdateDomain(entity, vertex, context);
break;
}
}
Expand Down Expand Up @@ -83,7 +85,7 @@ public static String createQualifiedName(String parentDomainQualifiedName) {
}
}

private void processUpdateDomain(AtlasEntity entity, AtlasVertex vertex) throws AtlasBaseException {
private void processUpdateDomain(AtlasEntity entity, AtlasVertex vertex, EntityMutationContext context) throws AtlasBaseException {
AtlasPerfMetrics.MetricRecorder metricRecorder = RequestContext.get().startMetricRecord("processUpdateDomain");
String productName = (String) entity.getAttribute(NAME);
String vertexQnName = vertex.getProperty(QUALIFIED_NAME, String.class);
Expand Down Expand Up @@ -114,7 +116,7 @@ private void processUpdateDomain(AtlasEntity entity, AtlasVertex vertex) throws
//Auth check
isAuthorized(currentParentDomainHeader, parentDomain);

processMoveDataProductToAnotherDomain(entity, currentParentDomainQualifiedName, newParentDomainQualifiedName, vertexQnName, superDomainQualifiedName);
processMoveDataProductToAnotherDomain(entity, currentParentDomainQualifiedName, newParentDomainQualifiedName, vertexQnName, superDomainQualifiedName, context);
entity.setAttribute(PARENT_DOMAIN_QN, newParentDomainQualifiedName);

} else {
Expand All @@ -129,10 +131,11 @@ private void processUpdateDomain(AtlasEntity entity, AtlasVertex vertex) throws
}

private void processMoveDataProductToAnotherDomain(AtlasEntity product,
String sourceDomainQualifiedName,
String targetDomainQualifiedName,
String currentDataProductQualifiedName,
String superDomainQualifiedName) throws AtlasBaseException {
String sourceDomainQualifiedName,
String targetDomainQualifiedName,
String currentDataProductQualifiedName,
String superDomainQualifiedName,
EntityMutationContext context) throws AtlasBaseException {
AtlasPerfMetrics.MetricRecorder recorder = RequestContext.get().startMetricRecord("processMoveDataProductToAnotherDomain");

try {
Expand All @@ -148,13 +151,69 @@ private void processMoveDataProductToAnotherDomain(AtlasEntity product,
product.setAttribute(PARENT_DOMAIN_QN, targetDomainQualifiedName);
product.setAttribute(SUPER_DOMAIN_QN, superDomainQualifiedName);

//Update policy
updatePolicy(currentDataProductQualifiedName, updatedQualifiedName, context);

LOG.info("Moved dataProduct {} to Domain {}", domainName, targetDomainQualifiedName);

} finally {
RequestContext.get().endMetricRecord(recorder);
}
}

private void updatePolicy(String currentQualifiedName, String updatedQualifiedName, EntityMutationContext context) throws AtlasBaseException {
AtlasPerfMetrics.MetricRecorder metricRecorder = RequestContext.get().startMetricRecord("updateDomainPolicy");
try {
LOG.info("Updating policy for entity {}", currentQualifiedName);
Map<String, Object> updatedpolicyResources = new HashMap<>();

String currentResource = "entity:"+ currentQualifiedName;
String updatedResource = "entity:"+ updatedQualifiedName;

updatedpolicyResources.put(currentResource, updatedResource);

List<AtlasEntityHeader> policies = getPolicy(currentResource);
if (CollectionUtils.isNotEmpty(policies)) {
AtlasEntityType entityType = typeRegistry.getEntityTypeByName(POLICY_ENTITY_TYPE);
for (AtlasEntityHeader policy : policies) {
AtlasEntity policyEntity = entityRetriever.toAtlasEntity(policy.getGuid());
List<String> policyResources = (List<String>) policyEntity.getAttribute(ATTR_POLICY_RESOURCES);
policyResources.remove(currentResource);
policyResources.add(updatedResource);
AtlasVertex policyVertex = context.getVertex(policy.getGuid());
policyVertex.removeProperty(ATTR_POLICY_RESOURCES);
policyEntity.setAttribute(ATTR_POLICY_RESOURCES, policyResources);
context.addUpdated(policyEntity.getGuid(), policyEntity, entityType, policyVertex);
}
}

}finally {
RequestContext.get().endMetricRecord(metricRecorder);
}

}

private List<AtlasEntityHeader> getPolicy(String resource) throws AtlasBaseException {
AtlasPerfMetrics.MetricRecorder metricRecorder = RequestContext.get().startMetricRecord("getPolicy");
try {
List mustClauseList = new ArrayList();
mustClauseList.add(mapOf("term", mapOf("__typeName.keyword", POLICY_ENTITY_TYPE)));
mustClauseList.add(mapOf("term", mapOf("__state", "ACTIVE")));
mustClauseList.add(mapOf("term", mapOf("policyResources", Arrays.asList(resource))));

Map<String, Object> bool = new HashMap<>();
bool.put("must", mustClauseList);

Map<String, Object> dsl = mapOf("query", mapOf("bool", bool));

List<AtlasEntityHeader> policies = indexSearchPaginated(dsl);

return policies;
} finally {
RequestContext.get().endMetricRecord(metricRecorder);
}
}

private void setParent(AtlasEntity entity, EntityMutationContext context) throws AtlasBaseException {
AtlasPerfMetrics.MetricRecorder metricRecorder = RequestContext.get().startMetricRecord("DataProductPreProcessor.setParent");
if (parentDomain == null) {
Expand Down Expand Up @@ -246,7 +305,7 @@ private void productExists(String productName, String parentDomainQualifiedName)
}

if (exists) {
throw new AtlasBaseException(AtlasErrorCode.BAD_REQUEST, productName);
throw new AtlasBaseException(AtlasErrorCode.BAD_REQUEST, productName+" already exists in the domain");
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.atlas.repository.store.graph.v2.EntityGraphMapper;
import org.apache.atlas.repository.store.graph.v2.EntityGraphRetriever;
import org.apache.atlas.repository.store.graph.v2.EntityMutationContext;
import org.apache.atlas.type.AtlasEntityType;
import org.apache.atlas.type.AtlasTypeRegistry;
import org.apache.atlas.utils.AtlasPerfMetrics;
import org.apache.commons.collections.CollectionUtils;
Expand All @@ -46,6 +47,7 @@
import static org.apache.atlas.repository.Constants.*;
import static org.apache.atlas.repository.graph.GraphHelper.getActiveChildrenVertices;
import static org.apache.atlas.repository.store.graph.v2.preprocessor.PreProcessorUtils.*;
import static org.apache.atlas.repository.util.AccessControlUtils.ATTR_POLICY_RESOURCES;
import static org.apache.atlas.repository.util.AtlasEntityUtils.mapOf;

public class DomainPreProcessor extends AbstractDomainPreProcessor {
Expand Down Expand Up @@ -81,7 +83,7 @@ public void processAttributes(AtlasStruct entityStruct, EntityMutationContext co
processCreateDomain(entity, vertex);
break;
case UPDATE:
processUpdateDomain(entity, vertex);
processUpdateDomain(entity, vertex, context);
break;
}
}
Expand All @@ -106,7 +108,7 @@ public static String createQualifiedName(String parentDomainQualifiedName) {
}
}

private void processUpdateDomain(AtlasEntity entity, AtlasVertex vertex) throws AtlasBaseException {
private void processUpdateDomain(AtlasEntity entity, AtlasVertex vertex, EntityMutationContext context) throws AtlasBaseException {
AtlasPerfMetrics.MetricRecorder metricRecorder = RequestContext.get().startMetricRecord("processUpdateDomain");
String domainName = (String) entity.getAttribute(NAME);
String vertexQnName = vertex.getProperty(QUALIFIED_NAME, String.class);
Expand Down Expand Up @@ -143,7 +145,7 @@ private void processUpdateDomain(AtlasEntity entity, AtlasVertex vertex) throws
isAuthorized(currentParentDomainHeader, parentDomain);
}

processMoveSubDomainToAnotherDomain(entity, vertex, currentParentDomainQualifiedName, newParentDomainQualifiedName, vertexQnName, superDomainQualifiedName);
processMoveSubDomainToAnotherDomain(entity, vertex, currentParentDomainQualifiedName, newParentDomainQualifiedName, vertexQnName, superDomainQualifiedName, context);

} else {
String vertexName = vertex.getProperty(NAME, String.class);
Expand All @@ -162,7 +164,8 @@ private void processMoveSubDomainToAnotherDomain(AtlasEntity domain,
String sourceDomainQualifiedName,
String targetDomainQualifiedName,
String currentSubDomainQualifiedName,
String superDomainQualifiedName) throws AtlasBaseException {
String superDomainQualifiedName,
EntityMutationContext context) throws AtlasBaseException {
AtlasPerfMetrics.MetricRecorder recorder = RequestContext.get().startMetricRecord("processMoveSubDomainToAnotherGlossary");

try {
Expand Down Expand Up @@ -190,7 +193,7 @@ private void processMoveSubDomainToAnotherDomain(AtlasEntity domain,
domain.setAttribute(SUPER_DOMAIN_QN, superDomainQualifiedName);
}

moveChildrenToAnotherDomain(domainVertex, superDomainQualifiedName, null, sourceDomainQualifiedName, targetDomainQualifiedName);
moveChildrenToAnotherDomain(domainVertex, superDomainQualifiedName, null, sourceDomainQualifiedName, targetDomainQualifiedName, context);

LOG.info("Moved subDomain {} to Domain {}", domainName, targetDomainQualifiedName);

Expand All @@ -203,7 +206,8 @@ private void moveChildrenToAnotherDomain(AtlasVertex childDomainVertex,
String superDomainQualifiedName,
String parentDomainQualifiedName,
String sourceDomainQualifiedName,
String targetDomainQualifiedName) throws AtlasBaseException {
String targetDomainQualifiedName,
EntityMutationContext context) throws AtlasBaseException {
AtlasPerfMetrics.MetricRecorder recorder = RequestContext.get().startMetricRecord("moveChildrenToAnotherDomain");


Expand All @@ -222,6 +226,9 @@ private void moveChildrenToAnotherDomain(AtlasVertex childDomainVertex,
childDomainVertex.setProperty(SUPER_DOMAIN_QN, superDomainQualifiedName);
childDomainVertex.setProperty(PARENT_DOMAIN_QN, parentDomainQualifiedName);

//update policy
updatePolicy(currentDomainQualifiedName, updatedDomainQualifiedName, context);

//update system properties
GraphHelper.setModifiedByAsString(childDomainVertex, RequestContext.get().getUser());
GraphHelper.setModifiedTime(childDomainVertex, System.currentTimeMillis());
Expand All @@ -231,15 +238,15 @@ private void moveChildrenToAnotherDomain(AtlasVertex childDomainVertex,

while (products.hasNext()) {
AtlasVertex productVertex = products.next();
moveChildDataProductToAnotherDomain(productVertex, superDomainQualifiedName, updatedDomainQualifiedName, sourceDomainQualifiedName, targetDomainQualifiedName);
moveChildDataProductToAnotherDomain(productVertex, superDomainQualifiedName, updatedDomainQualifiedName, sourceDomainQualifiedName, targetDomainQualifiedName, context);
}

// Get all children domains of current domain
Iterator<AtlasVertex> childDomains = getActiveChildrenVertices(childDomainVertex, DOMAIN_PARENT_EDGE_LABEL);

while (childDomains.hasNext()) {
AtlasVertex childVertex = childDomains.next();
moveChildrenToAnotherDomain(childVertex, superDomainQualifiedName, updatedDomainQualifiedName, sourceDomainQualifiedName, targetDomainQualifiedName);
moveChildrenToAnotherDomain(childVertex, superDomainQualifiedName, updatedDomainQualifiedName, sourceDomainQualifiedName, targetDomainQualifiedName, context);
}

recordUpdatedChildEntities(childDomainVertex, updatedAttributes);
Expand All @@ -254,23 +261,27 @@ private void moveChildDataProductToAnotherDomain(AtlasVertex productVertex,
String superDomainQualifiedName,
String parentDomainQualifiedName,
String sourceDomainQualifiedName,
String targetDomainQualifiedName) throws AtlasBaseException {
String targetDomainQualifiedName,
EntityMutationContext context) throws AtlasBaseException {
AtlasPerfMetrics.MetricRecorder recorder = RequestContext.get().startMetricRecord("moveChildDataProductToAnotherDomain");

try {
String productName = productVertex.getProperty(NAME, String.class);
LOG.info("Moving dataProduct {} to Domain {}", productName, targetDomainQualifiedName);
Map<String, Object> updatedAttributes = new HashMap<>();

String currentDomainQualifiedName = productVertex.getProperty(PARENT_DOMAIN_QN, String.class);
String updatedQualifiedName = currentDomainQualifiedName.replace(sourceDomainQualifiedName, targetDomainQualifiedName);
String currentQualifiedName = productVertex.getProperty(PARENT_DOMAIN_QN, String.class);
String updatedQualifiedName = currentQualifiedName.replace(sourceDomainQualifiedName, targetDomainQualifiedName);

productVertex.setProperty(QUALIFIED_NAME, updatedQualifiedName);
updatedAttributes.put(QUALIFIED_NAME, updatedQualifiedName);

productVertex.setProperty(PARENT_DOMAIN_QN, parentDomainQualifiedName);
productVertex.setProperty(SUPER_DOMAIN_QN, superDomainQualifiedName);

//update policy
updatePolicy(currentQualifiedName, updatedQualifiedName, context);

//update system properties
GraphHelper.setModifiedByAsString(productVertex, RequestContext.get().getUser());
GraphHelper.setModifiedTime(productVertex, System.currentTimeMillis());
Expand All @@ -283,6 +294,59 @@ private void moveChildDataProductToAnotherDomain(AtlasVertex productVertex,
}
}

private void updatePolicy(String currentQualifiedName, String updatedQualifiedName, EntityMutationContext context) throws AtlasBaseException {
AtlasPerfMetrics.MetricRecorder metricRecorder = RequestContext.get().startMetricRecord("updateDomainPolicy");
try {
LOG.info("Updating policy for entity {}", currentQualifiedName);
Map<String, Object> updatedpolicyResources = new HashMap<>();

String currentResource = "entity:"+ currentQualifiedName;
String updatedResource = "entity:"+ updatedQualifiedName;

updatedpolicyResources.put(currentResource, updatedResource);

List<AtlasEntityHeader> policies = getPolicy(currentResource);
if (CollectionUtils.isNotEmpty(policies)) {
AtlasEntityType entityType = typeRegistry.getEntityTypeByName(POLICY_ENTITY_TYPE);
for (AtlasEntityHeader policy : policies) {
AtlasEntity policyEntity = entityRetriever.toAtlasEntity(policy.getGuid());
List<String> policyResources = (List<String>) policyEntity.getAttribute(ATTR_POLICY_RESOURCES);
policyResources.remove(currentResource);
policyResources.add(updatedResource);
AtlasVertex policyVertex = context.getVertex(policy.getGuid());
policyVertex.removeProperty(ATTR_POLICY_RESOURCES);
policyEntity.setAttribute(ATTR_POLICY_RESOURCES, policyResources);
context.addUpdated(policyEntity.getGuid(), policyEntity, entityType, policyVertex);
}
}

}finally {
RequestContext.get().endMetricRecord(metricRecorder);
}

}

private List<AtlasEntityHeader> getPolicy(String resource) throws AtlasBaseException {
AtlasPerfMetrics.MetricRecorder metricRecorder = RequestContext.get().startMetricRecord("getPolicy");
try {
List mustClauseList = new ArrayList();
mustClauseList.add(mapOf("term", mapOf("__typeName.keyword", POLICY_ENTITY_TYPE)));
mustClauseList.add(mapOf("term", mapOf("__state", "ACTIVE")));
mustClauseList.add(mapOf("terms", mapOf("policyResources", Arrays.asList(resource))));

Map<String, Object> bool = new HashMap<>();
bool.put("must", mustClauseList);

Map<String, Object> dsl = mapOf("query", mapOf("bool", bool));

List<AtlasEntityHeader> policies = indexSearchPaginated(dsl);

return policies;
} finally {
RequestContext.get().endMetricRecord(metricRecorder);
}
}

private void setParent(AtlasEntity entity, EntityMutationContext context) throws AtlasBaseException {
AtlasPerfMetrics.MetricRecorder metricRecorder = RequestContext.get().startMetricRecord("DomainPreProcessor.setParent");
if (parentDomain == null) {
Expand Down Expand Up @@ -348,7 +412,7 @@ private void domainExists(String domainName, String parentDomainQualifiedName) t
}

if (exists) {
throw new AtlasBaseException(AtlasErrorCode.BAD_REQUEST, domainName);
throw new AtlasBaseException(AtlasErrorCode.BAD_REQUEST, domainName+" already exists");
}
}
}
Expand Down
Loading