diff --git a/common/src/main/java/org/apache/atlas/repository/Constants.java b/common/src/main/java/org/apache/atlas/repository/Constants.java index f2f7fd1e3e..44c6dadb27 100644 --- a/common/src/main/java/org/apache/atlas/repository/Constants.java +++ b/common/src/main/java/org/apache/atlas/repository/Constants.java @@ -370,6 +370,10 @@ public final class Constants { public static final String IMPALA_SOURCE = "impala"; public static final String STORM_SOURCE = "storm"; public static final String FILE_SPOOL_SOURCE = "file_spool"; + public static final String ASSET_POLICY_GUIDS = "assetPolicyGUIDs"; + public static final String ASSET_POLICIES_COUNT = "assetPoliciesCount"; + + /* * All supported file-format extensions for Bulk Imports through file upload diff --git a/intg/src/main/java/org/apache/atlas/model/instance/LinkBusinessPolicyRequest.java b/intg/src/main/java/org/apache/atlas/model/instance/LinkBusinessPolicyRequest.java new file mode 100644 index 0000000000..e42fd4ad55 --- /dev/null +++ b/intg/src/main/java/org/apache/atlas/model/instance/LinkBusinessPolicyRequest.java @@ -0,0 +1,74 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.atlas.model.instance; + +import com.fasterxml.jackson.annotation.JsonAutoDetect; +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import com.fasterxml.jackson.annotation.JsonInclude; +import com.fasterxml.jackson.databind.annotation.JsonSerialize; + +import javax.xml.bind.annotation.XmlAccessType; +import javax.xml.bind.annotation.XmlAccessorType; +import javax.xml.bind.annotation.XmlRootElement; +import java.io.Serializable; +import java.util.List; +import java.util.Set; + +import static com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility.NONE; +import static com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility.PUBLIC_ONLY; + +/** + * Request to link/unlink policies from asset. + */ +@JsonAutoDetect(getterVisibility = PUBLIC_ONLY, setterVisibility = PUBLIC_ONLY, fieldVisibility = NONE) +@JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL) +@JsonIgnoreProperties(ignoreUnknown = true) +@JsonInclude(JsonInclude.Include.NON_NULL) +@XmlRootElement +@XmlAccessorType(XmlAccessType.PROPERTY) +public class LinkBusinessPolicyRequest implements Serializable { + private static final long serialVersionUID = 1L; + + private Set linkGuids; + private Set unlinkGuids; + + public Set getLinkGuids() { + return linkGuids; + } + + public void setLinkGuids(Set linkGuids) { + this.linkGuids = linkGuids; + } + + public Set getUnlinkGuids() { + return unlinkGuids; + } + + public void setUnlinkGuids(Set unlinkGuids) { + this.unlinkGuids = unlinkGuids; + } + + @Override + public String toString() { + final StringBuilder sb = new StringBuilder("LinkBusinessPolicyRequest{"); + sb.append("linkGuids=").append(linkGuids); + sb.append(", unlinkGuids=").append(unlinkGuids); + sb.append('}'); + return sb.toString(); + } +} diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/AtlasEntityStore.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/AtlasEntityStore.java index 5fd7680281..4d7e10e2a2 100644 --- a/repository/src/main/java/org/apache/atlas/repository/store/graph/AtlasEntityStore.java +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/AtlasEntityStore.java @@ -368,4 +368,10 @@ EntityMutationResponse deleteByUniqueAttributes(List objectIds) void repairAccesscontrolAlias(String guid) throws AtlasBaseException; + + void linkBusinessPolicy(String policyId, Set linkGuids) throws AtlasBaseException; + + + void unlinkBusinessPolicy(String policyId, Set unlinkGuids) throws AtlasBaseException; + } diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityStoreV2.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityStoreV2.java index 5e627dfa07..7b3cbb6fb3 100644 --- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityStoreV2.java +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityStoreV2.java @@ -20,16 +20,12 @@ import com.google.common.annotations.VisibleForTesting; -import org.apache.atlas.AtlasErrorCode; -import org.apache.atlas.DeleteType; -import org.apache.atlas.GraphTransactionInterceptor; -import org.apache.atlas.RequestContext; -import org.apache.atlas.AtlasException; -import org.apache.atlas.AtlasConfiguration; +import org.apache.atlas.*; import org.apache.atlas.annotation.GraphTransaction; import org.apache.atlas.authorize.*; import org.apache.atlas.authorize.AtlasEntityAccessRequest.AtlasEntityAccessRequestBuilder; -import org.apache.atlas.authorize.AtlasPrivilege; +import org.apache.atlas.bulkimport.BulkImportResponse; +import org.apache.atlas.bulkimport.BulkImportResponse.ImportInfo; import org.apache.atlas.discovery.EntityDiscoveryService; import org.apache.atlas.exception.AtlasBaseException; import org.apache.atlas.featureflag.FeatureFlagStore; @@ -55,40 +51,31 @@ import org.apache.atlas.repository.store.graph.EntityGraphDiscovery; import org.apache.atlas.repository.store.graph.EntityGraphDiscoveryContext; import org.apache.atlas.repository.store.graph.v1.DeleteHandlerDelegate; -import org.apache.atlas.repository.store.graph.v2.AtlasEntityComparator.AtlasEntityDiffResult; import org.apache.atlas.repository.store.graph.v1.RestoreHandlerV1; +import org.apache.atlas.repository.store.graph.v2.AtlasEntityComparator.AtlasEntityDiffResult; import org.apache.atlas.repository.store.graph.v2.preprocessor.AuthPolicyPreProcessor; import org.apache.atlas.repository.store.graph.v2.preprocessor.ConnectionPreProcessor; -import org.apache.atlas.repository.store.graph.v2.preprocessor.accesscontrol.StakeholderPreProcessor; -import org.apache.atlas.repository.store.graph.v2.preprocessor.contract.ContractPreProcessor; -import org.apache.atlas.repository.store.graph.v2.preprocessor.datamesh.StakeholderTitlePreProcessor; -import org.apache.atlas.repository.store.graph.v2.preprocessor.resource.LinkPreProcessor; import org.apache.atlas.repository.store.graph.v2.preprocessor.PreProcessor; import org.apache.atlas.repository.store.graph.v2.preprocessor.accesscontrol.PersonaPreProcessor; import org.apache.atlas.repository.store.graph.v2.preprocessor.accesscontrol.PurposePreProcessor; -import org.apache.atlas.repository.store.graph.v2.preprocessor.datamesh.DataProductPreProcessor; +import org.apache.atlas.repository.store.graph.v2.preprocessor.accesscontrol.StakeholderPreProcessor; +import org.apache.atlas.repository.store.graph.v2.preprocessor.contract.ContractPreProcessor; import org.apache.atlas.repository.store.graph.v2.preprocessor.datamesh.DataDomainPreProcessor; +import org.apache.atlas.repository.store.graph.v2.preprocessor.datamesh.DataProductPreProcessor; +import org.apache.atlas.repository.store.graph.v2.preprocessor.datamesh.StakeholderTitlePreProcessor; import org.apache.atlas.repository.store.graph.v2.preprocessor.glossary.CategoryPreProcessor; import org.apache.atlas.repository.store.graph.v2.preprocessor.glossary.GlossaryPreProcessor; import org.apache.atlas.repository.store.graph.v2.preprocessor.glossary.TermPreProcessor; +import org.apache.atlas.repository.store.graph.v2.preprocessor.resource.LinkPreProcessor; import org.apache.atlas.repository.store.graph.v2.preprocessor.resource.ReadmePreProcessor; import org.apache.atlas.repository.store.graph.v2.preprocessor.sql.QueryCollectionPreProcessor; import org.apache.atlas.repository.store.graph.v2.preprocessor.sql.QueryFolderPreProcessor; import org.apache.atlas.repository.store.graph.v2.preprocessor.sql.QueryPreProcessor; import org.apache.atlas.repository.store.graph.v2.tasks.MeaningsTask; import org.apache.atlas.tasks.TaskManagement; -import org.apache.atlas.type.AtlasArrayType; -import org.apache.atlas.type.AtlasBusinessMetadataType; +import org.apache.atlas.type.*; import org.apache.atlas.type.AtlasBusinessMetadataType.AtlasBusinessAttribute; -import org.apache.atlas.type.AtlasClassificationType; -import org.apache.atlas.type.AtlasEntityType; -import org.apache.atlas.type.AtlasEnumType; import org.apache.atlas.type.AtlasStructType.AtlasAttribute; -import org.apache.atlas.type.AtlasType; -import org.apache.atlas.type.AtlasTypeRegistry; -import org.apache.atlas.type.AtlasTypeUtil; -import org.apache.atlas.bulkimport.BulkImportResponse; -import org.apache.atlas.bulkimport.BulkImportResponse.ImportInfo; import org.apache.atlas.util.FileUtils; import org.apache.atlas.utils.AtlasEntityUtil; import org.apache.atlas.utils.AtlasPerfMetrics; @@ -111,18 +98,15 @@ import static org.apache.atlas.bulkimport.BulkImportResponse.ImportStatus.FAILED; import static org.apache.atlas.model.instance.AtlasEntity.Status.ACTIVE; import static org.apache.atlas.model.instance.EntityMutations.EntityOperation.*; +import static org.apache.atlas.repository.Constants.IS_INCOMPLETE_PROPERTY_KEY; +import static org.apache.atlas.repository.Constants.STATE_PROPERTY_KEY; import static org.apache.atlas.repository.Constants.*; import static org.apache.atlas.repository.graph.GraphHelper.*; -import static org.apache.atlas.repository.graph.GraphHelper.getStatus; import static org.apache.atlas.repository.store.graph.v2.EntityGraphMapper.validateLabels; -import static org.apache.atlas.repository.store.graph.v2.tasks.MeaningsTaskFactory.*; +import static org.apache.atlas.repository.store.graph.v2.tasks.MeaningsTaskFactory.UPDATE_ENTITY_MEANINGS_ON_TERM_HARD_DELETE; +import static org.apache.atlas.repository.store.graph.v2.tasks.MeaningsTaskFactory.UPDATE_ENTITY_MEANINGS_ON_TERM_SOFT_DELETE; import static org.apache.atlas.repository.util.AccessControlUtils.REL_ATTR_POLICIES; -import static org.apache.atlas.type.Constants.HAS_LINEAGE; -import static org.apache.atlas.type.Constants.HAS_LINEAGE_VALID; -import static org.apache.atlas.type.Constants.MEANINGS_TEXT_PROPERTY_KEY; -import static org.apache.atlas.type.Constants.MEANINGS_PROPERTY_KEY; -import static org.apache.atlas.type.Constants.MEANING_NAMES_PROPERTY_KEY; -import static org.apache.atlas.type.Constants.PENDING_TASKS_PROPERTY_KEY; +import static org.apache.atlas.type.Constants.*; @@ -151,10 +135,12 @@ public class AtlasEntityStoreV2 implements AtlasEntityStore { private final ESAliasStore esAliasStore; + private final IAtlasMinimalChangeNotifier atlasAlternateChangeNotifier; @Inject public AtlasEntityStoreV2(AtlasGraph graph, DeleteHandlerDelegate deleteDelegate, RestoreHandlerV1 restoreHandlerV1, AtlasTypeRegistry typeRegistry, IAtlasEntityChangeNotifier entityChangeNotifier, EntityGraphMapper entityGraphMapper, TaskManagement taskManagement, - AtlasRelationshipStore atlasRelationshipStore, FeatureFlagStore featureFlagStore) { + AtlasRelationshipStore atlasRelationshipStore, FeatureFlagStore featureFlagStore, + IAtlasMinimalChangeNotifier atlasAlternateChangeNotifier) { this.graph = graph; this.deleteDelegate = deleteDelegate; this.restoreHandlerV1 = restoreHandlerV1; @@ -168,7 +154,7 @@ public AtlasEntityStoreV2(AtlasGraph graph, DeleteHandlerDelegate deleteDelegate this.atlasRelationshipStore = atlasRelationshipStore; this.featureFlagStore = featureFlagStore; this.esAliasStore = new ESAliasStore(graph, entityRetriever); - + this.atlasAlternateChangeNotifier = atlasAlternateChangeNotifier; try { this.discovery = new EntityDiscoveryService(typeRegistry, graph, null, null, null, null); } catch (AtlasException e) { @@ -2806,6 +2792,54 @@ public void repairAccesscontrolAlias(String guid) throws AtlasBaseException { RequestContext.get().endMetricRecord(metric); } + + @Override + @GraphTransaction + public void linkBusinessPolicy(String policyGuid, Set linkGuids) throws AtlasBaseException { + AtlasPerfMetrics.MetricRecorder metric = RequestContext.get().startMetricRecord("linkBusinessPolicy.GraphTransaction"); + + try { + List vertices = this.entityGraphMapper.linkBusinessPolicy(policyGuid, linkGuids); + if (CollectionUtils.isEmpty(vertices)) { + return; + } + + handleBusinessPolicyMutation(vertices); + } catch (Exception e) { + LOG.error("Error during linkBusinessPolicy for policyGuid: {}", policyGuid, e); + throw e; + } finally { + RequestContext.get().endMetricRecord(metric); + } + } + + @Override + @GraphTransaction + public void unlinkBusinessPolicy(String policyGuid, Set unlinkGuids) throws AtlasBaseException { + AtlasPerfMetrics.MetricRecorder metric = RequestContext.get().startMetricRecord("unlinkBusinessPolicy.GraphTransaction"); + try { + List vertices = this.entityGraphMapper.unlinkBusinessPolicy(policyGuid, unlinkGuids); + if (CollectionUtils.isEmpty(vertices)) { + return; + } + + handleBusinessPolicyMutation(vertices); + } catch (Exception e) { + LOG.error("Error during unlinkBusinessPolicy for policyGuid: {}", policyGuid, e); + throw e; + } finally { + RequestContext.get().endMetricRecord(metric); + } + } + + private void handleBusinessPolicyMutation(List vertices) throws AtlasBaseException { + AtlasPerfMetrics.MetricRecorder metricRecorder = RequestContext.get().startMetricRecord("handleBusinessPolicyMutation"); + this.atlasAlternateChangeNotifier.onEntitiesMutation(vertices); + RequestContext.get().endMetricRecord(metricRecorder); + } + + } + diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/BusinessPolicyNotifierImpl.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/BusinessPolicyNotifierImpl.java new file mode 100644 index 0000000000..fab059cfbe --- /dev/null +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/BusinessPolicyNotifierImpl.java @@ -0,0 +1,64 @@ +package org.apache.atlas.repository.store.graph.v2; + +import org.apache.atlas.RequestContext; +import org.apache.atlas.exception.AtlasBaseException; +import org.apache.atlas.listener.EntityChangeListenerV2; +import org.apache.atlas.model.instance.AtlasEntity; +import org.apache.atlas.repository.graphdb.AtlasVertex; +import org.apache.atlas.utils.AtlasPerfMetrics; +import org.springframework.stereotype.Component; + +import javax.inject.Inject; +import java.util.*; + +import static org.apache.atlas.repository.Constants.*; +import static org.apache.atlas.repository.graph.GraphHelper.*; + + +@Component +public class BusinessPolicyNotifierImpl implements IAtlasMinimalChangeNotifier { + + private final Set entityChangeListenersV2; + + @Inject + public BusinessPolicyNotifierImpl(Set entityChangeListenersV2) { + this.entityChangeListenersV2 = entityChangeListenersV2; + + } + + @Override + public void onEntitiesMutation(final List vertices) throws AtlasBaseException { + AtlasPerfMetrics.MetricRecorder metricRecorder = RequestContext.get().startMetricRecord("onEntitiesMutation"); + final List entities = new ArrayList<>(0); + vertices.forEach(item -> entities.add(createAtlasEntity(item))); + for (EntityChangeListenerV2 listener : entityChangeListenersV2) { + listener.onEntitiesUpdated(entities, false); + } + + RequestContext.get().endMetricRecord(metricRecorder); + } + + private AtlasEntity createAtlasEntity(AtlasVertex vertex) { + AtlasEntity atlasEntity = new AtlasEntity(); + atlasEntity.setAttribute(QUALIFIED_NAME, vertex.getProperty(QUALIFIED_NAME, String.class)); + atlasEntity.setAttribute(NAME, vertex.getProperty(NAME, String.class)); + + atlasEntity.setGuid(vertex.getProperty(GUID_PROPERTY_KEY, String.class)); + atlasEntity.setTypeName(vertex.getProperty(TYPE_NAME_PROPERTY_KEY, String.class)); + atlasEntity.setCreatedBy(vertex.getProperty(CREATED_BY_KEY, String.class)); + atlasEntity.setUpdatedBy(vertex.getProperty(MODIFIED_BY_KEY, String.class)); + atlasEntity.setCreateTime(new Date(vertex.getProperty(TIMESTAMP_PROPERTY_KEY, Long.class))); + atlasEntity.setUpdateTime(new Date(vertex.getProperty(MODIFICATION_TIMESTAMP_PROPERTY_KEY, Long.class))); + atlasEntity.setIsProxy(vertex.getProperty(IS_PROXY_KEY, Boolean.class)); + atlasEntity.setIsIncomplete(vertex.getProperty(IS_INCOMPLETE_PROPERTY_KEY, Boolean.class)); + atlasEntity.setStatus(getStatus(vertex)); + atlasEntity.setProvenanceType(getProvenanceType(vertex)); + atlasEntity.setHomeId(getHomeId(vertex)); + atlasEntity.setVersion(getVersion(vertex)); + + + return atlasEntity; + } + + +} \ No newline at end of file diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphMapper.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphMapper.java index 782b6847c8..d48d54d732 100644 --- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphMapper.java +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphMapper.java @@ -4762,4 +4762,51 @@ public void addHasLineage(Set inputOutputEdges, boolean isRestoreEnti RequestContext.get().endMetricRecord(metricRecorder); } -} \ No newline at end of file + + public List linkBusinessPolicy(String policyId, Set linkGuids) { + return linkGuids.stream().map(guid -> findByGuid(graph, guid)).filter(Objects::nonNull).filter(ev -> { + Set existingValues = ev.getMultiValuedSetProperty(ASSET_POLICY_GUIDS, String.class); + return !existingValues.contains(policyId); + }).peek(ev -> { + Set existingValues = ev.getMultiValuedSetProperty(ASSET_POLICY_GUIDS, String.class); + existingValues.add(policyId); + ev.setProperty(ASSET_POLICY_GUIDS, policyId); + ev.setProperty(ASSET_POLICIES_COUNT, existingValues.size()); + + updateModificationMetadata(ev); + + cacheDifferentialEntity(ev, existingValues); + }).collect(Collectors.toList()); + } + + + public List unlinkBusinessPolicy(String policyId, Set unlinkGuids) { + return unlinkGuids.stream().map(guid -> AtlasGraphUtilsV2.findByGuid(graph, guid)).filter(Objects::nonNull).filter(ev -> { + Set existingValues = ev.getMultiValuedSetProperty(ASSET_POLICY_GUIDS, String.class); + return existingValues.contains(policyId); + }).peek(ev -> { + Set existingValues = ev.getMultiValuedSetProperty(ASSET_POLICY_GUIDS, String.class); + existingValues.remove(policyId); + ev.removePropertyValue(ASSET_POLICY_GUIDS, policyId); + ev.setProperty(ASSET_POLICIES_COUNT, existingValues.size()); + + updateModificationMetadata(ev); + + cacheDifferentialEntity(ev, existingValues); + }).collect(Collectors.toList()); + } + + + private void cacheDifferentialEntity(AtlasVertex ev, Set existingValues) { + AtlasEntity diffEntity = new AtlasEntity(ev.getProperty(TYPE_NAME_PROPERTY_KEY, String.class)); + diffEntity.setGuid(ev.getProperty(GUID_PROPERTY_KEY, String.class)); + diffEntity.setAttribute(ASSET_POLICY_GUIDS, existingValues); + diffEntity.setAttribute(ASSET_POLICIES_COUNT, existingValues.size()); + diffEntity.setUpdatedBy(ev.getProperty(MODIFIED_BY_KEY, String.class)); + diffEntity.setUpdateTime(new Date(RequestContext.get().getRequestTime())); + + RequestContext requestContext = RequestContext.get(); + requestContext.cacheDifferentialEntity(diffEntity); + } + +} diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/IAtlasMinimalChangeNotifier.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/IAtlasMinimalChangeNotifier.java new file mode 100644 index 0000000000..35c2d9e757 --- /dev/null +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/IAtlasMinimalChangeNotifier.java @@ -0,0 +1,27 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.atlas.repository.store.graph.v2; + +import org.apache.atlas.exception.AtlasBaseException; +import org.apache.atlas.repository.graphdb.AtlasVertex; + +import java.util.List; + +public interface IAtlasMinimalChangeNotifier { + void onEntitiesMutation(final List vertices) throws AtlasBaseException; +} diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/bulkimport/MigrationImport.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/bulkimport/MigrationImport.java index 9edbfc1cc3..84e7948790 100644 --- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/bulkimport/MigrationImport.java +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/bulkimport/MigrationImport.java @@ -129,7 +129,7 @@ private AtlasEntityStoreV2 createEntityStore(AtlasGraph graph, AtlasTypeRegistry graph, relationshipStore, entityChangeNotifier, getInstanceConverter(graph), fullTextMapperV2, null, null); AtlasRelationshipStoreV2 atlasRelationshipStoreV2 = new AtlasRelationshipStoreV2(graph, typeRegistry, deleteDelegate, entityChangeNotifier); - return new AtlasEntityStoreV2(graph, deleteDelegate, restoreHandlerV1, typeRegistry, entityChangeNotifier, entityGraphMapper, null, atlasRelationshipStoreV2, null); + return new AtlasEntityStoreV2(graph, deleteDelegate, restoreHandlerV1, typeRegistry, entityChangeNotifier, entityGraphMapper, null, atlasRelationshipStoreV2, null, null); } private void shutdownEntityCreationManager(EntityCreationManager creationManager) { diff --git a/webapp/src/main/java/org/apache/atlas/notification/EntityNotificationSender.java b/webapp/src/main/java/org/apache/atlas/notification/EntityNotificationSender.java index c039cc837a..ece56f294e 100644 --- a/webapp/src/main/java/org/apache/atlas/notification/EntityNotificationSender.java +++ b/webapp/src/main/java/org/apache/atlas/notification/EntityNotificationSender.java @@ -18,6 +18,7 @@ package org.apache.atlas.notification; import org.apache.atlas.GraphTransactionInterceptor; +import org.apache.atlas.RequestContext; import org.apache.atlas.model.notification.EntityNotification; import org.apache.commons.collections.CollectionUtils; import org.apache.commons.configuration.Configuration; @@ -95,10 +96,8 @@ public void send(EntityNotification.EntityNotificationV2.OperationType operation notificationHook = new PostCommitNotificationHook(operationType, notifications); postCommitNotificationHooks.set(notificationHook); } else { - if (isRelationshipEvent(operationType)) - notificationHook.addRelationshipNotifications(notifications); - else - notificationHook.addNotifications(notifications); + if (isRelationshipEvent(operationType)) notificationHook.addRelationshipNotifications(notifications); + else notificationHook.addNotifications(notifications); } } diff --git a/webapp/src/main/java/org/apache/atlas/web/rest/BusinessPolicyREST.java b/webapp/src/main/java/org/apache/atlas/web/rest/BusinessPolicyREST.java new file mode 100644 index 0000000000..ef252293a0 --- /dev/null +++ b/webapp/src/main/java/org/apache/atlas/web/rest/BusinessPolicyREST.java @@ -0,0 +1,113 @@ +package org.apache.atlas.web.rest; + +import org.apache.atlas.AtlasErrorCode; +import org.apache.atlas.RequestContext; +import org.apache.atlas.annotation.Timed; +import org.apache.atlas.exception.AtlasBaseException; +import org.apache.atlas.model.instance.LinkBusinessPolicyRequest; +import org.apache.atlas.repository.store.graph.AtlasEntityStore; +import org.apache.atlas.utils.AtlasPerfMetrics; +import org.apache.atlas.utils.AtlasPerfTracer; +import org.apache.atlas.web.util.Servlets; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.stereotype.Service; + +import javax.inject.Inject; +import javax.inject.Singleton; +import javax.ws.rs.*; +import javax.ws.rs.core.MediaType; + +import static org.apache.atlas.repository.util.AccessControlUtils.ARGO_SERVICE_USER_NAME; + +@Path("business-policy") +@Singleton +@Service +@Consumes({Servlets.JSON_MEDIA_TYPE, MediaType.APPLICATION_JSON}) +@Produces({Servlets.JSON_MEDIA_TYPE, MediaType.APPLICATION_JSON}) +public class BusinessPolicyREST { + + private static final Logger LOG = LoggerFactory.getLogger(BusinessPolicyREST.class); + private static final Logger PERF_LOG = AtlasPerfTracer.getPerfLogger("rest.BusinessPolicyREST"); + + private final AtlasEntityStore entitiesStore; + + @Inject + public BusinessPolicyREST(AtlasEntityStore entitiesStore) { + this.entitiesStore = entitiesStore; + } + + /** + * Links a business policy to entities. + * + * @param policyGuid the ID of the policy to be linked + * @param request the request containing the GUIDs of the assets to link the policy to + * @throws AtlasBaseException if there is an error during the linking process + */ + @POST + @Path("/{policyId}/link-business-policy") + @Timed + public void linkBusinessPolicy(@PathParam("policyId") final String policyGuid, final LinkBusinessPolicyRequest request) throws AtlasBaseException { + AtlasPerfMetrics.MetricRecorder metric = RequestContext.get().startMetricRecord("linkBusinessPolicy"); + // Ensure the current user is authorized to link policies + if (!ARGO_SERVICE_USER_NAME.equals(RequestContext.getCurrentUser())) { + throw new AtlasBaseException(AtlasErrorCode.UNAUTHORIZED_ACCESS, RequestContext.getCurrentUser(), "Policy linking"); + } + + // Set request context parameters + RequestContext.get().setIncludeClassifications(false); + RequestContext.get().setIncludeMeanings(false); + + AtlasPerfTracer perf = null; + try { + // Start performance tracing if enabled + if (AtlasPerfTracer.isPerfTraceEnabled(PERF_LOG)) { + perf = AtlasPerfTracer.getPerfTracer(PERF_LOG, "BusinessPolicyREST.linkBusinessPolicy(" + policyGuid + ")"); + } + + // Link the business policy to the specified entities + entitiesStore.linkBusinessPolicy(policyGuid, request.getLinkGuids()); + } finally { + // Log performance metrics + AtlasPerfTracer.log(perf); + RequestContext.get().endMetricRecord(metric); + } + } + + /** + * Unlinks a business policy from entities. + * + * @param policyGuid the ID of the policy to be unlinked + * @param request the request containing the GUIDs of the assets to unlink the policy from + * @throws AtlasBaseException if there is an error during the unlinking process + */ + @POST + @Path("/{policyId}/unlink-business-policy") + @Timed + public void unlinkBusinessPolicy(@PathParam("policyId") final String policyGuid, final LinkBusinessPolicyRequest request) throws AtlasBaseException { + AtlasPerfMetrics.MetricRecorder metric = RequestContext.get().startMetricRecord("unlinkBusinessPolicy"); + // Ensure the current user is authorized to unlink policies + if (!ARGO_SERVICE_USER_NAME.equals(RequestContext.getCurrentUser())) { + throw new AtlasBaseException(AtlasErrorCode.UNAUTHORIZED_ACCESS, RequestContext.getCurrentUser(), "Policy unlinking"); + } + + // Set request context parameters + RequestContext.get().setIncludeClassifications(false); + RequestContext.get().setIncludeMeanings(false); + + AtlasPerfTracer perf = null; + try { + // Start performance tracing if enabled + if (AtlasPerfTracer.isPerfTraceEnabled(PERF_LOG)) { + perf = AtlasPerfTracer.getPerfTracer(PERF_LOG, "BusinessPolicyREST.unlinkBusinessPolicy(" + policyGuid + ")"); + } + + // Unlink the business policy from the specified entities + entitiesStore.unlinkBusinessPolicy(policyGuid, request.getUnlinkGuids()); + } finally { + // Log performance metrics + AtlasPerfTracer.log(perf); + RequestContext.get().endMetricRecord(metric); + } + } +}