Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PLT-2782 | Janus0.6 #3852

Merged
merged 20 commits into from
Dec 10, 2024
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions intg/src/main/java/org/apache/atlas/AtlasConfiguration.java
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,11 @@ public enum AtlasConfiguration {
ATLAS_INDEXSEARCH_QUERY_SIZE_MAX_LIMIT("atlas.indexsearch.query.size.max.limit", 100000),
ATLAS_INDEXSEARCH_LIMIT_UTM_TAGS("atlas.indexsearch.limit.ignore.utm.tags", ""),
ATLAS_INDEXSEARCH_ENABLE_API_LIMIT("atlas.indexsearch.enable.api.limit", false),
ATLAS_INDEXSEARCH_ENABLE_JANUS_OPTIMISATION("atlas.indexsearch.enable.janus.optimization", false),
/***
* This configuration is used to enable fetching non primitive attributes in index search
*/
ATLAS_INDEXSEARCH_ENABLE_FETCHING_NON_PRIMITIVE_ATTRIBUTES("atlas.indexsearch.enable.fetching.non.primitive.attributes", true),

ATLAS_MAINTENANCE_MODE("atlas.maintenance.mode", false),

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@
import org.apache.atlas.repository.graphdb.AtlasElement;
import org.apache.atlas.repository.graphdb.AtlasGraph;
import org.apache.atlas.repository.graphdb.AtlasVertex;
import org.apache.atlas.repository.graphdb.janus.AtlasJanusEdge;
import org.apache.atlas.repository.util.AccessControlUtils;
import org.apache.atlas.type.AtlasArrayType;
import org.apache.atlas.type.AtlasBuiltInTypes.AtlasObjectIdType;
import org.apache.atlas.type.AtlasEntityType;
Expand All @@ -65,8 +67,12 @@
import org.apache.atlas.utils.AtlasPerfMetrics;
import org.apache.atlas.v1.model.instance.Id;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.collections.IteratorUtils;
import org.apache.commons.collections.MapUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversal;
import org.apache.tinkerpop.gremlin.structure.Vertex;
import org.apache.tinkerpop.gremlin.structure.VertexProperty;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
Expand All @@ -88,12 +94,10 @@
import java.util.Objects;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.*;
import java.util.stream.Collectors;

import static org.apache.atlas.AtlasConfiguration.ATLAS_INDEXSEARCH_ENABLE_FETCHING_NON_PRIMITIVE_ATTRIBUTES;
import static org.apache.atlas.glossary.GlossaryUtils.TERM_ASSIGNMENT_ATTR_CONFIDENCE;
import static org.apache.atlas.glossary.GlossaryUtils.TERM_ASSIGNMENT_ATTR_CREATED_BY;
import static org.apache.atlas.glossary.GlossaryUtils.TERM_ASSIGNMENT_ATTR_DESCRIPTION;
Expand Down Expand Up @@ -306,12 +310,16 @@ public AtlasObjectId toAtlasObjectId(AtlasVertex entityVertex) throws AtlasBaseE

Map<String, Object> attributes = new HashMap<>();
Set<String> relationAttributes = RequestContext.get().getRelationAttrsForSearch();

// preloadProperties here
if (CollectionUtils.isNotEmpty(relationAttributes)) {
Map<String, Object> referenceVertexProperties = preloadProperties(entityVertex, entityType, Collections.emptySet());

for (String attributeName : relationAttributes) {
AtlasAttribute attribute = entityType.getAttribute(attributeName);
if (attribute != null
&& !uniqueAttributes.containsKey(attributeName)) {
Object attrValue = getVertexAttribute(entityVertex, attribute);
Object attrValue = getVertexAttributePreFetchCache(entityVertex, attribute, referenceVertexProperties);
if (attrValue != null) {
attributes.put(attribute.getName(), attrValue);
}
Expand Down Expand Up @@ -1009,7 +1017,83 @@ private AtlasEntityHeader mapVertexToAtlasEntityHeader(AtlasVertex entityVertex)
return mapVertexToAtlasEntityHeader(entityVertex, Collections.<String>emptySet());
}

private Map<String, Object> preloadProperties(AtlasVertex entityVertex, AtlasEntityType entityType, Set<String> attributes) {
Map<String, Object> propertiesMap = new HashMap<>();

// Execute the traversal to fetch properties
GraphTraversal<Vertex, VertexProperty<Object>> traversal = graph.V(entityVertex.getId()).properties();

// Iterate through the resulting VertexProperty objects
while (traversal.hasNext()) {
try {
VertexProperty<Object> property = traversal.next();

AtlasAttribute attribute = entityType.getAttribute(property.key());
TypeCategory typeCategory = attribute != null ? attribute.getAttributeType().getTypeCategory() : null;
TypeCategory elementTypeCategory = attribute != null && attribute.getAttributeType().getTypeCategory() == TypeCategory.ARRAY ? ((AtlasArrayType) attribute.getAttributeType()).getElementType().getTypeCategory() : null;
boolean isBusinessAttribute = attribute == null;

if (property.isPresent()) {
if (typeCategory == TypeCategory.ARRAY && elementTypeCategory == TypeCategory.PRIMITIVE) {
Object value = propertiesMap.get(property.key());
if (value instanceof List) {
((List) value).add(property.value());
} else {
List<Object> values = new ArrayList<>();
values.add(property.value());
propertiesMap.put(property.key(), values);
}
} else {
if (propertiesMap.get(property.key()) == null) {
propertiesMap.put(property.key(), property.value());
} else if (isBusinessAttribute) { // If it is a business attribute, and is a multi-valued attribute
LOG.warn("Duplicate property key {} found for entity vertex: {}", property.key(), entityVertex);
List<Object> values = new ArrayList<>();
values.add(propertiesMap.get(property.key()));
values.add(property.value());
propertiesMap.put(property.key(), values);
}
}
}
} catch (RuntimeException e) {
LOG.error("Error preloading properties for entity vertex: {}", entityVertex, e);
throw e; // Re-throw the exception after logging it
}
}
return propertiesMap;
}


private boolean isPolicyAttribute(Set<String> attributes) {
Set<String> exclusionSet = new HashSet<>(Arrays.asList(AccessControlUtils.ATTR_POLICY_TYPE,
AccessControlUtils.ATTR_POLICY_USERS,
AccessControlUtils.ATTR_POLICY_GROUPS,
AccessControlUtils.ATTR_POLICY_ROLES,
AccessControlUtils.ATTR_POLICY_ACTIONS,
AccessControlUtils.ATTR_POLICY_CATEGORY,
AccessControlUtils.ATTR_POLICY_SUB_CATEGORY,
AccessControlUtils.ATTR_POLICY_RESOURCES,
AccessControlUtils.ATTR_POLICY_IS_ENABLED,
AccessControlUtils.ATTR_POLICY_RESOURCES_CATEGORY,
AccessControlUtils.ATTR_POLICY_SERVICE_NAME,
AccessControlUtils.ATTR_POLICY_PRIORITY,
AccessControlUtils.REL_ATTR_POLICIES));

return exclusionSet.stream().anyMatch(attributes::contains);
}

private AtlasEntityHeader mapVertexToAtlasEntityHeader(AtlasVertex entityVertex, Set<String> attributes) throws AtlasBaseException {
boolean shouldPrefetch = !isPolicyAttribute(attributes)
&& AtlasConfiguration.ATLAS_INDEXSEARCH_ENABLE_JANUS_OPTIMISATION.getBoolean();

if (shouldPrefetch) {
return mapVertexToAtlasEntityHeaderWithPrefetch(entityVertex, attributes);
} else {
return mapVertexToAtlasEntityHeaderWithoutPrefetch(entityVertex, attributes);
}
}

private AtlasEntityHeader mapVertexToAtlasEntityHeaderWithoutPrefetch(AtlasVertex entityVertex, Set<String> attributes) throws AtlasBaseException {
AtlasPerfMetrics.MetricRecorder metricRecorder = RequestContext.get().startMetricRecord("mapVertexToAtlasEntityHeader");
AtlasEntityHeader ret = new AtlasEntityHeader();
try {
Expand Down Expand Up @@ -1078,6 +1162,7 @@ private AtlasEntityHeader mapVertexToAtlasEntityHeader(AtlasVertex entityVertex,
}
}


Object attrValue = getVertexAttribute(entityVertex, attribute);

if (attrValue != null) {
Expand All @@ -1093,6 +1178,105 @@ private AtlasEntityHeader mapVertexToAtlasEntityHeader(AtlasVertex entityVertex,
return ret;
}

private AtlasEntityHeader mapVertexToAtlasEntityHeaderWithPrefetch(AtlasVertex entityVertex, Set<String> attributes) throws AtlasBaseException {
AtlasPerfMetrics.MetricRecorder metricRecorder = RequestContext.get().startMetricRecord("mapVertexToAtlasEntityHeaderWithPrefetch");
AtlasEntityHeader ret = new AtlasEntityHeader();
try {
//pre-fetching the properties
String typeName = entityVertex.getProperty(Constants.TYPE_NAME_PROPERTY_KEY, String.class); //properties.get returns null
AtlasEntityType entityType = typeRegistry.getEntityTypeByName(typeName); // this is not costly
Map<String, Object> properties = preloadProperties(entityVertex, entityType, attributes);

String guid = (String) properties.get(Constants.GUID_PROPERTY_KEY);

Integer value = (Integer)properties.get(Constants.IS_INCOMPLETE_PROPERTY_KEY);
Boolean isIncomplete = value != null && value.equals(INCOMPLETE_ENTITY_VALUE) ? Boolean.TRUE : Boolean.FALSE;

ret.setTypeName(typeName);
ret.setGuid(guid);

String state = (String)properties.get(Constants.STATE_PROPERTY_KEY);
Id.EntityState entityState = state == null ? null : Id.EntityState.valueOf(state);
ret.setStatus((entityState == Id.EntityState.DELETED) ? AtlasEntity.Status.DELETED : AtlasEntity.Status.ACTIVE);

RequestContext context = RequestContext.get();
boolean includeClassifications = context.includeClassifications();
boolean includeClassificationNames = context.isIncludeClassificationNames();
if(includeClassifications){
ret.setClassificationNames(getAllTraitNamesFromAttribute(entityVertex));
} else if (!includeClassifications && includeClassificationNames) {
ret.setClassificationNames(getAllTraitNamesFromAttribute(entityVertex));
}
ret.setIsIncomplete(isIncomplete);
ret.setLabels(getLabels(entityVertex));

ret.setCreatedBy(properties.get(CREATED_BY_KEY) != null ? (String) properties.get(CREATED_BY_KEY) : null);
ret.setUpdatedBy(properties.get(MODIFIED_BY_KEY) != null ? (String) properties.get(MODIFIED_BY_KEY) : null);
ret.setCreateTime(properties.get(TIMESTAMP_PROPERTY_KEY) != null ? new Date((Long)properties.get(TIMESTAMP_PROPERTY_KEY)) : null);
ret.setUpdateTime(properties.get(MODIFICATION_TIMESTAMP_PROPERTY_KEY) != null ? new Date((Long)properties.get(MODIFICATION_TIMESTAMP_PROPERTY_KEY)) : null);

if(RequestContext.get().includeMeanings()) {
List<AtlasTermAssignmentHeader> termAssignmentHeaders = mapAssignedTerms(entityVertex);
ret.setMeanings(termAssignmentHeaders);
ret.setMeaningNames(
termAssignmentHeaders.stream().map(AtlasTermAssignmentHeader::getDisplayText)
.collect(Collectors.toList()));
}

if (entityType != null) {
for (AtlasAttribute headerAttribute : entityType.getHeaderAttributes().values()) {
Object attrValue = getVertexAttributePreFetchCache(entityVertex, headerAttribute, properties);

if (attrValue != null) {
ret.setAttribute(headerAttribute.getName(), attrValue);
}
}

if(properties.get(NAME) != null){
ret.setDisplayText(properties.get(NAME).toString());
} else if(properties.get(DISPLAY_NAME) != null) {
ret.setDisplayText(properties.get(DISPLAY_NAME).toString());
} else if(properties.get(QUALIFIED_NAME) != null) {
ret.setDisplayText(properties.get(QUALIFIED_NAME).toString());
}



//attributes = only the attributes of entityType
if (CollectionUtils.isNotEmpty(attributes)) {
for (String attrName : attributes) {
AtlasAttribute attribute = entityType.getAttribute(attrName);

if (attribute == null) {
attrName = toNonQualifiedName(attrName);

if (ret.hasAttribute(attrName)) {
continue;
}

attribute = entityType.getAttribute(attrName);

if (attribute == null) {
// dataContractLatest, meanings, links
attribute = entityType.getRelationshipAttribute(attrName, null);
}
}

//this is a call to cassandra
Object attrValue = getVertexAttributePreFetchCache(entityVertex, attribute, properties); //use prefetch cache

if (attrValue != null) {
ret.setAttribute(attrName, attrValue);
}
}
}
}
} finally {
RequestContext.get().endMetricRecord(metricRecorder);
}
return ret;
}

private String toNonQualifiedName(String attrName) {
String ret;
if (attrName.contains(".")) {
Expand Down Expand Up @@ -1709,6 +1893,44 @@ public Object getVertexAttribute(AtlasVertex vertex, AtlasAttribute attribute) t
return vertex != null && attribute != null ? mapVertexToAttribute(vertex, attribute, null, false) : null;
}

public Object getVertexAttributePreFetchCache(AtlasVertex vertex, AtlasAttribute attribute, Map<String, Object> properties) throws AtlasBaseException {
if (vertex == null || attribute == null) {
return null;
}


TypeCategory typeCategory = attribute.getAttributeType().getTypeCategory();
TypeCategory elementTypeCategory = typeCategory == TypeCategory.ARRAY ? ((AtlasArrayType) attribute.getAttributeType()).getElementType().getTypeCategory() : null;
boolean isArrayOfPrimitives = typeCategory.equals(TypeCategory.ARRAY) && elementTypeCategory.equals(TypeCategory.PRIMITIVE);
boolean isPrefetchValueFinal = (typeCategory.equals(TypeCategory.PRIMITIVE) || typeCategory.equals(TypeCategory.ENUM) || typeCategory.equals(TypeCategory.MAP) || isArrayOfPrimitives);

// value is present and value is not marker (SPACE for further lookup) and type is primitive or array of primitives
if (properties.get(attribute.getName()) != null && properties.get(attribute.getName()) != StringUtils.SPACE && isPrefetchValueFinal) {
return properties.get(attribute.getName());
}

//when value is not present and type is primitive, return null
if(properties.get(attribute.getName()) == null && isPrefetchValueFinal) {
return null;
}

// if value is empty && element is array of primitives, return empty list
if (properties.get(attribute.getName()) == null && isArrayOfPrimitives) {
return new ArrayList<>();
}

// value is present as marker, fetch the value from the vertex
if (ATLAS_INDEXSEARCH_ENABLE_FETCHING_NON_PRIMITIVE_ATTRIBUTES.getBoolean()) {
//AtlasPerfMetrics.MetricRecorder nonPrimitiveAttributes = RequestContext.get().startMetricRecord("processNonPrimitiveAttributes");
return mapVertexToAttribute(vertex, attribute, null, false);
//LOG.debug("capturing excluded property set category and value, mapVertexValue - {}: {} : {} : {}", attribute.getName(), attribute.getAttributeType().getTypeCategory(), properties.get(attribute.getName()), mappedVertex);
//RequestContext.get().endMetricRecord(nonPrimitiveAttributes);
//return mappedVertex;
}

return null;
}

private Object getVertexAttributeIgnoreInactive(AtlasVertex vertex, AtlasAttribute attribute) throws AtlasBaseException {
return vertex != null && attribute != null ? mapVertexToAttribute(vertex, attribute, null, false, true, true) : null;
}
Expand Down
Loading