Skip to content

Commit

Permalink
Merge pull request #3348 from atlanhq/policyendpointsmasterss
Browse files Browse the repository at this point in the history
DG-1697: Adding endpoint for linking/unlink policy
  • Loading branch information
arpit-at authored Jul 29, 2024
2 parents 165587f + 93e2c13 commit e154fe8
Show file tree
Hide file tree
Showing 10 changed files with 407 additions and 39 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -368,6 +368,10 @@ public final class Constants {
public static final String IMPALA_SOURCE = "impala";
public static final String STORM_SOURCE = "storm";
public static final String FILE_SPOOL_SOURCE = "file_spool";
public static final String ASSET_POLICY_GUIDS = "assetPolicyGUIDs";
public static final String ASSET_POLICIES_COUNT = "assetPoliciesCount";



/*
* All supported file-format extensions for Bulk Imports through file upload
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.model.instance;

import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.databind.annotation.JsonSerialize;

import javax.xml.bind.annotation.XmlAccessType;
import javax.xml.bind.annotation.XmlAccessorType;
import javax.xml.bind.annotation.XmlRootElement;
import java.io.Serializable;
import java.util.List;
import java.util.Set;

import static com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility.NONE;
import static com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility.PUBLIC_ONLY;

/**
* Request to link/unlink policies from asset.
*/
@JsonAutoDetect(getterVisibility = PUBLIC_ONLY, setterVisibility = PUBLIC_ONLY, fieldVisibility = NONE)
@JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL)
@JsonIgnoreProperties(ignoreUnknown = true)
@JsonInclude(JsonInclude.Include.NON_NULL)
@XmlRootElement
@XmlAccessorType(XmlAccessType.PROPERTY)
public class LinkBusinessPolicyRequest implements Serializable {
private static final long serialVersionUID = 1L;

private Set<String> linkGuids;
private Set<String> unlinkGuids;

public Set<String> getLinkGuids() {
return linkGuids;
}

public void setLinkGuids(Set<String> linkGuids) {
this.linkGuids = linkGuids;
}

public Set<String> getUnlinkGuids() {
return unlinkGuids;
}

public void setUnlinkGuids(Set<String> unlinkGuids) {
this.unlinkGuids = unlinkGuids;
}

@Override
public String toString() {
final StringBuilder sb = new StringBuilder("LinkBusinessPolicyRequest{");
sb.append("linkGuids=").append(linkGuids);
sb.append(", unlinkGuids=").append(unlinkGuids);
sb.append('}');
return sb.toString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -365,4 +365,10 @@ EntityMutationResponse deleteByUniqueAttributes(List<AtlasObjectId> objectIds)

void repairAccesscontrolAlias(String guid) throws AtlasBaseException;


void linkBusinessPolicy(String policyId, Set<String> linkGuids) throws AtlasBaseException;


void unlinkBusinessPolicy(String policyId, Set<String> unlinkGuids) throws AtlasBaseException;

}
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,12 @@


import com.google.common.annotations.VisibleForTesting;
import org.apache.atlas.AtlasErrorCode;
import org.apache.atlas.DeleteType;
import org.apache.atlas.GraphTransactionInterceptor;
import org.apache.atlas.RequestContext;
import org.apache.atlas.AtlasException;
import org.apache.atlas.AtlasConfiguration;
import org.apache.atlas.*;
import org.apache.atlas.annotation.GraphTransaction;
import org.apache.atlas.authorize.*;
import org.apache.atlas.authorize.AtlasEntityAccessRequest.AtlasEntityAccessRequestBuilder;
import org.apache.atlas.authorize.AtlasPrivilege;
import org.apache.atlas.bulkimport.BulkImportResponse;
import org.apache.atlas.bulkimport.BulkImportResponse.ImportInfo;
import org.apache.atlas.discovery.EntityDiscoveryService;
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.featureflag.FeatureFlagStore;
Expand All @@ -55,40 +51,31 @@
import org.apache.atlas.repository.store.graph.EntityGraphDiscovery;
import org.apache.atlas.repository.store.graph.EntityGraphDiscoveryContext;
import org.apache.atlas.repository.store.graph.v1.DeleteHandlerDelegate;
import org.apache.atlas.repository.store.graph.v2.AtlasEntityComparator.AtlasEntityDiffResult;
import org.apache.atlas.repository.store.graph.v1.RestoreHandlerV1;
import org.apache.atlas.repository.store.graph.v2.AtlasEntityComparator.AtlasEntityDiffResult;
import org.apache.atlas.repository.store.graph.v2.preprocessor.AuthPolicyPreProcessor;
import org.apache.atlas.repository.store.graph.v2.preprocessor.ConnectionPreProcessor;
import org.apache.atlas.repository.store.graph.v2.preprocessor.accesscontrol.StakeholderPreProcessor;
import org.apache.atlas.repository.store.graph.v2.preprocessor.contract.ContractPreProcessor;
import org.apache.atlas.repository.store.graph.v2.preprocessor.datamesh.StakeholderTitlePreProcessor;
import org.apache.atlas.repository.store.graph.v2.preprocessor.resource.LinkPreProcessor;
import org.apache.atlas.repository.store.graph.v2.preprocessor.PreProcessor;
import org.apache.atlas.repository.store.graph.v2.preprocessor.accesscontrol.PersonaPreProcessor;
import org.apache.atlas.repository.store.graph.v2.preprocessor.accesscontrol.PurposePreProcessor;
import org.apache.atlas.repository.store.graph.v2.preprocessor.datamesh.DataProductPreProcessor;
import org.apache.atlas.repository.store.graph.v2.preprocessor.accesscontrol.StakeholderPreProcessor;
import org.apache.atlas.repository.store.graph.v2.preprocessor.contract.ContractPreProcessor;
import org.apache.atlas.repository.store.graph.v2.preprocessor.datamesh.DataDomainPreProcessor;
import org.apache.atlas.repository.store.graph.v2.preprocessor.datamesh.DataProductPreProcessor;
import org.apache.atlas.repository.store.graph.v2.preprocessor.datamesh.StakeholderTitlePreProcessor;
import org.apache.atlas.repository.store.graph.v2.preprocessor.glossary.CategoryPreProcessor;
import org.apache.atlas.repository.store.graph.v2.preprocessor.glossary.GlossaryPreProcessor;
import org.apache.atlas.repository.store.graph.v2.preprocessor.glossary.TermPreProcessor;
import org.apache.atlas.repository.store.graph.v2.preprocessor.resource.LinkPreProcessor;
import org.apache.atlas.repository.store.graph.v2.preprocessor.resource.ReadmePreProcessor;
import org.apache.atlas.repository.store.graph.v2.preprocessor.sql.QueryCollectionPreProcessor;
import org.apache.atlas.repository.store.graph.v2.preprocessor.sql.QueryFolderPreProcessor;
import org.apache.atlas.repository.store.graph.v2.preprocessor.sql.QueryPreProcessor;
import org.apache.atlas.repository.store.graph.v2.tasks.MeaningsTask;
import org.apache.atlas.tasks.TaskManagement;
import org.apache.atlas.type.AtlasArrayType;
import org.apache.atlas.type.AtlasBusinessMetadataType;
import org.apache.atlas.type.*;
import org.apache.atlas.type.AtlasBusinessMetadataType.AtlasBusinessAttribute;
import org.apache.atlas.type.AtlasClassificationType;
import org.apache.atlas.type.AtlasEntityType;
import org.apache.atlas.type.AtlasEnumType;
import org.apache.atlas.type.AtlasStructType.AtlasAttribute;
import org.apache.atlas.type.AtlasType;
import org.apache.atlas.type.AtlasTypeRegistry;
import org.apache.atlas.type.AtlasTypeUtil;
import org.apache.atlas.bulkimport.BulkImportResponse;
import org.apache.atlas.bulkimport.BulkImportResponse.ImportInfo;
import org.apache.atlas.util.FileUtils;
import org.apache.atlas.utils.AtlasEntityUtil;
import org.apache.atlas.utils.AtlasPerfMetrics;
Expand All @@ -111,18 +98,15 @@
import static org.apache.atlas.bulkimport.BulkImportResponse.ImportStatus.FAILED;
import static org.apache.atlas.model.instance.AtlasEntity.Status.ACTIVE;
import static org.apache.atlas.model.instance.EntityMutations.EntityOperation.*;
import static org.apache.atlas.repository.Constants.IS_INCOMPLETE_PROPERTY_KEY;
import static org.apache.atlas.repository.Constants.STATE_PROPERTY_KEY;
import static org.apache.atlas.repository.Constants.*;
import static org.apache.atlas.repository.graph.GraphHelper.*;
import static org.apache.atlas.repository.graph.GraphHelper.getStatus;
import static org.apache.atlas.repository.store.graph.v2.EntityGraphMapper.validateLabels;
import static org.apache.atlas.repository.store.graph.v2.tasks.MeaningsTaskFactory.*;
import static org.apache.atlas.repository.store.graph.v2.tasks.MeaningsTaskFactory.UPDATE_ENTITY_MEANINGS_ON_TERM_HARD_DELETE;
import static org.apache.atlas.repository.store.graph.v2.tasks.MeaningsTaskFactory.UPDATE_ENTITY_MEANINGS_ON_TERM_SOFT_DELETE;
import static org.apache.atlas.repository.util.AccessControlUtils.REL_ATTR_POLICIES;
import static org.apache.atlas.type.Constants.HAS_LINEAGE;
import static org.apache.atlas.type.Constants.HAS_LINEAGE_VALID;
import static org.apache.atlas.type.Constants.MEANINGS_TEXT_PROPERTY_KEY;
import static org.apache.atlas.type.Constants.MEANINGS_PROPERTY_KEY;
import static org.apache.atlas.type.Constants.MEANING_NAMES_PROPERTY_KEY;
import static org.apache.atlas.type.Constants.PENDING_TASKS_PROPERTY_KEY;
import static org.apache.atlas.type.Constants.*;



Expand Down Expand Up @@ -151,10 +135,12 @@ public class AtlasEntityStoreV2 implements AtlasEntityStore {

private final ESAliasStore esAliasStore;

private final IAtlasMinimalChangeNotifier atlasAlternateChangeNotifier;
@Inject
public AtlasEntityStoreV2(AtlasGraph graph, DeleteHandlerDelegate deleteDelegate, RestoreHandlerV1 restoreHandlerV1, AtlasTypeRegistry typeRegistry,
IAtlasEntityChangeNotifier entityChangeNotifier, EntityGraphMapper entityGraphMapper, TaskManagement taskManagement,
AtlasRelationshipStore atlasRelationshipStore, FeatureFlagStore featureFlagStore) {
AtlasRelationshipStore atlasRelationshipStore, FeatureFlagStore featureFlagStore,
IAtlasMinimalChangeNotifier atlasAlternateChangeNotifier) {
this.graph = graph;
this.deleteDelegate = deleteDelegate;
this.restoreHandlerV1 = restoreHandlerV1;
Expand All @@ -168,7 +154,7 @@ public AtlasEntityStoreV2(AtlasGraph graph, DeleteHandlerDelegate deleteDelegate
this.atlasRelationshipStore = atlasRelationshipStore;
this.featureFlagStore = featureFlagStore;
this.esAliasStore = new ESAliasStore(graph, entityRetriever);

this.atlasAlternateChangeNotifier = atlasAlternateChangeNotifier;
try {
this.discovery = new EntityDiscoveryService(typeRegistry, graph, null, null, null, null);
} catch (AtlasException e) {
Expand Down Expand Up @@ -2737,6 +2723,54 @@ public void repairAccesscontrolAlias(String guid) throws AtlasBaseException {

RequestContext.get().endMetricRecord(metric);
}

@Override
@GraphTransaction
public void linkBusinessPolicy(String policyGuid, Set<String> linkGuids) throws AtlasBaseException {
AtlasPerfMetrics.MetricRecorder metric = RequestContext.get().startMetricRecord("linkBusinessPolicy.GraphTransaction");

try {
List<AtlasVertex> vertices = this.entityGraphMapper.linkBusinessPolicy(policyGuid, linkGuids);
if (CollectionUtils.isEmpty(vertices)) {
return;
}

handleBusinessPolicyMutation(vertices);
} catch (Exception e) {
LOG.error("Error during linkBusinessPolicy for policyGuid: {}", policyGuid, e);
throw e;
} finally {
RequestContext.get().endMetricRecord(metric);
}
}

@Override
@GraphTransaction
public void unlinkBusinessPolicy(String policyGuid, Set<String> unlinkGuids) throws AtlasBaseException {
AtlasPerfMetrics.MetricRecorder metric = RequestContext.get().startMetricRecord("unlinkBusinessPolicy.GraphTransaction");
try {
List<AtlasVertex> vertices = this.entityGraphMapper.unlinkBusinessPolicy(policyGuid, unlinkGuids);
if (CollectionUtils.isEmpty(vertices)) {
return;
}

handleBusinessPolicyMutation(vertices);
} catch (Exception e) {
LOG.error("Error during unlinkBusinessPolicy for policyGuid: {}", policyGuid, e);
throw e;
} finally {
RequestContext.get().endMetricRecord(metric);
}
}

private void handleBusinessPolicyMutation(List<AtlasVertex> vertices) throws AtlasBaseException {
AtlasPerfMetrics.MetricRecorder metricRecorder = RequestContext.get().startMetricRecord("handleBusinessPolicyMutation");
this.atlasAlternateChangeNotifier.onEntitiesMutation(vertices);
RequestContext.get().endMetricRecord(metricRecorder);
}


}



Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
package org.apache.atlas.repository.store.graph.v2;

import org.apache.atlas.RequestContext;
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.listener.EntityChangeListenerV2;
import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.atlas.repository.graphdb.AtlasVertex;
import org.apache.atlas.utils.AtlasPerfMetrics;
import org.springframework.stereotype.Component;

import javax.inject.Inject;
import java.util.*;

import static org.apache.atlas.repository.Constants.*;
import static org.apache.atlas.repository.graph.GraphHelper.*;


@Component
public class BusinessPolicyNotifierImpl implements IAtlasMinimalChangeNotifier {

private final Set<EntityChangeListenerV2> entityChangeListenersV2;

@Inject
public BusinessPolicyNotifierImpl(Set<EntityChangeListenerV2> entityChangeListenersV2) {
this.entityChangeListenersV2 = entityChangeListenersV2;

}

@Override
public void onEntitiesMutation(final List<AtlasVertex> vertices) throws AtlasBaseException {
AtlasPerfMetrics.MetricRecorder metricRecorder = RequestContext.get().startMetricRecord("onEntitiesMutation");
final List<AtlasEntity> entities = new ArrayList<>(0);
vertices.forEach(item -> entities.add(createAtlasEntity(item)));
for (EntityChangeListenerV2 listener : entityChangeListenersV2) {
listener.onEntitiesUpdated(entities, false);
}

RequestContext.get().endMetricRecord(metricRecorder);
}

private AtlasEntity createAtlasEntity(AtlasVertex vertex) {
AtlasEntity atlasEntity = new AtlasEntity();
atlasEntity.setAttribute(QUALIFIED_NAME, vertex.getProperty(QUALIFIED_NAME, String.class));
atlasEntity.setAttribute(NAME, vertex.getProperty(NAME, String.class));

atlasEntity.setGuid(vertex.getProperty(GUID_PROPERTY_KEY, String.class));
atlasEntity.setTypeName(vertex.getProperty(TYPE_NAME_PROPERTY_KEY, String.class));
atlasEntity.setCreatedBy(vertex.getProperty(CREATED_BY_KEY, String.class));
atlasEntity.setUpdatedBy(vertex.getProperty(MODIFIED_BY_KEY, String.class));
atlasEntity.setCreateTime(new Date(vertex.getProperty(TIMESTAMP_PROPERTY_KEY, Long.class)));
atlasEntity.setUpdateTime(new Date(vertex.getProperty(MODIFICATION_TIMESTAMP_PROPERTY_KEY, Long.class)));
atlasEntity.setIsProxy(vertex.getProperty(IS_PROXY_KEY, Boolean.class));
atlasEntity.setIsIncomplete(vertex.getProperty(IS_INCOMPLETE_PROPERTY_KEY, Boolean.class));
atlasEntity.setStatus(getStatus(vertex));
atlasEntity.setProvenanceType(getProvenanceType(vertex));
atlasEntity.setHomeId(getHomeId(vertex));
atlasEntity.setVersion(getVersion(vertex));


return atlasEntity;
}


}
Original file line number Diff line number Diff line change
Expand Up @@ -4581,4 +4581,51 @@ public void addHasLineage(Set<AtlasEdge> inputOutputEdges, boolean isRestoreEnti
RequestContext.get().endMetricRecord(metricRecorder);
}

}

public List<AtlasVertex> linkBusinessPolicy(String policyId, Set<String> linkGuids) {
return linkGuids.stream().map(guid -> findByGuid(graph, guid)).filter(Objects::nonNull).filter(ev -> {
Set<String> existingValues = ev.getMultiValuedSetProperty(ASSET_POLICY_GUIDS, String.class);
return !existingValues.contains(policyId);
}).peek(ev -> {
Set<String> existingValues = ev.getMultiValuedSetProperty(ASSET_POLICY_GUIDS, String.class);
existingValues.add(policyId);
ev.setProperty(ASSET_POLICY_GUIDS, policyId);
ev.setProperty(ASSET_POLICIES_COUNT, existingValues.size());

updateModificationMetadata(ev);

cacheDifferentialEntity(ev, existingValues);
}).collect(Collectors.toList());
}


public List<AtlasVertex> unlinkBusinessPolicy(String policyId, Set<String> unlinkGuids) {
return unlinkGuids.stream().map(guid -> AtlasGraphUtilsV2.findByGuid(graph, guid)).filter(Objects::nonNull).filter(ev -> {
Set<String> existingValues = ev.getMultiValuedSetProperty(ASSET_POLICY_GUIDS, String.class);
return existingValues.contains(policyId);
}).peek(ev -> {
Set<String> existingValues = ev.getMultiValuedSetProperty(ASSET_POLICY_GUIDS, String.class);
existingValues.remove(policyId);
ev.removePropertyValue(ASSET_POLICY_GUIDS, policyId);
ev.setProperty(ASSET_POLICIES_COUNT, existingValues.size());

updateModificationMetadata(ev);

cacheDifferentialEntity(ev, existingValues);
}).collect(Collectors.toList());
}


private void cacheDifferentialEntity(AtlasVertex ev, Set<String> existingValues) {
AtlasEntity diffEntity = new AtlasEntity(ev.getProperty(TYPE_NAME_PROPERTY_KEY, String.class));
diffEntity.setGuid(ev.getProperty(GUID_PROPERTY_KEY, String.class));
diffEntity.setAttribute(ASSET_POLICY_GUIDS, existingValues);
diffEntity.setAttribute(ASSET_POLICIES_COUNT, existingValues.size());
diffEntity.setUpdatedBy(ev.getProperty(MODIFIED_BY_KEY, String.class));
diffEntity.setUpdateTime(new Date(RequestContext.get().getRequestTime()));

RequestContext requestContext = RequestContext.get();
requestContext.cacheDifferentialEntity(diffEntity);
}

}
Loading

0 comments on commit e154fe8

Please sign in to comment.