Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[master]:- DG 1298 Generating unique QN for Product and Domains. #2932

Closed
wants to merge 11 commits into from
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,6 @@

import java.util.*;

import static org.apache.atlas.repository.Constants.*;
import static org.apache.atlas.repository.util.AccessControlUtils.ATTR_POLICY_CATEGORY;
import static org.apache.atlas.repository.util.AccessControlUtils.ATTR_POLICY_RESOURCES;
import static org.apache.atlas.repository.util.AtlasEntityUtils.mapOf;

public abstract class AbstractDomainPreProcessor implements PreProcessor {
Expand All @@ -57,7 +54,7 @@ public abstract class AbstractDomainPreProcessor implements PreProcessor {
}
}

public List<AtlasEntityHeader> indexSearchPaginated(Map<String, Object> dsl, String entityType) throws AtlasBaseException {
public List<AtlasEntityHeader> indexSearchPaginated(Map<String, Object> dsl) throws AtlasBaseException {
PRATHAM2002-DS marked this conversation as resolved.
Show resolved Hide resolved
IndexSearchParams searchParams = new IndexSearchParams();
List<AtlasEntityHeader> ret = new ArrayList<>();

Expand All @@ -74,11 +71,6 @@ public List<AtlasEntityHeader> indexSearchPaginated(Map<String, Object> dsl, Str
dsl.put("size", size);
searchParams.setDsl(dsl);

if (entityType.equals(POLICY_ENTITY_TYPE)) {
Set<String> attributes = new HashSet<>(Arrays.asList(ATTR_POLICY_RESOURCES, ATTR_POLICY_CATEGORY));
searchParams.setAttributes(attributes);
}

List<AtlasEntityHeader> headers = discovery.directIndexSearch(searchParams).getEntities();

if (CollectionUtils.isNotEmpty(headers)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,6 @@ public void processAttributes(AtlasStruct entityStruct, EntityMutationContext co
AtlasEntity entity = (AtlasEntity) entityStruct;
AtlasVertex vertex = context.getVertex(entity.getGuid());

setParent(entity, context);

switch (operation) {
case CREATE:
processCreateProduct(entity, vertex);
Expand All @@ -73,6 +71,13 @@ private void processCreateProduct(AtlasEntity entity, AtlasVertex vertex) throws
private void processUpdateProduct(AtlasEntity entity, AtlasVertex vertex) throws AtlasBaseException {
AtlasPerfMetrics.MetricRecorder metricRecorder = RequestContext.get().startMetricRecord("processUpdateProduct");
String VertexQName = vertex.getProperty(QUALIFIED_NAME, String.class);
String productName = (String) entity.getAttribute(NAME);
String parentDomainQualifiedName = (String) entity.getAttribute(PARENT_DOMAIN_QN);
PRATHAM2002-DS marked this conversation as resolved.
Show resolved Hide resolved
String productVertexName = vertex.getProperty(NAME, String.class);

if (!productVertexName.equals(productName)) {
productExists(productName, parentDomainQualifiedName);
}

entity.setAttribute(QUALIFIED_NAME, VertexQName);
RequestContext.get().endMetricRecord(metricRecorder);
Expand All @@ -86,57 +91,6 @@ private static String createQualifiedName(String parentDomainQualifiedName) thro

}

private void setParent(AtlasEntity entity, EntityMutationContext context) throws AtlasBaseException {
AtlasPerfMetrics.MetricRecorder metricRecorder = RequestContext.get().startMetricRecord("DataProductPreProcessor.setParent");
if (parentDomain == null) {
Object relationshipAttribute = entity.getRelationshipAttribute(DATA_DOMAIN);
Set<String> attributes = new HashSet<>(Arrays.asList(QUALIFIED_NAME, SUPER_DOMAIN_QN, PARENT_DOMAIN_QN, "__typeName"));

if(relationshipAttribute instanceof AtlasObjectId){
AtlasObjectId objectId = (AtlasObjectId) relationshipAttribute;
if (objectId != null) {
if (StringUtils.isNotEmpty(objectId.getGuid())) {
AtlasVertex vertex = entityRetriever.getEntityVertex(objectId.getGuid());

if (vertex == null) {
parentDomain = entityRetriever.toAtlasEntityHeader(objectId.getGuid(), attributes);
} else {
parentDomain = entityRetriever.toAtlasEntityHeader(vertex, attributes);
}

} else if (MapUtils.isNotEmpty(objectId.getUniqueAttributes()) &&
StringUtils.isNotEmpty((String) objectId.getUniqueAttributes().get(QUALIFIED_NAME))) {
AtlasVertex parentDomainVertex = entityRetriever.getEntityVertex(objectId);
parentDomain = entityRetriever.toAtlasEntityHeader(parentDomainVertex, attributes);

}
}
}
else if(relationshipAttribute instanceof Map){
Map<String, Object> relationshipMap = (Map<String, Object>) relationshipAttribute;
if (StringUtils.isNotEmpty((String) relationshipMap.get("guid"))) {
AtlasVertex vertex = entityRetriever.getEntityVertex((String) relationshipMap.get("guid"));

if (vertex == null) {
parentDomain = entityRetriever.toAtlasEntityHeader((String) relationshipMap.get("guid"), attributes);
} else {
parentDomain = entityRetriever.toAtlasEntityHeader(vertex, attributes);
}

}
else {
parentDomain = new AtlasEntityHeader((String) relationshipMap.get("typeName"), relationshipMap);

}
}
else{
LOG.warn("DataProductPreProcessor.setParent: Invalid relationshipAttribute {}", relationshipAttribute);
}

}
RequestContext.get().endMetricRecord(metricRecorder);
}

private void productExists(String productName, String parentDomainQualifiedName) throws AtlasBaseException {
AtlasPerfMetrics.MetricRecorder metricRecorder = RequestContext.get().startMetricRecord("domainExists");
PRATHAM2002-DS marked this conversation as resolved.
Show resolved Hide resolved

Expand All @@ -161,7 +115,7 @@ private void productExists(String productName, String parentDomainQualifiedName)

Map<String, Object> dsl = mapOf("query", mapOf("bool", bool));

List<AtlasEntityHeader> products = indexSearchPaginated(dsl, DATA_PRODUCT_ENTITY_TYPE);
List<AtlasEntityHeader> products = indexSearchPaginated(dsl);

if (CollectionUtils.isNotEmpty(products)) {
for (AtlasEntityHeader product : products) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,6 @@ public void processAttributes(AtlasStruct entityStruct, EntityMutationContext co
AtlasEntity entity = (AtlasEntity) entityStruct;
AtlasVertex vertex = context.getVertex(entity.getGuid());

setParent(entity, context);

switch (operation) {
case CREATE:
processCreateDomain(entity, vertex);
Expand Down Expand Up @@ -107,34 +105,16 @@ private static String createQualifiedName(String parentDomainQualifiedName) {
private void processUpdateDomain(AtlasEntity entity, AtlasVertex vertex) throws AtlasBaseException {
PRATHAM2002-DS marked this conversation as resolved.
Show resolved Hide resolved
AtlasPerfMetrics.MetricRecorder metricRecorder = RequestContext.get().startMetricRecord("processUpdateDomain");
String vertexQName = vertex.getProperty(QUALIFIED_NAME, String.class);
String parentDomainQualifiedName = (String) entity.getAttribute(PARENT_DOMAIN_QN);
PRATHAM2002-DS marked this conversation as resolved.
Show resolved Hide resolved
String domainName = (String) entity.getAttribute(NAME);
String domainVertexName = vertex.getProperty(NAME, String.class);

entity.setAttribute(QUALIFIED_NAME, vertexQName);

RequestContext.get().endMetricRecord(metricRecorder);
}

private void setParent(AtlasEntity entity, EntityMutationContext context) throws AtlasBaseException {
AtlasPerfMetrics.MetricRecorder metricRecorder = RequestContext.get().startMetricRecord("DomainPreProcessor.setParent");
if (parentDomain == null) {
AtlasObjectId objectId = (AtlasObjectId) entity.getRelationshipAttribute(PARENT_DOMAIN);
Set<String> attributes = new HashSet<>(Arrays.asList(QUALIFIED_NAME, SUPER_DOMAIN_QN, PARENT_DOMAIN_QN, "__typeName"));
if (!domainVertexName.equals(domainName)) {
domainExists(domainName, parentDomainQualifiedName);
}

if (objectId != null) {
if (StringUtils.isNotEmpty(objectId.getGuid())) {
AtlasVertex vertex = entityRetriever.getEntityVertex(objectId.getGuid());
entity.setAttribute(QUALIFIED_NAME, vertexQName);

if (vertex == null) {
parentDomain = entityRetriever.toAtlasEntityHeader(objectId.getGuid(), attributes);
} else {
parentDomain = entityRetriever.toAtlasEntityHeader(vertex, attributes);
}
} else if (MapUtils.isNotEmpty(objectId.getUniqueAttributes()) &&
StringUtils.isNotEmpty((String) objectId.getUniqueAttributes().get(QUALIFIED_NAME))) {
AtlasVertex parentDomainVertex = entityRetriever.getEntityVertex(objectId);
parentDomain = entityRetriever.toAtlasEntityHeader(parentDomainVertex, attributes);
}
}
}
RequestContext.get().endMetricRecord(metricRecorder);
}

Expand All @@ -150,7 +130,7 @@ private void domainExists(String domainName, String parentDomainQualifiedName) t


Map<String, Object> bool = new HashMap<>();
if (parentDomain != null) {
if (StringUtils.isNotEmpty(parentDomainQualifiedName)) {
mustClauseList.add(mapOf("term", mapOf("parentDomainQualifiedName", parentDomainQualifiedName)));
} else {
List mustNotClauseList = new ArrayList();
PRATHAM2002-DS marked this conversation as resolved.
Show resolved Hide resolved
Expand All @@ -162,7 +142,7 @@ private void domainExists(String domainName, String parentDomainQualifiedName) t

Map<String, Object> dsl = mapOf("query", mapOf("bool", bool));

List<AtlasEntityHeader> domains = indexSearchPaginated(dsl, DATA_DOMAIN_ENTITY_TYPE);
List<AtlasEntityHeader> domains = indexSearchPaginated(dsl);

if (CollectionUtils.isNotEmpty(domains)) {
for (AtlasEntityHeader domain : domains) {
Expand Down
Loading