Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DQ-295 Making type optional and auto-detecting by qName #3305

Merged
merged 2 commits into from
Jul 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,8 @@ public final class Constants {
/**
* SQL property keys.
*/

public static final String SQL_ENTITY_TYPE = "SQL";
public static final String CONNECTION_ENTITY_TYPE = "Connection";
public static final String QUERY_ENTITY_TYPE = "Query";
public static final String QUERY_FOLDER_ENTITY_TYPE = "Folder";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,47 +4,51 @@
import org.apache.atlas.authorize.AtlasAuthorizationUtils;
import org.apache.atlas.authorize.AtlasEntityAccessRequest;
import org.apache.atlas.authorize.AtlasPrivilege;
import org.apache.atlas.discovery.EntityDiscoveryService;
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.model.TypeCategory;
import org.apache.atlas.model.discovery.AtlasSearchResult;
import org.apache.atlas.model.discovery.IndexSearchParams;
import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.atlas.model.instance.AtlasEntityHeader;
import org.apache.atlas.repository.graphdb.AtlasGraph;
import org.apache.atlas.repository.graphdb.AtlasVertex;
import org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2;
import org.apache.atlas.repository.store.graph.v2.EntityGraphRetriever;
import org.apache.atlas.repository.store.graph.v2.preprocessor.PreProcessor;
import org.apache.atlas.type.AtlasEntityType;
import org.apache.atlas.type.AtlasTypeRegistry;
import org.apache.atlas.utils.AtlasPerfMetrics;
import org.apache.commons.collections.CollectionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.HashMap;
import java.util.Map;
import java.util.*;

import static org.apache.atlas.AtlasErrorCode.INSTANCE_BY_UNIQUE_ATTRIBUTE_NOT_FOUND;
import static org.apache.atlas.AtlasErrorCode.BAD_REQUEST;
import static org.apache.atlas.AtlasErrorCode.TYPE_NAME_INVALID;
import static org.apache.atlas.repository.Constants.*;
import static org.apache.atlas.repository.util.AtlasEntityUtils.mapOf;

public abstract class AbstractContractPreProcessor implements PreProcessor {
private static final Logger LOG = LoggerFactory.getLogger(AbstractContractPreProcessor.class);

public final AtlasTypeRegistry typeRegistry;
public final EntityGraphRetriever entityRetriever;
public final AtlasGraph graph;
private final EntityDiscoveryService discovery;


AbstractContractPreProcessor(AtlasGraph graph, AtlasTypeRegistry typeRegistry,
EntityGraphRetriever entityRetriever) {
EntityGraphRetriever entityRetriever, EntityDiscoveryService discovery) {
this.graph = graph;
this.typeRegistry = typeRegistry;
this.entityRetriever = entityRetriever;
this.discovery = discovery;
}

void authorizeContractCreateOrUpdate(AtlasEntity contractEntity, AtlasEntity.AtlasEntityWithExtInfo associatedAsset) throws AtlasBaseException {
void authorizeContractCreateOrUpdate(AtlasEntity contractEntity, AtlasEntity associatedAsset) throws AtlasBaseException {
AtlasPerfMetrics.MetricRecorder metricRecorder = RequestContext.get().startMetricRecord("authorizeContractUpdate");
try {
AtlasEntityHeader entityHeader = new AtlasEntityHeader(associatedAsset.getEntity());
AtlasEntityHeader entityHeader = new AtlasEntityHeader(associatedAsset);

//First authorize entity update access
verifyAssetAccess(entityHeader, AtlasPrivilege.ENTITY_UPDATE, contractEntity, AtlasPrivilege.ENTITY_UPDATE);
Expand All @@ -70,16 +74,39 @@ private void verifyAccess(AtlasEntityHeader entityHeader, AtlasPrivilege privile
AtlasAuthorizationUtils.verifyAccess(new AtlasEntityAccessRequest(typeRegistry, privilege, entityHeader), errorMessage);
}

AtlasEntity.AtlasEntityWithExtInfo getAssociatedAsset(String datasetQName, String typeName) throws AtlasBaseException {
public AtlasEntity getAssociatedAsset(String datasetQName, DataContract contract) throws AtlasBaseException {
IndexSearchParams indexSearchParams = new IndexSearchParams();
Map<String, Object> dsl = new HashMap<>();
int size = 2;

List<Map<String, Object>> mustClauseList = new ArrayList<>();
mustClauseList.add(mapOf("term", mapOf(QUALIFIED_NAME, datasetQName)));
if (contract.getType() != null) {
mustClauseList.add(mapOf("term", mapOf("__typeName.keyword", contract.getType().name())));
} else {
mustClauseList.add(mapOf("term", mapOf("__superTypeNames.keyword", SQL_ENTITY_TYPE)));
}

dsl.put("query", mapOf("bool", mapOf("must", mustClauseList)));
dsl.put("sort", Collections.singletonList(mapOf(ATTR_CONTRACT_VERSION, mapOf("order", "desc"))));
dsl.put("size", size);

Map<String, Object> uniqAttributes = new HashMap<>();
uniqAttributes.put(QUALIFIED_NAME, datasetQName);
indexSearchParams.setDsl(dsl);
indexSearchParams.setSuppressLogs(true);

AtlasEntityType entityType = ensureEntityType(typeName);
AtlasSearchResult result = discovery.directIndexSearch(indexSearchParams);
if (result == null || CollectionUtils.isEmpty(result.getEntities())) {
throw new AtlasBaseException("Dataset doesn't exist for given qualified name.");

AtlasVertex entityVertex = AtlasGraphUtilsV2.getVertexByUniqueAttributes(graph, entityType, uniqAttributes);
} else if (result.getEntities().size() >1 ) {
throw new AtlasBaseException(BAD_REQUEST, "Multiple dataset exists for given qualified name. " +
"Please specify the `type` attribute in contract.");
} else {
AtlasEntityHeader datasetEntity = result.getEntities().get(0);
contract.setType(datasetEntity.getTypeName());
return new AtlasEntity(datasetEntity);
}

return entityRetriever.toAtlasEntityWithExtInfo(entityVertex);
}

AtlasEntityType ensureEntityType(String typeName) throws AtlasBaseException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
import org.apache.atlas.model.discovery.AtlasSearchResult;
import org.apache.atlas.model.discovery.IndexSearchParams;
import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityWithExtInfo;
import org.apache.atlas.model.instance.AtlasStruct;
import org.apache.atlas.model.instance.EntityMutations;
import org.apache.atlas.repository.graphdb.AtlasGraph;
Expand Down Expand Up @@ -52,7 +51,7 @@ public ContractPreProcessor(AtlasGraph graph, AtlasTypeRegistry typeRegistry,
EntityGraphRetriever entityRetriever,
boolean storeDifferentialAudits, EntityDiscoveryService discovery) {

super(graph, typeRegistry, entityRetriever);
super(graph, typeRegistry, entityRetriever, discovery);
this.storeDifferentialAudits = storeDifferentialAudits;
this.discovery = discovery;
this.entityComparator = new AtlasEntityComparator(typeRegistry, entityRetriever, null, true, true);
Expand Down Expand Up @@ -109,8 +108,8 @@ private void processCreateContract(AtlasEntity entity, EntityMutationContext con
String contractString = getContractString(entity);
DataContract contract = DataContract.deserialize(contractString);
String datasetQName = contractQName.substring(0, contractQName.lastIndexOf('/'));
contractQName = String.format("%s/%s/%s", datasetQName, contract.getType().name(), CONTRACT_QUALIFIED_NAME_SUFFIX);
AtlasEntityWithExtInfo associatedAsset = getAssociatedAsset(datasetQName, contract.getType().name());
AtlasEntity associatedAsset = getAssociatedAsset(datasetQName, contract);
contractQName = String.format("%s/%s/%s", datasetQName, associatedAsset.getTypeName(), CONTRACT_QUALIFIED_NAME_SUFFIX);

authorizeContractCreateOrUpdate(entity, associatedAsset);

Expand All @@ -120,7 +119,7 @@ private void processCreateContract(AtlasEntity entity, EntityMutationContext con
String contractStringJSON = DataContract.serializeJSON(contract);
entity.setAttribute(ATTR_CONTRACT_JSON, contractStringJSON);

AtlasEntity currentVersionEntity = getCurrentVersion(associatedAsset.getEntity().getGuid());
AtlasEntity currentVersionEntity = getCurrentVersion(associatedAsset.getGuid());
Long newVersionNumber = 1L;
if (currentVersionEntity != null) {
// Contract already exist
Expand Down Expand Up @@ -150,9 +149,9 @@ private void processCreateContract(AtlasEntity entity, EntityMutationContext con
}
entity.setAttribute(QUALIFIED_NAME, String.format("%s/V%s", contractQName, newVersionNumber));
entity.setAttribute(ATTR_CONTRACT_VERSION, newVersionNumber);
entity.setAttribute(ATTR_ASSET_GUID, associatedAsset.getEntity().getGuid());
entity.setAttribute(ATTR_ASSET_GUID, associatedAsset.getGuid());

datasetAttributeSync(context, associatedAsset.getEntity(), entity);
datasetAttributeSync(context, associatedAsset, entity);

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@ public class DataContract {
public String data_source;
@Valid @NotNull
public String dataset;
@Valid @NotNull
public DatasetType type;
public String description;
public List<String> owners;
Expand Down Expand Up @@ -89,7 +88,7 @@ public static DatasetType from(String s) throws AtlasBaseException {
case "materialisedview":
return MaterialisedView;
default:
throw new AtlasBaseException(String.format("dataset.type: %s value not supported yet.", s));
throw new AtlasBaseException(BAD_REQUEST, String.format("type: %s value not supported yet.", s));
}
}
}
Expand Down Expand Up @@ -144,7 +143,7 @@ public void setType(String type) throws AtlasBaseException {
try {
this.type = DatasetType.from(type);
} catch (IllegalArgumentException | AtlasBaseException ex) {
throw new AtlasBaseException(AtlasErrorCode.INVALID_VALUE, "type " + type + " is inappropriate. Accepted values: " + Arrays.toString(DatasetType.values()));
throw new AtlasBaseException(AtlasErrorCode.INVALID_VALUE, "type: " + type + " is inappropriate. Accepted values: " + Arrays.toString(DatasetType.values()));
}
}

Expand Down
Loading