Skip to content

Commit

Permalink
Refactor with new attributes
Browse files Browse the repository at this point in the history
  • Loading branch information
bichitra95 committed Apr 22, 2024
1 parent 3e6f1b2 commit f69d6f6
Show file tree
Hide file tree
Showing 4 changed files with 82 additions and 178 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1839,7 +1839,7 @@ public PreProcessor getPreProcessor(String typeName) {
break;

case CONTRACT_ENTITY_TYPE:
preProcessor = new ContractPreProcessor(graph, typeRegistry, entityRetriever, this, entityGraphMapper, storeDifferentialAudits);
preProcessor = new ContractPreProcessor(graph, typeRegistry, entityRetriever, this, storeDifferentialAudits, discovery);
break;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,22 +55,6 @@ void authorizeContractCreateOrUpdate(AtlasEntity contractEntity, AtlasEntity.Atl
}


void authorizeContractDelete(AtlasVertex contractVertex, String typeName) throws AtlasBaseException {
AtlasPerfMetrics.MetricRecorder recorder = RequestContext.get().startMetricRecord("authorizeContractDelete");

try {
AtlasEntity contractEntity = entityRetriever.toAtlasEntity(contractVertex);
String contractQName = contractEntity.getAttribute(QUALIFIED_NAME).toString();
AtlasEntity.AtlasEntityWithExtInfo assetEntity = getAssociatedAsset(contractQName, typeName);
AtlasEntityHeader entityHeader = new AtlasEntityHeader(assetEntity.getEntity());

verifyAssetAccess(entityHeader, AtlasPrivilege.ENTITY_UPDATE, contractEntity, AtlasPrivilege.ENTITY_DELETE);
} finally {
RequestContext.get().endMetricRecord(recorder);
}
}


private void verifyAssetAccess(AtlasEntityHeader asset, AtlasPrivilege assetPrivilege,
AtlasEntity contract, AtlasPrivilege contractPrivilege) throws AtlasBaseException {
verifyAccess(asset, assetPrivilege);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import com.fasterxml.jackson.databind.node.ObjectNode;

import org.apache.atlas.RequestContext;
import org.apache.atlas.discovery.EntityDiscoveryService;
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityWithExtInfo;
Expand All @@ -26,33 +27,36 @@
import static org.apache.atlas.AtlasErrorCode.*;
import static org.apache.atlas.repository.Constants.ATTR_CERTIFICATE_STATUS;
import static org.apache.atlas.repository.Constants.QUALIFIED_NAME;
import static org.apache.atlas.type.AtlasTypeUtil.getAtlasObjectId;

public class ContractPreProcessor extends AbstractContractPreProcessor {
private static final Logger LOG = LoggerFactory.getLogger(ContractPreProcessor.class);
public static final String ATTR_CONTRACT = "contract";
public static final String ATTR_VERSION = "contractVersion";
public static final String ATTR_ASSET_QUALIFIED_NAME = "contractAssetQualifiedName";
public static final String ATTR_PARENT_GUID = "parentGuid";
public static final String ATTR_CONTRACT = "dataContractJson";
public static final String ATTR_VERSION = "dataContractVersion";
public static final String REL_ATTR_GOVERNED_ASSET = "dataContractGovernedAsset";
public static final String REL_ATTR_LATEST_CONTRACT = "dataContractLatest";
public static final String REL_ATTR_GOVERNED_ASSET_CERTIFIED = "dataContractGovernedAssetCertified";
public static final String REL_ATTR_PREVIOUS_VERSION = "dataContractPreviousVersion";
public static final String ASSET_ATTR_HAS_CONTRACT = "hasContract";
public static final String ASSET_ATTR_CONTRACT_VERSION_QUALIFIED_NAME = "latestContractQualifiedName";

public static final String ASSET_ATTR_DESCRIPTION = "description";

public static final String CONTRACT_QUALIFIED_NAME_SUFFIX = "contract";
public static final String VERSION_PREFIX = "version";
public static final String CONTRACT_ATTR_STATUS = "status";
private final AtlasEntityStore entityStore;
private final EntityGraphMapper entityGraphMapper;
private boolean storeDifferentialAudits;
private final boolean storeDifferentialAudits;
private EntityDiscoveryService discovery;



public ContractPreProcessor(AtlasGraph graph, AtlasTypeRegistry typeRegistry,
EntityGraphRetriever entityRetriever, AtlasEntityStore entityStore, EntityGraphMapper entityGraphMapper, boolean storeDifferentialAudits) {
EntityGraphRetriever entityRetriever, AtlasEntityStore entityStore,
boolean storeDifferentialAudits, EntityDiscoveryService discovery) {

super(graph, typeRegistry, entityRetriever);
this.storeDifferentialAudits = storeDifferentialAudits;
this.entityStore = entityStore;
this.entityGraphMapper = entityGraphMapper;
this.discovery = discovery;
}

@Override
Expand Down Expand Up @@ -107,50 +111,38 @@ private void processCreateContract(AtlasEntity entity, EntityMutationContext con
contractString = DataContract.serialize(contract);
entity.setAttribute(ATTR_CONTRACT, contractString);


ContractVersionUtils versionUtil = new ContractVersionUtils(entity, context, entityRetriever, typeRegistry, entityStore, graph);
AtlasEntity latestExistingVersion = versionUtil.getLatestVersion();

if (latestExistingVersion != null) {
ContractVersionUtils versionUtil = new ContractVersionUtils(entity, context, entityRetriever, typeRegistry, entityStore, graph, discovery);
AtlasEntity currentVersionEntity = versionUtil.getCurrentVersion();
String latestVersion = "V1";
if (currentVersionEntity != null) {
// Contract already exist
String qName = (String) latestExistingVersion.getAttribute(QUALIFIED_NAME);
Integer latestVersionNumber = Integer.valueOf(qName.substring(qName.lastIndexOf("/V") + 2));
List<String> attributes = getDiffAttributes(context, entity, latestExistingVersion);
String qName = (String) currentVersionEntity.getAttribute(QUALIFIED_NAME);
Integer currentVersionNumber = Integer.valueOf(qName.substring(qName.lastIndexOf("/V") + 2));
List<String> attributes = getDiffAttributes(context, entity, currentVersionEntity);
if (attributes.isEmpty()) {
// No changes in the contract, Not creating new version
removeCreatingVertex(context, entity);
return;
}

if (attributes.size() == 1 && attributes.contains(ATTR_CERTIFICATE_STATUS)) {
if (Objects.equals(entity.getAttribute(ATTR_CERTIFICATE_STATUS).toString(), DataContract.STATUS.VERIFIED.name())) {
//update existing entity
updateExistingVersion(context, entity, latestExistingVersion);
}
// Contract status changed, either to draft or verified

} else if (attributes.contains(ATTR_CONTRACT)) {
//Contract is changed
if (isEqualContract(contractString, (String) latestExistingVersion.getAttribute(ATTR_CONTRACT))) {
// Update the same asset(entity)
updateExistingVersion(context, entity, latestExistingVersion);
} else if (isEqualContract(contractString, (String) currentVersionEntity.getAttribute(ATTR_CONTRACT))) {
// No change in contract, metadata changed
updateExistingVersion(context, entity, currentVersionEntity);
return;
} else {
// contract changed (metadata might/not changed). Create new version.
latestVersion = String.format("V%s", currentVersionNumber + 1);

} else {
// Create New version of entity
entity.setAttribute(QUALIFIED_NAME, String.format("%s/%s/V%s", contractQName, VERSION_PREFIX, ++latestVersionNumber));
entity.setAttribute(ATTR_VERSION, String.format("V%s", latestVersionNumber));
entity.setAttribute(ATTR_ASSET_QUALIFIED_NAME, associatedAsset.getEntity().getAttribute(QUALIFIED_NAME));
entity.setAttribute(ATTR_PARENT_GUID, latestExistingVersion.getGuid());
// Attach previous version via rel
entity.setRelationshipAttribute(REL_ATTR_PREVIOUS_VERSION, getAtlasObjectId(currentVersionEntity));
AtlasVertex vertex = AtlasGraphUtilsV2.findByGuid(currentVersionEntity.getGuid());
AtlasEntityType entityType = ensureEntityType(currentVersionEntity.getTypeName());
context.addUpdated(currentVersionEntity.getGuid(), currentVersionEntity, entityType, vertex);

}
}

} else {
// Create new contract
entity.setAttribute(QUALIFIED_NAME, String.format("%s/%s/%s", contractQName, VERSION_PREFIX, "V1"));
entity.setAttribute(ATTR_VERSION, "V1");
entity.setAttribute(ATTR_ASSET_QUALIFIED_NAME, associatedAsset.getEntity().getAttribute(QUALIFIED_NAME));

}
entity.setAttribute(QUALIFIED_NAME, String.format("%s/%s/%s", contractQName, VERSION_PREFIX, latestVersion));
entity.setAttribute(ATTR_VERSION, latestVersion);
entity.setRelationshipAttribute(REL_ATTR_GOVERNED_ASSET, getAtlasObjectId(associatedAsset.getEntity()));

datasetAttributeSync(context, associatedAsset.getEntity(), contract, entity);

}
Expand Down Expand Up @@ -186,10 +178,10 @@ private boolean isEqualContract(String firstNode, String secondNode) throws Atla

}

private void updateExistingVersion(EntityMutationContext context, AtlasEntity entity, AtlasEntity latestExistingVersion) throws AtlasBaseException {
private void updateExistingVersion(EntityMutationContext context, AtlasEntity entity, AtlasEntity currentVersionEntity) throws AtlasBaseException {
removeCreatingVertex(context, entity);
entity.setAttribute(QUALIFIED_NAME, latestExistingVersion.getAttribute(QUALIFIED_NAME));
entity.setGuid(latestExistingVersion.getGuid());
entity.setAttribute(QUALIFIED_NAME, currentVersionEntity.getAttribute(QUALIFIED_NAME));
entity.setGuid(currentVersionEntity.getGuid());

AtlasVertex vertex = AtlasGraphUtilsV2.findByGuid(entity.getGuid());

Expand Down Expand Up @@ -246,35 +238,18 @@ private void datasetAttributeSync(EntityMutationContext context, AtlasEntity ass
if (associatedAsset.getAttribute(ASSET_ATTR_HAS_CONTRACT) == null || associatedAsset.getAttribute(ASSET_ATTR_HAS_CONTRACT).equals(false)) {
entity.setAttribute(ASSET_ATTR_HAS_CONTRACT, true);
}
entity.setAttribute(ASSET_ATTR_CONTRACT_VERSION_QUALIFIED_NAME, contractAsset.getAttribute(QUALIFIED_NAME));
if (contract.getStatus() == DataContract.STATUS.VERIFIED &&
contractAsset.getAttribute(ATTR_CERTIFICATE_STATUS).equals(DataContract.STATUS.VERIFIED.name())) {
// Will implement dataset attribute sync from the contract attributes
// if (!dataset.description.isEmpty()) {
// associatedAsset.setAttribute(ASSET_ATTR_DESCRIPTION, dataset.description);
// }
}
try {
AtlasVertex vertex = AtlasGraphUtilsV2.findByGuid(entity.getGuid());
AtlasEntityType entityType = ensureEntityType(entity.getTypeName());
AtlasEntityComparator entityComparator = new AtlasEntityComparator(typeRegistry, entityRetriever, context.getGuidAssignments(), true, true);
AtlasEntityComparator.AtlasEntityDiffResult diffResult = entityComparator.getDiffResult(entity, vertex, !storeDifferentialAudits);
RequestContext reqContext = RequestContext.get();

if (diffResult.hasDifference()) {
context.addUpdated(entity.getGuid(), entity, entityType, vertex);
if (storeDifferentialAudits) {
diffResult.getDiffEntity().setGuid(entity.getGuid());
reqContext.cacheDifferentialEntity(diffResult.getDiffEntity());
}
}
AtlasVertex vertex = AtlasGraphUtilsV2.findByGuid(entity.getGuid());
AtlasEntityType entityType = ensureEntityType(entity.getTypeName());
AtlasEntityComparator entityComparator = new AtlasEntityComparator(typeRegistry, entityRetriever, context.getGuidAssignments(), true, true);
AtlasEntityComparator.AtlasEntityDiffResult diffResult = entityComparator.getDiffResult(entity, vertex, !storeDifferentialAudits);
RequestContext reqContext = RequestContext.get();
context.addUpdated(entity.getGuid(), entity, entityType, vertex);

// RequestContext.get().setSkipAuthorizationCheck(true);
// EntityStream entityStream = new AtlasEntityStream(associatedAsset);
// entityStore.createOrUpdate(entityStream, false);
// LOG.info("Updated associated asset attributes of contract {}", associatedAsset.getAttribute(QUALIFIED_NAME));
} finally {
// RequestContext.get().setSkipAuthorizationCheck(false);
if (diffResult.hasDifference()) {
if (storeDifferentialAudits) {
diffResult.getDiffEntity().setGuid(entity.getGuid());
reqContext.cacheDifferentialEntity(diffResult.getDiffEntity());
}
}
}

Expand Down
Loading

0 comments on commit f69d6f6

Please sign in to comment.