Skip to content

Commit

Permalink
logs added to debug policy issue
Browse files Browse the repository at this point in the history
  • Loading branch information
PRATHAM2002-DS committed Apr 22, 2024
1 parent 3115e48 commit 084ac61
Show file tree
Hide file tree
Showing 3 changed files with 84 additions and 147 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.atlas.repository.graphdb.AtlasGraph;
import org.apache.atlas.repository.graphdb.AtlasVertex;
import org.apache.atlas.repository.store.graph.v2.EntityGraphRetriever;
import org.apache.atlas.repository.store.graph.v2.EntityMutationContext;
import org.apache.atlas.repository.store.graph.v2.preprocessor.PreProcessor;
import org.apache.atlas.repository.store.graph.v2.preprocessor.glossary.AbstractGlossaryPreProcessor;
import org.apache.atlas.type.AtlasEntityType;
Expand All @@ -42,14 +43,12 @@
import java.util.*;

import static org.apache.atlas.repository.Constants.*;
import static org.apache.atlas.repository.store.graph.v2.preprocessor.PreProcessorUtils.PARENT_DOMAIN_QN;
import static org.apache.atlas.repository.store.graph.v2.preprocessor.PreProcessorUtils.SUPER_DOMAIN_QN;
import static org.apache.atlas.repository.util.AccessControlUtils.ATTR_POLICY_CATEGORY;
import static org.apache.atlas.repository.util.AccessControlUtils.ATTR_POLICY_RESOURCES;
import static org.apache.atlas.repository.util.AtlasEntityUtils.mapOf;

public abstract class AbstractDomainPreProcessor implements PreProcessor {
private static final Logger LOG = LoggerFactory.getLogger(AbstractGlossaryPreProcessor.class);
private static final Logger LOG = LoggerFactory.getLogger(AbstractDomainPreProcessor.class);


protected final AtlasTypeRegistry typeRegistry;
Expand Down Expand Up @@ -129,6 +128,63 @@ protected void isAuthorized(AtlasEntityHeader sourceDomain, AtlasEntityHeader ta
"delete on target Domain: ", targetDomain.getAttribute(NAME));
}

protected void updatePolicy(String currentQualifiedName, String updatedQualifiedName, EntityMutationContext context) throws AtlasBaseException {
AtlasPerfMetrics.MetricRecorder metricRecorder = RequestContext.get().startMetricRecord("updateDomainPolicy");
try {
LOG.info("Updating policy for entity {}", currentQualifiedName);
Map<String, Object> updatedpolicyResources = new HashMap<>();

String currentResource = "entity:"+ currentQualifiedName;
String updatedResource = "entity:"+ updatedQualifiedName;

updatedpolicyResources.put(currentResource, updatedResource);

List<AtlasEntityHeader> policies = getPolicy(currentResource);
if (CollectionUtils.isNotEmpty(policies)) {
AtlasEntityType entityType = typeRegistry.getEntityTypeByName(POLICY_ENTITY_TYPE);
for (AtlasEntityHeader policy : policies) {
LOG.info("Updating policy {} for entity {}", policy.getGuid(), currentQualifiedName);
AtlasEntity policyEntity = entityRetriever.toAtlasEntity(policy.getGuid());
List<String> policyResources = (List<String>) policyEntity.getAttribute(ATTR_POLICY_RESOURCES);
LOG.info("Policy resources {}", policyResources);
policyResources.remove(currentResource);
policyResources.add(updatedResource);
AtlasVertex policyVertex = context.getVertex(policy.getGuid());
LOG.info("Policy Vertex {}", policyVertex);
LOG.info("Context {}", context);
policyVertex.removeProperty(ATTR_POLICY_RESOURCES);
policyEntity.setAttribute(ATTR_POLICY_RESOURCES, policyResources);
context.addUpdated(policyEntity.getGuid(), policyEntity, entityType, policyVertex);
}
}

}finally {
RequestContext.get().endMetricRecord(metricRecorder);
}

}

protected List<AtlasEntityHeader> getPolicy(String resource) throws AtlasBaseException {
AtlasPerfMetrics.MetricRecorder metricRecorder = RequestContext.get().startMetricRecord("getPolicy");
try {
List mustClauseList = new ArrayList();
mustClauseList.add(mapOf("term", mapOf("__typeName.keyword", POLICY_ENTITY_TYPE)));
mustClauseList.add(mapOf("term", mapOf("__state", "ACTIVE")));
mustClauseList.add(mapOf("terms", mapOf("policyResources", Arrays.asList(resource))));

Map<String, Object> bool = new HashMap<>();
bool.put("must", mustClauseList);

Map<String, Object> dsl = mapOf("query", mapOf("bool", bool));

List<AtlasEntityHeader> policies = indexSearchPaginated(dsl, POLICY_ENTITY_TYPE);

return policies;
} finally {
RequestContext.get().endMetricRecord(metricRecorder);
}
}

/**
* Record the updated child entities, it will be used to send notification and store audit logs
* @param entityVertex Child entity vertex
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@

import static org.apache.atlas.repository.Constants.*;
import static org.apache.atlas.repository.store.graph.v2.preprocessor.PreProcessorUtils.*;
import static org.apache.atlas.repository.util.AccessControlUtils.ATTR_POLICY_RESOURCES;
import static org.apache.atlas.repository.util.AtlasEntityUtils.mapOf;

public class DataProductPreProcessor extends AbstractDomainPreProcessor {
Expand Down Expand Up @@ -54,38 +53,32 @@ public void processAttributes(AtlasStruct entityStruct, EntityMutationContext co
processCreateProduct(entity, vertex);
break;
case UPDATE:
processUpdateDomain(entity, vertex, context);
processUpdateDomain(entity, vertex);
break;
}
}

private void processCreateProduct(AtlasEntity entity, AtlasVertex vertex) throws AtlasBaseException {
AtlasPerfMetrics.MetricRecorder metricRecorder = RequestContext.get().startMetricRecord("processCreateDomain");
String domainName = (String) entity.getAttribute(NAME);
AtlasPerfMetrics.MetricRecorder metricRecorder = RequestContext.get().startMetricRecord("processCreateProduct");
String productName = (String) entity.getAttribute(NAME);
String parentDomainQualifiedName = (String) entity.getAttribute(PARENT_DOMAIN_QN);

productExists(domainName, parentDomainQualifiedName);
productExists(productName, parentDomainQualifiedName);
String newQualifiedName = createQualifiedName(parentDomainQualifiedName);
if(!newQualifiedName.isEmpty()){
entity.setAttribute(QUALIFIED_NAME, newQualifiedName);
}
else{
throw new AtlasBaseException(AtlasErrorCode.BAD_REQUEST, "Parent Domain Qualified Name is empty");
}

entity.setAttribute(QUALIFIED_NAME, newQualifiedName);

RequestContext.get().endMetricRecord(metricRecorder);
}

public static String createQualifiedName(String parentDomainQualifiedName) {
if (StringUtils.isNotEmpty(parentDomainQualifiedName) && parentDomainQualifiedName !=null) {
return parentDomainQualifiedName + "/product/" + getUUID();
}
else{
return "";
private static String createQualifiedName(String parentDomainQualifiedName) throws AtlasBaseException {
if (StringUtils.isNotEmpty(parentDomainQualifiedName)) {
throw new AtlasBaseException(AtlasErrorCode.BAD_REQUEST, "Parent Domain Qualified Name cannot be empty or null");
}
return parentDomainQualifiedName + "/product/" + getUUID();
}

private void processUpdateDomain(AtlasEntity entity, AtlasVertex vertex, EntityMutationContext context) throws AtlasBaseException {
private void processUpdateDomain(AtlasEntity entity, AtlasVertex vertex) throws AtlasBaseException {
AtlasPerfMetrics.MetricRecorder metricRecorder = RequestContext.get().startMetricRecord("processUpdateDomain");
String productName = (String) entity.getAttribute(NAME);
String vertexQnName = vertex.getProperty(QUALIFIED_NAME, String.class);
Expand Down Expand Up @@ -116,7 +109,7 @@ private void processUpdateDomain(AtlasEntity entity, AtlasVertex vertex, EntityM
//Auth check
isAuthorized(currentParentDomainHeader, parentDomain);

processMoveDataProductToAnotherDomain(entity, currentParentDomainQualifiedName, newParentDomainQualifiedName, vertexQnName, superDomainQualifiedName, context);
processMoveDataProductToAnotherDomain(entity, currentParentDomainQualifiedName, newParentDomainQualifiedName, vertexQnName, superDomainQualifiedName);
entity.setAttribute(PARENT_DOMAIN_QN, newParentDomainQualifiedName);

} else {
Expand All @@ -134,8 +127,7 @@ private void processMoveDataProductToAnotherDomain(AtlasEntity product,
String sourceDomainQualifiedName,
String targetDomainQualifiedName,
String currentDataProductQualifiedName,
String superDomainQualifiedName,
EntityMutationContext context) throws AtlasBaseException {
String superDomainQualifiedName) throws AtlasBaseException {
AtlasPerfMetrics.MetricRecorder recorder = RequestContext.get().startMetricRecord("processMoveDataProductToAnotherDomain");

try {
Expand All @@ -161,59 +153,6 @@ private void processMoveDataProductToAnotherDomain(AtlasEntity product,
}
}

private void updatePolicy(String currentQualifiedName, String updatedQualifiedName, EntityMutationContext context) throws AtlasBaseException {
AtlasPerfMetrics.MetricRecorder metricRecorder = RequestContext.get().startMetricRecord("updateDomainPolicy");
try {
LOG.info("Updating policy for entity {}", currentQualifiedName);
Map<String, Object> updatedpolicyResources = new HashMap<>();

String currentResource = "entity:"+ currentQualifiedName;
String updatedResource = "entity:"+ updatedQualifiedName;

updatedpolicyResources.put(currentResource, updatedResource);

List<AtlasEntityHeader> policies = getPolicy(currentResource);
if (CollectionUtils.isNotEmpty(policies)) {
AtlasEntityType entityType = typeRegistry.getEntityTypeByName(POLICY_ENTITY_TYPE);
for (AtlasEntityHeader policy : policies) {
AtlasEntity policyEntity = entityRetriever.toAtlasEntity(policy.getGuid());
List<String> policyResources = (List<String>) policyEntity.getAttribute(ATTR_POLICY_RESOURCES);
policyResources.remove(currentResource);
policyResources.add(updatedResource);
AtlasVertex policyVertex = context.getVertex(policy.getGuid());
policyVertex.removeProperty(ATTR_POLICY_RESOURCES);
policyEntity.setAttribute(ATTR_POLICY_RESOURCES, policyResources);
context.addUpdated(policyEntity.getGuid(), policyEntity, entityType, policyVertex);
}
}

}finally {
RequestContext.get().endMetricRecord(metricRecorder);
}

}

private List<AtlasEntityHeader> getPolicy(String resource) throws AtlasBaseException {
AtlasPerfMetrics.MetricRecorder metricRecorder = RequestContext.get().startMetricRecord("getPolicy");
try {
List mustClauseList = new ArrayList();
mustClauseList.add(mapOf("term", mapOf("__typeName.keyword", POLICY_ENTITY_TYPE)));
mustClauseList.add(mapOf("term", mapOf("__state", "ACTIVE")));
mustClauseList.add(mapOf("term", mapOf("policyResources", Arrays.asList(resource))));

Map<String, Object> bool = new HashMap<>();
bool.put("must", mustClauseList);

Map<String, Object> dsl = mapOf("query", mapOf("bool", bool));

List<AtlasEntityHeader> policies = indexSearchPaginated(dsl, POLICY_ENTITY_TYPE);

return policies;
} finally {
RequestContext.get().endMetricRecord(metricRecorder);
}
}

private void setParent(AtlasEntity entity, EntityMutationContext context) throws AtlasBaseException {
AtlasPerfMetrics.MetricRecorder metricRecorder = RequestContext.get().startMetricRecord("DataProductPreProcessor.setParent");
if (parentDomain == null) {
Expand Down
Loading

0 comments on commit 084ac61

Please sign in to comment.