From f69d6f688295666c83fc2afdf73b48ae3f1fd41c Mon Sep 17 00:00:00 2001 From: Bichitra Kumar Sahoo <32828151+bichitra95@users.noreply.github.com> Date: Mon, 22 Apr 2024 17:41:44 +0530 Subject: [PATCH] Refactor with new attributes --- .../store/graph/v2/AtlasEntityStoreV2.java | 2 +- .../AbstractContractPreProcessor.java | 16 --- .../contract/ContractPreProcessor.java | 129 +++++++----------- .../contract/ContractVersionUtils.java | 113 ++++----------- 4 files changed, 82 insertions(+), 178 deletions(-) diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityStoreV2.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityStoreV2.java index ed24f329c6..8e11aaeff1 100644 --- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityStoreV2.java +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityStoreV2.java @@ -1839,7 +1839,7 @@ public PreProcessor getPreProcessor(String typeName) { break; case CONTRACT_ENTITY_TYPE: - preProcessor = new ContractPreProcessor(graph, typeRegistry, entityRetriever, this, entityGraphMapper, storeDifferentialAudits); + preProcessor = new ContractPreProcessor(graph, typeRegistry, entityRetriever, this, storeDifferentialAudits, discovery); break; } diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/preprocessor/contract/AbstractContractPreProcessor.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/preprocessor/contract/AbstractContractPreProcessor.java index dd2e558156..4ef8d0ee6d 100644 --- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/preprocessor/contract/AbstractContractPreProcessor.java +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/preprocessor/contract/AbstractContractPreProcessor.java @@ -55,22 +55,6 @@ void authorizeContractCreateOrUpdate(AtlasEntity contractEntity, AtlasEntity.Atl } - void authorizeContractDelete(AtlasVertex contractVertex, String typeName) throws AtlasBaseException { - AtlasPerfMetrics.MetricRecorder recorder = RequestContext.get().startMetricRecord("authorizeContractDelete"); - - try { - AtlasEntity contractEntity = entityRetriever.toAtlasEntity(contractVertex); - String contractQName = contractEntity.getAttribute(QUALIFIED_NAME).toString(); - AtlasEntity.AtlasEntityWithExtInfo assetEntity = getAssociatedAsset(contractQName, typeName); - AtlasEntityHeader entityHeader = new AtlasEntityHeader(assetEntity.getEntity()); - - verifyAssetAccess(entityHeader, AtlasPrivilege.ENTITY_UPDATE, contractEntity, AtlasPrivilege.ENTITY_DELETE); - } finally { - RequestContext.get().endMetricRecord(recorder); - } - } - - private void verifyAssetAccess(AtlasEntityHeader asset, AtlasPrivilege assetPrivilege, AtlasEntity contract, AtlasPrivilege contractPrivilege) throws AtlasBaseException { verifyAccess(asset, assetPrivilege); diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/preprocessor/contract/ContractPreProcessor.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/preprocessor/contract/ContractPreProcessor.java index e7c165a328..173f066753 100644 --- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/preprocessor/contract/ContractPreProcessor.java +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/preprocessor/contract/ContractPreProcessor.java @@ -5,6 +5,7 @@ import com.fasterxml.jackson.databind.node.ObjectNode; import org.apache.atlas.RequestContext; +import org.apache.atlas.discovery.EntityDiscoveryService; import org.apache.atlas.exception.AtlasBaseException; import org.apache.atlas.model.instance.AtlasEntity; import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityWithExtInfo; @@ -26,33 +27,36 @@ import static org.apache.atlas.AtlasErrorCode.*; import static org.apache.atlas.repository.Constants.ATTR_CERTIFICATE_STATUS; import static org.apache.atlas.repository.Constants.QUALIFIED_NAME; +import static org.apache.atlas.type.AtlasTypeUtil.getAtlasObjectId; public class ContractPreProcessor extends AbstractContractPreProcessor { private static final Logger LOG = LoggerFactory.getLogger(ContractPreProcessor.class); - public static final String ATTR_CONTRACT = "contract"; - public static final String ATTR_VERSION = "contractVersion"; - public static final String ATTR_ASSET_QUALIFIED_NAME = "contractAssetQualifiedName"; - public static final String ATTR_PARENT_GUID = "parentGuid"; + public static final String ATTR_CONTRACT = "dataContractJson"; + public static final String ATTR_VERSION = "dataContractVersion"; + public static final String REL_ATTR_GOVERNED_ASSET = "dataContractGovernedAsset"; + public static final String REL_ATTR_LATEST_CONTRACT = "dataContractLatest"; + public static final String REL_ATTR_GOVERNED_ASSET_CERTIFIED = "dataContractGovernedAssetCertified"; + public static final String REL_ATTR_PREVIOUS_VERSION = "dataContractPreviousVersion"; public static final String ASSET_ATTR_HAS_CONTRACT = "hasContract"; - public static final String ASSET_ATTR_CONTRACT_VERSION_QUALIFIED_NAME = "latestContractQualifiedName"; - public static final String ASSET_ATTR_DESCRIPTION = "description"; public static final String CONTRACT_QUALIFIED_NAME_SUFFIX = "contract"; public static final String VERSION_PREFIX = "version"; public static final String CONTRACT_ATTR_STATUS = "status"; private final AtlasEntityStore entityStore; - private final EntityGraphMapper entityGraphMapper; - private boolean storeDifferentialAudits; + private final boolean storeDifferentialAudits; + private EntityDiscoveryService discovery; + public ContractPreProcessor(AtlasGraph graph, AtlasTypeRegistry typeRegistry, - EntityGraphRetriever entityRetriever, AtlasEntityStore entityStore, EntityGraphMapper entityGraphMapper, boolean storeDifferentialAudits) { + EntityGraphRetriever entityRetriever, AtlasEntityStore entityStore, + boolean storeDifferentialAudits, EntityDiscoveryService discovery) { super(graph, typeRegistry, entityRetriever); this.storeDifferentialAudits = storeDifferentialAudits; this.entityStore = entityStore; - this.entityGraphMapper = entityGraphMapper; + this.discovery = discovery; } @Override @@ -107,50 +111,38 @@ private void processCreateContract(AtlasEntity entity, EntityMutationContext con contractString = DataContract.serialize(contract); entity.setAttribute(ATTR_CONTRACT, contractString); - - ContractVersionUtils versionUtil = new ContractVersionUtils(entity, context, entityRetriever, typeRegistry, entityStore, graph); - AtlasEntity latestExistingVersion = versionUtil.getLatestVersion(); - - if (latestExistingVersion != null) { + ContractVersionUtils versionUtil = new ContractVersionUtils(entity, context, entityRetriever, typeRegistry, entityStore, graph, discovery); + AtlasEntity currentVersionEntity = versionUtil.getCurrentVersion(); + String latestVersion = "V1"; + if (currentVersionEntity != null) { // Contract already exist - String qName = (String) latestExistingVersion.getAttribute(QUALIFIED_NAME); - Integer latestVersionNumber = Integer.valueOf(qName.substring(qName.lastIndexOf("/V") + 2)); - List attributes = getDiffAttributes(context, entity, latestExistingVersion); + String qName = (String) currentVersionEntity.getAttribute(QUALIFIED_NAME); + Integer currentVersionNumber = Integer.valueOf(qName.substring(qName.lastIndexOf("/V") + 2)); + List attributes = getDiffAttributes(context, entity, currentVersionEntity); if (attributes.isEmpty()) { + // No changes in the contract, Not creating new version removeCreatingVertex(context, entity); return; - } - - if (attributes.size() == 1 && attributes.contains(ATTR_CERTIFICATE_STATUS)) { - if (Objects.equals(entity.getAttribute(ATTR_CERTIFICATE_STATUS).toString(), DataContract.STATUS.VERIFIED.name())) { - //update existing entity - updateExistingVersion(context, entity, latestExistingVersion); - } - // Contract status changed, either to draft or verified - - } else if (attributes.contains(ATTR_CONTRACT)) { - //Contract is changed - if (isEqualContract(contractString, (String) latestExistingVersion.getAttribute(ATTR_CONTRACT))) { - // Update the same asset(entity) - updateExistingVersion(context, entity, latestExistingVersion); + } else if (isEqualContract(contractString, (String) currentVersionEntity.getAttribute(ATTR_CONTRACT))) { + // No change in contract, metadata changed + updateExistingVersion(context, entity, currentVersionEntity); + return; + } else { + // contract changed (metadata might/not changed). Create new version. + latestVersion = String.format("V%s", currentVersionNumber + 1); - } else { - // Create New version of entity - entity.setAttribute(QUALIFIED_NAME, String.format("%s/%s/V%s", contractQName, VERSION_PREFIX, ++latestVersionNumber)); - entity.setAttribute(ATTR_VERSION, String.format("V%s", latestVersionNumber)); - entity.setAttribute(ATTR_ASSET_QUALIFIED_NAME, associatedAsset.getEntity().getAttribute(QUALIFIED_NAME)); - entity.setAttribute(ATTR_PARENT_GUID, latestExistingVersion.getGuid()); + // Attach previous version via rel + entity.setRelationshipAttribute(REL_ATTR_PREVIOUS_VERSION, getAtlasObjectId(currentVersionEntity)); + AtlasVertex vertex = AtlasGraphUtilsV2.findByGuid(currentVersionEntity.getGuid()); + AtlasEntityType entityType = ensureEntityType(currentVersionEntity.getTypeName()); + context.addUpdated(currentVersionEntity.getGuid(), currentVersionEntity, entityType, vertex); - } } - - } else { - // Create new contract - entity.setAttribute(QUALIFIED_NAME, String.format("%s/%s/%s", contractQName, VERSION_PREFIX, "V1")); - entity.setAttribute(ATTR_VERSION, "V1"); - entity.setAttribute(ATTR_ASSET_QUALIFIED_NAME, associatedAsset.getEntity().getAttribute(QUALIFIED_NAME)); - } + entity.setAttribute(QUALIFIED_NAME, String.format("%s/%s/%s", contractQName, VERSION_PREFIX, latestVersion)); + entity.setAttribute(ATTR_VERSION, latestVersion); + entity.setRelationshipAttribute(REL_ATTR_GOVERNED_ASSET, getAtlasObjectId(associatedAsset.getEntity())); + datasetAttributeSync(context, associatedAsset.getEntity(), contract, entity); } @@ -186,10 +178,10 @@ private boolean isEqualContract(String firstNode, String secondNode) throws Atla } - private void updateExistingVersion(EntityMutationContext context, AtlasEntity entity, AtlasEntity latestExistingVersion) throws AtlasBaseException { + private void updateExistingVersion(EntityMutationContext context, AtlasEntity entity, AtlasEntity currentVersionEntity) throws AtlasBaseException { removeCreatingVertex(context, entity); - entity.setAttribute(QUALIFIED_NAME, latestExistingVersion.getAttribute(QUALIFIED_NAME)); - entity.setGuid(latestExistingVersion.getGuid()); + entity.setAttribute(QUALIFIED_NAME, currentVersionEntity.getAttribute(QUALIFIED_NAME)); + entity.setGuid(currentVersionEntity.getGuid()); AtlasVertex vertex = AtlasGraphUtilsV2.findByGuid(entity.getGuid()); @@ -246,35 +238,18 @@ private void datasetAttributeSync(EntityMutationContext context, AtlasEntity ass if (associatedAsset.getAttribute(ASSET_ATTR_HAS_CONTRACT) == null || associatedAsset.getAttribute(ASSET_ATTR_HAS_CONTRACT).equals(false)) { entity.setAttribute(ASSET_ATTR_HAS_CONTRACT, true); } - entity.setAttribute(ASSET_ATTR_CONTRACT_VERSION_QUALIFIED_NAME, contractAsset.getAttribute(QUALIFIED_NAME)); - if (contract.getStatus() == DataContract.STATUS.VERIFIED && - contractAsset.getAttribute(ATTR_CERTIFICATE_STATUS).equals(DataContract.STATUS.VERIFIED.name())) { - // Will implement dataset attribute sync from the contract attributes - // if (!dataset.description.isEmpty()) { - // associatedAsset.setAttribute(ASSET_ATTR_DESCRIPTION, dataset.description); - // } - } - try { - AtlasVertex vertex = AtlasGraphUtilsV2.findByGuid(entity.getGuid()); - AtlasEntityType entityType = ensureEntityType(entity.getTypeName()); - AtlasEntityComparator entityComparator = new AtlasEntityComparator(typeRegistry, entityRetriever, context.getGuidAssignments(), true, true); - AtlasEntityComparator.AtlasEntityDiffResult diffResult = entityComparator.getDiffResult(entity, vertex, !storeDifferentialAudits); - RequestContext reqContext = RequestContext.get(); - - if (diffResult.hasDifference()) { - context.addUpdated(entity.getGuid(), entity, entityType, vertex); - if (storeDifferentialAudits) { - diffResult.getDiffEntity().setGuid(entity.getGuid()); - reqContext.cacheDifferentialEntity(diffResult.getDiffEntity()); - } - } + AtlasVertex vertex = AtlasGraphUtilsV2.findByGuid(entity.getGuid()); + AtlasEntityType entityType = ensureEntityType(entity.getTypeName()); + AtlasEntityComparator entityComparator = new AtlasEntityComparator(typeRegistry, entityRetriever, context.getGuidAssignments(), true, true); + AtlasEntityComparator.AtlasEntityDiffResult diffResult = entityComparator.getDiffResult(entity, vertex, !storeDifferentialAudits); + RequestContext reqContext = RequestContext.get(); + context.addUpdated(entity.getGuid(), entity, entityType, vertex); -// RequestContext.get().setSkipAuthorizationCheck(true); -// EntityStream entityStream = new AtlasEntityStream(associatedAsset); -// entityStore.createOrUpdate(entityStream, false); -// LOG.info("Updated associated asset attributes of contract {}", associatedAsset.getAttribute(QUALIFIED_NAME)); - } finally { -// RequestContext.get().setSkipAuthorizationCheck(false); + if (diffResult.hasDifference()) { + if (storeDifferentialAudits) { + diffResult.getDiffEntity().setGuid(entity.getGuid()); + reqContext.cacheDifferentialEntity(diffResult.getDiffEntity()); + } } } diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/preprocessor/contract/ContractVersionUtils.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/preprocessor/contract/ContractVersionUtils.java index 8d27a10bc1..3f93d305e9 100644 --- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/preprocessor/contract/ContractVersionUtils.java +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/preprocessor/contract/ContractVersionUtils.java @@ -1,29 +1,23 @@ package org.apache.atlas.repository.store.graph.v2.preprocessor.contract; -import org.apache.atlas.AtlasErrorCode; -import org.apache.atlas.RequestContext; +import org.apache.atlas.discovery.EntityDiscoveryService; import org.apache.atlas.exception.AtlasBaseException; +import org.apache.atlas.model.discovery.AtlasSearchResult; +import org.apache.atlas.model.discovery.IndexSearchParams; import org.apache.atlas.model.instance.AtlasEntity; +import org.apache.atlas.model.instance.AtlasEntityHeader; import org.apache.atlas.repository.graphdb.AtlasGraph; -import org.apache.atlas.repository.graphdb.AtlasGraphQuery; -import org.apache.atlas.repository.graphdb.AtlasVertex; import org.apache.atlas.repository.store.graph.AtlasEntityStore; import org.apache.atlas.repository.store.graph.v2.*; -import org.apache.atlas.repository.store.graph.v2.preprocessor.ConnectionPreProcessor; -import org.apache.atlas.type.AtlasEntityType; import org.apache.atlas.type.AtlasTypeRegistry; -import org.apache.atlas.utils.AtlasPerfMetrics; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import java.util.*; import static org.apache.atlas.repository.Constants.*; -import static org.apache.atlas.repository.graph.AtlasGraphProvider.getGraphInstance; +import static org.apache.atlas.repository.util.AtlasEntityUtils.mapOf; public class ContractVersionUtils { - private static final Logger LOG = LoggerFactory.getLogger(ConnectionPreProcessor.class); private final EntityMutationContext context; public final EntityGraphRetriever entityRetriever; @@ -33,58 +27,46 @@ public class ContractVersionUtils { private AtlasEntity entity; public final AtlasGraph graph; - private List versionList; - - + private List versionList; + private EntityDiscoveryService discovery; public ContractVersionUtils(AtlasEntity entity, EntityMutationContext context, EntityGraphRetriever entityRetriever, - AtlasTypeRegistry atlasTypeRegistry, AtlasEntityStore entityStore, AtlasGraph graph) { + AtlasTypeRegistry atlasTypeRegistry, AtlasEntityStore entityStore, AtlasGraph graph, + EntityDiscoveryService discovery) { this.context = context; this.entityRetriever = entityRetriever; this.atlasTypeRegistry = atlasTypeRegistry; this.graph = graph; this.entityStore = entityStore; this.entity = entity; + this.discovery = discovery; } - private void extractAllVersions() { + private void extractAllVersions() throws AtlasBaseException { String contractQName = (String) entity.getAttribute(QUALIFIED_NAME); String datasetQName = contractQName.substring(0, contractQName.lastIndexOf("/contract")); + List ret = new ArrayList<>(); - AtlasEntityType entityType = atlasTypeRegistry.getEntityTypeByName("DataContract"); - Integer versionCounter = 1; - boolean found = true; - - List versionList = new ArrayList<>(); - - while (found) { - Map uniqAttributes = new HashMap<>(); - uniqAttributes.put(QUALIFIED_NAME, String.format("%s/contract/version/V%s", datasetQName, versionCounter++)); - try { - AtlasVertex entityVertex = AtlasGraphUtilsV2.getVertexByUniqueAttributes(graph, entityType, uniqAttributes); - AtlasEntity entity = entityRetriever.toAtlasEntity(entityVertex); -// -// EntityGraphRetriever entityRetriever = new EntityGraphRetriever(graph, typeRegistry, true); -// -// AtlasEntity.AtlasEntityWithExtInfo ret = entityRetriever.toAtlasEntityWithExtInfo(entityVertex); -// -// if (ret == null) { -// throw new AtlasBaseException(AtlasErrorCode.INSTANCE_BY_UNIQUE_ATTRIBUTE_NOT_FOUND, entityType.getTypeName(), -// uniqAttributes.toString()); -// } -// return ret; - - versionList.add(entity); - } catch (AtlasBaseException ex) { - found = false; - } + IndexSearchParams indexSearchParams = new IndexSearchParams(); + Map dsl = new HashMap<>(); - } - this.versionList = versionList; + List mustClauseList = new ArrayList(); + mustClauseList.add(mapOf("term", mapOf("__typeName.keyword", CONTRACT_ENTITY_TYPE))); + mustClauseList.add(mapOf("wildcard", mapOf(QUALIFIED_NAME, String.format("%s/contract/version/*", datasetQName)))); + + dsl.put("query", mapOf("bool", mapOf("must", mustClauseList))); + indexSearchParams.setDsl(dsl); + indexSearchParams.setSuppressLogs(true); + + AtlasSearchResult result = discovery.directIndexSearch(indexSearchParams); + if (result != null) { + ret = result.getEntities(); + } + this.versionList = ret; } - public AtlasEntity getLatestVersion() throws AtlasBaseException { + public AtlasEntity getCurrentVersion() throws AtlasBaseException { if (this.versionList == null) { extractAllVersions(); } @@ -97,43 +79,6 @@ public AtlasEntity getLatestVersion() throws AtlasBaseException { if (this.versionList.isEmpty()) { return null; } - return this.versionList.get(0); - } - - - public Iterator getAllEntityVersions() { - String entityQNamePrefix = (String) entity.getAttribute(QUALIFIED_NAME); - AtlasEntityType entityType = atlasTypeRegistry.getEntityTypeByName("DataContract"); - -// AtlasEntityType entityType, String name - AtlasPerfMetrics.MetricRecorder metric = RequestContext.get().startMetricRecord("getAllEntityVersions"); - AtlasGraph graph = getGraphInstance(); - AtlasGraphQuery query = graph.query() - .has(ENTITY_TYPE_PROPERTY_KEY, entityType.getTypeName()) - .has(STATE_PROPERTY_KEY, AtlasEntity.Status.ACTIVE.name()) - .has(entityType.getAllAttributes().get(QUALIFIED_NAME).getQualifiedName(), String.format("%s/version/V1", entityQNamePrefix)); - - - Iterator result = query.vertices().iterator(); - - RequestContext.get().endMetricRecord(metric); - return result; + return new AtlasEntity(this.versionList.get(0)); } - -// public void createNewVersion() throws AtlasBaseException { -// AtlasVertex vertex = context.getVertex(entity.getGuid()); -// AtlasEntity existingContractEntity = entityRetriever.toAtlasEntity(vertex); -//// this.newEntity = new AtlasEntity(existingEntity); -// this.existingEntity.setAttribute(QUALIFIED_NAME, null); -// -// try { -// RequestContext.get().setSkipAuthorizationCheck(true); -// EntityStream entityStream = new AtlasEntityStream(existingEntity); -// entityStore.createOrUpdate(entityStream, false); -// LOG.info("Created bootstrap policies for connection {}", existingEntity.getAttribute(QUALIFIED_NAME)); -// } finally { -// RequestContext.get().setSkipAuthorizationCheck(false); -// } -// -// } }