Skip to content

Commit

Permalink
Add validation of data contract spec
Browse files Browse the repository at this point in the history
  • Loading branch information
bichitra95 committed Apr 9, 2024
1 parent d64e969 commit 6ee00aa
Show file tree
Hide file tree
Showing 3 changed files with 80 additions and 41 deletions.
6 changes: 6 additions & 0 deletions repository/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -322,6 +322,12 @@
<version>3.0.0-SNAPSHOT</version>
</dependency>

<dependency>
<groupId>org.hibernate</groupId>
<artifactId>hibernate-validator</artifactId>
<version>4.3.0.Final</version>
</dependency>

</dependencies>

<profiles>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package org.apache.atlas.repository.store.graph.v2.preprocessor.contract;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ObjectNode;

Expand All @@ -10,7 +9,6 @@
import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityWithExtInfo;
import org.apache.atlas.model.instance.AtlasStruct;
import org.apache.atlas.model.instance.EntityMutationResponse;
import org.apache.atlas.model.instance.EntityMutations;
import org.apache.atlas.repository.graphdb.AtlasGraph;
import org.apache.atlas.repository.graphdb.AtlasVertex;
Expand All @@ -35,7 +33,9 @@ public class ContractPreProcessor extends AbstractContractPreProcessor {
public static final String ATTR_VERSION = "contractVersion";
public static final String ATTR_ASSET_QUALIFIED_NAME = "contractAssetQualifiedName";
public static final String ATTR_PARENT_GUID = "parentGuid";
public static final String ATTR_HAS_CONTRACT = "hasContract";
public static final String ASSET_ATTR_HAS_CONTRACT = "hasContract";
public static final String ASSET_ATTR_DESCRIPTION = "description";

public static final String CONTRACT_QUALIFIED_NAME_SUFFIX = "contract";
public static final String VERSION_PREFIX = "version";
public static final String CONTRACT_ATTR_STATUS = "status";
Expand Down Expand Up @@ -69,7 +69,7 @@ private void processUpdateContract(AtlasEntity entity, EntityMutationContext con
String contractString = (String) entity.getAttribute(ATTR_CONTRACT);
AtlasVertex vertex = context.getVertex(entity.getGuid());
AtlasEntity existingContractEntity = entityRetriever.toAtlasEntity(vertex);

// TODO: Check for qualifiedName to understand if a particular version is getting updated or duplicate contract in payload
if (!isEqualContract(contractString, (String) existingContractEntity.getAttribute(ATTR_CONTRACT))) {
// Update the same asset(entity)
throw new AtlasBaseException(OPERATION_NOT_SUPPORTED, "Can't update a specific version of contract");
Expand Down Expand Up @@ -148,7 +148,7 @@ private void processCreateContract(AtlasEntity entity, EntityMutationContext con
entity.setAttribute(ATTR_ASSET_QUALIFIED_NAME, associatedAsset.getEntity().getAttribute(QUALIFIED_NAME));

}
datasetAttributeSync(associatedAsset.getEntity(), contract, entity);
datasetAttributeSync(context, associatedAsset.getEntity(), contract, entity);

}

Expand Down Expand Up @@ -209,7 +209,7 @@ private void removeCreatingVertex(EntityMutationContext context, AtlasEntity ent

}

private void contractAttributeSync(AtlasEntity entity, DataContract contract) {
private void contractAttributeSync(AtlasEntity entity, DataContract contract) throws AtlasBaseException {
// Sync certificateStatus
if (!Objects.equals(entity.getAttribute(ATTR_CERTIFICATE_STATUS), contract.getStatus().name())) {
/*
Expand All @@ -223,32 +223,38 @@ private void contractAttributeSync(AtlasEntity entity, DataContract contract) {
*/
if (Objects.equals(entity.getAttribute(ATTR_CERTIFICATE_STATUS), DataContract.STATUS.VERIFIED.name())) {
contract.setStatus(DataContract.STATUS.VERIFIED);
contract.setStatus(String.valueOf(DataContract.STATUS.VERIFIED));
} else if (Objects.equals(contract.getStatus(), DataContract.STATUS.VERIFIED)) {
entity.setAttribute(ATTR_CERTIFICATE_STATUS, DataContract.STATUS.VERIFIED.name());
} else {
entity.setAttribute(ATTR_CERTIFICATE_STATUS, DataContract.STATUS.DRAFT);
contract.setStatus(DataContract.STATUS.DRAFT);
contract.setStatus(String.valueOf(DataContract.STATUS.DRAFT));
}

}

}

private void datasetAttributeSync(AtlasEntity associatedAsset, DataContract contract, AtlasEntity contractAsset) throws AtlasBaseException {
associatedAsset.setAttribute(ATTR_HAS_CONTRACT, true);
private void datasetAttributeSync(EntityMutationContext context, AtlasEntity associatedAsset, DataContract contract, AtlasEntity contractAsset) throws AtlasBaseException {
associatedAsset.setAttribute(ASSET_ATTR_HAS_CONTRACT, true);
if (contract.getStatus() == DataContract.STATUS.VERIFIED &&
contractAsset.getAttribute(ATTR_CERTIFICATE_STATUS).equals(DataContract.STATUS.VERIFIED.name())) {
DataContract.Dataset dataset = contract.dataset;
// Will implement dataset attribute sync from the contract attributes
if (!dataset.description.isEmpty()) {
associatedAsset.setAttribute(ASSET_ATTR_DESCRIPTION, dataset.description);
}
}
try {
RequestContext.get().setSkipAuthorizationCheck(true);
EntityStream entityStream = new AtlasEntityStream(associatedAsset);
entityStore.createOrUpdate(entityStream, false);
LOG.info("Updated associated asset attributes of contract {}", associatedAsset.getAttribute(QUALIFIED_NAME));
AtlasVertex vertex = AtlasGraphUtilsV2.findByGuid(associatedAsset.getGuid());
AtlasEntityType entityType = ensureEntityType(associatedAsset.getTypeName());
context.addUpdated(associatedAsset.getGuid(), associatedAsset, entityType, vertex);
// RequestContext.get().setSkipAuthorizationCheck(true);
// EntityStream entityStream = new AtlasEntityStream(associatedAsset);
// entityStore.createOrUpdate(entityStream, false);
// LOG.info("Updated associated asset attributes of contract {}", associatedAsset.getAttribute(QUALIFIED_NAME));
} finally {
RequestContext.get().setSkipAuthorizationCheck(false);
// RequestContext.get().setSkipAuthorizationCheck(false);
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
package org.apache.atlas.repository.store.graph.v2.preprocessor.contract;

import java.lang.String;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.*;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

Expand All @@ -15,28 +13,39 @@
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.commons.lang.StringUtils;

import static org.apache.atlas.AtlasErrorCode.BAD_REQUEST;
import static org.apache.atlas.AtlasErrorCode.JSON_ERROR;
import javax.validation.*;
import javax.validation.constraints.NotNull;
import java.util.Set;

import static org.apache.atlas.AtlasErrorCode.*;


@JsonIgnoreProperties(ignoreUnknown = true)
@JsonPropertyOrder({"kind", "status", "template_version", "dataset", "columns"})
public class DataContract {
@JsonProperty(required = true)
public String kind;
public STATUS status;
@Valid @NotNull
public String kind;

public STATUS status;

@JsonProperty(value = "template_version", defaultValue = "0.0.1")
public String templateVersion;
public Dataset dataset;
public List<Column> columns;
private Map<String, Object> unknownFields = new HashMap<>();
public String templateVersion;
@Valid @NotNull
public Dataset dataset;
@Valid
public List<Column> columns;
private Map<String, Object> unknownFields = new HashMap<>();
public STATUS getStatus() {
return status;
}

@JsonSetter("status")
public void setStatus(STATUS status) {
this.status = status;
public void setStatus(String status) throws AtlasBaseException {
try {
this.status = STATUS.from(status);
} catch (IllegalArgumentException ex) {
throw new AtlasBaseException(AtlasErrorCode.INVALID_VALUE, "status " + status + " is inappropriate. Accepted values: " + Arrays.toString(STATUS.values()));
}
}

@JsonAnySetter
Expand All @@ -57,7 +66,6 @@ public static STATUS from(String s) {
if(StringUtils.isEmpty(s)) {
return DRAFT;
}

switch (s.toLowerCase()) {
case "draft":
return DRAFT;
Expand All @@ -79,9 +87,9 @@ public void setKind(String kind) throws AtlasBaseException {
this.kind = kind;
}

public void setTemplateVersion(String templateVersion) {
public void setTemplateVersion(String templateVersion) throws AtlasBaseException {
if (!isSemVer(templateVersion)) {
throw new IllegalArgumentException("Invalid version syntax");
throw new AtlasBaseException(AtlasErrorCode.INVALID_VALUE, "Invalid template_version syntax");
}
this.templateVersion = templateVersion;
}
Expand All @@ -95,8 +103,9 @@ private boolean isSemVer(String version) {
@JsonIgnoreProperties(ignoreUnknown = true)
@JsonPropertyOrder({"name", "type", "description"})
public static final class Dataset {
@NotNull
public String name;
@JsonProperty(required = true)
@NotNull
public DATASET_TYPE type;
public String description;
private Map<String, Object> unknownFields = new HashMap<>();
Expand All @@ -112,8 +121,12 @@ public Map<String, Object> getUnknownFields() {
}

@JsonSetter("type")
public void setType(DATASET_TYPE type) {
this.type = type;
public void setType(String type) throws AtlasBaseException {
try {
this.type = DATASET_TYPE.from(type);
} catch (IllegalArgumentException | AtlasBaseException ex) {
throw new AtlasBaseException(AtlasErrorCode.INVALID_VALUE, "dataset.type " + type + " is inappropriate. Accepted values: " + Arrays.toString(DATASET_TYPE.values()));
}
}

public enum DATASET_TYPE {
Expand All @@ -131,7 +144,7 @@ public static DATASET_TYPE from(String s) throws AtlasBaseException {
case "materialisedview":
return MaterialisedView;
default:
throw new AtlasBaseException("dataset.type value not supported yet.");
throw new AtlasBaseException(String.format("dataset.type: %s value not supported yet.", s));
}
}
}
Expand All @@ -142,6 +155,7 @@ public static DATASET_TYPE from(String s) throws AtlasBaseException {
@JsonIgnoreProperties(ignoreUnknown = true)
@JsonPropertyOrder({"name", "description", "data_type"})
public static final class Column {
@NotNull
public String name;

public String description;
Expand All @@ -160,8 +174,6 @@ public Map<String, Object> getUnknownFields() {
return unknownFields;
}



}

public static DataContract deserialize(String contractString) throws AtlasBaseException {
Expand All @@ -175,11 +187,26 @@ public static DataContract deserialize(String contractString) throws AtlasBaseEx
DataContract contract;
try {
contract = objectMapper.readValue(contractString, DataContract.class);
} catch (Exception ex) {
ex.printStackTrace();
throw new AtlasBaseException("Failed at this");
} catch (JsonProcessingException ex) {
throw new AtlasBaseException(ex.getOriginalMessage());
}
contract.validate();
return contract;

}

public void validate() throws AtlasBaseException {
Validator validator = Validation.buildDefaultValidatorFactory().getValidator();
Set<ConstraintViolation<DataContract>> violations = validator.validate(this);
if (!violations.isEmpty()) {
List<String> errorMessageList = new ArrayList<>();
for (ConstraintViolation<DataContract> violation : violations) {
errorMessageList.add(String.format("Field: %s -> %s", violation.getPropertyPath(), violation.getMessage()));
System.out.println(violation.getMessage());
}
throw new AtlasBaseException(StringUtils.join(errorMessageList, "; "));
}

}

public static String serialize(DataContract contract) throws AtlasBaseException {
Expand Down

0 comments on commit 6ee00aa

Please sign in to comment.