diff --git a/.github/workflows/maven.yml b/.github/workflows/maven.yml index f8a09b5589..b8481ac283 100644 --- a/.github/workflows/maven.yml +++ b/.github/workflows/maven.yml @@ -1,3 +1,4 @@ + # # Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file @@ -26,6 +27,7 @@ on: - development - master - lineageondemand + - janus0.6 jobs: build: @@ -34,12 +36,12 @@ jobs: steps: - uses: actions/checkout@v2 - + - name: Set up JDK 1.8 uses: actions/setup-java@v1 with: java-version: 1.8 - + - name: Cache Maven packages uses: actions/cache@v2 with: @@ -48,8 +50,9 @@ jobs: restore-keys: ${{ runner.os }}-m2 - name: Get branch name - run: echo "##[set-output name=branch;]$(echo ${GITHUB_REF#refs/heads/})" - id: get_branch + run: | + echo "BRANCH_NAME=${GITHUB_REF#refs/heads/}" >> $GITHUB_ENV + echo BRANCH_NAME=${GITHUB_REF#refs/heads/} - name: Create Maven Settings uses: s4u/maven-settings-action@v2.8.0 @@ -63,7 +66,7 @@ jobs: - name: Build with Maven run: | - branch_name=${{ steps.get_branch.outputs.branch }} + branch_name=${{ env.BRANCH_NAME }} if [[ $branch_name == 'main' || $branch_name == 'master' || $branch_name == 'lineageondemand' ]] then echo "build without dashboard" @@ -73,19 +76,26 @@ jobs: chmod +x ./build.sh && ./build.sh fi - - run: echo "REPOSITORY_NAME=`echo "$GITHUB_REPOSITORY" | awk -F / '{print $2}' | sed -e "s/:refs//"`" >> $GITHUB_ENV + - name: Get Repository Name + run: echo "REPOSITORY_NAME=`echo "$GITHUB_REPOSITORY" | awk -F / '{print $2}' | sed -e "s/:refs//"`" >> $GITHUB_ENV shell: bash - name: Get version tag - run: echo "##[set-output name=version;]$(echo `git ls-remote https://${{ secrets.ORG_PAT_GITHUB }}@github.com/atlanhq/${REPOSITORY_NAME}.git ${{ steps.get_branch.outputs.branch }} | awk '{ print $1}' | cut -c1-7`)abcd" - id: get_version + # run: echo "##[set-output name=version;]$(echo `git ls-remote https://${{ secrets.ORG_PAT_GITHUB }}@github.com/atlanhq/${REPOSITORY_NAME}.git ${{ env.BRANCH_NAME }} | awk '{ print $1}' | cut -c1-7`)abcd" + run: | + echo "VERSION=$(git ls-remote https://${{ secrets.ORG_PAT_GITHUB }}@github.com/atlanhq/${REPOSITORY_NAME}.git ${{ env.BRANCH_NAME }} | awk '{ print $1}' | cut -c1-7 | head -n 1)abcd" + echo "VERSION=$(git ls-remote https://${{ secrets.ORG_PAT_GITHUB }}@github.com/atlanhq/${REPOSITORY_NAME}.git ${{ env.BRANCH_NAME }} | awk '{ print $1}' | cut -c1-7 | tr -d '[:space:]')abcd" + echo "VERSION=$(git ls-remote https://${{ secrets.ORG_PAT_GITHUB }}@github.com/atlanhq/${REPOSITORY_NAME}.git ${{ env.BRANCH_NAME }} | awk '{ print $1}' | cut -c1-7 | tr -d '[:space:]')abcd" >> $GITHUB_ENV + + - name: Get commit ID + run: echo "COMMIT_ID=$(echo ${GITHUB_SHA} | cut -c1-7)abcd" >> $GITHUB_ENV - - name: Set up Buildx + - name: Set up Buildx id: buildx uses: docker/setup-buildx-action@v1 - name: Login to GitHub Registry - uses: docker/login-action@v1 + uses: docker/login-action@v1 with: registry: ghcr.io username: $GITHUB_ACTOR @@ -102,8 +112,8 @@ jobs: provenance: true push: true tags: | - ghcr.io/atlanhq/${{ github.event.repository.name }}-${{ steps.get_branch.outputs.branch }}:latest - ghcr.io/atlanhq/${{ github.event.repository.name }}-${{ steps.get_branch.outputs.branch }}:${{ steps.get_version.outputs.version }} + ghcr.io/atlanhq/${{ github.event.repository.name }}-${{ env.BRANCH_NAME }}:latest + ghcr.io/atlanhq/${{ github.event.repository.name }}-${{ env.BRANCH_NAME }}:${{ env.COMMIT_ID }} - name: Scan Image uses: aquasecurity/trivy-action@master @@ -116,4 +126,4 @@ jobs: - name: Upload Trivy scan results to GitHub Security tab uses: github/codeql-action/upload-sarif@v2.1.33 with: - sarif_file: 'trivy-image-results.sarif' + sarif_file: 'trivy-image-results.sarif' \ No newline at end of file diff --git a/intg/src/main/java/org/apache/atlas/AtlasConfiguration.java b/intg/src/main/java/org/apache/atlas/AtlasConfiguration.java index df2bca7860..b9c3c338c2 100644 --- a/intg/src/main/java/org/apache/atlas/AtlasConfiguration.java +++ b/intg/src/main/java/org/apache/atlas/AtlasConfiguration.java @@ -117,6 +117,11 @@ public enum AtlasConfiguration { ATLAS_INDEXSEARCH_QUERY_SIZE_MAX_LIMIT("atlas.indexsearch.query.size.max.limit", 100000), ATLAS_INDEXSEARCH_LIMIT_UTM_TAGS("atlas.indexsearch.limit.ignore.utm.tags", ""), ATLAS_INDEXSEARCH_ENABLE_API_LIMIT("atlas.indexsearch.enable.api.limit", false), + ATLAS_INDEXSEARCH_ENABLE_JANUS_OPTIMISATION("atlas.indexsearch.enable.janus.optimization", false), + /*** + * This configuration is used to enable fetching non primitive attributes in index search + */ + ATLAS_INDEXSEARCH_ENABLE_FETCHING_NON_PRIMITIVE_ATTRIBUTES("atlas.indexsearch.enable.fetching.non.primitive.attributes", true), ATLAS_MAINTENANCE_MODE("atlas.maintenance.mode", false), diff --git a/repository/src/main/java/org/apache/atlas/discovery/EntityLineageService.java b/repository/src/main/java/org/apache/atlas/discovery/EntityLineageService.java index a8841eb1aa..73446f49c3 100644 --- a/repository/src/main/java/org/apache/atlas/discovery/EntityLineageService.java +++ b/repository/src/main/java/org/apache/atlas/discovery/EntityLineageService.java @@ -40,6 +40,7 @@ import org.apache.atlas.model.lineage.AtlasLineageInfo.LineageRelation; import org.apache.atlas.model.lineage.AtlasLineageOnDemandInfo.LineageInfoOnDemand; import org.apache.atlas.repository.Constants; +import org.apache.atlas.repository.graph.GraphHelper; import org.apache.atlas.repository.graphdb.AtlasEdge; import org.apache.atlas.repository.graphdb.AtlasEdgeDirection; import org.apache.atlas.repository.graphdb.AtlasGraph; @@ -375,7 +376,7 @@ private void traverseEdgesOnDemand(AtlasVertex datasetVertex, boolean isInput, i visitedVertices.add(getId(datasetVertex)); AtlasPerfMetrics.MetricRecorder traverseEdgesOnDemandGetEdgesIn = RequestContext.get().startMetricRecord("traverseEdgesOnDemandGetEdgesIn"); - Iterator incomingEdges = datasetVertex.getEdges(IN, isInput ? PROCESS_OUTPUTS_EDGE : PROCESS_INPUTS_EDGE).iterator(); + Iterator incomingEdges = GraphHelper.getActiveEdges(datasetVertex, isInput ? PROCESS_OUTPUTS_EDGE : PROCESS_INPUTS_EDGE, IN); RequestContext.get().endMetricRecord(traverseEdgesOnDemandGetEdgesIn); while (incomingEdges.hasNext()) { @@ -403,7 +404,7 @@ private void traverseEdgesOnDemand(AtlasVertex datasetVertex, boolean isInput, i } AtlasPerfMetrics.MetricRecorder traverseEdgesOnDemandGetEdgesOut = RequestContext.get().startMetricRecord("traverseEdgesOnDemandGetEdgesOut"); - Iterator outgoingEdges = processVertex.getEdges(OUT, isInput ? PROCESS_INPUTS_EDGE : PROCESS_OUTPUTS_EDGE).iterator(); + Iterator outgoingEdges = GraphHelper.getActiveEdges(processVertex, isInput ? PROCESS_INPUTS_EDGE : PROCESS_OUTPUTS_EDGE, OUT); RequestContext.get().endMetricRecord(traverseEdgesOnDemandGetEdgesOut); while (outgoingEdges.hasNext()) { @@ -533,15 +534,15 @@ private void traverseEdgesUsingBFS(String baseGuid, AtlasLineageListContext line private void enqueueNeighbours(AtlasVertex currentVertex, boolean isDataset, AtlasLineageListContext lineageListContext, Queue traversalQueue, Set visitedVertices, Set skippedVertices, - Map> lineageParentsForEntityMap, Map> lineageChildrenForEntityMap) { + Map> lineageParentsForEntityMap, Map> lineageChildrenForEntityMap) throws AtlasBaseException { AtlasPerfMetrics.MetricRecorder traverseEdgesOnDemandGetEdges = RequestContext.get().startMetricRecord("traverseEdgesOnDemandGetEdges"); Iterator edges; if (isDataset) - edges = currentVertex.getEdges(IN, isInputDirection(lineageListContext) ? PROCESS_OUTPUTS_EDGE : PROCESS_INPUTS_EDGE).iterator(); + edges = GraphHelper.getActiveEdges(currentVertex, isInputDirection(lineageListContext) ? PROCESS_OUTPUTS_EDGE : PROCESS_INPUTS_EDGE, IN); else - edges = currentVertex.getEdges(OUT, isInputDirection(lineageListContext) ? PROCESS_INPUTS_EDGE : PROCESS_OUTPUTS_EDGE).iterator(); - RequestContext.get().endMetricRecord(traverseEdgesOnDemandGetEdges); + edges = GraphHelper.getActiveEdges(currentVertex, isInputDirection(lineageListContext) ? PROCESS_INPUTS_EDGE : PROCESS_OUTPUTS_EDGE, OUT); + RequestContext.get().endMetricRecord(traverseEdgesOnDemandGetEdges); while (edges.hasNext()) { AtlasEdge currentEdge = edges.next(); if (!lineageListContext.evaluateTraversalFilter(currentEdge)) diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphRetriever.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphRetriever.java index 34d3036042..59125c492c 100644 --- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphRetriever.java +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphRetriever.java @@ -49,6 +49,8 @@ import org.apache.atlas.repository.graphdb.AtlasElement; import org.apache.atlas.repository.graphdb.AtlasGraph; import org.apache.atlas.repository.graphdb.AtlasVertex; +import org.apache.atlas.repository.graphdb.janus.AtlasJanusEdge; +import org.apache.atlas.repository.util.AccessControlUtils; import org.apache.atlas.type.AtlasArrayType; import org.apache.atlas.type.AtlasBuiltInTypes.AtlasObjectIdType; import org.apache.atlas.type.AtlasEntityType; @@ -65,8 +67,14 @@ import org.apache.atlas.utils.AtlasPerfMetrics; import org.apache.atlas.v1.model.instance.Id; import org.apache.commons.collections.CollectionUtils; +import org.apache.commons.collections.IteratorUtils; import org.apache.commons.collections.MapUtils; import org.apache.commons.lang3.StringUtils; +import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversal; +import org.apache.tinkerpop.gremlin.structure.Vertex; +import org.apache.tinkerpop.gremlin.structure.VertexProperty; +import org.janusgraph.core.Cardinality; +import org.janusgraph.graphdb.relations.CacheVertexProperty; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.stereotype.Component; @@ -88,12 +96,10 @@ import java.util.Objects; import java.util.Queue; import java.util.Set; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.ThreadFactory; +import java.util.concurrent.*; import java.util.stream.Collectors; +import static org.apache.atlas.AtlasConfiguration.ATLAS_INDEXSEARCH_ENABLE_FETCHING_NON_PRIMITIVE_ATTRIBUTES; import static org.apache.atlas.glossary.GlossaryUtils.TERM_ASSIGNMENT_ATTR_CONFIDENCE; import static org.apache.atlas.glossary.GlossaryUtils.TERM_ASSIGNMENT_ATTR_CREATED_BY; import static org.apache.atlas.glossary.GlossaryUtils.TERM_ASSIGNMENT_ATTR_DESCRIPTION; @@ -306,12 +312,16 @@ public AtlasObjectId toAtlasObjectId(AtlasVertex entityVertex) throws AtlasBaseE Map attributes = new HashMap<>(); Set relationAttributes = RequestContext.get().getRelationAttrsForSearch(); + + // preloadProperties here if (CollectionUtils.isNotEmpty(relationAttributes)) { + Map referenceVertexProperties = preloadProperties(entityVertex, entityType, Collections.emptySet()); + for (String attributeName : relationAttributes) { AtlasAttribute attribute = entityType.getAttribute(attributeName); if (attribute != null && !uniqueAttributes.containsKey(attributeName)) { - Object attrValue = getVertexAttribute(entityVertex, attribute); + Object attrValue = getVertexAttributePreFetchCache(entityVertex, attribute, referenceVertexProperties); if (attrValue != null) { attributes.put(attribute.getName(), attrValue); } @@ -1009,7 +1019,89 @@ private AtlasEntityHeader mapVertexToAtlasEntityHeader(AtlasVertex entityVertex) return mapVertexToAtlasEntityHeader(entityVertex, Collections.emptySet()); } + private Map preloadProperties(AtlasVertex entityVertex, AtlasEntityType entityType, Set attributes) { + Map propertiesMap = new HashMap<>(); + + // Execute the traversal to fetch properties + GraphTraversal> traversal = graph.V(entityVertex.getId()).properties(); + + // Iterate through the resulting VertexProperty objects + while (traversal.hasNext()) { + try { + VertexProperty property = traversal.next(); + + AtlasAttribute attribute = entityType.getAttribute(property.key()); + TypeCategory typeCategory = attribute != null ? attribute.getAttributeType().getTypeCategory() : null; + TypeCategory elementTypeCategory = attribute != null && attribute.getAttributeType().getTypeCategory() == TypeCategory.ARRAY ? ((AtlasArrayType) attribute.getAttributeType()).getElementType().getTypeCategory() : null; + + if (property.isPresent()) { + + // If the attribute is not known (null) + // validate if prefetched property is multi-valued + boolean isMultiValuedProperty = (property instanceof CacheVertexProperty && ((CacheVertexProperty) property).propertyKey().cardinality().equals(Cardinality.SET)); + + if (typeCategory == TypeCategory.ARRAY && elementTypeCategory == TypeCategory.PRIMITIVE) { + updateAttrValue(propertiesMap, property); + } else if (attribute == null && isMultiValuedProperty) { + updateAttrValue(propertiesMap, property); + } else if (propertiesMap.get(property.key()) == null) { + propertiesMap.put(property.key(), property.value()); + } + } + } catch (RuntimeException e) { + LOG.error("Error preloading properties for entity vertex: {}", entityVertex, e); + throw e; // Re-throw the exception after logging it + } + } + return propertiesMap; + } + + private void updateAttrValue( Map propertiesMap, VertexProperty property){ + Object value = propertiesMap.get(property.key()); + if (value instanceof List) { + ((List) value).add(property.value()); + } else { + List values = new ArrayList<>(); + values.add(property.value()); + propertiesMap.put(property.key(), values); + } + } + + private boolean isPolicyAttribute(Set attributes) { + Set exclusionSet = new HashSet<>(Arrays.asList(AccessControlUtils.ATTR_POLICY_TYPE, + AccessControlUtils.ATTR_POLICY_USERS, + AccessControlUtils.ATTR_POLICY_GROUPS, + AccessControlUtils.ATTR_POLICY_ROLES, + AccessControlUtils.ATTR_POLICY_ACTIONS, + AccessControlUtils.ATTR_POLICY_CATEGORY, + AccessControlUtils.ATTR_POLICY_SUB_CATEGORY, + AccessControlUtils.ATTR_POLICY_RESOURCES, + AccessControlUtils.ATTR_POLICY_IS_ENABLED, + AccessControlUtils.ATTR_POLICY_RESOURCES_CATEGORY, + AccessControlUtils.ATTR_POLICY_SERVICE_NAME, + AccessControlUtils.ATTR_POLICY_PRIORITY, + AccessControlUtils.REL_ATTR_POLICIES, + AccessControlUtils.ATTR_SERVICE_SERVICE_TYPE, + AccessControlUtils.ATTR_SERVICE_TAG_SERVICE, + AccessControlUtils.ATTR_SERVICE_IS_ENABLED, + AccessControlUtils.ATTR_SERVICE_LAST_SYNC) + ); + + return exclusionSet.stream().anyMatch(attributes::contains); + } + private AtlasEntityHeader mapVertexToAtlasEntityHeader(AtlasVertex entityVertex, Set attributes) throws AtlasBaseException { + boolean shouldPrefetch = !isPolicyAttribute(attributes) + && AtlasConfiguration.ATLAS_INDEXSEARCH_ENABLE_JANUS_OPTIMISATION.getBoolean(); + + if (shouldPrefetch) { + return mapVertexToAtlasEntityHeaderWithPrefetch(entityVertex, attributes); + } else { + return mapVertexToAtlasEntityHeaderWithoutPrefetch(entityVertex, attributes); + } + } + + private AtlasEntityHeader mapVertexToAtlasEntityHeaderWithoutPrefetch(AtlasVertex entityVertex, Set attributes) throws AtlasBaseException { AtlasPerfMetrics.MetricRecorder metricRecorder = RequestContext.get().startMetricRecord("mapVertexToAtlasEntityHeader"); AtlasEntityHeader ret = new AtlasEntityHeader(); try { @@ -1078,6 +1170,7 @@ private AtlasEntityHeader mapVertexToAtlasEntityHeader(AtlasVertex entityVertex, } } + Object attrValue = getVertexAttribute(entityVertex, attribute); if (attrValue != null) { @@ -1093,6 +1186,105 @@ private AtlasEntityHeader mapVertexToAtlasEntityHeader(AtlasVertex entityVertex, return ret; } + private AtlasEntityHeader mapVertexToAtlasEntityHeaderWithPrefetch(AtlasVertex entityVertex, Set attributes) throws AtlasBaseException { + AtlasPerfMetrics.MetricRecorder metricRecorder = RequestContext.get().startMetricRecord("mapVertexToAtlasEntityHeaderWithPrefetch"); + AtlasEntityHeader ret = new AtlasEntityHeader(); + try { + //pre-fetching the properties + String typeName = entityVertex.getProperty(Constants.TYPE_NAME_PROPERTY_KEY, String.class); //properties.get returns null + AtlasEntityType entityType = typeRegistry.getEntityTypeByName(typeName); // this is not costly + Map properties = preloadProperties(entityVertex, entityType, attributes); + + String guid = (String) properties.get(Constants.GUID_PROPERTY_KEY); + + Integer value = (Integer)properties.get(Constants.IS_INCOMPLETE_PROPERTY_KEY); + Boolean isIncomplete = value != null && value.equals(INCOMPLETE_ENTITY_VALUE) ? Boolean.TRUE : Boolean.FALSE; + + ret.setTypeName(typeName); + ret.setGuid(guid); + + String state = (String)properties.get(Constants.STATE_PROPERTY_KEY); + Id.EntityState entityState = state == null ? null : Id.EntityState.valueOf(state); + ret.setStatus((entityState == Id.EntityState.DELETED) ? AtlasEntity.Status.DELETED : AtlasEntity.Status.ACTIVE); + + RequestContext context = RequestContext.get(); + boolean includeClassifications = context.includeClassifications(); + boolean includeClassificationNames = context.isIncludeClassificationNames(); + if(includeClassifications){ + ret.setClassificationNames(getAllTraitNamesFromAttribute(entityVertex)); + } else if (!includeClassifications && includeClassificationNames) { + ret.setClassificationNames(getAllTraitNamesFromAttribute(entityVertex)); + } + ret.setIsIncomplete(isIncomplete); + ret.setLabels(getLabels(entityVertex)); + + ret.setCreatedBy(properties.get(CREATED_BY_KEY) != null ? (String) properties.get(CREATED_BY_KEY) : null); + ret.setUpdatedBy(properties.get(MODIFIED_BY_KEY) != null ? (String) properties.get(MODIFIED_BY_KEY) : null); + ret.setCreateTime(properties.get(TIMESTAMP_PROPERTY_KEY) != null ? new Date((Long)properties.get(TIMESTAMP_PROPERTY_KEY)) : null); + ret.setUpdateTime(properties.get(MODIFICATION_TIMESTAMP_PROPERTY_KEY) != null ? new Date((Long)properties.get(MODIFICATION_TIMESTAMP_PROPERTY_KEY)) : null); + + if(RequestContext.get().includeMeanings()) { + List termAssignmentHeaders = mapAssignedTerms(entityVertex); + ret.setMeanings(termAssignmentHeaders); + ret.setMeaningNames( + termAssignmentHeaders.stream().map(AtlasTermAssignmentHeader::getDisplayText) + .collect(Collectors.toList())); + } + + if (entityType != null) { + for (AtlasAttribute headerAttribute : entityType.getHeaderAttributes().values()) { + Object attrValue = getVertexAttributePreFetchCache(entityVertex, headerAttribute, properties); + + if (attrValue != null) { + ret.setAttribute(headerAttribute.getName(), attrValue); + } + } + + if(properties.get(NAME) != null){ + ret.setDisplayText(properties.get(NAME).toString()); + } else if(properties.get(DISPLAY_NAME) != null) { + ret.setDisplayText(properties.get(DISPLAY_NAME).toString()); + } else if(properties.get(QUALIFIED_NAME) != null) { + ret.setDisplayText(properties.get(QUALIFIED_NAME).toString()); + } + + + + //attributes = only the attributes of entityType + if (CollectionUtils.isNotEmpty(attributes)) { + for (String attrName : attributes) { + AtlasAttribute attribute = entityType.getAttribute(attrName); + + if (attribute == null) { + attrName = toNonQualifiedName(attrName); + + if (ret.hasAttribute(attrName)) { + continue; + } + + attribute = entityType.getAttribute(attrName); + + if (attribute == null) { + // dataContractLatest, meanings, links + attribute = entityType.getRelationshipAttribute(attrName, null); + } + } + + //this is a call to cassandra + Object attrValue = getVertexAttributePreFetchCache(entityVertex, attribute, properties); //use prefetch cache + + if (attrValue != null) { + ret.setAttribute(attrName, attrValue); + } + } + } + } + } finally { + RequestContext.get().endMetricRecord(metricRecorder); + } + return ret; + } + private String toNonQualifiedName(String attrName) { String ret; if (attrName.contains(".")) { @@ -1709,6 +1901,41 @@ public Object getVertexAttribute(AtlasVertex vertex, AtlasAttribute attribute) t return vertex != null && attribute != null ? mapVertexToAttribute(vertex, attribute, null, false) : null; } + public Object getVertexAttributePreFetchCache(AtlasVertex vertex, AtlasAttribute attribute, Map properties) throws AtlasBaseException { + if (vertex == null || attribute == null) { + return null; + } + + + TypeCategory typeCategory = attribute.getAttributeType().getTypeCategory(); + TypeCategory elementTypeCategory = typeCategory == TypeCategory.ARRAY ? ((AtlasArrayType) attribute.getAttributeType()).getElementType().getTypeCategory() : null; + boolean isArrayOfPrimitives = typeCategory.equals(TypeCategory.ARRAY) && elementTypeCategory.equals(TypeCategory.PRIMITIVE); + boolean isPrefetchValueFinal = (typeCategory.equals(TypeCategory.PRIMITIVE) || typeCategory.equals(TypeCategory.ENUM) || typeCategory.equals(TypeCategory.MAP) || isArrayOfPrimitives); + boolean isMultiValueBusinessAttribute = attribute.getDefinedInType() != null && attribute.getDefinedInType().getTypeCategory() == TypeCategory.BUSINESS_METADATA && isArrayOfPrimitives; + + // value is present and value is not marker (SPACE for further lookup) and type is primitive or array of primitives + if (properties.get(attribute.getName()) != null && properties.get(attribute.getName()) != StringUtils.SPACE && (isMultiValueBusinessAttribute || isPrefetchValueFinal)) { + return properties.get(attribute.getName()); + } + + // if value is empty && element is array of primitives, return empty list + if (properties.get(attribute.getName()) == null && isArrayOfPrimitives) { + return new ArrayList<>(); + } + + //when value is not present and type is primitive, return null + if(properties.get(attribute.getName()) == null && isPrefetchValueFinal) { + return null; + } + + // value is present as marker, fetch the value from the vertex + if (ATLAS_INDEXSEARCH_ENABLE_FETCHING_NON_PRIMITIVE_ATTRIBUTES.getBoolean()) { + return mapVertexToAttribute(vertex, attribute, null, false); + } + + return null; + } + private Object getVertexAttributeIgnoreInactive(AtlasVertex vertex, AtlasAttribute attribute) throws AtlasBaseException { return vertex != null && attribute != null ? mapVertexToAttribute(vertex, attribute, null, false, true, true) : null; } diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/UniqueQNAttributeMigrationService.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/UniqueQNAttributeMigrationService.java new file mode 100644 index 0000000000..a7e0fc46e7 --- /dev/null +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/UniqueQNAttributeMigrationService.java @@ -0,0 +1,99 @@ +package org.apache.atlas.repository.store.graph.v2; + +import org.apache.atlas.exception.AtlasBaseException; +import org.apache.atlas.repository.graph.GraphHelper; +import org.apache.atlas.repository.graphdb.AtlasVertex; +import org.apache.commons.collections.CollectionUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.*; + +public class UniqueQNAttributeMigrationService { + + private static final Logger LOG = LoggerFactory.getLogger(UniqueQNAttributeMigrationService.class); + + private final EntityGraphRetriever entityRetriever; + + + private Set entityGuids; + private final TransactionInterceptHelper transactionInterceptHelper; + + private final String QUALIFIED_NAME_ATTR = "qualifiedName"; + private final String UNIQUE_QUALIFIED_NAME_ATTR = "__u_qualifiedName"; + + public UniqueQNAttributeMigrationService(EntityGraphRetriever entityRetriever, Set entityGuids, TransactionInterceptHelper transactionInterceptHelper) { + this.entityRetriever = entityRetriever; + this.transactionInterceptHelper = transactionInterceptHelper; + this.entityGuids = entityGuids; + } + + public void migrateQN() throws Exception { + try { + int count = 0; + int totalUpdatedCount = 0; + for (String entityGuid : entityGuids) { + AtlasVertex entityVertex = entityRetriever.getEntityVertex(entityGuid); + + if (entityVertex == null) { + LOG.error("Entity vertex not found for guid: {}", entityGuid); + continue; + } + + boolean isCommitRequired = migrateuniqueQnAttr(entityVertex); + if (isCommitRequired){ + count++; + totalUpdatedCount++; + } + else { + LOG.info("No changes to commit for entity: {} as no migration needed", entityGuid); + } + + if (count == 20) { + LOG.info("Committing batch of 20 entities..."); + commitChanges(); + count = 0; + } + } + + if (count > 0) { + LOG.info("Committing remaining {} entities...", count); + commitChanges(); + } + + LOG.info("Total Vertex updated: {}", totalUpdatedCount); + + } catch (Exception e) { + LOG.error("Error while migration unique qualifiedName attribute for entities: {}", entityGuids, e); + throw e; + } + } + + private boolean migrateuniqueQnAttr(AtlasVertex vertex) throws AtlasBaseException { + try{ + boolean isCommitRequired = false; + + String qualifiedName = vertex.getProperty(QUALIFIED_NAME_ATTR, String.class); + String uniqueQualifiedName = vertex.getProperty(UNIQUE_QUALIFIED_NAME_ATTR, String.class); + + if(!qualifiedName.equals(uniqueQualifiedName)) { + vertex.setProperty(UNIQUE_QUALIFIED_NAME_ATTR, qualifiedName); + isCommitRequired = true; + } + return isCommitRequired; + }catch (Exception e) { + LOG.error("Failed to migrate unique qualifiedName attribute for entity: ", e); + throw e; + } + } + + public void commitChanges() throws AtlasBaseException { + try { + transactionInterceptHelper.intercept(); + LOG.info("Committed a entity to the graph"); + } catch (Exception e){ + LOG.error("Failed to commit asset: ", e); + throw e; + } + } +} \ No newline at end of file diff --git a/repository/src/main/java/org/apache/atlas/repository/util/AccessControlUtils.java b/repository/src/main/java/org/apache/atlas/repository/util/AccessControlUtils.java index 7101868e30..1f89e4d65c 100644 --- a/repository/src/main/java/org/apache/atlas/repository/util/AccessControlUtils.java +++ b/repository/src/main/java/org/apache/atlas/repository/util/AccessControlUtils.java @@ -61,6 +61,10 @@ public final class AccessControlUtils { public static final String ATTR_PURPOSE_CLASSIFICATIONS = "purposeClassifications"; + public static final String ATTR_SERVICE_SERVICE_TYPE = "authServiceType"; + public static final String ATTR_SERVICE_TAG_SERVICE = "tagService"; + public static final String ATTR_SERVICE_IS_ENABLED = "authServiceIsEnabled"; + public static final String ATTR_SERVICE_LAST_SYNC = "authServicePolicyLastSync"; public static final String ATTR_POLICY_TYPE = "policyType"; public static final String ATTR_POLICY_USERS = "policyUsers"; public static final String ATTR_POLICY_GROUPS = "policyGroups"; diff --git a/webapp/src/main/java/org/apache/atlas/web/errors/ExceptionMapperUtil.java b/webapp/src/main/java/org/apache/atlas/web/errors/ExceptionMapperUtil.java index e2511e5d50..f10029266a 100644 --- a/webapp/src/main/java/org/apache/atlas/web/errors/ExceptionMapperUtil.java +++ b/webapp/src/main/java/org/apache/atlas/web/errors/ExceptionMapperUtil.java @@ -17,15 +17,80 @@ */ package org.apache.atlas.web.errors; +import org.apache.atlas.type.AtlasType; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.HashMap; public class ExceptionMapperUtil { protected static final Logger LOGGER = LoggerFactory.getLogger(ExceptionMapperUtil.class); @SuppressWarnings("UnusedParameters") protected static String formatErrorMessage(long id, Exception exception) { - return String.format("There was an error processing your request. It has been logged (ID %016x).", id); + if (exception == null) { + // If the exception is null, return a minimal error message + Map errorDetails = new HashMap<>(); + errorDetails.put("errorId", String.format("%016x", id)); + errorDetails.put("message", "No exception provided."); + errorDetails.put("causes", new ArrayList<>()); + return AtlasType.toJson(errorDetails); + } + + // Prepare data for error message + Map errorDetails = new HashMap<>(); + errorDetails.put("errorId", String.format("%016x", id)); + errorDetails.put("message", "There was an error processing your request."); + + // Create a list of causes + List> causes = new ArrayList<>(); + List visited = new ArrayList<>(); + Throwable currentException = exception; + + while (currentException != null) { + if (visited.contains(currentException)) { + // If circular reference detected, add special entry + Map circularCause = new HashMap<>(); + circularCause.put("errorType", "CircularReferenceDetected"); + circularCause.put("errorMessage", "A circular reference was detected in the exception chain."); + circularCause.put("location", "Unavailable"); + causes.add(circularCause); + break; + } + visited.add(currentException); + causes.add(formatCause(currentException)); + currentException = currentException.getCause(); + } + + errorDetails.put("causes", causes); + + return AtlasType.toJson(errorDetails); + } + + // Helper method to format a single exception cause + private static Map formatCause(Throwable exception) { + Map cause = new HashMap<>(); + + // Extract location details from the first stack trace element + StackTraceElement[] stackTrace = exception.getStackTrace(); + String location = "Unavailable"; + if (stackTrace != null && stackTrace.length > 0) { + StackTraceElement element = stackTrace[0]; + location = String.format("%s.%s (%s:%d)", + element.getClassName(), + element.getMethodName(), + element.getFileName(), + element.getLineNumber()); + } + + // Populate the cause map + cause.put("errorType", exception.getClass().getName()); + cause.put("errorMessage", exception.getMessage() != null ? exception.getMessage() : "No additional information provided"); + cause.put("location", location); + + return cause; } protected static void logException(long id, Exception exception) { @@ -36,5 +101,4 @@ protected static void logException(long id, Exception exception) { protected static String formatLogMessage(long id, Throwable exception) { return String.format("Error handling a request: %016x", id); } - } diff --git a/webapp/src/main/java/org/apache/atlas/web/rest/MigrationREST.java b/webapp/src/main/java/org/apache/atlas/web/rest/MigrationREST.java index 551d0f4aa2..c50e472d84 100644 --- a/webapp/src/main/java/org/apache/atlas/web/rest/MigrationREST.java +++ b/webapp/src/main/java/org/apache/atlas/web/rest/MigrationREST.java @@ -325,6 +325,31 @@ public List searchForType(@PathParam("typeName") String typeName, @ } } + @POST + @Path("repair-unique-qualified-name") + @Timed + public Boolean updateUniqueQualifiedName(final Set assetGuids) throws Exception { + AtlasPerfTracer perf = null; + try { + if (CollectionUtils.isEmpty(assetGuids)) { + throw new AtlasBaseException(AtlasErrorCode.BAD_REQUEST, "Asset GUIDs are required for which updating unique qualified name is required"); + } + + if (AtlasPerfTracer.isPerfTraceEnabled(PERF_LOG)) { + perf = AtlasPerfTracer.getPerfTracer(PERF_LOG, "MigrationREST.updateUniqueQualifiedName(" + assetGuids + ")"); + } + + UniqueQNAttributeMigrationService migrationService = new UniqueQNAttributeMigrationService(entityRetriever, assetGuids, transactionInterceptHelper); + migrationService.migrateQN(); + } catch (Exception e) { + LOG.error("Error while updating unique qualified name for guids: {}", assetGuids, e); + throw e; + } finally { + AtlasPerfTracer.log(perf); + } + return Boolean.TRUE; + } + private List getEntitiesByIndexSearch(IndexSearchParams indexSearchParams, Boolean minExtInfo, boolean ignoreRelationships) throws AtlasBaseException { List entities = new ArrayList<>(); String indexName = "janusgraph_vertex_index";