Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Syncing Staging with Master #3300

Merged
merged 20 commits into from
Jul 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -431,7 +431,8 @@ public enum SupportedFileExtensions { XLSX, XLS, CSV }
public static final String ATTR_ASSET_STARRED_BY = "assetStarredBy";
public static final String ATTR_ASSET_STARRED_AT = "assetStarredAt";
public static final String ATTR_CERTIFICATE_STATUS = "certificateStatus";
public static final String ATTR_CONTRACT = "dataContractJson";
public static final String ATTR_CONTRACT = "dataContractSpec";
public static final String ATTR_CONTRACT_JSON = "dataContractJson";
public static final String STRUCT_STARRED_DETAILS = "StarredDetails";

public static final String KEYCLOAK_ROLE_ADMIN = "$admin";
Expand Down
5 changes: 5 additions & 0 deletions repository/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -327,6 +327,11 @@
<artifactId>hibernate-validator</artifactId>
<version>4.3.2.Final</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.dataformat</groupId>
<artifactId>jackson-dataformat-yaml</artifactId>
<version>2.12.7</version>
</dependency>

</dependencies>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.atlas.model.typedef.AtlasTypesDef;
import org.apache.atlas.repository.store.graph.BulkImporter;
import org.apache.atlas.repository.store.graph.v2.EntityImportStream;
import org.apache.atlas.repository.util.FilterUtil;
import org.apache.atlas.store.AtlasTypeDefStore;
import org.apache.atlas.type.AtlasType;
import org.apache.atlas.type.AtlasTypeRegistry;
Expand All @@ -47,6 +48,8 @@
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.List;

import static org.apache.atlas.model.impexp.AtlasImportRequest.TRANSFORMERS_KEY;
Expand Down Expand Up @@ -191,7 +194,9 @@ public AtlasImportResult run(AtlasImportRequest request, String userName, String
if (StringUtils.isBlank(fileName)) {
throw new AtlasBaseException(AtlasErrorCode.INVALID_PARAMETERS, "FILENAME parameter not found");
}

if(!FilterUtil.validateFilePath(fileName)){
throw new AtlasBaseException(AtlasErrorCode.INVALID_PARAMETERS, "FILENAME IS INVALID");
}
AtlasImportResult result = null;
try {
LOG.info("==> import(user={}, from={}, fileName={})", userName, requestingIP, fileName);
Expand Down Expand Up @@ -296,4 +301,5 @@ boolean checkHiveTableIncrementalSkipLineage(AtlasImportRequest importRequest, A
private boolean isMigrationMode(AtlasImportRequest request) {
return request.getOptions().containsKey(AtlasImportRequest.OPTION_KEY_MIGRATION);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.atlas.ApplicationProperties;
import org.apache.atlas.AtlasException;
import org.apache.atlas.annotation.AtlasService;
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.model.impexp.MigrationStatus;
import org.apache.atlas.repository.graph.AtlasGraphProvider;
import org.apache.atlas.repository.graphdb.GraphDBMigrator;
Expand Down Expand Up @@ -55,15 +56,15 @@ public class MigrationProgressService {
private boolean zipFileBasedMigrationImport;

@Inject
public MigrationProgressService(Configuration configuration, GraphDBMigrator migrator) {
public MigrationProgressService(Configuration configuration, GraphDBMigrator migrator) throws AtlasBaseException {
this.migrator = migrator;
this.cacheValidity = (configuration != null) ? configuration.getLong(MIGRATION_QUERY_CACHE_TTL, DEFAULT_CACHE_TTL_IN_SECS) : DEFAULT_CACHE_TTL_IN_SECS;

this.zipFileBasedMigrationImport = isZipFileBasedMigrationEnabled();
initConditionallyZipFileBasedMigrator();
}

private void initConditionallyZipFileBasedMigrator() {
private void initConditionallyZipFileBasedMigrator() throws AtlasBaseException {
if (!zipFileBasedMigrationImport) {
return;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,22 +18,28 @@

package org.apache.atlas.repository.migration;

import org.apache.atlas.AtlasErrorCode;
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.model.migration.MigrationImportStatus;
import org.apache.atlas.repository.Constants;
import org.apache.atlas.repository.graph.AtlasGraphProvider;
import org.apache.atlas.repository.graphdb.AtlasGraph;
import org.apache.atlas.repository.graphdb.AtlasVertex;
import org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2;
import org.apache.atlas.repository.util.FilterUtil;
import org.apache.commons.codec.digest.DigestUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.FileInputStream;
import java.io.IOException;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Date;

import static org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2.getEncodedProperty;
import static org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2.setEncodedProperty;
import static org.apache.atlas.repository.util.FilterUtil.validateFilePath;
import static org.apache.atlas.type.AtlasStructType.AtlasAttribute.encodePropertyKey;
import static org.apache.atlas.type.Constants.INTERNAL_PROPERTY_KEY_PREFIX;

Expand All @@ -52,8 +58,11 @@ public DataMigrationStatusService(AtlasGraph graph) {
}


public void init(String fileToImport) {
public void init(String fileToImport) throws AtlasBaseException {
try {
if(!validateFilePath(fileToImport)){
throw new AtlasBaseException(AtlasErrorCode.INVALID_PARAMETERS, "File Path is invalid");
}
this.status = new MigrationImportStatus(fileToImport, DigestUtils.md5Hex(new FileInputStream(fileToImport)));
} catch (IOException e) {
LOG.error("Not able to create Migration status", e);
Expand All @@ -66,9 +75,13 @@ public void init(String fileToImport) {
getCreate(fileToImport);
}

public MigrationImportStatus getCreate(String fileName) {

public MigrationImportStatus getCreate(String fileName) throws AtlasBaseException {
MigrationImportStatus create = null;
try {
if(!validateFilePath(fileName)){
throw new AtlasBaseException(AtlasErrorCode.INVALID_PARAMETERS, "File Path is invalid");
}
create = getCreate(new MigrationImportStatus(fileName, DigestUtils.md5Hex(new FileInputStream(fileName))));
} catch (IOException e) {
LOG.error("Exception occurred while creating migration import", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ public EntityMutationResponse run(EntityImportStream entityStream, AtlasImportRe
return ret;
}

private DataMigrationStatusService createMigrationStatusService(AtlasImportResult importResult) {
private DataMigrationStatusService createMigrationStatusService(AtlasImportResult importResult) throws AtlasBaseException {
DataMigrationStatusService dataMigrationStatusService = new DataMigrationStatusService();
dataMigrationStatusService.init(importResult.getRequest().getOptions().get(AtlasImportRequest.OPTION_KEY_MIGRATION_FILE_NAME));
return dataMigrationStatusService;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,5 @@
package org.apache.atlas.repository.store.graph.v2.preprocessor.contract;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ObjectNode;

import org.apache.atlas.RequestContext;
import org.apache.atlas.discovery.EntityDiscoveryService;
import org.apache.atlas.exception.AtlasBaseException;
Expand All @@ -19,11 +15,10 @@
import org.apache.atlas.type.AtlasEntityType;
import org.apache.atlas.type.AtlasTypeRegistry;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.fasterxml.jackson.databind.ObjectMapper;

import java.util.*;

import static org.apache.atlas.AtlasErrorCode.*;
Expand All @@ -43,6 +38,7 @@ public class ContractPreProcessor extends AbstractContractPreProcessor {
private static final Set<String> contractAttributes = new HashSet<>();
static {
contractAttributes.add(ATTR_CONTRACT);
contractAttributes.add(ATTR_CONTRACT_JSON);
contractAttributes.add(ATTR_CERTIFICATE_STATUS);
contractAttributes.add(ATTR_CONTRACT_VERSION);
}
Expand Down Expand Up @@ -78,12 +74,17 @@ public void processAttributes(AtlasStruct entityStruct, EntityMutationContext co
}

private void processUpdateContract(AtlasEntity entity, EntityMutationContext context) throws AtlasBaseException {
String contractString = (String) entity.getAttribute(ATTR_CONTRACT);
String contractString = getContractString(entity);
AtlasVertex vertex = context.getVertex(entity.getGuid());
AtlasEntity existingContractEntity = entityRetriever.toAtlasEntity(vertex);
// No update to relationships allowed for the existing contract version
resetAllRelationshipAttributes(entity);
if (!isEqualContract(contractString, (String) existingContractEntity.getAttribute(ATTR_CONTRACT))) {
DataContract contract = DataContract.deserialize(contractString);
String existingContractString = getContractString(existingContractEntity);
boolean requestFromMigration = RequestContext.get().getRequestContextHeaders().getOrDefault(
"x-atlan-request-id", "").contains("json-to-yaml-migration");
if (!requestFromMigration && !StringUtils.isEmpty(contractString) &&
!contract.equals(DataContract.deserialize(existingContractString))) {
// Update the same asset(entity)
throw new AtlasBaseException(OPERATION_NOT_SUPPORTED, "Can't update a specific version of contract");
}
Expand All @@ -105,7 +106,7 @@ private void processCreateContract(AtlasEntity entity, EntityMutationContext con
String contractQName = (String) entity.getAttribute(QUALIFIED_NAME);
validateAttribute(!contractQName.endsWith(String.format("/%s", CONTRACT_QUALIFIED_NAME_SUFFIX)), "Invalid qualifiedName for the contract.");

String contractString = (String) entity.getAttribute(ATTR_CONTRACT);
String contractString = getContractString(entity);
DataContract contract = DataContract.deserialize(contractString);
String datasetQName = contractQName.substring(0, contractQName.lastIndexOf('/'));
contractQName = String.format("%s/%s/%s", datasetQName, contract.getType().name(), CONTRACT_QUALIFIED_NAME_SUFFIX);
Expand All @@ -116,7 +117,8 @@ private void processCreateContract(AtlasEntity entity, EntityMutationContext con
boolean contractSync = syncContractCertificateStatus(entity, contract);
contractString = DataContract.serialize(contract);
entity.setAttribute(ATTR_CONTRACT, contractString);

String contractStringJSON = DataContract.serializeJSON(contract);
entity.setAttribute(ATTR_CONTRACT_JSON, contractStringJSON);

AtlasEntity currentVersionEntity = getCurrentVersion(associatedAsset.getEntity().getGuid());
Long newVersionNumber = 1L;
Expand All @@ -128,7 +130,7 @@ private void processCreateContract(AtlasEntity entity, EntityMutationContext con
// No changes in the contract, Not creating new version
removeCreatingVertex(context, entity);
return;
} else if (isEqualContract(contractString, (String) currentVersionEntity.getAttribute(ATTR_CONTRACT))) {
} else if (contract.equals(DataContract.deserialize(getContractString(currentVersionEntity)))) {
resetAllRelationshipAttributes(entity);
// No change in contract, metadata changed
updateExistingVersion(context, entity, currentVersionEntity);
Expand Down Expand Up @@ -168,22 +170,6 @@ private List<String> getDiffAttributes(AtlasEntity entity, AtlasEntity latestExi
return attributesSet;
}

private boolean isEqualContract(String firstNode, String secondNode) throws AtlasBaseException {
ObjectMapper mapper = new ObjectMapper();
try {
JsonNode actualObj1 = mapper.readTree(firstNode);
JsonNode actualObj2 = mapper.readTree(secondNode);
//Ignore status field change
((ObjectNode) actualObj1).remove(CONTRACT_ATTR_STATUS);
((ObjectNode) actualObj2).remove(CONTRACT_ATTR_STATUS);

return actualObj1.equals(actualObj2);
} catch (JsonProcessingException e) {
throw new AtlasBaseException(JSON_ERROR, e.getMessage());
}

}

private void updateExistingVersion(EntityMutationContext context, AtlasEntity entity, AtlasEntity currentVersionEntity) throws AtlasBaseException {
removeCreatingVertex(context, entity);
entity.setAttribute(QUALIFIED_NAME, currentVersionEntity.getAttribute(QUALIFIED_NAME));
Expand Down Expand Up @@ -304,4 +290,12 @@ private static void validateAttribute(boolean isInvalid, String errorMessage) th
if (isInvalid)
throw new AtlasBaseException(BAD_REQUEST, errorMessage);
}

private static String getContractString(AtlasEntity entity) {
String contractString = (String) entity.getAttribute(ATTR_CONTRACT);
if (StringUtils.isEmpty(contractString)) {
contractString = (String) entity.getAttribute(ATTR_CONTRACT_JSON);
}
return contractString;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.MapperFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
import org.apache.atlas.AtlasErrorCode;
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.commons.lang.StringUtils;
Expand All @@ -27,9 +28,11 @@
public class DataContract {
private static final String KIND_VALUE = "DataContract";
private static final Pattern versionPattern = Pattern.compile("^(0|[1-9]\\d*)\\.(0|[1-9]\\d*)\\.(0|[1-9]\\d*)(?:-((?:0|[1-9]\\d*|\\d*[a-zA-Z-][0-9a-zA-Z-]*)(?:\\.(?:0|[1-9]\\d*|\\d*[a-zA-Z-][0-9a-zA-Z-]*))*))?(?:\\+([0-9a-zA-Z-]+(?:\\.[0-9a-zA-Z-]+)*))?$");
private static final ObjectMapper objectMapper = new ObjectMapper();
private static final ObjectMapper objectMapperYAML = new ObjectMapper(new YAMLFactory());
private static final ObjectMapper objectMapperJSON = new ObjectMapper();
static {
objectMapper.enable(MapperFeature.ACCEPT_CASE_INSENSITIVE_ENUMS);
objectMapperYAML.enable(MapperFeature.ACCEPT_CASE_INSENSITIVE_ENUMS);
objectMapperJSON.enable(MapperFeature.ACCEPT_CASE_INSENSITIVE_ENUMS);
}

@Valid @NotNull
Expand Down Expand Up @@ -184,6 +187,20 @@ public Map<String, Object> getUnknownFields() {
return unknownFields;
}

@Override
public boolean equals(Object o) {
if (this == o) { return true; }
if (o == null || getClass() != o.getClass()) { return false; }
BusinessTag that = (BusinessTag) o;
return Objects.equals(name, that.name) &&
Objects.equals(unknownFields, that.unknownFields);
}

@Override
public int hashCode() {
return Objects.hash(super.hashCode(), name, unknownFields);
}

}

@JsonIgnoreProperties(ignoreUnknown = true)
Expand All @@ -206,6 +223,22 @@ public void setUnknownFields(String key, Object value) {
public Map<String, Object> getUnknownFields() {
return unknownFields;
}

@Override
public boolean equals(Object o) {
if (this == o) { return true; }
if (o == null || getClass() != o.getClass()) { return false; }
Column that = (Column) o;
return Objects.equals(name, that.name) &&
Objects.equals(description, that.description) &&
Objects.equals(data_type, that.data_type) &&
Objects.equals(unknownFields, that.unknownFields);
}

@Override
public int hashCode() {
return Objects.hash(super.hashCode(), name, description, data_type, unknownFields);
}
}

public static DataContract deserialize(String contractString) throws AtlasBaseException {
Expand All @@ -216,9 +249,13 @@ public static DataContract deserialize(String contractString) throws AtlasBaseEx

DataContract contract;
try {
contract = objectMapper.readValue(contractString, DataContract.class);
contract = objectMapperYAML.readValue(contractString, DataContract.class);
} catch (JsonProcessingException ex) {
throw new AtlasBaseException(ex.getOriginalMessage());
try {
contract = objectMapperJSON.readValue(contractString, DataContract.class);
} catch (JsonProcessingException e) {
throw new AtlasBaseException(ex.getOriginalMessage());
}
}
contract.validate();
return contract;
Expand All @@ -242,11 +279,45 @@ public void validate() throws AtlasBaseException {
public static String serialize(DataContract contract) throws AtlasBaseException {

try {
return objectMapper.writeValueAsString(contract);
return objectMapperYAML.writeValueAsString(contract);
} catch (JsonProcessingException ex) {
throw new AtlasBaseException(JSON_ERROR, ex.getMessage());
}
}

@Override
public boolean equals(Object o) {
if (this == o) { return true; }
if (o == null || getClass() != o.getClass()) { return false; }

DataContract that = (DataContract) o;
return Objects.equals(kind, that.kind) &&
Objects.equals(status, that.status) &&
Objects.equals(templateVersion, that.templateVersion) &&
Objects.equals(data_source, that.data_source) &&
Objects.equals(dataset, that.dataset) &&
Objects.equals(type, that.type) &&
Objects.equals(description, that.description) &&
Objects.equals(owners, that.owners) &&
Objects.equals(tags, that.tags) &&
Objects.equals(certificate, that.certificate) &&
Objects.equals(columns, that.columns) &&
Objects.equals(unknownFields, that.unknownFields);
}

@Override
public int hashCode() {
return Objects.hash(super.hashCode(), kind, status, templateVersion, data_source, dataset, type, description, owners,
tags, certificate, columns, unknownFields);
}

public static String serializeJSON(DataContract contract) throws AtlasBaseException {

try {
return objectMapperJSON.writeValueAsString(contract);
} catch (JsonProcessingException ex) {
throw new AtlasBaseException(JSON_ERROR, ex.getMessage());
}
}
}

Loading
Loading