diff --git a/common/src/main/java/org/apache/atlas/repository/Constants.java b/common/src/main/java/org/apache/atlas/repository/Constants.java index accea8ec88..4ad2a285f8 100644 --- a/common/src/main/java/org/apache/atlas/repository/Constants.java +++ b/common/src/main/java/org/apache/atlas/repository/Constants.java @@ -159,6 +159,8 @@ public final class Constants { /** * SQL property keys. */ + + public static final String SQL_ENTITY_TYPE = "SQL"; public static final String CONNECTION_ENTITY_TYPE = "Connection"; public static final String QUERY_ENTITY_TYPE = "Query"; public static final String QUERY_FOLDER_ENTITY_TYPE = "Folder"; diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/preprocessor/contract/AbstractContractPreProcessor.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/preprocessor/contract/AbstractContractPreProcessor.java index 0a4521e34b..a89f4327de 100644 --- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/preprocessor/contract/AbstractContractPreProcessor.java +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/preprocessor/contract/AbstractContractPreProcessor.java @@ -4,27 +4,28 @@ import org.apache.atlas.authorize.AtlasAuthorizationUtils; import org.apache.atlas.authorize.AtlasEntityAccessRequest; import org.apache.atlas.authorize.AtlasPrivilege; +import org.apache.atlas.discovery.EntityDiscoveryService; import org.apache.atlas.exception.AtlasBaseException; import org.apache.atlas.model.TypeCategory; +import org.apache.atlas.model.discovery.AtlasSearchResult; +import org.apache.atlas.model.discovery.IndexSearchParams; import org.apache.atlas.model.instance.AtlasEntity; import org.apache.atlas.model.instance.AtlasEntityHeader; import org.apache.atlas.repository.graphdb.AtlasGraph; -import org.apache.atlas.repository.graphdb.AtlasVertex; -import org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2; import org.apache.atlas.repository.store.graph.v2.EntityGraphRetriever; import org.apache.atlas.repository.store.graph.v2.preprocessor.PreProcessor; import org.apache.atlas.type.AtlasEntityType; import org.apache.atlas.type.AtlasTypeRegistry; import org.apache.atlas.utils.AtlasPerfMetrics; +import org.apache.commons.collections.CollectionUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.HashMap; -import java.util.Map; +import java.util.*; -import static org.apache.atlas.AtlasErrorCode.INSTANCE_BY_UNIQUE_ATTRIBUTE_NOT_FOUND; import static org.apache.atlas.AtlasErrorCode.TYPE_NAME_INVALID; import static org.apache.atlas.repository.Constants.*; +import static org.apache.atlas.repository.util.AtlasEntityUtils.mapOf; public abstract class AbstractContractPreProcessor implements PreProcessor { private static final Logger LOG = LoggerFactory.getLogger(AbstractContractPreProcessor.class); @@ -32,19 +33,21 @@ public abstract class AbstractContractPreProcessor implements PreProcessor { public final AtlasTypeRegistry typeRegistry; public final EntityGraphRetriever entityRetriever; public final AtlasGraph graph; + private final EntityDiscoveryService discovery; AbstractContractPreProcessor(AtlasGraph graph, AtlasTypeRegistry typeRegistry, - EntityGraphRetriever entityRetriever) { + EntityGraphRetriever entityRetriever, EntityDiscoveryService discovery) { this.graph = graph; this.typeRegistry = typeRegistry; this.entityRetriever = entityRetriever; + this.discovery = discovery; } - void authorizeContractCreateOrUpdate(AtlasEntity contractEntity, AtlasEntity.AtlasEntityWithExtInfo associatedAsset) throws AtlasBaseException { + void authorizeContractCreateOrUpdate(AtlasEntity contractEntity, AtlasEntity associatedAsset) throws AtlasBaseException { AtlasPerfMetrics.MetricRecorder metricRecorder = RequestContext.get().startMetricRecord("authorizeContractUpdate"); try { - AtlasEntityHeader entityHeader = new AtlasEntityHeader(associatedAsset.getEntity()); + AtlasEntityHeader entityHeader = new AtlasEntityHeader(associatedAsset); //First authorize entity update access verifyAssetAccess(entityHeader, AtlasPrivilege.ENTITY_UPDATE, contractEntity, AtlasPrivilege.ENTITY_UPDATE); @@ -70,16 +73,39 @@ private void verifyAccess(AtlasEntityHeader entityHeader, AtlasPrivilege privile AtlasAuthorizationUtils.verifyAccess(new AtlasEntityAccessRequest(typeRegistry, privilege, entityHeader), errorMessage); } - AtlasEntity.AtlasEntityWithExtInfo getAssociatedAsset(String datasetQName, String typeName) throws AtlasBaseException { + public AtlasEntity getAssociatedAsset(String datasetQName, DataContract contract) throws AtlasBaseException { + IndexSearchParams indexSearchParams = new IndexSearchParams(); + Map dsl = new HashMap<>(); + int size = 2; + + List> mustClauseList = new ArrayList<>(); + mustClauseList.add(mapOf("term", mapOf(QUALIFIED_NAME, datasetQName))); + if (contract.getType() != null) { + mustClauseList.add(mapOf("term", mapOf("__typeName.keyword", contract.getType().name()))); + } else { + mustClauseList.add(mapOf("term", mapOf("__superTypeNames.keyword", SQL_ENTITY_TYPE))); + } + + dsl.put("query", mapOf("bool", mapOf("must", mustClauseList))); + dsl.put("sort", Collections.singletonList(mapOf(ATTR_CONTRACT_VERSION, mapOf("order", "desc")))); + dsl.put("size", size); - Map uniqAttributes = new HashMap<>(); - uniqAttributes.put(QUALIFIED_NAME, datasetQName); + indexSearchParams.setDsl(dsl); + indexSearchParams.setSuppressLogs(true); - AtlasEntityType entityType = ensureEntityType(typeName); + AtlasSearchResult result = discovery.directIndexSearch(indexSearchParams); + if (result == null || CollectionUtils.isEmpty(result.getEntities())) { + throw new AtlasBaseException("Dataset doesn't exist for given qualified name."); - AtlasVertex entityVertex = AtlasGraphUtilsV2.getVertexByUniqueAttributes(graph, entityType, uniqAttributes); + } else if (result.getEntities().size() >1 ) { + throw new AtlasBaseException("Multiple dataset exists for given qualified name. " + + "Please specify the `type` attribute in contract."); + } else { + AtlasEntityHeader datasetEntity = result.getEntities().get(0); + contract.setType(datasetEntity.getTypeName()); + return new AtlasEntity(datasetEntity); + } - return entityRetriever.toAtlasEntityWithExtInfo(entityVertex); } AtlasEntityType ensureEntityType(String typeName) throws AtlasBaseException { diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/preprocessor/contract/ContractPreProcessor.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/preprocessor/contract/ContractPreProcessor.java index 1407c3c2ef..84fbf124f0 100644 --- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/preprocessor/contract/ContractPreProcessor.java +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/preprocessor/contract/ContractPreProcessor.java @@ -6,7 +6,6 @@ import org.apache.atlas.model.discovery.AtlasSearchResult; import org.apache.atlas.model.discovery.IndexSearchParams; import org.apache.atlas.model.instance.AtlasEntity; -import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityWithExtInfo; import org.apache.atlas.model.instance.AtlasStruct; import org.apache.atlas.model.instance.EntityMutations; import org.apache.atlas.repository.graphdb.AtlasGraph; @@ -52,7 +51,7 @@ public ContractPreProcessor(AtlasGraph graph, AtlasTypeRegistry typeRegistry, EntityGraphRetriever entityRetriever, boolean storeDifferentialAudits, EntityDiscoveryService discovery) { - super(graph, typeRegistry, entityRetriever); + super(graph, typeRegistry, entityRetriever, discovery); this.storeDifferentialAudits = storeDifferentialAudits; this.discovery = discovery; this.entityComparator = new AtlasEntityComparator(typeRegistry, entityRetriever, null, true, true); @@ -109,8 +108,8 @@ private void processCreateContract(AtlasEntity entity, EntityMutationContext con String contractString = getContractString(entity); DataContract contract = DataContract.deserialize(contractString); String datasetQName = contractQName.substring(0, contractQName.lastIndexOf('/')); - contractQName = String.format("%s/%s/%s", datasetQName, contract.getType().name(), CONTRACT_QUALIFIED_NAME_SUFFIX); - AtlasEntityWithExtInfo associatedAsset = getAssociatedAsset(datasetQName, contract.getType().name()); + AtlasEntity associatedAsset = getAssociatedAsset(datasetQName, contract); + contractQName = String.format("%s/%s/%s", datasetQName, associatedAsset.getTypeName(), CONTRACT_QUALIFIED_NAME_SUFFIX); authorizeContractCreateOrUpdate(entity, associatedAsset); @@ -120,7 +119,7 @@ private void processCreateContract(AtlasEntity entity, EntityMutationContext con String contractStringJSON = DataContract.serializeJSON(contract); entity.setAttribute(ATTR_CONTRACT_JSON, contractStringJSON); - AtlasEntity currentVersionEntity = getCurrentVersion(associatedAsset.getEntity().getGuid()); + AtlasEntity currentVersionEntity = getCurrentVersion(associatedAsset.getGuid()); Long newVersionNumber = 1L; if (currentVersionEntity != null) { // Contract already exist @@ -150,9 +149,9 @@ private void processCreateContract(AtlasEntity entity, EntityMutationContext con } entity.setAttribute(QUALIFIED_NAME, String.format("%s/V%s", contractQName, newVersionNumber)); entity.setAttribute(ATTR_CONTRACT_VERSION, newVersionNumber); - entity.setAttribute(ATTR_ASSET_GUID, associatedAsset.getEntity().getGuid()); + entity.setAttribute(ATTR_ASSET_GUID, associatedAsset.getGuid()); - datasetAttributeSync(context, associatedAsset.getEntity(), entity); + datasetAttributeSync(context, associatedAsset, entity); } diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/preprocessor/contract/DataContract.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/preprocessor/contract/DataContract.java index 4ceea2853c..99da7a6882 100644 --- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/preprocessor/contract/DataContract.java +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/preprocessor/contract/DataContract.java @@ -44,7 +44,6 @@ public class DataContract { public String data_source; @Valid @NotNull public String dataset; - @Valid @NotNull public DatasetType type; public String description; public List owners; @@ -89,7 +88,7 @@ public static DatasetType from(String s) throws AtlasBaseException { case "materialisedview": return MaterialisedView; default: - throw new AtlasBaseException(String.format("dataset.type: %s value not supported yet.", s)); + throw new AtlasBaseException(String.format("type: %s value not supported yet.", s)); } } } @@ -144,7 +143,7 @@ public void setType(String type) throws AtlasBaseException { try { this.type = DatasetType.from(type); } catch (IllegalArgumentException | AtlasBaseException ex) { - throw new AtlasBaseException(AtlasErrorCode.INVALID_VALUE, "type " + type + " is inappropriate. Accepted values: " + Arrays.toString(DatasetType.values())); + throw new AtlasBaseException(AtlasErrorCode.INVALID_VALUE, "type: " + type + " is inappropriate. Accepted values: " + Arrays.toString(DatasetType.values())); } }