Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

cassandra marker logic #3892

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/maven.yml
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ on:
- development
- master
- lineageondemand
- janus0.6
- cassandraPhase2

jobs:
build:
Expand Down
5 changes: 0 additions & 5 deletions intg/src/main/java/org/apache/atlas/AtlasConfiguration.java
Original file line number Diff line number Diff line change
Expand Up @@ -118,11 +118,6 @@ public enum AtlasConfiguration {
ATLAS_INDEXSEARCH_LIMIT_UTM_TAGS("atlas.indexsearch.limit.ignore.utm.tags", ""),
ATLAS_INDEXSEARCH_ENABLE_API_LIMIT("atlas.indexsearch.enable.api.limit", false),
ATLAS_INDEXSEARCH_ENABLE_JANUS_OPTIMISATION("atlas.indexsearch.enable.janus.optimization", false),
/***
* This configuration is used to enable fetching non primitive attributes in index search
*/
ATLAS_INDEXSEARCH_ENABLE_FETCHING_NON_PRIMITIVE_ATTRIBUTES("atlas.indexsearch.enable.fetching.non.primitive.attributes", true),

ATLAS_MAINTENANCE_MODE("atlas.maintenance.mode", false),

DELTA_BASED_REFRESH_ENABLED("atlas.authorizer.enable.delta_based_refresh", false),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.atlas.model.instance.AtlasEntityHeader;
import org.apache.atlas.model.instance.AtlasObjectId;
import org.apache.atlas.model.instance.AtlasRelationship;
import org.apache.atlas.repository.graphdb.janus.AtlasJanusEdge;
import org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2;
import org.apache.atlas.repository.store.graph.v2.TransactionInterceptHelper;
import org.apache.atlas.type.AtlasArrayType;
Expand Down Expand Up @@ -2056,4 +2057,26 @@ private static Set<String> parseLabelsString(String labels) {

return ret;
}

/**
* Get all the active edges
* @param vertex entity vertex
* @return Iterator of children edges
*/
public static Iterator<AtlasJanusEdge> getOnlyActiveEdges(AtlasVertex vertex, AtlasEdgeDirection direction) throws AtlasBaseException {
AtlasPerfMetrics.MetricRecorder metricRecorder = RequestContext.get().startMetricRecord("GraphHelper.getOnlyActiveEdges");

try {
return vertex.query()
.direction(direction)
.has(STATE_PROPERTY_KEY, ACTIVE_STATE_VALUE)
.edges()
.iterator();
} catch (Exception e) {
throw new AtlasBaseException(AtlasErrorCode.INTERNAL_ERROR, e);
}
finally {
RequestContext.get().endMetricRecord(metricRecorder);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import org.apache.atlas.repository.graphdb.AtlasGraph;
import org.apache.atlas.repository.graphdb.AtlasVertex;
import org.apache.atlas.repository.graphdb.janus.AtlasJanusEdge;
import org.apache.atlas.repository.graphdb.janus.AtlasJanusVertex;
import org.apache.atlas.repository.util.AccessControlUtils;
import org.apache.atlas.type.AtlasArrayType;
import org.apache.atlas.type.AtlasBuiltInTypes.AtlasObjectIdType;
Expand Down Expand Up @@ -82,24 +83,10 @@
import javax.inject.Inject;
import java.math.BigDecimal;
import java.math.BigInteger;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Queue;
import java.util.Set;
import java.util.*;
import java.util.concurrent.*;
import java.util.stream.Collectors;

import static org.apache.atlas.AtlasConfiguration.ATLAS_INDEXSEARCH_ENABLE_FETCHING_NON_PRIMITIVE_ATTRIBUTES;
import static org.apache.atlas.glossary.GlossaryUtils.TERM_ASSIGNMENT_ATTR_CONFIDENCE;
import static org.apache.atlas.glossary.GlossaryUtils.TERM_ASSIGNMENT_ATTR_CREATED_BY;
import static org.apache.atlas.glossary.GlossaryUtils.TERM_ASSIGNMENT_ATTR_DESCRIPTION;
Expand Down Expand Up @@ -312,16 +299,12 @@ public AtlasObjectId toAtlasObjectId(AtlasVertex entityVertex) throws AtlasBaseE

Map<String, Object> attributes = new HashMap<>();
Set<String> relationAttributes = RequestContext.get().getRelationAttrsForSearch();

// preloadProperties here
if (CollectionUtils.isNotEmpty(relationAttributes)) {
Map<String, Object> referenceVertexProperties = preloadProperties(entityVertex, entityType, Collections.emptySet());

for (String attributeName : relationAttributes) {
AtlasAttribute attribute = entityType.getAttribute(attributeName);
if (attribute != null
&& !uniqueAttributes.containsKey(attributeName)) {
Object attrValue = getVertexAttributePreFetchCache(entityVertex, attribute, referenceVertexProperties);
Object attrValue = getVertexAttribute(entityVertex, attribute);
if (attrValue != null) {
attributes.put(attribute.getName(), attrValue);
}
Expand Down Expand Up @@ -1019,18 +1002,20 @@ private AtlasEntityHeader mapVertexToAtlasEntityHeader(AtlasVertex entityVertex)
return mapVertexToAtlasEntityHeader(entityVertex, Collections.<String>emptySet());
}

private Map<String, Object> preloadProperties(AtlasVertex entityVertex, AtlasEntityType entityType, Set<String> attributes) {
private Map<String, Object> preloadProperties(AtlasVertex entityVertex, AtlasEntityType entityType, Set<String> attributes) throws AtlasBaseException {
Map<String, Object> propertiesMap = new HashMap<>();

// Execute the traversal to fetch properties
GraphTraversal<Vertex, VertexProperty<Object>> traversal = graph.V(entityVertex.getId()).properties();
Iterator<VertexProperty<Object>> traversal = ((AtlasJanusVertex)entityVertex).getWrappedElement().properties();
// Fetch edges in both directions
retrieveEdgeLabels(entityVertex, AtlasEdgeDirection.BOTH, attributes, propertiesMap);

// Iterate through the resulting VertexProperty objects
while (traversal.hasNext()) {
try {
VertexProperty<Object> property = traversal.next();

AtlasAttribute attribute = entityType.getAttribute(property.key());
AtlasAttribute attribute = entityType.getAttribute(property.key()) != null ? entityType.getAttribute(property.key()) : null;
TypeCategory typeCategory = attribute != null ? attribute.getAttributeType().getTypeCategory() : null;
TypeCategory elementTypeCategory = attribute != null && attribute.getAttributeType().getTypeCategory() == TypeCategory.ARRAY ? ((AtlasArrayType) attribute.getAttributeType()).getElementType().getTypeCategory() : null;

Expand All @@ -1049,13 +1034,33 @@ private Map<String, Object> preloadProperties(AtlasVertex entityVertex, AtlasEnt
}
}
} catch (RuntimeException e) {
LOG.error("Error preloading properties for entity vertex: {}", entityVertex, e);
LOG.error("Error preloading properties for entity vertex: {}", entityVertex.getId(), e);
throw e; // Re-throw the exception after logging it
}
}
return propertiesMap;
}

private void retrieveEdgeLabels(AtlasVertex entityVertex, AtlasEdgeDirection edgeDirection, Set<String> attributes, Map<String, Object> propertiesMap) throws AtlasBaseException {
Iterator<AtlasJanusEdge> edges = GraphHelper.getOnlyActiveEdges(entityVertex, edgeDirection);


List<String> edgeLabelsDebug = new ArrayList<>();
while (edges.hasNext()) {
AtlasJanusEdge edge = edges.next();
edgeLabelsDebug.add(edge.getLabel());
}

Set<String> edgeLabels =
edgeLabelsDebug.stream()
.map(edgeLabel -> {
Optional<String> matchingAttrOpt = attributes.stream().filter(ele -> (edgeLabel.contains(ele))).findFirst();
return matchingAttrOpt.orElse(null);
}).filter(Objects::nonNull)
.collect(Collectors.toSet());

edgeLabels.stream().forEach(e -> propertiesMap.put(e, StringUtils.SPACE));
}
private void updateAttrValue( Map<String, Object> propertiesMap, VertexProperty<Object> property){
Object value = propertiesMap.get(property.key());
if (value instanceof List) {
Expand Down Expand Up @@ -1906,7 +1911,6 @@ public Object getVertexAttributePreFetchCache(AtlasVertex vertex, AtlasAttribute
return null;
}


TypeCategory typeCategory = attribute.getAttributeType().getTypeCategory();
TypeCategory elementTypeCategory = typeCategory == TypeCategory.ARRAY ? ((AtlasArrayType) attribute.getAttributeType()).getElementType().getTypeCategory() : null;
boolean isArrayOfPrimitives = typeCategory.equals(TypeCategory.ARRAY) && elementTypeCategory.equals(TypeCategory.PRIMITIVE);
Expand All @@ -1918,8 +1922,8 @@ public Object getVertexAttributePreFetchCache(AtlasVertex vertex, AtlasAttribute
return properties.get(attribute.getName());
}

// if value is empty && element is array of primitives, return empty list
if (properties.get(attribute.getName()) == null && isArrayOfPrimitives) {
// if value is empty && element is array and not inward relation, return empty list
if (properties.get(attribute.getName()) == null && typeCategory.equals(TypeCategory.ARRAY) && !AtlasRelationshipEdgeDirection.IN.equals(attribute.getRelationshipEdgeDirection())) {
return new ArrayList<>();
}

Expand All @@ -1928,8 +1932,9 @@ public Object getVertexAttributePreFetchCache(AtlasVertex vertex, AtlasAttribute
return null;
}

// value is present as marker, fetch the value from the vertex
if (ATLAS_INDEXSEARCH_ENABLE_FETCHING_NON_PRIMITIVE_ATTRIBUTES.getBoolean()) {
// value is present as marker or is inward/outward relation, fetch the value from the vertex
if (properties.get(attribute.getName()) == StringUtils.SPACE || AtlasRelationshipEdgeDirection.IN.equals(attribute.getRelationshipEdgeDirection())
|| AtlasRelationshipEdgeDirection.OUT.equals(attribute.getRelationshipEdgeDirection())) {
return mapVertexToAttribute(vertex, attribute, null, false);
}

Expand Down
Loading