Skip to content

Commit

Permalink
Create Preprocessor for Updating Product/Domain QN
Browse files Browse the repository at this point in the history
  • Loading branch information
PRATHAM2002-DS committed Apr 17, 2024
1 parent 2930528 commit a1e11fe
Show file tree
Hide file tree
Showing 6 changed files with 510 additions and 0 deletions.
10 changes: 10 additions & 0 deletions common/src/main/java/org/apache/atlas/repository/Constants.java
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,16 @@ public final class Constants {
public static final String GLOSSARY_CATEGORY_EDGE_LABEL = "r:AtlasGlossaryCategoryAnchor";


/**
* MESH property keys.
*/
public static final String DATA_DOMAIN_ENTITY_TYPE = "DataDomain";
public static final String DATA_PRODUCT_ENTITY_TYPE = "DataProduct";
public static final String DATA_PRODUCT_EDGE_LABEL = "__DataDomain.dataProducts";
public static final String DOMAIN_PARENT_EDGE_LABEL = "__DataDomain.subDomains";



/**
* SQL property keys.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,8 @@
import org.apache.atlas.repository.store.graph.v2.preprocessor.PreProcessor;
import org.apache.atlas.repository.store.graph.v2.preprocessor.accesscontrol.PersonaPreProcessor;
import org.apache.atlas.repository.store.graph.v2.preprocessor.accesscontrol.PurposePreProcessor;
import org.apache.atlas.repository.store.graph.v2.preprocessor.datamesh.DataProductPreProcessor;
import org.apache.atlas.repository.store.graph.v2.preprocessor.datamesh.DomainPreProcessor;
import org.apache.atlas.repository.store.graph.v2.preprocessor.glossary.CategoryPreProcessor;
import org.apache.atlas.repository.store.graph.v2.preprocessor.glossary.GlossaryPreProcessor;
import org.apache.atlas.repository.store.graph.v2.preprocessor.glossary.TermPreProcessor;
Expand Down Expand Up @@ -1800,6 +1802,14 @@ public PreProcessor getPreProcessor(String typeName) {
preProcessor = new CategoryPreProcessor(typeRegistry, entityRetriever, graph, taskManagement, entityGraphMapper);
break;

case DATA_DOMAIN_ENTITY_TYPE:
preProcessor = new DomainPreProcessor(typeRegistry, entityRetriever, graph, entityGraphMapper);
break;

case DATA_PRODUCT_ENTITY_TYPE:
preProcessor = new DataProductPreProcessor(typeRegistry, entityRetriever, graph, entityGraphMapper);
break;

case QUERY_ENTITY_TYPE:
preProcessor = new QueryPreProcessor(typeRegistry, entityRetriever);
break;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,13 @@ public class PreProcessorUtils {
public static final String GLOSSARY_TERM_REL_TYPE = "AtlasGlossaryTermAnchor";
public static final String GLOSSARY_CATEGORY_REL_TYPE = "AtlasGlossaryCategoryAnchor";

//DataMesh models constants
public static final String DATA_PRODUCT_TYPE = "DataProduct";
public static final String PARENT_DOMAIN = "parentDomain";
public static final String PARENT_DOMAIN_QN = "parentDomainQualifiedName";
public static final String SUPER_DOMAIN_QN = "superDomainQualifiedName";
public static final String DATA_DOMAIN = "dataDomain";

//Query models constants
public static final String PREFIX_QUERY_QN = "default/collection/";
public static final String COLLECTION_QUALIFIED_NAME = "collectionQualifiedName";
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.repository.store.graph.v2.preprocessor.datamesh;

import org.apache.atlas.AtlasException;
import org.apache.atlas.RequestContext;
import org.apache.atlas.authorize.AtlasAuthorizationUtils;
import org.apache.atlas.authorize.AtlasEntityAccessRequest;
import org.apache.atlas.authorize.AtlasPrivilege;
import org.apache.atlas.discovery.EntityDiscoveryService;
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.model.discovery.IndexSearchParams;
import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.atlas.model.instance.AtlasEntityHeader;
import org.apache.atlas.repository.graphdb.AtlasGraph;
import org.apache.atlas.repository.graphdb.AtlasVertex;
import org.apache.atlas.repository.store.graph.v2.EntityGraphRetriever;
import org.apache.atlas.repository.store.graph.v2.preprocessor.PreProcessor;
import org.apache.atlas.repository.store.graph.v2.preprocessor.glossary.AbstractGlossaryPreProcessor;
import org.apache.atlas.type.AtlasEntityType;
import org.apache.atlas.type.AtlasTypeRegistry;
import org.apache.atlas.utils.AtlasPerfMetrics;
import org.apache.commons.collections.CollectionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.*;

import static org.apache.atlas.repository.Constants.*;
import static org.apache.atlas.repository.store.graph.v2.preprocessor.PreProcessorUtils.PARENT_DOMAIN_QN;
import static org.apache.atlas.repository.store.graph.v2.preprocessor.PreProcessorUtils.SUPER_DOMAIN_QN;
import static org.apache.atlas.repository.util.AccessControlUtils.ATTR_POLICY_CATEGORY;
import static org.apache.atlas.repository.util.AccessControlUtils.ATTR_POLICY_RESOURCES;
import static org.apache.atlas.repository.util.AtlasEntityUtils.mapOf;

public abstract class AbstractDomainPreProcessor implements PreProcessor {
private static final Logger LOG = LoggerFactory.getLogger(AbstractGlossaryPreProcessor.class);


protected final AtlasTypeRegistry typeRegistry;
protected final EntityGraphRetriever entityRetriever;

protected EntityDiscoveryService discovery;

AbstractDomainPreProcessor(AtlasTypeRegistry typeRegistry, EntityGraphRetriever entityRetriever, AtlasGraph graph) {
this.entityRetriever = entityRetriever;
this.typeRegistry = typeRegistry;

try {
this.discovery = new EntityDiscoveryService(typeRegistry, graph, null, null, null, null);
} catch (AtlasException e) {
e.printStackTrace();
}
}

public List<AtlasEntityHeader> indexSearchPaginated(Map<String, Object> dsl, String entityType) throws AtlasBaseException {
IndexSearchParams searchParams = new IndexSearchParams();
List<AtlasEntityHeader> ret = new ArrayList<>();

List<Map> sortList = new ArrayList<>(0);
sortList.add(mapOf("__timestamp", mapOf("order", "asc")));
sortList.add(mapOf("__guid", mapOf("order", "asc")));
dsl.put("sort", sortList);

int from = 0;
int size = 100;
boolean hasMore = true;
do {
dsl.put("from", from);
dsl.put("size", size);
searchParams.setDsl(dsl);

if (entityType.equals(POLICY_ENTITY_TYPE)) {
Set<String> attributes = new HashSet<>(Arrays.asList(ATTR_POLICY_RESOURCES, ATTR_POLICY_CATEGORY));
searchParams.setAttributes(attributes);
}

List<AtlasEntityHeader> headers = discovery.directIndexSearch(searchParams).getEntities();

if (CollectionUtils.isNotEmpty(headers)) {
ret.addAll(headers);
} else {
hasMore = false;
}

from += size;

} while (hasMore);

return ret;
}

protected void isAuthorized(AtlasEntityHeader sourceDomain, AtlasEntityHeader targetDomain) throws AtlasBaseException {

// source -> CREATE + UPDATE + DELETE
AtlasAuthorizationUtils.verifyAccess(new AtlasEntityAccessRequest(typeRegistry, AtlasPrivilege.ENTITY_CREATE, sourceDomain),
"create on source Domain: ", sourceDomain.getAttribute(NAME));

AtlasAuthorizationUtils.verifyAccess(new AtlasEntityAccessRequest(typeRegistry, AtlasPrivilege.ENTITY_UPDATE, sourceDomain),
"update on source Domain: ", sourceDomain.getAttribute(NAME));

AtlasAuthorizationUtils.verifyAccess(new AtlasEntityAccessRequest(typeRegistry, AtlasPrivilege.ENTITY_DELETE, sourceDomain),
"delete on source Domain: ", sourceDomain.getAttribute(NAME));


// target -> CREATE + UPDATE + DELETE
AtlasAuthorizationUtils.verifyAccess(new AtlasEntityAccessRequest(typeRegistry, AtlasPrivilege.ENTITY_CREATE, targetDomain),
"create on target Domain: ", targetDomain.getAttribute(NAME));

AtlasAuthorizationUtils.verifyAccess(new AtlasEntityAccessRequest(typeRegistry, AtlasPrivilege.ENTITY_UPDATE, targetDomain),
"update on target Domain: ", targetDomain.getAttribute(NAME));

AtlasAuthorizationUtils.verifyAccess(new AtlasEntityAccessRequest(typeRegistry, AtlasPrivilege.ENTITY_DELETE, targetDomain),
"delete on target Domain: ", targetDomain.getAttribute(NAME));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,178 @@
package org.apache.atlas.repository.store.graph.v2.preprocessor.datamesh;

import org.apache.atlas.AtlasErrorCode;
import org.apache.atlas.RequestContext;
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.model.instance.*;
import org.apache.atlas.repository.graphdb.AtlasGraph;
import org.apache.atlas.repository.graphdb.AtlasVertex;
import org.apache.atlas.repository.store.graph.v2.EntityGraphMapper;
import org.apache.atlas.repository.store.graph.v2.EntityGraphRetriever;
import org.apache.atlas.repository.store.graph.v2.EntityMutationContext;
import org.apache.atlas.type.AtlasEntityType;
import org.apache.atlas.type.AtlasTypeRegistry;
import org.apache.atlas.utils.AtlasPerfMetrics;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.collections.MapUtils;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.*;

import static org.apache.atlas.repository.Constants.*;
import static org.apache.atlas.repository.store.graph.v2.preprocessor.PreProcessorUtils.*;
import static org.apache.atlas.repository.util.AccessControlUtils.ATTR_POLICY_RESOURCES;
import static org.apache.atlas.repository.util.AtlasEntityUtils.mapOf;

public class DataProductPreProcessor extends AbstractDomainPreProcessor {
private static final Logger LOG = LoggerFactory.getLogger(DomainPreProcessor.class);
private AtlasEntityHeader parentDomain;
private EntityMutationContext context;
public DataProductPreProcessor(AtlasTypeRegistry typeRegistry, EntityGraphRetriever entityRetriever,
AtlasGraph graph, EntityGraphMapper entityGraphMapper) {
super(typeRegistry, entityRetriever, graph);
}

@Override
public void processAttributes(AtlasStruct entityStruct, EntityMutationContext context,
EntityMutations.EntityOperation operation) throws AtlasBaseException {
//Handle name & qualifiedName
if (operation == EntityMutations.EntityOperation.CREATE && LOG.isDebugEnabled()) {
LOG.debug("DataProductPreProcessor.processAttributes: pre processing {}, {}",
entityStruct.getAttribute(QUALIFIED_NAME), operation);
}
this.context = context;

AtlasEntity entity = (AtlasEntity) entityStruct;
AtlasVertex vertex = context.getVertex(entity.getGuid());

setParent(entity, context);

if (operation == EntityMutations.EntityOperation.CREATE) {
processCreateProduct(entity, vertex);
}
}

private void processCreateProduct(AtlasEntity entity, AtlasVertex vertex) throws AtlasBaseException {
AtlasPerfMetrics.MetricRecorder metricRecorder = RequestContext.get().startMetricRecord("processCreateDomain");
String domainName = (String) entity.getAttribute(NAME);
String parentDomainQualifiedName = (String) entity.getAttribute(PARENT_DOMAIN_QN);

productExists(domainName, parentDomainQualifiedName);
String newQualifiedName = createQualifiedName(parentDomainQualifiedName);
if(!newQualifiedName.isEmpty()){
entity.setAttribute(QUALIFIED_NAME, newQualifiedName);
}
else{
throw new AtlasBaseException(AtlasErrorCode.BAD_REQUEST, "Parent Domain Qualified Name is empty");
}

RequestContext.get().endMetricRecord(metricRecorder);
}

public static String createQualifiedName(String parentDomainQualifiedName) {
if (StringUtils.isNotEmpty(parentDomainQualifiedName) && parentDomainQualifiedName !=null) {
return parentDomainQualifiedName + "/product/" + getUUID();
}
else{
return "";
}
}

private void setParent(AtlasEntity entity, EntityMutationContext context) throws AtlasBaseException {
AtlasPerfMetrics.MetricRecorder metricRecorder = RequestContext.get().startMetricRecord("DataProductPreProcessor.setParent");
if (parentDomain == null) {
Object relationshipAttribute = entity.getRelationshipAttribute(DATA_DOMAIN);
Set<String> attributes = new HashSet<>(Arrays.asList(QUALIFIED_NAME, SUPER_DOMAIN_QN, PARENT_DOMAIN_QN, "__typeName"));

if(relationshipAttribute instanceof AtlasObjectId){
AtlasObjectId objectId = (AtlasObjectId) relationshipAttribute;
if (objectId != null) {
if (StringUtils.isNotEmpty(objectId.getGuid())) {
AtlasVertex vertex = entityRetriever.getEntityVertex(objectId.getGuid());

if (vertex == null) {
parentDomain = entityRetriever.toAtlasEntityHeader(objectId.getGuid(), attributes);
} else {
parentDomain = entityRetriever.toAtlasEntityHeader(vertex, attributes);
}

} else if (MapUtils.isNotEmpty(objectId.getUniqueAttributes()) &&
StringUtils.isNotEmpty((String) objectId.getUniqueAttributes().get(QUALIFIED_NAME))) {
AtlasVertex parentDomainVertex = entityRetriever.getEntityVertex(objectId);
parentDomain = entityRetriever.toAtlasEntityHeader(parentDomainVertex, attributes);

}
}
}
else if(relationshipAttribute instanceof Map){
Map<String, Object> relationshipMap = (Map<String, Object>) relationshipAttribute;
if (StringUtils.isNotEmpty((String) relationshipMap.get("guid"))) {
AtlasVertex vertex = entityRetriever.getEntityVertex((String) relationshipMap.get("guid"));

if (vertex == null) {
parentDomain = entityRetriever.toAtlasEntityHeader((String) relationshipMap.get("guid"), attributes);
} else {
parentDomain = entityRetriever.toAtlasEntityHeader(vertex, attributes);
}

}
else {
parentDomain = new AtlasEntityHeader((String) relationshipMap.get("typeName"), relationshipMap);

}
}
else{
LOG.warn("DataProductPreProcessor.setParent: Invalid relationshipAttribute {}", relationshipAttribute);
}

}
RequestContext.get().endMetricRecord(metricRecorder);
}

private void productExists(String productName, String parentDomainQualifiedName) throws AtlasBaseException {
AtlasPerfMetrics.MetricRecorder metricRecorder = RequestContext.get().startMetricRecord("domainExists");

boolean exists = false;
try {
List mustClauseList = new ArrayList();
mustClauseList.add(mapOf("term", mapOf("__typeName.keyword", DATA_PRODUCT_ENTITY_TYPE)));
mustClauseList.add(mapOf("term", mapOf("__state", "ACTIVE")));
mustClauseList.add(mapOf("term", mapOf("name.keyword", productName)));


Map<String, Object> bool = new HashMap<>();
if (parentDomain != null) {
mustClauseList.add(mapOf("term", mapOf("parentDomainQualifiedName", parentDomainQualifiedName)));
} else {
List mustNotClauseList = new ArrayList();
mustNotClauseList.add(mapOf("exists", mapOf("field", "parentDomainQualifiedName")));
bool.put("must_not", mustNotClauseList);
}

bool.put("must", mustClauseList);

Map<String, Object> dsl = mapOf("query", mapOf("bool", bool));

List<AtlasEntityHeader> products = indexSearchPaginated(dsl, DATA_PRODUCT_ENTITY_TYPE);

if (CollectionUtils.isNotEmpty(products)) {
for (AtlasEntityHeader product : products) {
String name = (String) product.getAttribute(NAME);
if (productName.equals(name)) {
exists = true;
break;
}
}
}
} finally {
RequestContext.get().endMetricRecord(metricRecorder);
}

if (exists) {
throw new AtlasBaseException(AtlasErrorCode.BAD_REQUEST, productName+" already exists in the domain");
}
}

}
Loading

0 comments on commit a1e11fe

Please sign in to comment.