diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/AtlasEntityStore.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/AtlasEntityStore.java index 61b7148bc2..04a7cd2ba2 100644 --- a/repository/src/main/java/org/apache/atlas/repository/store/graph/AtlasEntityStore.java +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/AtlasEntityStore.java @@ -369,4 +369,6 @@ EntityMutationResponse deleteByUniqueAttributes(List objectIds) void unlinkProductFromAsset(String productId, Set unlinkGuids) throws AtlasBaseException; + void linkProductWithNotification(String productId, Set unlinkGuids) throws AtlasBaseException; + } diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityStoreV2.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityStoreV2.java index 2f82b58cc8..33f185e696 100644 --- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityStoreV2.java +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityStoreV2.java @@ -150,11 +150,12 @@ public class AtlasEntityStoreV2 implements AtlasEntityStore { private final FeatureFlagStore featureFlagStore; private final ESAliasStore esAliasStore; + private final IAtlasMinimalChangeNotifier atlasAlternateChangeNotifier; @Inject public AtlasEntityStoreV2(AtlasGraph graph, DeleteHandlerDelegate deleteDelegate, RestoreHandlerV1 restoreHandlerV1, AtlasTypeRegistry typeRegistry, IAtlasEntityChangeNotifier entityChangeNotifier, EntityGraphMapper entityGraphMapper, TaskManagement taskManagement, - AtlasRelationshipStore atlasRelationshipStore, FeatureFlagStore featureFlagStore) { + AtlasRelationshipStore atlasRelationshipStore, FeatureFlagStore featureFlagStore, IAtlasMinimalChangeNotifier atlasAlternateChangeNotifier) { this.graph = graph; this.deleteDelegate = deleteDelegate; this.restoreHandlerV1 = restoreHandlerV1; @@ -168,6 +169,7 @@ public AtlasEntityStoreV2(AtlasGraph graph, DeleteHandlerDelegate deleteDelegate this.atlasRelationshipStore = atlasRelationshipStore; this.featureFlagStore = featureFlagStore; this.esAliasStore = new ESAliasStore(graph, entityRetriever); + this.atlasAlternateChangeNotifier = atlasAlternateChangeNotifier; try { this.discovery = new EntityDiscoveryService(typeRegistry, graph, null, null, null, null); @@ -2749,9 +2751,31 @@ public void linkProductToAsset(String productGuid, Set linkGuids) throws return; } -// handleProductMutation(vertices); + LOG.info("linkProductToAsset: productGuid={}, linkGuids={}", productGuid, linkGuids); + + //handleProductMutation(vertices); } catch (Exception e) { - LOG.error("Error during linkBusinessPolicy for policyGuid: {}", productGuid, e); + LOG.error("Error during linkProduct for productGuid: {}", productGuid, e); + throw e; + } finally { + RequestContext.get().endMetricRecord(metric); + } + } + + @Override + @GraphTransaction + public void linkProductWithNotification(String productGuid, Set linkGuids) throws AtlasBaseException { + AtlasPerfMetrics.MetricRecorder metric = RequestContext.get().startMetricRecord("linkProductToAsset.GraphTransaction"); + + try { + List vertices = this.entityGraphMapper.linkProductWithNotification(productGuid, linkGuids); + if (CollectionUtils.isEmpty(vertices)) { + return; + } + + handleProductMutation(vertices); + } catch (Exception e) { + LOG.error("Error during linkProduct for productGuid: {}", productGuid, e); throw e; } finally { RequestContext.get().endMetricRecord(metric); @@ -2768,7 +2792,7 @@ public void unlinkProductFromAsset(String productGuid, Set unlinkGuids) return; } -// handleProductMutation(vertices); + handleProductMutation(vertices); } catch (Exception e) { LOG.error("Error during unlinkProduct for productGuid: {}", productGuid, e); throw e; @@ -2777,11 +2801,11 @@ public void unlinkProductFromAsset(String productGuid, Set unlinkGuids) } } -// private void handleProductMutation(List vertices) throws AtlasBaseException { -// AtlasPerfMetrics.MetricRecorder metricRecorder = RequestContext.get().startMetricRecord("handleBusinessPolicyMutation"); -// this.atlasAlternateChangeNotifier.onEntitiesMutation(vertices); -// RequestContext.get().endMetricRecord(metricRecorder); -// } + private void handleProductMutation(List vertices) throws AtlasBaseException { + AtlasPerfMetrics.MetricRecorder metricRecorder = RequestContext.get().startMetricRecord("handleBusinessPolicyMutation"); + this.atlasAlternateChangeNotifier.onEntitiesMutation(vertices); + RequestContext.get().endMetricRecord(metricRecorder); + } } diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/DataProductNotifierImpl.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/DataProductNotifierImpl.java new file mode 100644 index 0000000000..4c9115dec8 --- /dev/null +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/DataProductNotifierImpl.java @@ -0,0 +1,64 @@ +package org.apache.atlas.repository.store.graph.v2; + +import org.apache.atlas.RequestContext; +import org.apache.atlas.exception.AtlasBaseException; +import org.apache.atlas.listener.EntityChangeListenerV2; +import org.apache.atlas.model.instance.AtlasEntity; +import org.apache.atlas.repository.graphdb.AtlasVertex; +import org.apache.atlas.utils.AtlasPerfMetrics; +import org.springframework.stereotype.Component; + +import javax.inject.Inject; +import java.util.*; + +import static org.apache.atlas.repository.Constants.*; +import static org.apache.atlas.repository.graph.GraphHelper.*; + + +@Component +public class DataProductNotifierImpl implements IAtlasMinimalChangeNotifier { + + private final Set entityChangeListenersV2; + + @Inject + public DataProductNotifierImpl(Set entityChangeListenersV2) { + this.entityChangeListenersV2 = entityChangeListenersV2; + + } + + @Override + public void onEntitiesMutation(final List vertices) throws AtlasBaseException { + AtlasPerfMetrics.MetricRecorder metricRecorder = RequestContext.get().startMetricRecord("onEntitiesMutation"); + final List entities = new ArrayList<>(0); + vertices.forEach(item -> entities.add(createAtlasEntity(item))); + for (EntityChangeListenerV2 listener : entityChangeListenersV2) { + listener.onEntitiesUpdated(entities, false); + } + + RequestContext.get().endMetricRecord(metricRecorder); + } + + private AtlasEntity createAtlasEntity(AtlasVertex vertex) { + AtlasEntity atlasEntity = new AtlasEntity(); + atlasEntity.setAttribute(QUALIFIED_NAME, vertex.getProperty(QUALIFIED_NAME, String.class)); + atlasEntity.setAttribute(NAME, vertex.getProperty(NAME, String.class)); + + atlasEntity.setGuid(vertex.getProperty(GUID_PROPERTY_KEY, String.class)); + atlasEntity.setTypeName(vertex.getProperty(TYPE_NAME_PROPERTY_KEY, String.class)); + atlasEntity.setCreatedBy(vertex.getProperty(CREATED_BY_KEY, String.class)); + atlasEntity.setUpdatedBy(vertex.getProperty(MODIFIED_BY_KEY, String.class)); + atlasEntity.setCreateTime(new Date(vertex.getProperty(TIMESTAMP_PROPERTY_KEY, Long.class))); + atlasEntity.setUpdateTime(new Date(vertex.getProperty(MODIFICATION_TIMESTAMP_PROPERTY_KEY, Long.class))); + atlasEntity.setIsProxy(vertex.getProperty(IS_PROXY_KEY, Boolean.class)); + atlasEntity.setIsIncomplete(vertex.getProperty(IS_INCOMPLETE_PROPERTY_KEY, Boolean.class)); + atlasEntity.setStatus(getStatus(vertex)); + atlasEntity.setProvenanceType(getProvenanceType(vertex)); + atlasEntity.setHomeId(getHomeId(vertex)); + atlasEntity.setVersion(getVersion(vertex)); + + + return atlasEntity; + } + + +} diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphMapper.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphMapper.java index 18f3f3c180..fc46ca0b15 100644 --- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphMapper.java +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphMapper.java @@ -4568,6 +4568,21 @@ public void addHasLineage(Set inputOutputEdges, boolean isRestoreEnti } public List linkProductToAsset(String productId, Set linkGuids) { + return linkGuids.stream().map(guid -> findByGuid(graph, guid)).filter(Objects::nonNull).filter(ev -> { + Set existingValues = ev.getMultiValuedSetProperty(ASSET_PRODUCT_GUIDS, String.class); + return !existingValues.contains(productId); + }).peek(ev -> { + Set existingValues = ev.getMultiValuedSetProperty(ASSET_PRODUCT_GUIDS, String.class); + existingValues.add(productId); + ev.setProperty(ASSET_PRODUCT_GUIDS, productId); + + updateModificationMetadata(ev); + + //cacheDifferentialEntity(ev, existingValues); + }).collect(Collectors.toList()); + } + + public List linkProductWithNotification(String productId, Set linkGuids) { return linkGuids.stream().map(guid -> findByGuid(graph, guid)).filter(Objects::nonNull).filter(ev -> { Set existingValues = ev.getMultiValuedSetProperty(ASSET_PRODUCT_GUIDS, String.class); return !existingValues.contains(productId); diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/IAtlasMinimalChangeNotifier.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/IAtlasMinimalChangeNotifier.java new file mode 100644 index 0000000000..35c2d9e757 --- /dev/null +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/IAtlasMinimalChangeNotifier.java @@ -0,0 +1,27 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.atlas.repository.store.graph.v2; + +import org.apache.atlas.exception.AtlasBaseException; +import org.apache.atlas.repository.graphdb.AtlasVertex; + +import java.util.List; + +public interface IAtlasMinimalChangeNotifier { + void onEntitiesMutation(final List vertices) throws AtlasBaseException; +} diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/bulkimport/MigrationImport.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/bulkimport/MigrationImport.java index 9edbfc1cc3..84e7948790 100644 --- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/bulkimport/MigrationImport.java +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/bulkimport/MigrationImport.java @@ -129,7 +129,7 @@ private AtlasEntityStoreV2 createEntityStore(AtlasGraph graph, AtlasTypeRegistry graph, relationshipStore, entityChangeNotifier, getInstanceConverter(graph), fullTextMapperV2, null, null); AtlasRelationshipStoreV2 atlasRelationshipStoreV2 = new AtlasRelationshipStoreV2(graph, typeRegistry, deleteDelegate, entityChangeNotifier); - return new AtlasEntityStoreV2(graph, deleteDelegate, restoreHandlerV1, typeRegistry, entityChangeNotifier, entityGraphMapper, null, atlasRelationshipStoreV2, null); + return new AtlasEntityStoreV2(graph, deleteDelegate, restoreHandlerV1, typeRegistry, entityChangeNotifier, entityGraphMapper, null, atlasRelationshipStoreV2, null, null); } private void shutdownEntityCreationManager(EntityCreationManager creationManager) { diff --git a/repository/src/test/java/org/apache/atlas/repository/impexp/ExportSkipLineageTest.java b/repository/src/test/java/org/apache/atlas/repository/impexp/ExportSkipLineageTest.java index de9fb0718a..86c292a40b 100644 --- a/repository/src/test/java/org/apache/atlas/repository/impexp/ExportSkipLineageTest.java +++ b/repository/src/test/java/org/apache/atlas/repository/impexp/ExportSkipLineageTest.java @@ -84,7 +84,7 @@ public void setup() throws IOException, AtlasBaseException { loadHiveModel(typeDefStore, typeRegistry); RequestContext.get().setImportInProgress(true); - entityStore = new AtlasEntityStoreV2(atlasGraph, deleteDelegate, restoreHandlerV1, typeRegistry, mockChangeNotifier, graphMapper, null, atlasRelationshipStore, null); + entityStore = new AtlasEntityStoreV2(atlasGraph, deleteDelegate, restoreHandlerV1, typeRegistry, mockChangeNotifier, graphMapper, null, atlasRelationshipStore, null, null); createEntities(entityStore, ENTITIES_SUB_DIR, new String[]{"db", "table-columns", "table-view", "table-table-lineage"}); final String[] entityGuids = {DB_GUID, TABLE_GUID, TABLE_TABLE_GUID, TABLE_VIEW_GUID}; verifyCreatedEntities(entityStore, entityGuids, 4); diff --git a/repository/src/test/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityStoreV2Test.java b/repository/src/test/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityStoreV2Test.java index cda9d5658b..d9c8fa28be 100644 --- a/repository/src/test/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityStoreV2Test.java +++ b/repository/src/test/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityStoreV2Test.java @@ -128,7 +128,7 @@ public void setUp() throws Exception { } @BeforeTest public void init() throws Exception { - entityStore = new AtlasEntityStoreV2(graph, deleteDelegate, restoreHandlerV1, typeRegistry, mockChangeNotifier, graphMapper, null, atlasRelationshipStore, null); + entityStore = new AtlasEntityStoreV2(graph, deleteDelegate, restoreHandlerV1, typeRegistry, mockChangeNotifier, graphMapper, null, atlasRelationshipStore, null, null); RequestContext.clear(); RequestContext.get().setUser(TestUtilsV2.TEST_USER, null); diff --git a/repository/src/test/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityTestBase.java b/repository/src/test/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityTestBase.java index 60973b308b..c6ffef6797 100644 --- a/repository/src/test/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityTestBase.java +++ b/repository/src/test/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityTestBase.java @@ -115,7 +115,7 @@ public void clear() throws Exception { @BeforeTest public void init() throws Exception { - entityStore = new AtlasEntityStoreV2(graph, deleteDelegate, restoreHandlerV1, typeRegistry, mockChangeNotifier, graphMapper, null, atlasRelationshipStore, null); + entityStore = new AtlasEntityStoreV2(graph, deleteDelegate, restoreHandlerV1, typeRegistry, mockChangeNotifier, graphMapper, null, atlasRelationshipStore, null, null); RequestContext.clear(); RequestContext.get().setUser(TestUtilsV2.TEST_USER, null); diff --git a/repository/src/test/java/org/apache/atlas/repository/store/graph/v2/AtlasRelationshipStoreV2Test.java b/repository/src/test/java/org/apache/atlas/repository/store/graph/v2/AtlasRelationshipStoreV2Test.java index 796da31b65..a554a08245 100644 --- a/repository/src/test/java/org/apache/atlas/repository/store/graph/v2/AtlasRelationshipStoreV2Test.java +++ b/repository/src/test/java/org/apache/atlas/repository/store/graph/v2/AtlasRelationshipStoreV2Test.java @@ -135,7 +135,7 @@ public void setUp() throws Exception { @BeforeTest public void init() throws Exception { relationshipStore = new AtlasRelationshipStoreV2(atlasGraph, typeRegistry, deleteDelegate, entityNotifier); - entityStore = new AtlasEntityStoreV2(atlasGraph, deleteDelegate, restoreHandlerV1, typeRegistry, mockChangeNotifier, graphMapper, null, relationshipStore, null); + entityStore = new AtlasEntityStoreV2(atlasGraph, deleteDelegate, restoreHandlerV1, typeRegistry, mockChangeNotifier, graphMapper, null, relationshipStore, null, null); RequestContext.clear(); RequestContext.get().setUser(TestUtilsV2.TEST_USER, null); diff --git a/webapp/src/main/java/org/apache/atlas/web/rest/ProductAssetLinkREST.java b/webapp/src/main/java/org/apache/atlas/web/rest/ProductAssetLinkREST.java index 5b72b0ea3f..ae90ece72c 100644 --- a/webapp/src/main/java/org/apache/atlas/web/rest/ProductAssetLinkREST.java +++ b/webapp/src/main/java/org/apache/atlas/web/rest/ProductAssetLinkREST.java @@ -76,6 +76,37 @@ public void linkProductToAsset(@PathParam("productId") final String productGuid, } } + @POST + @Path("/{productId}/link-product-with-notification") + @Timed + public void linkProductWithNotification(@PathParam("productId") final String productGuid, final LinkDataProductRequest request) throws AtlasBaseException { + AtlasPerfMetrics.MetricRecorder metric = RequestContext.get().startMetricRecord("linkDataProductToAsset"); + // Ensure the current user is authorized to link policies +// if (!ARGO_SERVICE_USER_NAME.equals(RequestContext.getCurrentUser())) { +// throw new AtlasBaseException(AtlasErrorCode.UNAUTHORIZED_ACCESS, RequestContext.getCurrentUser(), "Policy linking"); +// } + + // Set request context parameters + RequestContext.get().setIncludeClassifications(false); + RequestContext.get().setIncludeMeanings(false); + RequestContext.get().getRequestContextHeaders().put("route", "product-asset-link"); + + AtlasPerfTracer perf = null; + try { + // Start performance tracing if enabled + if (AtlasPerfTracer.isPerfTraceEnabled(PERF_LOG)) { + perf = AtlasPerfTracer.getPerfTracer(PERF_LOG, "ProductAssetLinkREST.linkProductWithNotification(" + productGuid + ")"); + } + + // Link the product to the specified entities + entitiesStore.linkProductWithNotification(productGuid, request.getLinkGuids()); + } finally { + // Log performance metrics + AtlasPerfTracer.log(perf); + RequestContext.get().endMetricRecord(metric); + } + } + /** * Unlinks a product from entities. *