Skip to content

Commit

Permalink
implemented changes on PR
Browse files Browse the repository at this point in the history
  • Loading branch information
PRATHAM2002-DS committed Apr 20, 2024
1 parent a1e11fe commit f9444b7
Show file tree
Hide file tree
Showing 5 changed files with 48 additions and 71 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -139,9 +139,6 @@ public final class Constants {
*/
public static final String DATA_DOMAIN_ENTITY_TYPE = "DataDomain";
public static final String DATA_PRODUCT_ENTITY_TYPE = "DataProduct";
public static final String DATA_PRODUCT_EDGE_LABEL = "__DataDomain.dataProducts";
public static final String DOMAIN_PARENT_EDGE_LABEL = "__DataDomain.subDomains";



/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ public class PreProcessorUtils {
public static final String GLOSSARY_CATEGORY_REL_TYPE = "AtlasGlossaryCategoryAnchor";

//DataMesh models constants
public static final String DATA_PRODUCT_TYPE = "DataProduct";
public static final String PARENT_DOMAIN = "parentDomain";
public static final String PARENT_DOMAIN_QN = "parentDomainQualifiedName";
public static final String SUPER_DOMAIN_QN = "superDomainQualifiedName";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,38 +18,27 @@
package org.apache.atlas.repository.store.graph.v2.preprocessor.datamesh;

import org.apache.atlas.AtlasException;
import org.apache.atlas.RequestContext;
import org.apache.atlas.authorize.AtlasAuthorizationUtils;
import org.apache.atlas.authorize.AtlasEntityAccessRequest;
import org.apache.atlas.authorize.AtlasPrivilege;
import org.apache.atlas.discovery.EntityDiscoveryService;
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.model.discovery.IndexSearchParams;
import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.atlas.model.instance.AtlasEntityHeader;
import org.apache.atlas.repository.graphdb.AtlasGraph;
import org.apache.atlas.repository.graphdb.AtlasVertex;
import org.apache.atlas.repository.store.graph.v2.EntityGraphRetriever;
import org.apache.atlas.repository.store.graph.v2.preprocessor.PreProcessor;
import org.apache.atlas.repository.store.graph.v2.preprocessor.glossary.AbstractGlossaryPreProcessor;
import org.apache.atlas.type.AtlasEntityType;
import org.apache.atlas.type.AtlasTypeRegistry;
import org.apache.atlas.utils.AtlasPerfMetrics;
import org.apache.commons.collections.CollectionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.*;

import static org.apache.atlas.repository.Constants.*;
import static org.apache.atlas.repository.store.graph.v2.preprocessor.PreProcessorUtils.PARENT_DOMAIN_QN;
import static org.apache.atlas.repository.store.graph.v2.preprocessor.PreProcessorUtils.SUPER_DOMAIN_QN;
import static org.apache.atlas.repository.util.AccessControlUtils.ATTR_POLICY_CATEGORY;
import static org.apache.atlas.repository.util.AccessControlUtils.ATTR_POLICY_RESOURCES;
import static org.apache.atlas.repository.util.AtlasEntityUtils.mapOf;

public abstract class AbstractDomainPreProcessor implements PreProcessor {
private static final Logger LOG = LoggerFactory.getLogger(AbstractGlossaryPreProcessor.class);
private static final Logger LOG = LoggerFactory.getLogger(AbstractDomainPreProcessor.class);


protected final AtlasTypeRegistry typeRegistry;
Expand Down Expand Up @@ -104,28 +93,4 @@ public List<AtlasEntityHeader> indexSearchPaginated(Map<String, Object> dsl, Str

return ret;
}

protected void isAuthorized(AtlasEntityHeader sourceDomain, AtlasEntityHeader targetDomain) throws AtlasBaseException {

// source -> CREATE + UPDATE + DELETE
AtlasAuthorizationUtils.verifyAccess(new AtlasEntityAccessRequest(typeRegistry, AtlasPrivilege.ENTITY_CREATE, sourceDomain),
"create on source Domain: ", sourceDomain.getAttribute(NAME));

AtlasAuthorizationUtils.verifyAccess(new AtlasEntityAccessRequest(typeRegistry, AtlasPrivilege.ENTITY_UPDATE, sourceDomain),
"update on source Domain: ", sourceDomain.getAttribute(NAME));

AtlasAuthorizationUtils.verifyAccess(new AtlasEntityAccessRequest(typeRegistry, AtlasPrivilege.ENTITY_DELETE, sourceDomain),
"delete on source Domain: ", sourceDomain.getAttribute(NAME));


// target -> CREATE + UPDATE + DELETE
AtlasAuthorizationUtils.verifyAccess(new AtlasEntityAccessRequest(typeRegistry, AtlasPrivilege.ENTITY_CREATE, targetDomain),
"create on target Domain: ", targetDomain.getAttribute(NAME));

AtlasAuthorizationUtils.verifyAccess(new AtlasEntityAccessRequest(typeRegistry, AtlasPrivilege.ENTITY_UPDATE, targetDomain),
"update on target Domain: ", targetDomain.getAttribute(NAME));

AtlasAuthorizationUtils.verifyAccess(new AtlasEntityAccessRequest(typeRegistry, AtlasPrivilege.ENTITY_DELETE, targetDomain),
"delete on target Domain: ", targetDomain.getAttribute(NAME));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
import org.apache.atlas.repository.store.graph.v2.EntityGraphMapper;
import org.apache.atlas.repository.store.graph.v2.EntityGraphRetriever;
import org.apache.atlas.repository.store.graph.v2.EntityMutationContext;
import org.apache.atlas.type.AtlasEntityType;
import org.apache.atlas.type.AtlasTypeRegistry;
import org.apache.atlas.utils.AtlasPerfMetrics;
import org.apache.commons.collections.CollectionUtils;
Expand All @@ -22,7 +21,6 @@

import static org.apache.atlas.repository.Constants.*;
import static org.apache.atlas.repository.store.graph.v2.preprocessor.PreProcessorUtils.*;
import static org.apache.atlas.repository.util.AccessControlUtils.ATTR_POLICY_RESOURCES;
import static org.apache.atlas.repository.util.AtlasEntityUtils.mapOf;

public class DataProductPreProcessor extends AbstractDomainPreProcessor {
Expand All @@ -49,35 +47,43 @@ public void processAttributes(AtlasStruct entityStruct, EntityMutationContext co

setParent(entity, context);

if (operation == EntityMutations.EntityOperation.CREATE) {
processCreateProduct(entity, vertex);
switch (operation) {
case CREATE:
processCreateProduct(entity, vertex);
break;
case UPDATE:
processUpdateProduct(entity, vertex);
break;
}
}

private void processCreateProduct(AtlasEntity entity, AtlasVertex vertex) throws AtlasBaseException {
AtlasPerfMetrics.MetricRecorder metricRecorder = RequestContext.get().startMetricRecord("processCreateDomain");
String domainName = (String) entity.getAttribute(NAME);
AtlasPerfMetrics.MetricRecorder metricRecorder = RequestContext.get().startMetricRecord("processCreateProduct");
String productName = (String) entity.getAttribute(NAME);
String parentDomainQualifiedName = (String) entity.getAttribute(PARENT_DOMAIN_QN);

productExists(domainName, parentDomainQualifiedName);
productExists(productName, parentDomainQualifiedName);
String newQualifiedName = createQualifiedName(parentDomainQualifiedName);
if(!newQualifiedName.isEmpty()){
entity.setAttribute(QUALIFIED_NAME, newQualifiedName);
}
else{
throw new AtlasBaseException(AtlasErrorCode.BAD_REQUEST, "Parent Domain Qualified Name is empty");
}

entity.setAttribute(QUALIFIED_NAME, newQualifiedName);

RequestContext.get().endMetricRecord(metricRecorder);
}

public static String createQualifiedName(String parentDomainQualifiedName) {
if (StringUtils.isNotEmpty(parentDomainQualifiedName) && parentDomainQualifiedName !=null) {
return parentDomainQualifiedName + "/product/" + getUUID();
}
else{
return "";
private void processUpdateProduct(AtlasEntity entity, AtlasVertex vertex) throws AtlasBaseException {
AtlasPerfMetrics.MetricRecorder metricRecorder = RequestContext.get().startMetricRecord("processUpdateProduct");
String VertexQName = vertex.getProperty(QUALIFIED_NAME, String.class);

entity.setAttribute(QUALIFIED_NAME, VertexQName);
RequestContext.get().endMetricRecord(metricRecorder);
}

private static String createQualifiedName(String parentDomainQualifiedName) throws AtlasBaseException {
if (StringUtils.isNotEmpty(parentDomainQualifiedName)) {
throw new AtlasBaseException(AtlasErrorCode.BAD_REQUEST, "Parent Domain Qualified Name cannot be empty or null");
}
return parentDomainQualifiedName + "/product/" + getUUID();

}

private void setParent(AtlasEntity entity, EntityMutationContext context) throws AtlasBaseException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,16 +24,13 @@
import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.atlas.model.instance.AtlasEntityHeader;
import org.apache.atlas.model.instance.AtlasObjectId;
import org.apache.atlas.model.instance.AtlasRelatedObjectId;
import org.apache.atlas.model.instance.AtlasStruct;
import org.apache.atlas.model.instance.EntityMutations;
import org.apache.atlas.repository.graph.GraphHelper;
import org.apache.atlas.repository.graphdb.AtlasGraph;
import org.apache.atlas.repository.graphdb.AtlasVertex;
import org.apache.atlas.repository.store.graph.v2.EntityGraphMapper;
import org.apache.atlas.repository.store.graph.v2.EntityGraphRetriever;
import org.apache.atlas.repository.store.graph.v2.EntityMutationContext;
import org.apache.atlas.type.AtlasEntityType;
import org.apache.atlas.type.AtlasTypeRegistry;
import org.apache.atlas.utils.AtlasPerfMetrics;
import org.apache.commons.collections.CollectionUtils;
Expand All @@ -45,9 +42,7 @@
import java.util.*;

import static org.apache.atlas.repository.Constants.*;
import static org.apache.atlas.repository.graph.GraphHelper.getActiveChildrenVertices;
import static org.apache.atlas.repository.store.graph.v2.preprocessor.PreProcessorUtils.*;
import static org.apache.atlas.repository.util.AccessControlUtils.ATTR_POLICY_RESOURCES;
import static org.apache.atlas.repository.util.AtlasEntityUtils.mapOf;

public class DomainPreProcessor extends AbstractDomainPreProcessor {
Expand All @@ -66,7 +61,7 @@ public DomainPreProcessor(AtlasTypeRegistry typeRegistry, EntityGraphRetriever e
public void processAttributes(AtlasStruct entityStruct, EntityMutationContext context,
EntityMutations.EntityOperation operation) throws AtlasBaseException {
//Handle name & qualifiedName
if (operation == EntityMutations.EntityOperation.CREATE && LOG.isDebugEnabled()) {
if (LOG.isDebugEnabled()) {
LOG.debug("DomainPreProcessor.processAttributes: pre processing {}, {}",
entityStruct.getAttribute(QUALIFIED_NAME), operation);
}
Expand All @@ -78,8 +73,15 @@ public void processAttributes(AtlasStruct entityStruct, EntityMutationContext co

setParent(entity, context);

if (operation == EntityMutations.EntityOperation.CREATE) {
processCreateDomain(entity, vertex);
switch (operation) {
case CREATE:
processCreateDomain(entity, vertex);
break;
case UPDATE:
processUpdateDomain(entity, vertex);
break;
default:
break;
}
}

Expand All @@ -94,15 +96,23 @@ private void processCreateDomain(AtlasEntity entity, AtlasVertex vertex) throws
RequestContext.get().endMetricRecord(metricRecorder);
}

public static String createQualifiedName(String parentDomainQualifiedName) {
if (StringUtils.isNotEmpty(parentDomainQualifiedName) && parentDomainQualifiedName !=null) {
private static String createQualifiedName(String parentDomainQualifiedName) {
if (StringUtils.isNotEmpty(parentDomainQualifiedName)) {
return parentDomainQualifiedName + "/domain/" + getUUID();
} else{
String prefixQN = "default/domain";
return prefixQN + "/" + getUUID();
return "default/domain" + "/" + getUUID();
}
}

private void processUpdateDomain(AtlasEntity entity, AtlasVertex vertex) throws AtlasBaseException {
AtlasPerfMetrics.MetricRecorder metricRecorder = RequestContext.get().startMetricRecord("processUpdateDomain");
String vertexQName = vertex.getProperty(QUALIFIED_NAME, String.class);

entity.setAttribute(QUALIFIED_NAME, vertexQName);

RequestContext.get().endMetricRecord(metricRecorder);
}

private void setParent(AtlasEntity entity, EntityMutationContext context) throws AtlasBaseException {
AtlasPerfMetrics.MetricRecorder metricRecorder = RequestContext.get().startMetricRecord("DomainPreProcessor.setParent");
if (parentDomain == null) {
Expand Down

0 comments on commit f9444b7

Please sign in to comment.