Skip to content

Commit

Permalink
DG-1697: Added linking and unlinking api for business-policy
Browse files Browse the repository at this point in the history
  • Loading branch information
arpit-at committed Jul 19, 2024
1 parent 8936e59 commit 72b591b
Show file tree
Hide file tree
Showing 11 changed files with 420 additions and 114 deletions.
1 change: 1 addition & 0 deletions .github/workflows/maven.yml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ on:
- development
- master
- lineageondemand
- policyendpointsmaster

jobs:
build:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ public enum AtlasConfiguration {

INDEXSEARCH_ASYNC_SEARCH_KEEP_ALIVE_TIME_IN_SECONDS("atlas.indexsearch.async.search.keep.alive.time.in.seconds", 300),

POLICY_OPERATIONS_NOTIFICATION_MAX_THREADS("atlas.policy.operations.max.threads", 5),
ATLAS_MAINTENANCE_MODE("atlas.maintenance.mode", false);


Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
package org.apache.atlas.repository.store.graph.v2;

import org.apache.atlas.RequestContext;
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.listener.EntityChangeListenerV2;
import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.atlas.model.instance.AtlasEntityHeader;
import org.apache.atlas.model.instance.EntityMutationResponse;
import org.apache.atlas.repository.graphdb.AtlasVertex;
import org.apache.atlas.utils.AtlasPerfMetrics;
import org.springframework.stereotype.Component;

import javax.inject.Inject;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;

import static org.apache.atlas.repository.graph.GraphHelper.*;


@Component
public class AlternateLinkingNotifierImpl implements IAtlasAlternateChangeNotifier {

private final Set<EntityChangeListenerV2> entityChangeListenersV2;

@Inject
public AlternateLinkingNotifierImpl(Set<EntityChangeListenerV2> entityChangeListenersV2) {
this.entityChangeListenersV2 = entityChangeListenersV2;

}

@Override
public void onEntitiesMutation(final EntityMutationResponse entityMutationResponse, final Map<String, AtlasVertex> entityByGuid) throws AtlasBaseException {
AtlasPerfMetrics.MetricRecorder metricRecorder = RequestContext.get().startMetricRecord("onEntitiesMutation");
final List<AtlasEntityHeader> updatedEntities = entityMutationResponse.getUpdatedEntities();
final List<AtlasEntity> entities = updatedEntities.stream().map(entityHeader -> createAtlasEntity(entityHeader, entityByGuid.get(entityHeader.getGuid()))).collect(Collectors.toList());

for (EntityChangeListenerV2 listener : entityChangeListenersV2) {
listener.onEntitiesUpdated(entities, false);
}

RequestContext.get().endMetricRecord(metricRecorder);
}

private AtlasEntity createAtlasEntity(AtlasEntityHeader entityHeader, AtlasVertex vertex) {
AtlasEntity atlasEntity = new AtlasEntity();
atlasEntity.setAttributes(entityHeader.getAttributes());
atlasEntity.setGuid(entityHeader.getGuid());
atlasEntity.setTypeName(entityHeader.getTypeName());
atlasEntity.setStatus(entityHeader.getStatus());
atlasEntity.setCreatedBy(entityHeader.getCreatedBy());
atlasEntity.setUpdatedBy(entityHeader.getUpdatedBy());
atlasEntity.setCreateTime(entityHeader.getCreateTime());
atlasEntity.setUpdateTime(entityHeader.getUpdateTime());
atlasEntity.setIsProxy(entityHeader.getIsIncomplete());
atlasEntity.setIsIncomplete(entityHeader.getIsIncomplete());
atlasEntity.setProvenanceType(getProvenanceType(vertex));
atlasEntity.setCustomAttributes(getCustomAttributes(vertex));
atlasEntity.setHomeId(getHomeId(vertex));
atlasEntity.setVersion(getVersion(vertex));

return atlasEntity;
}


}
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,12 @@


import com.google.common.annotations.VisibleForTesting;
import org.apache.atlas.AtlasErrorCode;
import org.apache.atlas.DeleteType;
import org.apache.atlas.GraphTransactionInterceptor;
import org.apache.atlas.RequestContext;
import org.apache.atlas.AtlasException;
import org.apache.atlas.AtlasConfiguration;
import org.apache.atlas.*;
import org.apache.atlas.annotation.GraphTransaction;
import org.apache.atlas.authorize.*;
import org.apache.atlas.authorize.AtlasEntityAccessRequest.AtlasEntityAccessRequestBuilder;
import org.apache.atlas.authorize.AtlasPrivilege;
import org.apache.atlas.bulkimport.BulkImportResponse;
import org.apache.atlas.bulkimport.BulkImportResponse.ImportInfo;
import org.apache.atlas.discovery.EntityDiscoveryService;
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.featureflag.FeatureFlagStore;
Expand All @@ -55,40 +51,31 @@
import org.apache.atlas.repository.store.graph.EntityGraphDiscovery;
import org.apache.atlas.repository.store.graph.EntityGraphDiscoveryContext;
import org.apache.atlas.repository.store.graph.v1.DeleteHandlerDelegate;
import org.apache.atlas.repository.store.graph.v2.AtlasEntityComparator.AtlasEntityDiffResult;
import org.apache.atlas.repository.store.graph.v1.RestoreHandlerV1;
import org.apache.atlas.repository.store.graph.v2.AtlasEntityComparator.AtlasEntityDiffResult;
import org.apache.atlas.repository.store.graph.v2.preprocessor.AuthPolicyPreProcessor;
import org.apache.atlas.repository.store.graph.v2.preprocessor.ConnectionPreProcessor;
import org.apache.atlas.repository.store.graph.v2.preprocessor.accesscontrol.StakeholderPreProcessor;
import org.apache.atlas.repository.store.graph.v2.preprocessor.contract.ContractPreProcessor;
import org.apache.atlas.repository.store.graph.v2.preprocessor.datamesh.StakeholderTitlePreProcessor;
import org.apache.atlas.repository.store.graph.v2.preprocessor.resource.LinkPreProcessor;
import org.apache.atlas.repository.store.graph.v2.preprocessor.PreProcessor;
import org.apache.atlas.repository.store.graph.v2.preprocessor.accesscontrol.PersonaPreProcessor;
import org.apache.atlas.repository.store.graph.v2.preprocessor.accesscontrol.PurposePreProcessor;
import org.apache.atlas.repository.store.graph.v2.preprocessor.datamesh.DataProductPreProcessor;
import org.apache.atlas.repository.store.graph.v2.preprocessor.accesscontrol.StakeholderPreProcessor;
import org.apache.atlas.repository.store.graph.v2.preprocessor.contract.ContractPreProcessor;
import org.apache.atlas.repository.store.graph.v2.preprocessor.datamesh.DataDomainPreProcessor;
import org.apache.atlas.repository.store.graph.v2.preprocessor.datamesh.DataProductPreProcessor;
import org.apache.atlas.repository.store.graph.v2.preprocessor.datamesh.StakeholderTitlePreProcessor;
import org.apache.atlas.repository.store.graph.v2.preprocessor.glossary.CategoryPreProcessor;
import org.apache.atlas.repository.store.graph.v2.preprocessor.glossary.GlossaryPreProcessor;
import org.apache.atlas.repository.store.graph.v2.preprocessor.glossary.TermPreProcessor;
import org.apache.atlas.repository.store.graph.v2.preprocessor.resource.LinkPreProcessor;
import org.apache.atlas.repository.store.graph.v2.preprocessor.resource.ReadmePreProcessor;
import org.apache.atlas.repository.store.graph.v2.preprocessor.sql.QueryCollectionPreProcessor;
import org.apache.atlas.repository.store.graph.v2.preprocessor.sql.QueryFolderPreProcessor;
import org.apache.atlas.repository.store.graph.v2.preprocessor.sql.QueryPreProcessor;
import org.apache.atlas.repository.store.graph.v2.tasks.MeaningsTask;
import org.apache.atlas.tasks.TaskManagement;
import org.apache.atlas.type.AtlasArrayType;
import org.apache.atlas.type.AtlasBusinessMetadataType;
import org.apache.atlas.type.*;
import org.apache.atlas.type.AtlasBusinessMetadataType.AtlasBusinessAttribute;
import org.apache.atlas.type.AtlasClassificationType;
import org.apache.atlas.type.AtlasEntityType;
import org.apache.atlas.type.AtlasEnumType;
import org.apache.atlas.type.AtlasStructType.AtlasAttribute;
import org.apache.atlas.type.AtlasType;
import org.apache.atlas.type.AtlasTypeRegistry;
import org.apache.atlas.type.AtlasTypeUtil;
import org.apache.atlas.bulkimport.BulkImportResponse;
import org.apache.atlas.bulkimport.BulkImportResponse.ImportInfo;
import org.apache.atlas.util.FileUtils;
import org.apache.atlas.utils.AtlasEntityUtil;
import org.apache.atlas.utils.AtlasPerfMetrics;
Expand All @@ -111,18 +98,15 @@
import static org.apache.atlas.bulkimport.BulkImportResponse.ImportStatus.FAILED;
import static org.apache.atlas.model.instance.AtlasEntity.Status.ACTIVE;
import static org.apache.atlas.model.instance.EntityMutations.EntityOperation.*;
import static org.apache.atlas.repository.Constants.IS_INCOMPLETE_PROPERTY_KEY;
import static org.apache.atlas.repository.Constants.STATE_PROPERTY_KEY;
import static org.apache.atlas.repository.Constants.*;
import static org.apache.atlas.repository.graph.GraphHelper.*;
import static org.apache.atlas.repository.graph.GraphHelper.getStatus;
import static org.apache.atlas.repository.store.graph.v2.EntityGraphMapper.validateLabels;
import static org.apache.atlas.repository.store.graph.v2.tasks.MeaningsTaskFactory.*;
import static org.apache.atlas.repository.store.graph.v2.tasks.MeaningsTaskFactory.UPDATE_ENTITY_MEANINGS_ON_TERM_HARD_DELETE;
import static org.apache.atlas.repository.store.graph.v2.tasks.MeaningsTaskFactory.UPDATE_ENTITY_MEANINGS_ON_TERM_SOFT_DELETE;
import static org.apache.atlas.repository.util.AccessControlUtils.REL_ATTR_POLICIES;
import static org.apache.atlas.type.Constants.HAS_LINEAGE;
import static org.apache.atlas.type.Constants.HAS_LINEAGE_VALID;
import static org.apache.atlas.type.Constants.MEANINGS_TEXT_PROPERTY_KEY;
import static org.apache.atlas.type.Constants.MEANINGS_PROPERTY_KEY;
import static org.apache.atlas.type.Constants.MEANING_NAMES_PROPERTY_KEY;
import static org.apache.atlas.type.Constants.PENDING_TASKS_PROPERTY_KEY;
import static org.apache.atlas.type.Constants.*;



Expand Down Expand Up @@ -151,10 +135,12 @@ public class AtlasEntityStoreV2 implements AtlasEntityStore {

private final ESAliasStore esAliasStore;

private final IAtlasAlternateChangeNotifier atlasAlternateChangeNotifier;
@Inject
public AtlasEntityStoreV2(AtlasGraph graph, DeleteHandlerDelegate deleteDelegate, RestoreHandlerV1 restoreHandlerV1, AtlasTypeRegistry typeRegistry,
IAtlasEntityChangeNotifier entityChangeNotifier, EntityGraphMapper entityGraphMapper, TaskManagement taskManagement,
AtlasRelationshipStore atlasRelationshipStore, FeatureFlagStore featureFlagStore) {
AtlasRelationshipStore atlasRelationshipStore, FeatureFlagStore featureFlagStore,
IAtlasAlternateChangeNotifier atlasAlternateChangeNotifier) {
this.graph = graph;
this.deleteDelegate = deleteDelegate;
this.restoreHandlerV1 = restoreHandlerV1;
Expand All @@ -168,7 +154,7 @@ public AtlasEntityStoreV2(AtlasGraph graph, DeleteHandlerDelegate deleteDelegate
this.atlasRelationshipStore = atlasRelationshipStore;
this.featureFlagStore = featureFlagStore;
this.esAliasStore = new ESAliasStore(graph, entityRetriever);

this.atlasAlternateChangeNotifier = atlasAlternateChangeNotifier;
try {
this.discovery = new EntityDiscoveryService(typeRegistry, graph, null, null, null, null);
} catch (AtlasException e) {
Expand Down Expand Up @@ -1550,8 +1536,8 @@ private EntityMutationResponse createOrUpdate(EntityStream entityStream, boolean


// Notify the change listeners
// entityChangeNotifier.onEntitiesMutated(ret, RequestContext.get().isImportInProgress());
// atlasRelationshipStore.onRelationshipsMutated(RequestContext.get().getRelationshipMutationMap());
entityChangeNotifier.onEntitiesMutated(ret, RequestContext.get().isImportInProgress());
atlasRelationshipStore.onRelationshipsMutated(RequestContext.get().getRelationshipMutationMap());
if (LOG.isDebugEnabled()) {
LOG.debug("<== createOrUpdate()");
}
Expand Down Expand Up @@ -2739,21 +2725,51 @@ public void repairAccesscontrolAlias(String guid) throws AtlasBaseException {
}

@Override
@GraphTransaction
public void linkBusinessPolicy(String guid, List<String> linkGuids) {
AtlasPerfMetrics.MetricRecorder metric = RequestContext.get().startMetricRecord("linkBusinessPolicy");
this.entityGraphMapper.linkBusinessPolicy(guid, linkGuids);
RequestContext.get().endMetricRecord(metric);
public void linkBusinessPolicy(String guid, List<String> linkGuids) throws AtlasBaseException {
processBusinessPolicy(guid, linkGuids, true);
}


@Override
@GraphTransaction
public void unlinkBusinessPolicy(String guid, List<String> unlinkGuids) {
AtlasPerfMetrics.MetricRecorder metric = RequestContext.get().startMetricRecord("linkBusinessPolicy");
this.entityGraphMapper.unlinkBusinessPolicy(guid, unlinkGuids);
public void unlinkBusinessPolicy(String guid, List<String> unlinkGuids) throws AtlasBaseException {
processBusinessPolicy(guid, unlinkGuids, false);
}

private void processBusinessPolicy(String guid, List<String> guids, boolean isLink) throws AtlasBaseException {
// Start recording the performance metrics for the operation
String operation = isLink ? "linkBusinessPolicy" : "unlinkBusinessPolicy";
AtlasPerfMetrics.MetricRecorder metric = RequestContext.get().startMetricRecord(operation);

// Link or unlink the business policy to/from entities and retrieve the affected vertices
List<AtlasVertex> vertices = isLink ? this.entityGraphMapper.linkBusinessPolicy(guid, guids)
: this.entityGraphMapper.unlinkBusinessPolicy(guid, guids);

// If no vertices are returned, exit the method early
if (CollectionUtils.isEmpty(vertices)) {
return;
}

// Prepare the response for the entity mutations
EntityMutationResponse entityMutationResponse = new EntityMutationResponse();
for (AtlasVertex vertex : vertices) {
// Convert each vertex to an AtlasEntityHeader and add it to the response
AtlasEntityHeader entityHeader = entityRetriever.toAtlasEntityHeader(vertex);
entityMutationResponse.addEntity(UPDATE, entityHeader);
}

// Collect the vertices into a map for easier access by their GUID property
Map<String, AtlasVertex> vertexMap = vertices.stream()
.collect(Collectors.toMap(vertex -> vertex.getProperty("__guid", String.class), vertex -> vertex));

// Notify the policy change notifier about the entities that were mutated
this.atlasAlternateChangeNotifier.onEntitiesMutation(entityMutationResponse, vertexMap);

// End the performance metrics recording
RequestContext.get().endMetricRecord(metric);
}



}



Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@
import java.util.Set;
import java.util.UUID;
import java.util.Date;
import java.util.concurrent.CompletableFuture;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -4514,27 +4515,58 @@ public void addHasLineage(Set<AtlasEdge> inputOutputEdges, boolean isRestoreEnti
}
RequestContext.get().endMetricRecord(metricRecorder);
}
public void linkBusinessPolicy(String policyId, List<String> linkGuids) {
for (String guid : linkGuids) {
AtlasVertex ev = AtlasGraphUtilsV2.findByGuid(graph, guid);
if (ev != null) {
Set<String> existingValues = ev.getMultiValuedSetProperty("assetPolicyGUIDs", String.class);
ev.setProperty("assetPolicyGUIDs", policyId);
ev.setProperty("assetPoliciesCount", existingValues.size() + 1);
updateModificationMetadata(ev);
}
}


@GraphTransaction
public List<AtlasVertex> linkBusinessPolicy(String policyId, List<String> linkGuids) {
AtlasPerfMetrics.MetricRecorder metric = RequestContext.get().startMetricRecord("linkBusinessPolicy.GraphTransaction");
List<AtlasVertex> collect = linkGuids.stream().map(guid -> findByGuid(graph, guid)).filter(Objects::nonNull).filter(ev -> {
Set<String> existingValues = ev.getMultiValuedSetProperty("assetPolicyGUIDs", String.class);
return !existingValues.contains(policyId);
}).peek(ev -> {
Set<String> existingValues = ev.getMultiValuedSetProperty("assetPolicyGUIDs", String.class);
existingValues.add(policyId);
ev.setProperty("assetPolicyGUIDs", policyId);
ev.setProperty("assetPoliciesCount", existingValues.size() + 1);

updateModificationMetadata(ev);

cacheDifferentialEntity(ev, existingValues, policyId);
}).collect(Collectors.toList());
RequestContext.get().endMetricRecord(metric);
return collect;
}

public void unlinkBusinessPolicy(String policyId, List<String> unlinkGuids) {
for (String guid : unlinkGuids) {
AtlasVertex ev = AtlasGraphUtilsV2.findByGuid(graph, guid);
if (ev != null) {
Set<String> existingValues = ev.getMultiValuedSetProperty("assetPolicyGUIDs", String.class);
ev.removePropertyValue("assetPolicyGUIDs", policyId);
ev.setProperty("assetPoliciesCount", existingValues.size() - 1);
updateModificationMetadata(ev);
}
}

@GraphTransaction
public List<AtlasVertex> unlinkBusinessPolicy(String policyId, List<String> unlinkGuids) {

return unlinkGuids.stream().map(guid -> AtlasGraphUtilsV2.findByGuid(graph, guid)).filter(Objects::nonNull).filter(ev -> {
Set<String> existingValues = ev.getMultiValuedSetProperty("assetPolicyGUIDs", String.class);
return existingValues.contains(policyId);
}).peek(ev -> {
Set<String> existingValues = ev.getMultiValuedSetProperty("assetPolicyGUIDs", String.class);
existingValues.remove(policyId);
ev.removePropertyValue("assetPolicyGUIDs", policyId);
ev.setProperty("assetPoliciesCount", existingValues.size() - 1);

updateModificationMetadata(ev);

cacheDifferentialEntity(ev, existingValues, policyId);
}).collect(Collectors.toList());
}
}


private void cacheDifferentialEntity(AtlasVertex ev, Set<String> existingValues, String policyId) {
AtlasEntity diffEntity = new AtlasEntity(ev.getProperty("__typeName", String.class));
diffEntity.setGuid(ev.getProperty("__guid", String.class));
diffEntity.setAttribute("assetPolicyGUIDs", existingValues);
diffEntity.setAttribute("assetPoliciesCount", existingValues.size());
diffEntity.setUpdatedBy(ev.getProperty(MODIFIED_BY_KEY, String.class));
diffEntity.setUpdateTime(new Date(ev.getProperty(MODIFICATION_TIMESTAMP_PROPERTY_KEY, Long.class)));

RequestContext requestContext = RequestContext.get();
requestContext.cacheDifferentialEntity(diffEntity);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.repository.store.graph.v2;

import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.model.instance.*;
import org.apache.atlas.repository.graphdb.AtlasVertex;

import java.util.Map;

public interface IAtlasAlternateChangeNotifier {
void onEntitiesMutation(final EntityMutationResponse entityMutationResponse, final Map<String, AtlasVertex> entityByGuid) throws AtlasBaseException;
}
Loading

0 comments on commit 72b591b

Please sign in to comment.