Skip to content

Commit

Permalink
Merge pull request #3300 from atlanhq/staging-master-helper
Browse files Browse the repository at this point in the history
Syncing Staging with Master
  • Loading branch information
hr2904 authored Jul 2, 2024
2 parents 7dc8c7a + 35c99c3 commit c02077f
Show file tree
Hide file tree
Showing 13 changed files with 259 additions and 43 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -431,7 +431,8 @@ public enum SupportedFileExtensions { XLSX, XLS, CSV }
public static final String ATTR_ASSET_STARRED_BY = "assetStarredBy";
public static final String ATTR_ASSET_STARRED_AT = "assetStarredAt";
public static final String ATTR_CERTIFICATE_STATUS = "certificateStatus";
public static final String ATTR_CONTRACT = "dataContractJson";
public static final String ATTR_CONTRACT = "dataContractSpec";
public static final String ATTR_CONTRACT_JSON = "dataContractJson";
public static final String STRUCT_STARRED_DETAILS = "StarredDetails";

public static final String KEYCLOAK_ROLE_ADMIN = "$admin";
Expand Down
5 changes: 5 additions & 0 deletions repository/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -327,6 +327,11 @@
<artifactId>hibernate-validator</artifactId>
<version>4.3.2.Final</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.dataformat</groupId>
<artifactId>jackson-dataformat-yaml</artifactId>
<version>2.12.7</version>
</dependency>

</dependencies>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.atlas.model.typedef.AtlasTypesDef;
import org.apache.atlas.repository.store.graph.BulkImporter;
import org.apache.atlas.repository.store.graph.v2.EntityImportStream;
import org.apache.atlas.repository.util.FilterUtil;
import org.apache.atlas.store.AtlasTypeDefStore;
import org.apache.atlas.type.AtlasType;
import org.apache.atlas.type.AtlasTypeRegistry;
Expand All @@ -47,6 +48,8 @@
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.List;

import static org.apache.atlas.model.impexp.AtlasImportRequest.TRANSFORMERS_KEY;
Expand Down Expand Up @@ -191,7 +194,9 @@ public AtlasImportResult run(AtlasImportRequest request, String userName, String
if (StringUtils.isBlank(fileName)) {
throw new AtlasBaseException(AtlasErrorCode.INVALID_PARAMETERS, "FILENAME parameter not found");
}

if(!FilterUtil.validateFilePath(fileName)){
throw new AtlasBaseException(AtlasErrorCode.INVALID_PARAMETERS, "FILENAME IS INVALID");
}
AtlasImportResult result = null;
try {
LOG.info("==> import(user={}, from={}, fileName={})", userName, requestingIP, fileName);
Expand Down Expand Up @@ -296,4 +301,5 @@ boolean checkHiveTableIncrementalSkipLineage(AtlasImportRequest importRequest, A
private boolean isMigrationMode(AtlasImportRequest request) {
return request.getOptions().containsKey(AtlasImportRequest.OPTION_KEY_MIGRATION);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.atlas.ApplicationProperties;
import org.apache.atlas.AtlasException;
import org.apache.atlas.annotation.AtlasService;
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.model.impexp.MigrationStatus;
import org.apache.atlas.repository.graph.AtlasGraphProvider;
import org.apache.atlas.repository.graphdb.GraphDBMigrator;
Expand Down Expand Up @@ -55,15 +56,15 @@ public class MigrationProgressService {
private boolean zipFileBasedMigrationImport;

@Inject
public MigrationProgressService(Configuration configuration, GraphDBMigrator migrator) {
public MigrationProgressService(Configuration configuration, GraphDBMigrator migrator) throws AtlasBaseException {
this.migrator = migrator;
this.cacheValidity = (configuration != null) ? configuration.getLong(MIGRATION_QUERY_CACHE_TTL, DEFAULT_CACHE_TTL_IN_SECS) : DEFAULT_CACHE_TTL_IN_SECS;

this.zipFileBasedMigrationImport = isZipFileBasedMigrationEnabled();
initConditionallyZipFileBasedMigrator();
}

private void initConditionallyZipFileBasedMigrator() {
private void initConditionallyZipFileBasedMigrator() throws AtlasBaseException {
if (!zipFileBasedMigrationImport) {
return;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,22 +18,28 @@

package org.apache.atlas.repository.migration;

import org.apache.atlas.AtlasErrorCode;
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.model.migration.MigrationImportStatus;
import org.apache.atlas.repository.Constants;
import org.apache.atlas.repository.graph.AtlasGraphProvider;
import org.apache.atlas.repository.graphdb.AtlasGraph;
import org.apache.atlas.repository.graphdb.AtlasVertex;
import org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2;
import org.apache.atlas.repository.util.FilterUtil;
import org.apache.commons.codec.digest.DigestUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.FileInputStream;
import java.io.IOException;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Date;

import static org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2.getEncodedProperty;
import static org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2.setEncodedProperty;
import static org.apache.atlas.repository.util.FilterUtil.validateFilePath;
import static org.apache.atlas.type.AtlasStructType.AtlasAttribute.encodePropertyKey;
import static org.apache.atlas.type.Constants.INTERNAL_PROPERTY_KEY_PREFIX;

Expand All @@ -52,8 +58,11 @@ public DataMigrationStatusService(AtlasGraph graph) {
}


public void init(String fileToImport) {
public void init(String fileToImport) throws AtlasBaseException {
try {
if(!validateFilePath(fileToImport)){
throw new AtlasBaseException(AtlasErrorCode.INVALID_PARAMETERS, "File Path is invalid");
}
this.status = new MigrationImportStatus(fileToImport, DigestUtils.md5Hex(new FileInputStream(fileToImport)));
} catch (IOException e) {
LOG.error("Not able to create Migration status", e);
Expand All @@ -66,9 +75,13 @@ public void init(String fileToImport) {
getCreate(fileToImport);
}

public MigrationImportStatus getCreate(String fileName) {

public MigrationImportStatus getCreate(String fileName) throws AtlasBaseException {
MigrationImportStatus create = null;
try {
if(!validateFilePath(fileName)){
throw new AtlasBaseException(AtlasErrorCode.INVALID_PARAMETERS, "File Path is invalid");
}
create = getCreate(new MigrationImportStatus(fileName, DigestUtils.md5Hex(new FileInputStream(fileName))));
} catch (IOException e) {
LOG.error("Exception occurred while creating migration import", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ public EntityMutationResponse run(EntityImportStream entityStream, AtlasImportRe
return ret;
}

private DataMigrationStatusService createMigrationStatusService(AtlasImportResult importResult) {
private DataMigrationStatusService createMigrationStatusService(AtlasImportResult importResult) throws AtlasBaseException {
DataMigrationStatusService dataMigrationStatusService = new DataMigrationStatusService();
dataMigrationStatusService.init(importResult.getRequest().getOptions().get(AtlasImportRequest.OPTION_KEY_MIGRATION_FILE_NAME));
return dataMigrationStatusService;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,5 @@
package org.apache.atlas.repository.store.graph.v2.preprocessor.contract;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ObjectNode;

import org.apache.atlas.RequestContext;
import org.apache.atlas.discovery.EntityDiscoveryService;
import org.apache.atlas.exception.AtlasBaseException;
Expand All @@ -19,11 +15,10 @@
import org.apache.atlas.type.AtlasEntityType;
import org.apache.atlas.type.AtlasTypeRegistry;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.fasterxml.jackson.databind.ObjectMapper;

import java.util.*;

import static org.apache.atlas.AtlasErrorCode.*;
Expand All @@ -43,6 +38,7 @@ public class ContractPreProcessor extends AbstractContractPreProcessor {
private static final Set<String> contractAttributes = new HashSet<>();
static {
contractAttributes.add(ATTR_CONTRACT);
contractAttributes.add(ATTR_CONTRACT_JSON);
contractAttributes.add(ATTR_CERTIFICATE_STATUS);
contractAttributes.add(ATTR_CONTRACT_VERSION);
}
Expand Down Expand Up @@ -78,12 +74,17 @@ public void processAttributes(AtlasStruct entityStruct, EntityMutationContext co
}

private void processUpdateContract(AtlasEntity entity, EntityMutationContext context) throws AtlasBaseException {
String contractString = (String) entity.getAttribute(ATTR_CONTRACT);
String contractString = getContractString(entity);
AtlasVertex vertex = context.getVertex(entity.getGuid());
AtlasEntity existingContractEntity = entityRetriever.toAtlasEntity(vertex);
// No update to relationships allowed for the existing contract version
resetAllRelationshipAttributes(entity);
if (!isEqualContract(contractString, (String) existingContractEntity.getAttribute(ATTR_CONTRACT))) {
DataContract contract = DataContract.deserialize(contractString);
String existingContractString = getContractString(existingContractEntity);
boolean requestFromMigration = RequestContext.get().getRequestContextHeaders().getOrDefault(
"x-atlan-request-id", "").contains("json-to-yaml-migration");
if (!requestFromMigration && !StringUtils.isEmpty(contractString) &&
!contract.equals(DataContract.deserialize(existingContractString))) {
// Update the same asset(entity)
throw new AtlasBaseException(OPERATION_NOT_SUPPORTED, "Can't update a specific version of contract");
}
Expand All @@ -105,7 +106,7 @@ private void processCreateContract(AtlasEntity entity, EntityMutationContext con
String contractQName = (String) entity.getAttribute(QUALIFIED_NAME);
validateAttribute(!contractQName.endsWith(String.format("/%s", CONTRACT_QUALIFIED_NAME_SUFFIX)), "Invalid qualifiedName for the contract.");

String contractString = (String) entity.getAttribute(ATTR_CONTRACT);
String contractString = getContractString(entity);
DataContract contract = DataContract.deserialize(contractString);
String datasetQName = contractQName.substring(0, contractQName.lastIndexOf('/'));
contractQName = String.format("%s/%s/%s", datasetQName, contract.getType().name(), CONTRACT_QUALIFIED_NAME_SUFFIX);
Expand All @@ -116,7 +117,8 @@ private void processCreateContract(AtlasEntity entity, EntityMutationContext con
boolean contractSync = syncContractCertificateStatus(entity, contract);
contractString = DataContract.serialize(contract);
entity.setAttribute(ATTR_CONTRACT, contractString);

String contractStringJSON = DataContract.serializeJSON(contract);
entity.setAttribute(ATTR_CONTRACT_JSON, contractStringJSON);

AtlasEntity currentVersionEntity = getCurrentVersion(associatedAsset.getEntity().getGuid());
Long newVersionNumber = 1L;
Expand All @@ -128,7 +130,7 @@ private void processCreateContract(AtlasEntity entity, EntityMutationContext con
// No changes in the contract, Not creating new version
removeCreatingVertex(context, entity);
return;
} else if (isEqualContract(contractString, (String) currentVersionEntity.getAttribute(ATTR_CONTRACT))) {
} else if (contract.equals(DataContract.deserialize(getContractString(currentVersionEntity)))) {
resetAllRelationshipAttributes(entity);
// No change in contract, metadata changed
updateExistingVersion(context, entity, currentVersionEntity);
Expand Down Expand Up @@ -168,22 +170,6 @@ private List<String> getDiffAttributes(AtlasEntity entity, AtlasEntity latestExi
return attributesSet;
}

private boolean isEqualContract(String firstNode, String secondNode) throws AtlasBaseException {
ObjectMapper mapper = new ObjectMapper();
try {
JsonNode actualObj1 = mapper.readTree(firstNode);
JsonNode actualObj2 = mapper.readTree(secondNode);
//Ignore status field change
((ObjectNode) actualObj1).remove(CONTRACT_ATTR_STATUS);
((ObjectNode) actualObj2).remove(CONTRACT_ATTR_STATUS);

return actualObj1.equals(actualObj2);
} catch (JsonProcessingException e) {
throw new AtlasBaseException(JSON_ERROR, e.getMessage());
}

}

private void updateExistingVersion(EntityMutationContext context, AtlasEntity entity, AtlasEntity currentVersionEntity) throws AtlasBaseException {
removeCreatingVertex(context, entity);
entity.setAttribute(QUALIFIED_NAME, currentVersionEntity.getAttribute(QUALIFIED_NAME));
Expand Down Expand Up @@ -304,4 +290,12 @@ private static void validateAttribute(boolean isInvalid, String errorMessage) th
if (isInvalid)
throw new AtlasBaseException(BAD_REQUEST, errorMessage);
}

private static String getContractString(AtlasEntity entity) {
String contractString = (String) entity.getAttribute(ATTR_CONTRACT);
if (StringUtils.isEmpty(contractString)) {
contractString = (String) entity.getAttribute(ATTR_CONTRACT_JSON);
}
return contractString;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.MapperFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
import org.apache.atlas.AtlasErrorCode;
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.commons.lang.StringUtils;
Expand All @@ -27,9 +28,11 @@
public class DataContract {
private static final String KIND_VALUE = "DataContract";
private static final Pattern versionPattern = Pattern.compile("^(0|[1-9]\\d*)\\.(0|[1-9]\\d*)\\.(0|[1-9]\\d*)(?:-((?:0|[1-9]\\d*|\\d*[a-zA-Z-][0-9a-zA-Z-]*)(?:\\.(?:0|[1-9]\\d*|\\d*[a-zA-Z-][0-9a-zA-Z-]*))*))?(?:\\+([0-9a-zA-Z-]+(?:\\.[0-9a-zA-Z-]+)*))?$");
private static final ObjectMapper objectMapper = new ObjectMapper();
private static final ObjectMapper objectMapperYAML = new ObjectMapper(new YAMLFactory());
private static final ObjectMapper objectMapperJSON = new ObjectMapper();
static {
objectMapper.enable(MapperFeature.ACCEPT_CASE_INSENSITIVE_ENUMS);
objectMapperYAML.enable(MapperFeature.ACCEPT_CASE_INSENSITIVE_ENUMS);
objectMapperJSON.enable(MapperFeature.ACCEPT_CASE_INSENSITIVE_ENUMS);
}

@Valid @NotNull
Expand Down Expand Up @@ -184,6 +187,20 @@ public Map<String, Object> getUnknownFields() {
return unknownFields;
}

@Override
public boolean equals(Object o) {
if (this == o) { return true; }
if (o == null || getClass() != o.getClass()) { return false; }
BusinessTag that = (BusinessTag) o;
return Objects.equals(name, that.name) &&
Objects.equals(unknownFields, that.unknownFields);
}

@Override
public int hashCode() {
return Objects.hash(super.hashCode(), name, unknownFields);
}

}

@JsonIgnoreProperties(ignoreUnknown = true)
Expand All @@ -206,6 +223,22 @@ public void setUnknownFields(String key, Object value) {
public Map<String, Object> getUnknownFields() {
return unknownFields;
}

@Override
public boolean equals(Object o) {
if (this == o) { return true; }
if (o == null || getClass() != o.getClass()) { return false; }
Column that = (Column) o;
return Objects.equals(name, that.name) &&
Objects.equals(description, that.description) &&
Objects.equals(data_type, that.data_type) &&
Objects.equals(unknownFields, that.unknownFields);
}

@Override
public int hashCode() {
return Objects.hash(super.hashCode(), name, description, data_type, unknownFields);
}
}

public static DataContract deserialize(String contractString) throws AtlasBaseException {
Expand All @@ -216,9 +249,13 @@ public static DataContract deserialize(String contractString) throws AtlasBaseEx

DataContract contract;
try {
contract = objectMapper.readValue(contractString, DataContract.class);
contract = objectMapperYAML.readValue(contractString, DataContract.class);
} catch (JsonProcessingException ex) {
throw new AtlasBaseException(ex.getOriginalMessage());
try {
contract = objectMapperJSON.readValue(contractString, DataContract.class);
} catch (JsonProcessingException e) {
throw new AtlasBaseException(ex.getOriginalMessage());
}
}
contract.validate();
return contract;
Expand All @@ -242,11 +279,45 @@ public void validate() throws AtlasBaseException {
public static String serialize(DataContract contract) throws AtlasBaseException {

try {
return objectMapper.writeValueAsString(contract);
return objectMapperYAML.writeValueAsString(contract);
} catch (JsonProcessingException ex) {
throw new AtlasBaseException(JSON_ERROR, ex.getMessage());
}
}

@Override
public boolean equals(Object o) {
if (this == o) { return true; }
if (o == null || getClass() != o.getClass()) { return false; }

DataContract that = (DataContract) o;
return Objects.equals(kind, that.kind) &&
Objects.equals(status, that.status) &&
Objects.equals(templateVersion, that.templateVersion) &&
Objects.equals(data_source, that.data_source) &&
Objects.equals(dataset, that.dataset) &&
Objects.equals(type, that.type) &&
Objects.equals(description, that.description) &&
Objects.equals(owners, that.owners) &&
Objects.equals(tags, that.tags) &&
Objects.equals(certificate, that.certificate) &&
Objects.equals(columns, that.columns) &&
Objects.equals(unknownFields, that.unknownFields);
}

@Override
public int hashCode() {
return Objects.hash(super.hashCode(), kind, status, templateVersion, data_source, dataset, type, description, owners,
tags, certificate, columns, unknownFields);
}

public static String serializeJSON(DataContract contract) throws AtlasBaseException {

try {
return objectMapperJSON.writeValueAsString(contract);
} catch (JsonProcessingException ex) {
throw new AtlasBaseException(JSON_ERROR, ex.getMessage());
}
}
}

Loading

0 comments on commit c02077f

Please sign in to comment.