Skip to content

Commit

Permalink
Making type optional and auto-detecting by qName
Browse files Browse the repository at this point in the history
  • Loading branch information
bichitra95 committed Jul 3, 2024
1 parent d2b7d1a commit 864ee9f
Show file tree
Hide file tree
Showing 4 changed files with 50 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,8 @@ public final class Constants {
/**
* SQL property keys.
*/

public static final String SQL_ENTITY_TYPE = "SQL";
public static final String CONNECTION_ENTITY_TYPE = "Connection";
public static final String QUERY_ENTITY_TYPE = "Query";
public static final String QUERY_FOLDER_ENTITY_TYPE = "Folder";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,47 +4,50 @@
import org.apache.atlas.authorize.AtlasAuthorizationUtils;
import org.apache.atlas.authorize.AtlasEntityAccessRequest;
import org.apache.atlas.authorize.AtlasPrivilege;
import org.apache.atlas.discovery.EntityDiscoveryService;
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.model.TypeCategory;
import org.apache.atlas.model.discovery.AtlasSearchResult;
import org.apache.atlas.model.discovery.IndexSearchParams;
import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.atlas.model.instance.AtlasEntityHeader;
import org.apache.atlas.repository.graphdb.AtlasGraph;
import org.apache.atlas.repository.graphdb.AtlasVertex;
import org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2;
import org.apache.atlas.repository.store.graph.v2.EntityGraphRetriever;
import org.apache.atlas.repository.store.graph.v2.preprocessor.PreProcessor;
import org.apache.atlas.type.AtlasEntityType;
import org.apache.atlas.type.AtlasTypeRegistry;
import org.apache.atlas.utils.AtlasPerfMetrics;
import org.apache.commons.collections.CollectionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.HashMap;
import java.util.Map;
import java.util.*;

import static org.apache.atlas.AtlasErrorCode.INSTANCE_BY_UNIQUE_ATTRIBUTE_NOT_FOUND;
import static org.apache.atlas.AtlasErrorCode.TYPE_NAME_INVALID;
import static org.apache.atlas.repository.Constants.*;
import static org.apache.atlas.repository.util.AtlasEntityUtils.mapOf;

public abstract class AbstractContractPreProcessor implements PreProcessor {
private static final Logger LOG = LoggerFactory.getLogger(AbstractContractPreProcessor.class);

public final AtlasTypeRegistry typeRegistry;
public final EntityGraphRetriever entityRetriever;
public final AtlasGraph graph;
private final EntityDiscoveryService discovery;


AbstractContractPreProcessor(AtlasGraph graph, AtlasTypeRegistry typeRegistry,
EntityGraphRetriever entityRetriever) {
EntityGraphRetriever entityRetriever, EntityDiscoveryService discovery) {
this.graph = graph;
this.typeRegistry = typeRegistry;
this.entityRetriever = entityRetriever;
this.discovery = discovery;
}

void authorizeContractCreateOrUpdate(AtlasEntity contractEntity, AtlasEntity.AtlasEntityWithExtInfo associatedAsset) throws AtlasBaseException {
void authorizeContractCreateOrUpdate(AtlasEntity contractEntity, AtlasEntity associatedAsset) throws AtlasBaseException {
AtlasPerfMetrics.MetricRecorder metricRecorder = RequestContext.get().startMetricRecord("authorizeContractUpdate");
try {
AtlasEntityHeader entityHeader = new AtlasEntityHeader(associatedAsset.getEntity());
AtlasEntityHeader entityHeader = new AtlasEntityHeader(associatedAsset);

//First authorize entity update access
verifyAssetAccess(entityHeader, AtlasPrivilege.ENTITY_UPDATE, contractEntity, AtlasPrivilege.ENTITY_UPDATE);
Expand All @@ -70,16 +73,39 @@ private void verifyAccess(AtlasEntityHeader entityHeader, AtlasPrivilege privile
AtlasAuthorizationUtils.verifyAccess(new AtlasEntityAccessRequest(typeRegistry, privilege, entityHeader), errorMessage);
}

AtlasEntity.AtlasEntityWithExtInfo getAssociatedAsset(String datasetQName, String typeName) throws AtlasBaseException {
public AtlasEntity getAssociatedAsset(String datasetQName, DataContract contract) throws AtlasBaseException {
IndexSearchParams indexSearchParams = new IndexSearchParams();
Map<String, Object> dsl = new HashMap<>();
int size = 2;

List<Map<String, Object>> mustClauseList = new ArrayList<>();
mustClauseList.add(mapOf("term", mapOf(QUALIFIED_NAME, datasetQName)));
if (contract.getType() != null) {
mustClauseList.add(mapOf("term", mapOf("__typeName.keyword", contract.getType().name())));
} else {
mustClauseList.add(mapOf("term", mapOf("__superTypeNames.keyword", SQL_ENTITY_TYPE)));
}

dsl.put("query", mapOf("bool", mapOf("must", mustClauseList)));
dsl.put("sort", Collections.singletonList(mapOf(ATTR_CONTRACT_VERSION, mapOf("order", "desc"))));
dsl.put("size", size);

Map<String, Object> uniqAttributes = new HashMap<>();
uniqAttributes.put(QUALIFIED_NAME, datasetQName);
indexSearchParams.setDsl(dsl);
indexSearchParams.setSuppressLogs(true);

AtlasEntityType entityType = ensureEntityType(typeName);
AtlasSearchResult result = discovery.directIndexSearch(indexSearchParams);
if (result == null || CollectionUtils.isEmpty(result.getEntities())) {
throw new AtlasBaseException("Dataset doesn't exist for given qualified name.");

AtlasVertex entityVertex = AtlasGraphUtilsV2.getVertexByUniqueAttributes(graph, entityType, uniqAttributes);
} else if (result.getEntities().size() >1 ) {
throw new AtlasBaseException("Multiple dataset exists for given qualified name. " +
"Please specify the `type` attribute in contract.");
} else {
AtlasEntityHeader datasetEntity = result.getEntities().get(0);
contract.setType(datasetEntity.getTypeName());
return new AtlasEntity(datasetEntity);
}

return entityRetriever.toAtlasEntityWithExtInfo(entityVertex);
}

AtlasEntityType ensureEntityType(String typeName) throws AtlasBaseException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
import org.apache.atlas.model.discovery.AtlasSearchResult;
import org.apache.atlas.model.discovery.IndexSearchParams;
import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityWithExtInfo;
import org.apache.atlas.model.instance.AtlasStruct;
import org.apache.atlas.model.instance.EntityMutations;
import org.apache.atlas.repository.graphdb.AtlasGraph;
Expand Down Expand Up @@ -52,7 +51,7 @@ public ContractPreProcessor(AtlasGraph graph, AtlasTypeRegistry typeRegistry,
EntityGraphRetriever entityRetriever,
boolean storeDifferentialAudits, EntityDiscoveryService discovery) {

super(graph, typeRegistry, entityRetriever);
super(graph, typeRegistry, entityRetriever, discovery);
this.storeDifferentialAudits = storeDifferentialAudits;
this.discovery = discovery;
this.entityComparator = new AtlasEntityComparator(typeRegistry, entityRetriever, null, true, true);
Expand Down Expand Up @@ -109,8 +108,8 @@ private void processCreateContract(AtlasEntity entity, EntityMutationContext con
String contractString = getContractString(entity);
DataContract contract = DataContract.deserialize(contractString);
String datasetQName = contractQName.substring(0, contractQName.lastIndexOf('/'));
contractQName = String.format("%s/%s/%s", datasetQName, contract.getType().name(), CONTRACT_QUALIFIED_NAME_SUFFIX);
AtlasEntityWithExtInfo associatedAsset = getAssociatedAsset(datasetQName, contract.getType().name());
AtlasEntity associatedAsset = getAssociatedAsset(datasetQName, contract);
contractQName = String.format("%s/%s/%s", datasetQName, associatedAsset.getTypeName(), CONTRACT_QUALIFIED_NAME_SUFFIX);

authorizeContractCreateOrUpdate(entity, associatedAsset);

Expand All @@ -120,7 +119,7 @@ private void processCreateContract(AtlasEntity entity, EntityMutationContext con
String contractStringJSON = DataContract.serializeJSON(contract);
entity.setAttribute(ATTR_CONTRACT_JSON, contractStringJSON);

AtlasEntity currentVersionEntity = getCurrentVersion(associatedAsset.getEntity().getGuid());
AtlasEntity currentVersionEntity = getCurrentVersion(associatedAsset.getGuid());
Long newVersionNumber = 1L;
if (currentVersionEntity != null) {
// Contract already exist
Expand Down Expand Up @@ -150,9 +149,9 @@ private void processCreateContract(AtlasEntity entity, EntityMutationContext con
}
entity.setAttribute(QUALIFIED_NAME, String.format("%s/V%s", contractQName, newVersionNumber));
entity.setAttribute(ATTR_CONTRACT_VERSION, newVersionNumber);
entity.setAttribute(ATTR_ASSET_GUID, associatedAsset.getEntity().getGuid());
entity.setAttribute(ATTR_ASSET_GUID, associatedAsset.getGuid());

datasetAttributeSync(context, associatedAsset.getEntity(), entity);
datasetAttributeSync(context, associatedAsset, entity);

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@ public class DataContract {
public String data_source;
@Valid @NotNull
public String dataset;
@Valid @NotNull
public DatasetType type;
public String description;
public List<String> owners;
Expand Down Expand Up @@ -89,7 +88,7 @@ public static DatasetType from(String s) throws AtlasBaseException {
case "materialisedview":
return MaterialisedView;
default:
throw new AtlasBaseException(String.format("dataset.type: %s value not supported yet.", s));
throw new AtlasBaseException(String.format("type: %s value not supported yet.", s));
}
}
}
Expand Down Expand Up @@ -144,7 +143,7 @@ public void setType(String type) throws AtlasBaseException {
try {
this.type = DatasetType.from(type);
} catch (IllegalArgumentException | AtlasBaseException ex) {
throw new AtlasBaseException(AtlasErrorCode.INVALID_VALUE, "type " + type + " is inappropriate. Accepted values: " + Arrays.toString(DatasetType.values()));
throw new AtlasBaseException(AtlasErrorCode.INVALID_VALUE, "type: " + type + " is inappropriate. Accepted values: " + Arrays.toString(DatasetType.values()));
}
}

Expand Down

0 comments on commit 864ee9f

Please sign in to comment.