diff --git a/addons/policies/bootstrap_entity_policies.json b/addons/policies/bootstrap_entity_policies.json
index 24ecee3421..32b0b381e1 100644
--- a/addons/policies/bootstrap_entity_policies.json
+++ b/addons/policies/bootstrap_entity_policies.json
@@ -3005,6 +3005,76 @@
"entity-delete"
]
}
+ },
+ {
+ "typeName": "AuthPolicy",
+ "attributes":
+ {
+ "name": "READ_DATA_CONTRACT",
+ "qualifiedName": "READ_DATA_CONTRACT",
+ "policyCategory": "bootstrap",
+ "policySubCategory": "default",
+ "policyServiceName": "atlas",
+ "policyType": "allow",
+ "policyPriority": 1,
+ "policyUsers":
+ [],
+ "policyGroups":
+ [],
+ "policyRoles":
+ [
+ "$admin",
+ "$member",
+ "$api-token-default-access"
+ ],
+ "policyResourceCategory": "ENTITY",
+ "policyResources":
+ [
+ "entity-type:DataContract",
+ "entity-classification:*",
+ "entity:*"
+ ],
+ "policyActions":
+ [
+ "entity-read"
+ ]
+ }
+ },
+ {
+ "typeName": "AuthPolicy",
+ "attributes":
+ {
+ "name": "CU_DATA_CONTRACT",
+ "qualifiedName": "CU_DATA_CONTRACT",
+ "description": "cu allow for data contract",
+ "policyCategory": "bootstrap",
+ "policySubCategory": "default",
+ "policyServiceName": "atlas",
+ "policyType": "allow",
+ "policyPriority": 1,
+ "policyUsers":
+ [],
+ "policyGroups":
+ [],
+ "policyRoles":
+ [
+ "$admin",
+ "$member",
+ "$api-token-default-access"
+ ],
+ "policyResourceCategory": "ENTITY",
+ "policyResources":
+ [
+ "entity-type:DataContract",
+ "entity-classification:*",
+ "entity:*"
+ ],
+ "policyActions":
+ [
+ "entity-create",
+ "entity-update"
+ ]
+ }
}
]
}
diff --git a/common/src/main/java/org/apache/atlas/repository/Constants.java b/common/src/main/java/org/apache/atlas/repository/Constants.java
index 8d4d47ea57..e10796992d 100644
--- a/common/src/main/java/org/apache/atlas/repository/Constants.java
+++ b/common/src/main/java/org/apache/atlas/repository/Constants.java
@@ -162,6 +162,13 @@ public final class Constants {
public static final String ASSET_README_EDGE_LABEL = "__Asset.readme";
public static final String ASSET_LINK_EDGE_LABEL = "__Asset.links";
+ /**
+ * Contract
+ */
+ public static final String CONTRACT_ENTITY_TYPE = "DataContract";
+ public static final String ATTR_CONTRACT_VERSION = "dataContractVersion";
+
+
/**
* Lineage relations.
*/
@@ -403,7 +410,8 @@ public enum SupportedFileExtensions { XLSX, XLS, CSV }
public static final String ATTR_STARRED_DETAILS_LIST = "starredDetailsList";
public static final String ATTR_ASSET_STARRED_BY = "assetStarredBy";
public static final String ATTR_ASSET_STARRED_AT = "assetStarredAt";
-
+ public static final String ATTR_CERTIFICATE_STATUS = "certificateStatus";
+ public static final String ATTR_CONTRACT = "dataContractJson";
public static final String STRUCT_STARRED_DETAILS = "StarredDetails";
public static final String KEYCLOAK_ROLE_ADMIN = "$admin";
diff --git a/repository/pom.xml b/repository/pom.xml
index 10d8d876fb..a2a1a4198f 100755
--- a/repository/pom.xml
+++ b/repository/pom.xml
@@ -322,6 +322,12 @@
3.0.0-SNAPSHOT
+
+ org.hibernate
+ hibernate-validator
+ 4.3.2.Final
+
+
diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityStoreV2.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityStoreV2.java
index 6e3487f8d3..3436ec3e10 100644
--- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityStoreV2.java
+++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityStoreV2.java
@@ -58,6 +58,7 @@
import org.apache.atlas.repository.store.graph.v1.RestoreHandlerV1;
import org.apache.atlas.repository.store.graph.v2.preprocessor.AuthPolicyPreProcessor;
import org.apache.atlas.repository.store.graph.v2.preprocessor.ConnectionPreProcessor;
+import org.apache.atlas.repository.store.graph.v2.preprocessor.contract.ContractPreProcessor;
import org.apache.atlas.repository.store.graph.v2.preprocessor.resource.LinkPreProcessor;
import org.apache.atlas.repository.store.graph.v2.preprocessor.PreProcessor;
import org.apache.atlas.repository.store.graph.v2.preprocessor.accesscontrol.PersonaPreProcessor;
@@ -1835,6 +1836,10 @@ public PreProcessor getPreProcessor(String typeName) {
case README_ENTITY_TYPE:
preProcessor = new ReadmePreProcessor(typeRegistry, entityRetriever);
break;
+
+ case CONTRACT_ENTITY_TYPE:
+ preProcessor = new ContractPreProcessor(graph, typeRegistry, entityRetriever, storeDifferentialAudits, discovery);
+ break;
}
return preProcessor;
diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/preprocessor/contract/AbstractContractPreProcessor.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/preprocessor/contract/AbstractContractPreProcessor.java
new file mode 100644
index 0000000000..0a4521e34b
--- /dev/null
+++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/preprocessor/contract/AbstractContractPreProcessor.java
@@ -0,0 +1,96 @@
+package org.apache.atlas.repository.store.graph.v2.preprocessor.contract;
+
+import org.apache.atlas.RequestContext;
+import org.apache.atlas.authorize.AtlasAuthorizationUtils;
+import org.apache.atlas.authorize.AtlasEntityAccessRequest;
+import org.apache.atlas.authorize.AtlasPrivilege;
+import org.apache.atlas.exception.AtlasBaseException;
+import org.apache.atlas.model.TypeCategory;
+import org.apache.atlas.model.instance.AtlasEntity;
+import org.apache.atlas.model.instance.AtlasEntityHeader;
+import org.apache.atlas.repository.graphdb.AtlasGraph;
+import org.apache.atlas.repository.graphdb.AtlasVertex;
+import org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2;
+import org.apache.atlas.repository.store.graph.v2.EntityGraphRetriever;
+import org.apache.atlas.repository.store.graph.v2.preprocessor.PreProcessor;
+import org.apache.atlas.type.AtlasEntityType;
+import org.apache.atlas.type.AtlasTypeRegistry;
+import org.apache.atlas.utils.AtlasPerfMetrics;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.atlas.AtlasErrorCode.INSTANCE_BY_UNIQUE_ATTRIBUTE_NOT_FOUND;
+import static org.apache.atlas.AtlasErrorCode.TYPE_NAME_INVALID;
+import static org.apache.atlas.repository.Constants.*;
+
+public abstract class AbstractContractPreProcessor implements PreProcessor {
+ private static final Logger LOG = LoggerFactory.getLogger(AbstractContractPreProcessor.class);
+
+ public final AtlasTypeRegistry typeRegistry;
+ public final EntityGraphRetriever entityRetriever;
+ public final AtlasGraph graph;
+
+
+ AbstractContractPreProcessor(AtlasGraph graph, AtlasTypeRegistry typeRegistry,
+ EntityGraphRetriever entityRetriever) {
+ this.graph = graph;
+ this.typeRegistry = typeRegistry;
+ this.entityRetriever = entityRetriever;
+ }
+
+ void authorizeContractCreateOrUpdate(AtlasEntity contractEntity, AtlasEntity.AtlasEntityWithExtInfo associatedAsset) throws AtlasBaseException {
+ AtlasPerfMetrics.MetricRecorder metricRecorder = RequestContext.get().startMetricRecord("authorizeContractUpdate");
+ try {
+ AtlasEntityHeader entityHeader = new AtlasEntityHeader(associatedAsset.getEntity());
+
+ //First authorize entity update access
+ verifyAssetAccess(entityHeader, AtlasPrivilege.ENTITY_UPDATE, contractEntity, AtlasPrivilege.ENTITY_UPDATE);
+
+ } finally {
+ RequestContext.get().endMetricRecord(metricRecorder);
+ }
+ }
+
+
+ private void verifyAssetAccess(AtlasEntityHeader asset, AtlasPrivilege assetPrivilege,
+ AtlasEntity contract, AtlasPrivilege contractPrivilege) throws AtlasBaseException {
+ verifyAccess(asset, assetPrivilege);
+ verifyAccess(contract, contractPrivilege);
+ }
+
+ private void verifyAccess(AtlasEntity entity, AtlasPrivilege privilege) throws AtlasBaseException {
+ verifyAccess(new AtlasEntityHeader(entity), privilege);
+ }
+
+ private void verifyAccess(AtlasEntityHeader entityHeader, AtlasPrivilege privilege) throws AtlasBaseException {
+ String errorMessage = privilege.name() + " entity: " + entityHeader.getTypeName();
+ AtlasAuthorizationUtils.verifyAccess(new AtlasEntityAccessRequest(typeRegistry, privilege, entityHeader), errorMessage);
+ }
+
+ AtlasEntity.AtlasEntityWithExtInfo getAssociatedAsset(String datasetQName, String typeName) throws AtlasBaseException {
+
+ Map uniqAttributes = new HashMap<>();
+ uniqAttributes.put(QUALIFIED_NAME, datasetQName);
+
+ AtlasEntityType entityType = ensureEntityType(typeName);
+
+ AtlasVertex entityVertex = AtlasGraphUtilsV2.getVertexByUniqueAttributes(graph, entityType, uniqAttributes);
+
+ return entityRetriever.toAtlasEntityWithExtInfo(entityVertex);
+ }
+
+ AtlasEntityType ensureEntityType(String typeName) throws AtlasBaseException {
+ AtlasEntityType ret = typeRegistry.getEntityTypeByName(typeName);
+
+ if (ret == null) {
+ throw new AtlasBaseException(TYPE_NAME_INVALID, TypeCategory.ENTITY.name(), typeName);
+ }
+
+ return ret;
+ }
+
+
+}
diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/preprocessor/contract/ContractPreProcessor.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/preprocessor/contract/ContractPreProcessor.java
new file mode 100644
index 0000000000..df3efde85f
--- /dev/null
+++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/preprocessor/contract/ContractPreProcessor.java
@@ -0,0 +1,304 @@
+package org.apache.atlas.repository.store.graph.v2.preprocessor.contract;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+
+import org.apache.atlas.RequestContext;
+import org.apache.atlas.discovery.EntityDiscoveryService;
+import org.apache.atlas.exception.AtlasBaseException;
+import org.apache.atlas.model.discovery.AtlasSearchResult;
+import org.apache.atlas.model.discovery.IndexSearchParams;
+import org.apache.atlas.model.instance.AtlasEntity;
+import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityWithExtInfo;
+import org.apache.atlas.model.instance.AtlasStruct;
+import org.apache.atlas.model.instance.EntityMutations;
+import org.apache.atlas.repository.graphdb.AtlasGraph;
+import org.apache.atlas.repository.graphdb.AtlasVertex;
+import org.apache.atlas.repository.store.graph.v2.*;
+import org.apache.atlas.type.AtlasEntityType;
+import org.apache.atlas.type.AtlasTypeRegistry;
+import org.apache.commons.collections.CollectionUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+import java.util.*;
+
+import static org.apache.atlas.AtlasErrorCode.*;
+import static org.apache.atlas.repository.Constants.*;
+import static org.apache.atlas.repository.util.AtlasEntityUtils.mapOf;
+import static org.apache.atlas.type.AtlasTypeUtil.getAtlasObjectId;
+
+public class ContractPreProcessor extends AbstractContractPreProcessor {
+ private static final Logger LOG = LoggerFactory.getLogger(ContractPreProcessor.class);
+ public static final String ATTR_ASSET_GUID = "dataContractAssetGuid";
+ public static final String REL_ATTR_LATEST_CONTRACT = "dataContractLatest";
+ public static final String REL_ATTR_GOVERNED_ASSET_CERTIFIED = "dataContractLatestCertified";
+ public static final String REL_ATTR_PREVIOUS_VERSION = "dataContractPreviousVersion";
+ public static final String ASSET_ATTR_HAS_CONTRACT = "hasContract";
+ public static final String CONTRACT_QUALIFIED_NAME_SUFFIX = "contract";
+ public static final String CONTRACT_ATTR_STATUS = "status";
+ private static final Set contractAttributes = new HashSet<>();
+ static {
+ contractAttributes.add(ATTR_CONTRACT);
+ contractAttributes.add(ATTR_CERTIFICATE_STATUS);
+ contractAttributes.add(ATTR_CONTRACT_VERSION);
+ }
+ private final boolean storeDifferentialAudits;
+ private final EntityDiscoveryService discovery;
+
+ private final AtlasEntityComparator entityComparator;
+
+
+ public ContractPreProcessor(AtlasGraph graph, AtlasTypeRegistry typeRegistry,
+ EntityGraphRetriever entityRetriever,
+ boolean storeDifferentialAudits, EntityDiscoveryService discovery) {
+
+ super(graph, typeRegistry, entityRetriever);
+ this.storeDifferentialAudits = storeDifferentialAudits;
+ this.discovery = discovery;
+ this.entityComparator = new AtlasEntityComparator(typeRegistry, entityRetriever, null, true, true);
+
+ }
+
+ @Override
+ public void processAttributes(AtlasStruct entityStruct, EntityMutationContext context, EntityMutations.EntityOperation operation) throws AtlasBaseException {
+ AtlasEntity entity = (AtlasEntity) entityStruct;
+ switch (operation) {
+ case CREATE:
+ processCreateContract(entity, context);
+ break;
+ case UPDATE:
+ // Updating an existing version of the contract
+ processUpdateContract(entity, context);
+ }
+
+ }
+
+ private void processUpdateContract(AtlasEntity entity, EntityMutationContext context) throws AtlasBaseException {
+ String contractString = (String) entity.getAttribute(ATTR_CONTRACT);
+ AtlasVertex vertex = context.getVertex(entity.getGuid());
+ AtlasEntity existingContractEntity = entityRetriever.toAtlasEntity(vertex);
+ // No update to relationships allowed for the existing contract version
+ resetAllRelationshipAttributes(entity);
+ if (!isEqualContract(contractString, (String) existingContractEntity.getAttribute(ATTR_CONTRACT))) {
+ // Update the same asset(entity)
+ throw new AtlasBaseException(OPERATION_NOT_SUPPORTED, "Can't update a specific version of contract");
+ }
+ }
+ private void processCreateContract(AtlasEntity entity, EntityMutationContext context) throws AtlasBaseException {
+ /*
+ Low-level Design
+ | Authorization
+ | Deserialization of the JSON
+ ---| Validation of spec
+ | Validation of contract
+ | Create Version
+ | Create Draft
+ ---| asset to contract sync
+ | Create Publish
+ ---| two-way sync of attribute
+ */
+
+ String contractQName = (String) entity.getAttribute(QUALIFIED_NAME);
+ validateAttribute(!contractQName.endsWith(String.format("/%s", CONTRACT_QUALIFIED_NAME_SUFFIX)), "Invalid qualifiedName for the contract.");
+
+ String contractString = (String) entity.getAttribute(ATTR_CONTRACT);
+ DataContract contract = DataContract.deserialize(contractString);
+ String datasetQName = contractQName.substring(0, contractQName.lastIndexOf('/'));
+ contractQName = String.format("%s/%s/%s", datasetQName, contract.getType().name(), CONTRACT_QUALIFIED_NAME_SUFFIX);
+ AtlasEntityWithExtInfo associatedAsset = getAssociatedAsset(datasetQName, contract.getType().name());
+
+ authorizeContractCreateOrUpdate(entity, associatedAsset);
+
+ boolean contractSync = syncContractCertificateStatus(entity, contract);
+ if (contractSync) {
+ contractString = DataContract.serialize(contract);
+ entity.setAttribute(ATTR_CONTRACT, contractString);
+ }
+
+ AtlasEntity currentVersionEntity = getCurrentVersion(associatedAsset.getEntity().getGuid());
+ Long newVersionNumber = 1L;
+ if (currentVersionEntity != null) {
+ // Contract already exist
+ Long currentVersionNumber = (Long) currentVersionEntity.getAttribute(ATTR_CONTRACT_VERSION);
+ List attributes = getDiffAttributes(entity, currentVersionEntity);
+ if (attributes.isEmpty()) {
+ // No changes in the contract, Not creating new version
+ removeCreatingVertex(context, entity);
+ return;
+ } else if (isEqualContract(contractString, (String) currentVersionEntity.getAttribute(ATTR_CONTRACT))) {
+ resetAllRelationshipAttributes(entity);
+ // No change in contract, metadata changed
+ updateExistingVersion(context, entity, currentVersionEntity);
+ newVersionNumber = currentVersionNumber;
+ } else {
+ // contract changed (metadata might/not changed). Create new version.
+ newVersionNumber = currentVersionNumber + 1;
+
+ resetAllRelationshipAttributes(entity);
+ // Attach previous version via rel
+ entity.setRelationshipAttribute(REL_ATTR_PREVIOUS_VERSION, getAtlasObjectId(currentVersionEntity));
+ AtlasVertex vertex = AtlasGraphUtilsV2.findByGuid(currentVersionEntity.getGuid());
+ AtlasEntityType entityType = ensureEntityType(currentVersionEntity.getTypeName());
+ context.addUpdated(currentVersionEntity.getGuid(), currentVersionEntity, entityType, vertex);
+
+ }
+ }
+ entity.setAttribute(QUALIFIED_NAME, String.format("%s/V%s", contractQName, newVersionNumber));
+ entity.setAttribute(ATTR_CONTRACT_VERSION, newVersionNumber);
+ entity.setAttribute(ATTR_ASSET_GUID, associatedAsset.getEntity().getGuid());
+
+ datasetAttributeSync(context, associatedAsset.getEntity(), entity);
+
+ }
+
+ private List getDiffAttributes(AtlasEntity entity, AtlasEntity latestExistingVersion) throws AtlasBaseException {
+ AtlasEntityComparator.AtlasEntityDiffResult diffResult = entityComparator.getDiffResult(entity, latestExistingVersion, false);
+ List attributesSet = new ArrayList<>();
+
+ if (diffResult.hasDifference()) {
+ for (Map.Entry entry : diffResult.getDiffEntity().getAttributes().entrySet()) {
+ if (!entry.getKey().equals(QUALIFIED_NAME)) {
+ attributesSet.add(entry.getKey());
+ }
+ }
+ }
+ return attributesSet;
+ }
+
+ private boolean isEqualContract(String firstNode, String secondNode) throws AtlasBaseException {
+ ObjectMapper mapper = new ObjectMapper();
+ try {
+ JsonNode actualObj1 = mapper.readTree(firstNode);
+ JsonNode actualObj2 = mapper.readTree(secondNode);
+ //Ignore status field change
+ ((ObjectNode) actualObj1).remove(CONTRACT_ATTR_STATUS);
+ ((ObjectNode) actualObj2).remove(CONTRACT_ATTR_STATUS);
+
+ return actualObj1.equals(actualObj2);
+ } catch (JsonProcessingException e) {
+ throw new AtlasBaseException(JSON_ERROR, e.getMessage());
+ }
+
+ }
+
+ private void updateExistingVersion(EntityMutationContext context, AtlasEntity entity, AtlasEntity currentVersionEntity) throws AtlasBaseException {
+ removeCreatingVertex(context, entity);
+ entity.setAttribute(QUALIFIED_NAME, currentVersionEntity.getAttribute(QUALIFIED_NAME));
+ entity.setGuid(currentVersionEntity.getGuid());
+ AtlasVertex vertex = AtlasGraphUtilsV2.findByGuid(entity.getGuid());
+ AtlasEntityType entityType = ensureEntityType(entity.getTypeName());
+
+ context.addUpdated(entity.getGuid(), entity, entityType, vertex);
+
+ }
+
+ public AtlasEntity getCurrentVersion(String datasetGuid) throws AtlasBaseException {
+ IndexSearchParams indexSearchParams = new IndexSearchParams();
+ Map dsl = new HashMap<>();
+ int size = 1;
+
+ List