From 4675750b2e531fa647c4255197710a5d9ff3f1e2 Mon Sep 17 00:00:00 2001 From: aarshi Date: Fri, 13 Dec 2024 15:28:22 +0530 Subject: [PATCH] cassandra marker logic --- .github/workflows/maven.yml | 2 +- .../org/apache/atlas/AtlasConfiguration.java | 5 -- .../atlas/repository/graph/GraphHelper.java | 23 +++++++ .../store/graph/v2/EntityGraphRetriever.java | 63 ++++++++++--------- 4 files changed, 58 insertions(+), 35 deletions(-) diff --git a/.github/workflows/maven.yml b/.github/workflows/maven.yml index b8481ac283..6a191455af 100644 --- a/.github/workflows/maven.yml +++ b/.github/workflows/maven.yml @@ -27,7 +27,7 @@ on: - development - master - lineageondemand - - janus0.6 + - cassandraPhase2 jobs: build: diff --git a/intg/src/main/java/org/apache/atlas/AtlasConfiguration.java b/intg/src/main/java/org/apache/atlas/AtlasConfiguration.java index aabd9d3a61..b83e1e4f8f 100644 --- a/intg/src/main/java/org/apache/atlas/AtlasConfiguration.java +++ b/intg/src/main/java/org/apache/atlas/AtlasConfiguration.java @@ -118,11 +118,6 @@ public enum AtlasConfiguration { ATLAS_INDEXSEARCH_LIMIT_UTM_TAGS("atlas.indexsearch.limit.ignore.utm.tags", ""), ATLAS_INDEXSEARCH_ENABLE_API_LIMIT("atlas.indexsearch.enable.api.limit", false), ATLAS_INDEXSEARCH_ENABLE_JANUS_OPTIMISATION("atlas.indexsearch.enable.janus.optimization", false), - /*** - * This configuration is used to enable fetching non primitive attributes in index search - */ - ATLAS_INDEXSEARCH_ENABLE_FETCHING_NON_PRIMITIVE_ATTRIBUTES("atlas.indexsearch.enable.fetching.non.primitive.attributes", true), - ATLAS_MAINTENANCE_MODE("atlas.maintenance.mode", false), DELTA_BASED_REFRESH_ENABLED("atlas.authorizer.enable.delta_based_refresh", false), diff --git a/repository/src/main/java/org/apache/atlas/repository/graph/GraphHelper.java b/repository/src/main/java/org/apache/atlas/repository/graph/GraphHelper.java index 6e3f5c90ed..335e2accc8 100755 --- a/repository/src/main/java/org/apache/atlas/repository/graph/GraphHelper.java +++ b/repository/src/main/java/org/apache/atlas/repository/graph/GraphHelper.java @@ -33,6 +33,7 @@ import org.apache.atlas.model.instance.AtlasEntityHeader; import org.apache.atlas.model.instance.AtlasObjectId; import org.apache.atlas.model.instance.AtlasRelationship; +import org.apache.atlas.repository.graphdb.janus.AtlasJanusEdge; import org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2; import org.apache.atlas.repository.store.graph.v2.TransactionInterceptHelper; import org.apache.atlas.type.AtlasArrayType; @@ -2056,4 +2057,26 @@ private static Set parseLabelsString(String labels) { return ret; } + + /** + * Get all the active edges + * @param vertex entity vertex + * @return Iterator of children edges + */ + public static Iterator getOnlyActiveEdges(AtlasVertex vertex, AtlasEdgeDirection direction) throws AtlasBaseException { + AtlasPerfMetrics.MetricRecorder metricRecorder = RequestContext.get().startMetricRecord("GraphHelper.getOnlyActiveEdges"); + + try { + return vertex.query() + .direction(direction) + .has(STATE_PROPERTY_KEY, ACTIVE_STATE_VALUE) + .edges() + .iterator(); + } catch (Exception e) { + throw new AtlasBaseException(AtlasErrorCode.INTERNAL_ERROR, e); + } + finally { + RequestContext.get().endMetricRecord(metricRecorder); + } + } } diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphRetriever.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphRetriever.java index 59125c492c..f2f417be26 100644 --- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphRetriever.java +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphRetriever.java @@ -50,6 +50,7 @@ import org.apache.atlas.repository.graphdb.AtlasGraph; import org.apache.atlas.repository.graphdb.AtlasVertex; import org.apache.atlas.repository.graphdb.janus.AtlasJanusEdge; +import org.apache.atlas.repository.graphdb.janus.AtlasJanusVertex; import org.apache.atlas.repository.util.AccessControlUtils; import org.apache.atlas.type.AtlasArrayType; import org.apache.atlas.type.AtlasBuiltInTypes.AtlasObjectIdType; @@ -82,24 +83,10 @@ import javax.inject.Inject; import java.math.BigDecimal; import java.math.BigInteger; -import java.util.ArrayDeque; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.Collections; -import java.util.Date; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.Objects; -import java.util.Queue; -import java.util.Set; +import java.util.*; import java.util.concurrent.*; import java.util.stream.Collectors; -import static org.apache.atlas.AtlasConfiguration.ATLAS_INDEXSEARCH_ENABLE_FETCHING_NON_PRIMITIVE_ATTRIBUTES; import static org.apache.atlas.glossary.GlossaryUtils.TERM_ASSIGNMENT_ATTR_CONFIDENCE; import static org.apache.atlas.glossary.GlossaryUtils.TERM_ASSIGNMENT_ATTR_CREATED_BY; import static org.apache.atlas.glossary.GlossaryUtils.TERM_ASSIGNMENT_ATTR_DESCRIPTION; @@ -312,16 +299,12 @@ public AtlasObjectId toAtlasObjectId(AtlasVertex entityVertex) throws AtlasBaseE Map attributes = new HashMap<>(); Set relationAttributes = RequestContext.get().getRelationAttrsForSearch(); - - // preloadProperties here if (CollectionUtils.isNotEmpty(relationAttributes)) { - Map referenceVertexProperties = preloadProperties(entityVertex, entityType, Collections.emptySet()); - for (String attributeName : relationAttributes) { AtlasAttribute attribute = entityType.getAttribute(attributeName); if (attribute != null && !uniqueAttributes.containsKey(attributeName)) { - Object attrValue = getVertexAttributePreFetchCache(entityVertex, attribute, referenceVertexProperties); + Object attrValue = getVertexAttribute(entityVertex, attribute); if (attrValue != null) { attributes.put(attribute.getName(), attrValue); } @@ -1019,18 +1002,20 @@ private AtlasEntityHeader mapVertexToAtlasEntityHeader(AtlasVertex entityVertex) return mapVertexToAtlasEntityHeader(entityVertex, Collections.emptySet()); } - private Map preloadProperties(AtlasVertex entityVertex, AtlasEntityType entityType, Set attributes) { + private Map preloadProperties(AtlasVertex entityVertex, AtlasEntityType entityType, Set attributes) throws AtlasBaseException { Map propertiesMap = new HashMap<>(); // Execute the traversal to fetch properties - GraphTraversal> traversal = graph.V(entityVertex.getId()).properties(); + Iterator> traversal = ((AtlasJanusVertex)entityVertex).getWrappedElement().properties(); + // Fetch edges in both directions + retrieveEdgeLabels(entityVertex, AtlasEdgeDirection.BOTH, attributes, propertiesMap); // Iterate through the resulting VertexProperty objects while (traversal.hasNext()) { try { VertexProperty property = traversal.next(); - AtlasAttribute attribute = entityType.getAttribute(property.key()); + AtlasAttribute attribute = entityType.getAttribute(property.key()) != null ? entityType.getAttribute(property.key()) : null; TypeCategory typeCategory = attribute != null ? attribute.getAttributeType().getTypeCategory() : null; TypeCategory elementTypeCategory = attribute != null && attribute.getAttributeType().getTypeCategory() == TypeCategory.ARRAY ? ((AtlasArrayType) attribute.getAttributeType()).getElementType().getTypeCategory() : null; @@ -1049,13 +1034,33 @@ private Map preloadProperties(AtlasVertex entityVertex, AtlasEnt } } } catch (RuntimeException e) { - LOG.error("Error preloading properties for entity vertex: {}", entityVertex, e); + LOG.error("Error preloading properties for entity vertex: {}", entityVertex.getId(), e); throw e; // Re-throw the exception after logging it } } return propertiesMap; } + private void retrieveEdgeLabels(AtlasVertex entityVertex, AtlasEdgeDirection edgeDirection, Set attributes, Map propertiesMap) throws AtlasBaseException { + Iterator edges = GraphHelper.getOnlyActiveEdges(entityVertex, edgeDirection); + + + List edgeLabelsDebug = new ArrayList<>(); + while (edges.hasNext()) { + AtlasJanusEdge edge = edges.next(); + edgeLabelsDebug.add(edge.getLabel()); + } + + Set edgeLabels = + edgeLabelsDebug.stream() + .map(edgeLabel -> { + Optional matchingAttrOpt = attributes.stream().filter(ele -> (edgeLabel.contains(ele))).findFirst(); + return matchingAttrOpt.orElse(null); + }).filter(Objects::nonNull) + .collect(Collectors.toSet()); + + edgeLabels.stream().forEach(e -> propertiesMap.put(e, StringUtils.SPACE)); + } private void updateAttrValue( Map propertiesMap, VertexProperty property){ Object value = propertiesMap.get(property.key()); if (value instanceof List) { @@ -1906,7 +1911,6 @@ public Object getVertexAttributePreFetchCache(AtlasVertex vertex, AtlasAttribute return null; } - TypeCategory typeCategory = attribute.getAttributeType().getTypeCategory(); TypeCategory elementTypeCategory = typeCategory == TypeCategory.ARRAY ? ((AtlasArrayType) attribute.getAttributeType()).getElementType().getTypeCategory() : null; boolean isArrayOfPrimitives = typeCategory.equals(TypeCategory.ARRAY) && elementTypeCategory.equals(TypeCategory.PRIMITIVE); @@ -1918,8 +1922,8 @@ public Object getVertexAttributePreFetchCache(AtlasVertex vertex, AtlasAttribute return properties.get(attribute.getName()); } - // if value is empty && element is array of primitives, return empty list - if (properties.get(attribute.getName()) == null && isArrayOfPrimitives) { + // if value is empty && element is array and not inward relation, return empty list + if (properties.get(attribute.getName()) == null && typeCategory.equals(TypeCategory.ARRAY) && !AtlasRelationshipEdgeDirection.IN.equals(attribute.getRelationshipEdgeDirection())) { return new ArrayList<>(); } @@ -1928,8 +1932,9 @@ public Object getVertexAttributePreFetchCache(AtlasVertex vertex, AtlasAttribute return null; } - // value is present as marker, fetch the value from the vertex - if (ATLAS_INDEXSEARCH_ENABLE_FETCHING_NON_PRIMITIVE_ATTRIBUTES.getBoolean()) { + // value is present as marker or is inward/outward relation, fetch the value from the vertex + if (properties.get(attribute.getName()) == StringUtils.SPACE || AtlasRelationshipEdgeDirection.IN.equals(attribute.getRelationshipEdgeDirection()) + || AtlasRelationshipEdgeDirection.OUT.equals(attribute.getRelationshipEdgeDirection())) { return mapVertexToAttribute(vertex, attribute, null, false); }