From 72b591b9794f0ce02f0e4e218f9d9f24ce03405f Mon Sep 17 00:00:00 2001 From: arpit-at Date: Wed, 3 Jul 2024 20:02:07 +0530 Subject: [PATCH] DG-1697: Added linking and unlinking api for business-policy --- .github/workflows/maven.yml | 1 + .../org/apache/atlas/AtlasConfiguration.java | 1 + .../v2/AlternateLinkingNotifierImpl.java | 67 ++++++++++ .../store/graph/v2/AtlasEntityStoreV2.java | 106 +++++++++------- .../store/graph/v2/EntityGraphMapper.java | 74 +++++++---- .../v2/IAtlasAlternateChangeNotifier.java | 28 +++++ .../graph/v2/bulkimport/MigrationImport.java | 2 +- .../java/org/apache/atlas/RequestContext.java | 86 ++++++++++++- .../EntityNotificationSender.java | 11 +- .../apache/atlas/web/rest/AlternateREST.java | 116 ++++++++++++++++++ .../org/apache/atlas/web/rest/EntityREST.java | 42 ------- 11 files changed, 420 insertions(+), 114 deletions(-) create mode 100644 repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AlternateLinkingNotifierImpl.java create mode 100644 repository/src/main/java/org/apache/atlas/repository/store/graph/v2/IAtlasAlternateChangeNotifier.java create mode 100644 webapp/src/main/java/org/apache/atlas/web/rest/AlternateREST.java diff --git a/.github/workflows/maven.yml b/.github/workflows/maven.yml index 0977cb36a2..1070038c39 100644 --- a/.github/workflows/maven.yml +++ b/.github/workflows/maven.yml @@ -26,6 +26,7 @@ on: - development - master - lineageondemand + - policyendpointsmaster jobs: build: diff --git a/intg/src/main/java/org/apache/atlas/AtlasConfiguration.java b/intg/src/main/java/org/apache/atlas/AtlasConfiguration.java index a701510a4e..ed54b1bb3f 100644 --- a/intg/src/main/java/org/apache/atlas/AtlasConfiguration.java +++ b/intg/src/main/java/org/apache/atlas/AtlasConfiguration.java @@ -113,6 +113,7 @@ public enum AtlasConfiguration { INDEXSEARCH_ASYNC_SEARCH_KEEP_ALIVE_TIME_IN_SECONDS("atlas.indexsearch.async.search.keep.alive.time.in.seconds", 300), + POLICY_OPERATIONS_NOTIFICATION_MAX_THREADS("atlas.policy.operations.max.threads", 5), ATLAS_MAINTENANCE_MODE("atlas.maintenance.mode", false); diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AlternateLinkingNotifierImpl.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AlternateLinkingNotifierImpl.java new file mode 100644 index 0000000000..3ca158b902 --- /dev/null +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AlternateLinkingNotifierImpl.java @@ -0,0 +1,67 @@ +package org.apache.atlas.repository.store.graph.v2; + +import org.apache.atlas.RequestContext; +import org.apache.atlas.exception.AtlasBaseException; +import org.apache.atlas.listener.EntityChangeListenerV2; +import org.apache.atlas.model.instance.AtlasEntity; +import org.apache.atlas.model.instance.AtlasEntityHeader; +import org.apache.atlas.model.instance.EntityMutationResponse; +import org.apache.atlas.repository.graphdb.AtlasVertex; +import org.apache.atlas.utils.AtlasPerfMetrics; +import org.springframework.stereotype.Component; + +import javax.inject.Inject; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + +import static org.apache.atlas.repository.graph.GraphHelper.*; + + +@Component +public class AlternateLinkingNotifierImpl implements IAtlasAlternateChangeNotifier { + + private final Set entityChangeListenersV2; + + @Inject + public AlternateLinkingNotifierImpl(Set entityChangeListenersV2) { + this.entityChangeListenersV2 = entityChangeListenersV2; + + } + + @Override + public void onEntitiesMutation(final EntityMutationResponse entityMutationResponse, final Map entityByGuid) throws AtlasBaseException { + AtlasPerfMetrics.MetricRecorder metricRecorder = RequestContext.get().startMetricRecord("onEntitiesMutation"); + final List updatedEntities = entityMutationResponse.getUpdatedEntities(); + final List entities = updatedEntities.stream().map(entityHeader -> createAtlasEntity(entityHeader, entityByGuid.get(entityHeader.getGuid()))).collect(Collectors.toList()); + + for (EntityChangeListenerV2 listener : entityChangeListenersV2) { + listener.onEntitiesUpdated(entities, false); + } + + RequestContext.get().endMetricRecord(metricRecorder); + } + + private AtlasEntity createAtlasEntity(AtlasEntityHeader entityHeader, AtlasVertex vertex) { + AtlasEntity atlasEntity = new AtlasEntity(); + atlasEntity.setAttributes(entityHeader.getAttributes()); + atlasEntity.setGuid(entityHeader.getGuid()); + atlasEntity.setTypeName(entityHeader.getTypeName()); + atlasEntity.setStatus(entityHeader.getStatus()); + atlasEntity.setCreatedBy(entityHeader.getCreatedBy()); + atlasEntity.setUpdatedBy(entityHeader.getUpdatedBy()); + atlasEntity.setCreateTime(entityHeader.getCreateTime()); + atlasEntity.setUpdateTime(entityHeader.getUpdateTime()); + atlasEntity.setIsProxy(entityHeader.getIsIncomplete()); + atlasEntity.setIsIncomplete(entityHeader.getIsIncomplete()); + atlasEntity.setProvenanceType(getProvenanceType(vertex)); + atlasEntity.setCustomAttributes(getCustomAttributes(vertex)); + atlasEntity.setHomeId(getHomeId(vertex)); + atlasEntity.setVersion(getVersion(vertex)); + + return atlasEntity; + } + + +} \ No newline at end of file diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityStoreV2.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityStoreV2.java index a227e5d36c..b5d8a40d08 100644 --- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityStoreV2.java +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityStoreV2.java @@ -20,16 +20,12 @@ import com.google.common.annotations.VisibleForTesting; -import org.apache.atlas.AtlasErrorCode; -import org.apache.atlas.DeleteType; -import org.apache.atlas.GraphTransactionInterceptor; -import org.apache.atlas.RequestContext; -import org.apache.atlas.AtlasException; -import org.apache.atlas.AtlasConfiguration; +import org.apache.atlas.*; import org.apache.atlas.annotation.GraphTransaction; import org.apache.atlas.authorize.*; import org.apache.atlas.authorize.AtlasEntityAccessRequest.AtlasEntityAccessRequestBuilder; -import org.apache.atlas.authorize.AtlasPrivilege; +import org.apache.atlas.bulkimport.BulkImportResponse; +import org.apache.atlas.bulkimport.BulkImportResponse.ImportInfo; import org.apache.atlas.discovery.EntityDiscoveryService; import org.apache.atlas.exception.AtlasBaseException; import org.apache.atlas.featureflag.FeatureFlagStore; @@ -55,40 +51,31 @@ import org.apache.atlas.repository.store.graph.EntityGraphDiscovery; import org.apache.atlas.repository.store.graph.EntityGraphDiscoveryContext; import org.apache.atlas.repository.store.graph.v1.DeleteHandlerDelegate; -import org.apache.atlas.repository.store.graph.v2.AtlasEntityComparator.AtlasEntityDiffResult; import org.apache.atlas.repository.store.graph.v1.RestoreHandlerV1; +import org.apache.atlas.repository.store.graph.v2.AtlasEntityComparator.AtlasEntityDiffResult; import org.apache.atlas.repository.store.graph.v2.preprocessor.AuthPolicyPreProcessor; import org.apache.atlas.repository.store.graph.v2.preprocessor.ConnectionPreProcessor; -import org.apache.atlas.repository.store.graph.v2.preprocessor.accesscontrol.StakeholderPreProcessor; -import org.apache.atlas.repository.store.graph.v2.preprocessor.contract.ContractPreProcessor; -import org.apache.atlas.repository.store.graph.v2.preprocessor.datamesh.StakeholderTitlePreProcessor; -import org.apache.atlas.repository.store.graph.v2.preprocessor.resource.LinkPreProcessor; import org.apache.atlas.repository.store.graph.v2.preprocessor.PreProcessor; import org.apache.atlas.repository.store.graph.v2.preprocessor.accesscontrol.PersonaPreProcessor; import org.apache.atlas.repository.store.graph.v2.preprocessor.accesscontrol.PurposePreProcessor; -import org.apache.atlas.repository.store.graph.v2.preprocessor.datamesh.DataProductPreProcessor; +import org.apache.atlas.repository.store.graph.v2.preprocessor.accesscontrol.StakeholderPreProcessor; +import org.apache.atlas.repository.store.graph.v2.preprocessor.contract.ContractPreProcessor; import org.apache.atlas.repository.store.graph.v2.preprocessor.datamesh.DataDomainPreProcessor; +import org.apache.atlas.repository.store.graph.v2.preprocessor.datamesh.DataProductPreProcessor; +import org.apache.atlas.repository.store.graph.v2.preprocessor.datamesh.StakeholderTitlePreProcessor; import org.apache.atlas.repository.store.graph.v2.preprocessor.glossary.CategoryPreProcessor; import org.apache.atlas.repository.store.graph.v2.preprocessor.glossary.GlossaryPreProcessor; import org.apache.atlas.repository.store.graph.v2.preprocessor.glossary.TermPreProcessor; +import org.apache.atlas.repository.store.graph.v2.preprocessor.resource.LinkPreProcessor; import org.apache.atlas.repository.store.graph.v2.preprocessor.resource.ReadmePreProcessor; import org.apache.atlas.repository.store.graph.v2.preprocessor.sql.QueryCollectionPreProcessor; import org.apache.atlas.repository.store.graph.v2.preprocessor.sql.QueryFolderPreProcessor; import org.apache.atlas.repository.store.graph.v2.preprocessor.sql.QueryPreProcessor; import org.apache.atlas.repository.store.graph.v2.tasks.MeaningsTask; import org.apache.atlas.tasks.TaskManagement; -import org.apache.atlas.type.AtlasArrayType; -import org.apache.atlas.type.AtlasBusinessMetadataType; +import org.apache.atlas.type.*; import org.apache.atlas.type.AtlasBusinessMetadataType.AtlasBusinessAttribute; -import org.apache.atlas.type.AtlasClassificationType; -import org.apache.atlas.type.AtlasEntityType; -import org.apache.atlas.type.AtlasEnumType; import org.apache.atlas.type.AtlasStructType.AtlasAttribute; -import org.apache.atlas.type.AtlasType; -import org.apache.atlas.type.AtlasTypeRegistry; -import org.apache.atlas.type.AtlasTypeUtil; -import org.apache.atlas.bulkimport.BulkImportResponse; -import org.apache.atlas.bulkimport.BulkImportResponse.ImportInfo; import org.apache.atlas.util.FileUtils; import org.apache.atlas.utils.AtlasEntityUtil; import org.apache.atlas.utils.AtlasPerfMetrics; @@ -111,18 +98,15 @@ import static org.apache.atlas.bulkimport.BulkImportResponse.ImportStatus.FAILED; import static org.apache.atlas.model.instance.AtlasEntity.Status.ACTIVE; import static org.apache.atlas.model.instance.EntityMutations.EntityOperation.*; +import static org.apache.atlas.repository.Constants.IS_INCOMPLETE_PROPERTY_KEY; +import static org.apache.atlas.repository.Constants.STATE_PROPERTY_KEY; import static org.apache.atlas.repository.Constants.*; import static org.apache.atlas.repository.graph.GraphHelper.*; -import static org.apache.atlas.repository.graph.GraphHelper.getStatus; import static org.apache.atlas.repository.store.graph.v2.EntityGraphMapper.validateLabels; -import static org.apache.atlas.repository.store.graph.v2.tasks.MeaningsTaskFactory.*; +import static org.apache.atlas.repository.store.graph.v2.tasks.MeaningsTaskFactory.UPDATE_ENTITY_MEANINGS_ON_TERM_HARD_DELETE; +import static org.apache.atlas.repository.store.graph.v2.tasks.MeaningsTaskFactory.UPDATE_ENTITY_MEANINGS_ON_TERM_SOFT_DELETE; import static org.apache.atlas.repository.util.AccessControlUtils.REL_ATTR_POLICIES; -import static org.apache.atlas.type.Constants.HAS_LINEAGE; -import static org.apache.atlas.type.Constants.HAS_LINEAGE_VALID; -import static org.apache.atlas.type.Constants.MEANINGS_TEXT_PROPERTY_KEY; -import static org.apache.atlas.type.Constants.MEANINGS_PROPERTY_KEY; -import static org.apache.atlas.type.Constants.MEANING_NAMES_PROPERTY_KEY; -import static org.apache.atlas.type.Constants.PENDING_TASKS_PROPERTY_KEY; +import static org.apache.atlas.type.Constants.*; @@ -151,10 +135,12 @@ public class AtlasEntityStoreV2 implements AtlasEntityStore { private final ESAliasStore esAliasStore; + private final IAtlasAlternateChangeNotifier atlasAlternateChangeNotifier; @Inject public AtlasEntityStoreV2(AtlasGraph graph, DeleteHandlerDelegate deleteDelegate, RestoreHandlerV1 restoreHandlerV1, AtlasTypeRegistry typeRegistry, IAtlasEntityChangeNotifier entityChangeNotifier, EntityGraphMapper entityGraphMapper, TaskManagement taskManagement, - AtlasRelationshipStore atlasRelationshipStore, FeatureFlagStore featureFlagStore) { + AtlasRelationshipStore atlasRelationshipStore, FeatureFlagStore featureFlagStore, + IAtlasAlternateChangeNotifier atlasAlternateChangeNotifier) { this.graph = graph; this.deleteDelegate = deleteDelegate; this.restoreHandlerV1 = restoreHandlerV1; @@ -168,7 +154,7 @@ public AtlasEntityStoreV2(AtlasGraph graph, DeleteHandlerDelegate deleteDelegate this.atlasRelationshipStore = atlasRelationshipStore; this.featureFlagStore = featureFlagStore; this.esAliasStore = new ESAliasStore(graph, entityRetriever); - + this.atlasAlternateChangeNotifier = atlasAlternateChangeNotifier; try { this.discovery = new EntityDiscoveryService(typeRegistry, graph, null, null, null, null); } catch (AtlasException e) { @@ -1550,8 +1536,8 @@ private EntityMutationResponse createOrUpdate(EntityStream entityStream, boolean // Notify the change listeners - // entityChangeNotifier.onEntitiesMutated(ret, RequestContext.get().isImportInProgress()); - // atlasRelationshipStore.onRelationshipsMutated(RequestContext.get().getRelationshipMutationMap()); + entityChangeNotifier.onEntitiesMutated(ret, RequestContext.get().isImportInProgress()); + atlasRelationshipStore.onRelationshipsMutated(RequestContext.get().getRelationshipMutationMap()); if (LOG.isDebugEnabled()) { LOG.debug("<== createOrUpdate()"); } @@ -2739,21 +2725,51 @@ public void repairAccesscontrolAlias(String guid) throws AtlasBaseException { } @Override - @GraphTransaction - public void linkBusinessPolicy(String guid, List linkGuids) { - AtlasPerfMetrics.MetricRecorder metric = RequestContext.get().startMetricRecord("linkBusinessPolicy"); - this.entityGraphMapper.linkBusinessPolicy(guid, linkGuids); - RequestContext.get().endMetricRecord(metric); + public void linkBusinessPolicy(String guid, List linkGuids) throws AtlasBaseException { + processBusinessPolicy(guid, linkGuids, true); } - @Override - @GraphTransaction - public void unlinkBusinessPolicy(String guid, List unlinkGuids) { - AtlasPerfMetrics.MetricRecorder metric = RequestContext.get().startMetricRecord("linkBusinessPolicy"); - this.entityGraphMapper.unlinkBusinessPolicy(guid, unlinkGuids); + public void unlinkBusinessPolicy(String guid, List unlinkGuids) throws AtlasBaseException { + processBusinessPolicy(guid, unlinkGuids, false); + } + + private void processBusinessPolicy(String guid, List guids, boolean isLink) throws AtlasBaseException { + // Start recording the performance metrics for the operation + String operation = isLink ? "linkBusinessPolicy" : "unlinkBusinessPolicy"; + AtlasPerfMetrics.MetricRecorder metric = RequestContext.get().startMetricRecord(operation); + + // Link or unlink the business policy to/from entities and retrieve the affected vertices + List vertices = isLink ? this.entityGraphMapper.linkBusinessPolicy(guid, guids) + : this.entityGraphMapper.unlinkBusinessPolicy(guid, guids); + + // If no vertices are returned, exit the method early + if (CollectionUtils.isEmpty(vertices)) { + return; + } + + // Prepare the response for the entity mutations + EntityMutationResponse entityMutationResponse = new EntityMutationResponse(); + for (AtlasVertex vertex : vertices) { + // Convert each vertex to an AtlasEntityHeader and add it to the response + AtlasEntityHeader entityHeader = entityRetriever.toAtlasEntityHeader(vertex); + entityMutationResponse.addEntity(UPDATE, entityHeader); + } + + // Collect the vertices into a map for easier access by their GUID property + Map vertexMap = vertices.stream() + .collect(Collectors.toMap(vertex -> vertex.getProperty("__guid", String.class), vertex -> vertex)); + + // Notify the policy change notifier about the entities that were mutated + this.atlasAlternateChangeNotifier.onEntitiesMutation(entityMutationResponse, vertexMap); + + // End the performance metrics recording RequestContext.get().endMetricRecord(metric); } + + + } + diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphMapper.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphMapper.java index 70835b74cb..2ca4519f52 100644 --- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphMapper.java +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphMapper.java @@ -98,6 +98,7 @@ import java.util.Set; import java.util.UUID; import java.util.Date; +import java.util.concurrent.CompletableFuture; import java.util.regex.Matcher; import java.util.regex.Pattern; import java.util.stream.Collectors; @@ -4514,27 +4515,58 @@ public void addHasLineage(Set inputOutputEdges, boolean isRestoreEnti } RequestContext.get().endMetricRecord(metricRecorder); } - public void linkBusinessPolicy(String policyId, List linkGuids) { - for (String guid : linkGuids) { - AtlasVertex ev = AtlasGraphUtilsV2.findByGuid(graph, guid); - if (ev != null) { - Set existingValues = ev.getMultiValuedSetProperty("assetPolicyGUIDs", String.class); - ev.setProperty("assetPolicyGUIDs", policyId); - ev.setProperty("assetPoliciesCount", existingValues.size() + 1); - updateModificationMetadata(ev); - } - } + + + @GraphTransaction + public List linkBusinessPolicy(String policyId, List linkGuids) { + AtlasPerfMetrics.MetricRecorder metric = RequestContext.get().startMetricRecord("linkBusinessPolicy.GraphTransaction"); + List collect = linkGuids.stream().map(guid -> findByGuid(graph, guid)).filter(Objects::nonNull).filter(ev -> { + Set existingValues = ev.getMultiValuedSetProperty("assetPolicyGUIDs", String.class); + return !existingValues.contains(policyId); + }).peek(ev -> { + Set existingValues = ev.getMultiValuedSetProperty("assetPolicyGUIDs", String.class); + existingValues.add(policyId); + ev.setProperty("assetPolicyGUIDs", policyId); + ev.setProperty("assetPoliciesCount", existingValues.size() + 1); + + updateModificationMetadata(ev); + + cacheDifferentialEntity(ev, existingValues, policyId); + }).collect(Collectors.toList()); + RequestContext.get().endMetricRecord(metric); + return collect; } - public void unlinkBusinessPolicy(String policyId, List unlinkGuids) { - for (String guid : unlinkGuids) { - AtlasVertex ev = AtlasGraphUtilsV2.findByGuid(graph, guid); - if (ev != null) { - Set existingValues = ev.getMultiValuedSetProperty("assetPolicyGUIDs", String.class); - ev.removePropertyValue("assetPolicyGUIDs", policyId); - ev.setProperty("assetPoliciesCount", existingValues.size() - 1); - updateModificationMetadata(ev); - } - } + + @GraphTransaction + public List unlinkBusinessPolicy(String policyId, List unlinkGuids) { + + return unlinkGuids.stream().map(guid -> AtlasGraphUtilsV2.findByGuid(graph, guid)).filter(Objects::nonNull).filter(ev -> { + Set existingValues = ev.getMultiValuedSetProperty("assetPolicyGUIDs", String.class); + return existingValues.contains(policyId); + }).peek(ev -> { + Set existingValues = ev.getMultiValuedSetProperty("assetPolicyGUIDs", String.class); + existingValues.remove(policyId); + ev.removePropertyValue("assetPolicyGUIDs", policyId); + ev.setProperty("assetPoliciesCount", existingValues.size() - 1); + + updateModificationMetadata(ev); + + cacheDifferentialEntity(ev, existingValues, policyId); + }).collect(Collectors.toList()); } -} \ No newline at end of file + + + private void cacheDifferentialEntity(AtlasVertex ev, Set existingValues, String policyId) { + AtlasEntity diffEntity = new AtlasEntity(ev.getProperty("__typeName", String.class)); + diffEntity.setGuid(ev.getProperty("__guid", String.class)); + diffEntity.setAttribute("assetPolicyGUIDs", existingValues); + diffEntity.setAttribute("assetPoliciesCount", existingValues.size()); + diffEntity.setUpdatedBy(ev.getProperty(MODIFIED_BY_KEY, String.class)); + diffEntity.setUpdateTime(new Date(ev.getProperty(MODIFICATION_TIMESTAMP_PROPERTY_KEY, Long.class))); + + RequestContext requestContext = RequestContext.get(); + requestContext.cacheDifferentialEntity(diffEntity); + } + +} diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/IAtlasAlternateChangeNotifier.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/IAtlasAlternateChangeNotifier.java new file mode 100644 index 0000000000..3e83666144 --- /dev/null +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/IAtlasAlternateChangeNotifier.java @@ -0,0 +1,28 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.atlas.repository.store.graph.v2; + +import org.apache.atlas.exception.AtlasBaseException; +import org.apache.atlas.model.instance.*; +import org.apache.atlas.repository.graphdb.AtlasVertex; + +import java.util.Map; + +public interface IAtlasAlternateChangeNotifier { + void onEntitiesMutation(final EntityMutationResponse entityMutationResponse, final Map entityByGuid) throws AtlasBaseException; +} diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/bulkimport/MigrationImport.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/bulkimport/MigrationImport.java index 0cc7c4a318..cdb485c7bf 100644 --- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/bulkimport/MigrationImport.java +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/bulkimport/MigrationImport.java @@ -129,7 +129,7 @@ private AtlasEntityStoreV2 createEntityStore(AtlasGraph graph, AtlasTypeRegistry graph, relationshipStore, entityChangeNotifier, getInstanceConverter(graph), fullTextMapperV2, null, null); AtlasRelationshipStoreV2 atlasRelationshipStoreV2 = new AtlasRelationshipStoreV2(graph, typeRegistry, deleteDelegate, entityChangeNotifier); - return new AtlasEntityStoreV2(graph, deleteDelegate, restoreHandlerV1, typeRegistry, entityChangeNotifier, entityGraphMapper, null, atlasRelationshipStoreV2, null); + return new AtlasEntityStoreV2(graph, deleteDelegate, restoreHandlerV1, typeRegistry, entityChangeNotifier, entityGraphMapper, null, atlasRelationshipStoreV2, null, null); } private void shutdownEntityCreationManager(EntityCreationManager creationManager) { diff --git a/server-api/src/main/java/org/apache/atlas/RequestContext.java b/server-api/src/main/java/org/apache/atlas/RequestContext.java index 565832b7bd..9ac463ecd1 100644 --- a/server-api/src/main/java/org/apache/atlas/RequestContext.java +++ b/server-api/src/main/java/org/apache/atlas/RequestContext.java @@ -93,7 +93,7 @@ public class RequestContext { private String currentTypePatchAction = ""; private AtlasTask currentTask; private String traceId; - private final Map relationshipEndToVertexIdMap = new HashMap<>(); + private Map relationshipEndToVertexIdMap = new HashMap<>(); private boolean allowDuplicateDisplayName; private MetricsRegistry metricsRegistry; private boolean skipAuthorizationCheck = false; @@ -105,10 +105,82 @@ public class RequestContext { private Map> deletedClassificationAndVertices = new HashMap<>(); private Map> addedClassificationAndVertices = new HashMap<>(); + private boolean isCloned = false; + private boolean alternatePath = false; + private RequestContext() { + + } + + private RequestContext(RequestContext other) { + this.user = other.user; + this.userGroups = other.userGroups != null ? new HashSet<>(other.userGroups) : null; + this.clientIPAddress = other.clientIPAddress; + this.forwardedAddresses = other.forwardedAddresses != null ? new ArrayList<>(other.forwardedAddresses) : null; + this.deleteType = other.deleteType; + this.isPurgeRequested = other.isPurgeRequested; + this.maxAttempts = other.maxAttempts; + this.attemptCount = other.attemptCount; + this.isImportInProgress = other.isImportInProgress; + this.isInNotificationProcessing = other.isInNotificationProcessing; + this.isInTypePatching = other.isInTypePatching; + this.createShellEntityForNonExistingReference = other.createShellEntityForNonExistingReference; + this.skipFailedEntities = other.skipFailedEntities; + this.allowDeletedRelationsIndexsearch = other.allowDeletedRelationsIndexsearch; + this.includeMeanings = other.includeMeanings; + this.includeClassifications = other.includeClassifications; + this.includeClassificationNames = other.includeClassificationNames; + this.currentTypePatchAction = other.currentTypePatchAction; + this.currentTask = other.currentTask; + this.traceId = other.traceId; + this.relationshipEndToVertexIdMap = new HashMap<>(other.relationshipEndToVertexIdMap); + this.allowDuplicateDisplayName = other.allowDuplicateDisplayName; + this.metricsRegistry = other.metricsRegistry; + this.skipAuthorizationCheck = other.skipAuthorizationCheck; + this.deletedEdgesIdsForResetHasLineage = new HashSet<>(other.deletedEdgesIdsForResetHasLineage); + this.requestUri = other.requestUri; + this.cacheEnabled = other.cacheEnabled; + this.delayTagNotifications = other.delayTagNotifications; + this.deletedClassificationAndVertices = new HashMap<>(other.deletedClassificationAndVertices); + this.addedClassificationAndVertices = new HashMap<>(other.addedClassificationAndVertices); + + this.updatedEntities.putAll(other.updatedEntities); + this.deletedEntities.putAll(other.deletedEntities); + this.restoreEntities.putAll(other.restoreEntities); + this.entityCache.putAll(other.entityCache); + this.entityHeaderCache.putAll(other.entityHeaderCache); + this.entityExtInfoCache.putAll(other.entityExtInfoCache); + this.diffEntityCache.putAll(other.diffEntityCache); + this.addedPropagations.putAll(other.addedPropagations); + this.removedPropagations.putAll(other.removedPropagations); + this.requestContextHeaders.putAll(other.requestContextHeaders); + this.deletedEdgesIds.addAll(other.deletedEdgesIds); + this.processGuidIds.addAll(other.processGuidIds); + this.entitiesToSkipUpdate.addAll(other.entitiesToSkipUpdate); + this.onlyCAUpdateEntities.addAll(other.onlyCAUpdateEntities); + this.onlyBAUpdateEntities.addAll(other.onlyBAUpdateEntities); + this.queuedTasks.addAll(other.queuedTasks); + this.relationAttrsForSearch.addAll(other.relationAttrsForSearch); + this.removedElementsMap.putAll(other.removedElementsMap); + this.newElementsCreatedMap.putAll(other.newElementsCreatedMap); + this.relationshipMutationMap.putAll(other.relationshipMutationMap); + } + + + public static RequestContext cloneCurrentContext() { + RequestContext currentContext = get(); + RequestContext requestContext = new RequestContext(currentContext); + requestContext.isCloned = true; + return requestContext; + } + + public static void set(RequestContext context) { + CURRENT_CONTEXT.set(context); } + + //To handle gets from background threads where createContext() is not called //createContext called for every request in the filter public static RequestContext get() { @@ -788,4 +860,16 @@ public void clearMutationContext(String event) { public Map> getRelationshipMutationMap() { return relationshipMutationMap; } + + public boolean isCloned() { + return isCloned; + } + + public boolean isAlternatePath() { + return alternatePath; + } + + public void setAlternatePath(boolean alternatePath) { + this.alternatePath = alternatePath; + } } \ No newline at end of file diff --git a/webapp/src/main/java/org/apache/atlas/notification/EntityNotificationSender.java b/webapp/src/main/java/org/apache/atlas/notification/EntityNotificationSender.java index c039cc837a..7147d23215 100644 --- a/webapp/src/main/java/org/apache/atlas/notification/EntityNotificationSender.java +++ b/webapp/src/main/java/org/apache/atlas/notification/EntityNotificationSender.java @@ -18,6 +18,7 @@ package org.apache.atlas.notification; import org.apache.atlas.GraphTransactionInterceptor; +import org.apache.atlas.RequestContext; import org.apache.atlas.model.notification.EntityNotification; import org.apache.commons.collections.CollectionUtils; import org.apache.commons.configuration.Configuration; @@ -95,10 +96,12 @@ public void send(EntityNotification.EntityNotificationV2.OperationType operation notificationHook = new PostCommitNotificationHook(operationType, notifications); postCommitNotificationHooks.set(notificationHook); } else { - if (isRelationshipEvent(operationType)) - notificationHook.addRelationshipNotifications(notifications); - else - notificationHook.addNotifications(notifications); + if (isRelationshipEvent(operationType)) notificationHook.addRelationshipNotifications(notifications); + else notificationHook.addNotifications(notifications); + } + + if (RequestContext.get().isAlternatePath()) { + notificationHook.onComplete(true); } } diff --git a/webapp/src/main/java/org/apache/atlas/web/rest/AlternateREST.java b/webapp/src/main/java/org/apache/atlas/web/rest/AlternateREST.java new file mode 100644 index 0000000000..2746bcbf8c --- /dev/null +++ b/webapp/src/main/java/org/apache/atlas/web/rest/AlternateREST.java @@ -0,0 +1,116 @@ +package org.apache.atlas.web.rest; + +import org.apache.atlas.AtlasErrorCode; +import org.apache.atlas.RequestContext; +import org.apache.atlas.annotation.Timed; +import org.apache.atlas.exception.AtlasBaseException; +import org.apache.atlas.model.instance.LinkBusinessPolicyRequest; +import org.apache.atlas.repository.store.graph.AtlasEntityStore; +import org.apache.atlas.utils.AtlasPerfTracer; +import org.apache.atlas.web.util.Servlets; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.stereotype.Service; + +import javax.inject.Inject; +import javax.inject.Singleton; +import javax.ws.rs.*; +import javax.ws.rs.core.MediaType; + +import static org.apache.atlas.repository.util.AccessControlUtils.ARGO_SERVICE_USER_NAME; + +@Path("alternate") +@Singleton +@Service +@Consumes({Servlets.JSON_MEDIA_TYPE, MediaType.APPLICATION_JSON}) +@Produces({Servlets.JSON_MEDIA_TYPE, MediaType.APPLICATION_JSON}) +public class AlternateREST { + + private static final Logger LOG = LoggerFactory.getLogger(AlternateREST.class); + private static final Logger PERF_LOG = AtlasPerfTracer.getPerfLogger("rest.AlternateREST"); + + private final AtlasEntityStore entitiesStore; + + @Inject + public AlternateREST(AtlasEntityStore entitiesStore) { + this.entitiesStore = entitiesStore; + } + + /** + * Links a business policy to entities. + * + * @param policyId the ID of the policy to be linked + * @param request the request containing the GUIDs of the assets to link the policy to + * @throws AtlasBaseException if there is an error during the linking process + */ + @POST + @Path("/{policyId}/link-business-policy") + @Timed + public void linkBusinessPolicy(@PathParam("policyId") final String policyId, final LinkBusinessPolicyRequest request) throws AtlasBaseException { + // Ensure the current user is authorized to link policies + if (!ARGO_SERVICE_USER_NAME.equals(RequestContext.getCurrentUser())) { + throw new AtlasBaseException(AtlasErrorCode.UNAUTHORIZED_ACCESS, RequestContext.getCurrentUser(), "Policy linking"); + } + + // Set request context parameters + RequestContext.get().setAlternatePath(true); + RequestContext.get().setIncludeClassifications(false); + RequestContext.get().setIncludeMeanings(false); + + AtlasPerfTracer perf = null; + try { + // Start performance tracing if enabled + if (AtlasPerfTracer.isPerfTraceEnabled(PERF_LOG)) { + perf = AtlasPerfTracer.getPerfTracer(PERF_LOG, "AlternateREST.linkBusinessPolicy(" + policyId + ")"); + } + + // Link the business policy to the specified entities + entitiesStore.linkBusinessPolicy(policyId, request.getLinkGuids()); + } catch (AtlasBaseException abe) { + LOG.error("Error in policy linking: ", abe); + throw new AtlasBaseException(AtlasErrorCode.INTERNAL_ERROR, "During Policy linking"); + } finally { + // Log performance metrics + AtlasPerfTracer.log(perf); + } + } + + /** + * Unlinks a business policy from entities. + * + * @param policyId the ID of the policy to be unlinked + * @param request the request containing the GUIDs of the assets to unlink the policy from + * @throws AtlasBaseException if there is an error during the unlinking process + */ + @POST + @Path("/{policyId}/unlink-business-policy") + @Timed + public void unlinkBusinessPolicy(@PathParam("policyId") final String policyId, final LinkBusinessPolicyRequest request) throws AtlasBaseException { + // Ensure the current user is authorized to unlink policies + if (!ARGO_SERVICE_USER_NAME.equals(RequestContext.getCurrentUser())) { + throw new AtlasBaseException(AtlasErrorCode.UNAUTHORIZED_ACCESS, RequestContext.getCurrentUser(), "Policy unlinking"); + } + + // Set request context parameters + RequestContext.get().setAlternatePath(true); + RequestContext.get().setIncludeClassifications(false); + RequestContext.get().setIncludeMeanings(false); + + AtlasPerfTracer perf = null; + try { + // Start performance tracing if enabled + if (AtlasPerfTracer.isPerfTraceEnabled(PERF_LOG)) { + perf = AtlasPerfTracer.getPerfTracer(PERF_LOG, "AlternateREST.unlinkBusinessPolicy(" + policyId + ")"); + } + + // Unlink the business policy from the specified entities + entitiesStore.unlinkBusinessPolicy(policyId, request.getUnlinkGuids()); + } catch (AtlasBaseException abe) { + LOG.error("Error in policy unlinking: ", abe); + throw new AtlasBaseException(AtlasErrorCode.INTERNAL_ERROR, "During Policy unlinking"); + } finally { + // Log performance metrics + AtlasPerfTracer.log(perf); + } + } +} diff --git a/webapp/src/main/java/org/apache/atlas/web/rest/EntityREST.java b/webapp/src/main/java/org/apache/atlas/web/rest/EntityREST.java index 62c77b142d..7d50b30db7 100644 --- a/webapp/src/main/java/org/apache/atlas/web/rest/EntityREST.java +++ b/webapp/src/main/java/org/apache/atlas/web/rest/EntityREST.java @@ -1955,46 +1955,4 @@ public void repairAccessControlAlias(@PathParam("guid") String guid) throws Atla AtlasPerfTracer.log(perf); } } - - - @POST - @Path("/{policyId}/link-business-policy") - @Produces(Servlets.JSON_MEDIA_TYPE) - @Consumes(Servlets.JSON_MEDIA_TYPE) - @Timed - public void linkBusinessPolicy(@PathParam("policyId") final String policyId, final LinkBusinessPolicyRequest request) throws AtlasBaseException { - if (ARGO_SERVICE_USER_NAME.equals(RequestContext.getCurrentUser())) { - throw new AtlasBaseException(AtlasErrorCode.UNAUTHORIZED_ACCESS, RequestContext.getCurrentUser(), "Policy linking"); - } - AtlasPerfTracer perf = null; - try { - if (AtlasPerfTracer.isPerfTraceEnabled(PERF_LOG)) { - perf = AtlasPerfTracer.getPerfTracer(PERF_LOG, "EntityREST.linkBusinessPolicy(" + policyId + ")"); - } - entitiesStore.linkBusinessPolicy(policyId, request.getLinkGuids()); - } finally { - AtlasPerfTracer.log(perf); - } - } - - - @POST - @Path("/{policyId}/unlink-business-policy") - @Produces(Servlets.JSON_MEDIA_TYPE) - @Consumes(Servlets.JSON_MEDIA_TYPE) - @Timed - public void unlinkBusinessPolicy(@PathParam("policyId") final String policyId, final LinkBusinessPolicyRequest request) throws AtlasBaseException { - if (ARGO_SERVICE_USER_NAME.equals(RequestContext.getCurrentUser())) { - throw new AtlasBaseException(AtlasErrorCode.UNAUTHORIZED_ACCESS, RequestContext.getCurrentUser(), "Policy unlinking"); - } - AtlasPerfTracer perf = null; - try { - if (AtlasPerfTracer.isPerfTraceEnabled(PERF_LOG)) { - perf = AtlasPerfTracer.getPerfTracer(PERF_LOG, "EntityREST.unlinkBusinessPolicy(" + policyId + ")"); - } - entitiesStore.unlinkBusinessPolicy(policyId, request.getUnlinkGuids()); - } finally { - AtlasPerfTracer.log(perf); - } - } }