Skip to content

Commit

Permalink
Merge pull request #2927 from atlanhq/dg-996-move-domain
Browse files Browse the repository at this point in the history
[master] DG-996: Support moving Domain/Products across Domains
  • Loading branch information
nikhilbonte21 authored May 7, 2024
2 parents 4356129 + 77d90ea commit f260666
Show file tree
Hide file tree
Showing 10 changed files with 865 additions and 39 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,12 @@ public final class Constants {
public static final String GLOSSARY_TERMS_EDGE_LABEL = "r:AtlasGlossaryTermAnchor";
public static final String GLOSSARY_CATEGORY_EDGE_LABEL = "r:AtlasGlossaryCategoryAnchor";

/**
* MESH property keys.
*/
public static final String DATA_DOMAIN_ENTITY_TYPE = "DataDomain";
public static final String DATA_PRODUCT_ENTITY_TYPE = "DataProduct";


/**
* SQL property keys.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@
import org.apache.atlas.repository.store.graph.v2.preprocessor.PreProcessor;
import org.apache.atlas.repository.store.graph.v2.preprocessor.accesscontrol.PersonaPreProcessor;
import org.apache.atlas.repository.store.graph.v2.preprocessor.accesscontrol.PurposePreProcessor;
import org.apache.atlas.repository.store.graph.v2.preprocessor.datamesh.DataProductPreProcessor;
import org.apache.atlas.repository.store.graph.v2.preprocessor.datamesh.DataDomainPreProcessor;
import org.apache.atlas.repository.store.graph.v2.preprocessor.glossary.CategoryPreProcessor;
import org.apache.atlas.repository.store.graph.v2.preprocessor.glossary.GlossaryPreProcessor;
import org.apache.atlas.repository.store.graph.v2.preprocessor.glossary.TermPreProcessor;
Expand Down Expand Up @@ -1801,6 +1803,14 @@ public PreProcessor getPreProcessor(String typeName) {
preProcessor = new CategoryPreProcessor(typeRegistry, entityRetriever, graph, taskManagement, entityGraphMapper);
break;

case DATA_DOMAIN_ENTITY_TYPE:
preProcessor = new DataDomainPreProcessor(typeRegistry, entityRetriever, graph);
break;

case DATA_PRODUCT_ENTITY_TYPE:
preProcessor = new DataProductPreProcessor(typeRegistry, entityRetriever, graph);
break;

case QUERY_ENTITY_TYPE:
preProcessor = new QueryPreProcessor(typeRegistry, entityRetriever);
break;
Expand All @@ -1822,7 +1832,7 @@ public PreProcessor getPreProcessor(String typeName) {
break;

case POLICY_ENTITY_TYPE:
preProcessor = new AuthPolicyPreProcessor(graph, typeRegistry, entityRetriever, featureFlagStore);
preProcessor = new AuthPolicyPreProcessor(graph, typeRegistry, entityRetriever);
break;

case CONNECTION_ENTITY_TYPE:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@
import static org.apache.atlas.repository.Constants.RELATIONSHIPTYPE_TAG_PROPAGATION_KEY;
import static org.apache.atlas.repository.Constants.RELATIONSHIP_GUID_PROPERTY_KEY;
import static org.apache.atlas.repository.Constants.VERSION_PROPERTY_KEY;
import static org.apache.atlas.repository.graph.GraphHelper.getTypeName;
import static org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2.*;
import static org.apache.atlas.repository.store.graph.v2.tasks.ClassificationPropagateTaskFactory.CLASSIFICATION_PROPAGATION_RELATIONSHIP_UPDATE;

Expand Down Expand Up @@ -104,6 +105,11 @@ public class AtlasRelationshipStoreV2 implements AtlasRelationshipStore {
private static final String END_2_DOC_ID_KEY = "end2DocId";
private static final String ES_DOC_ID_MAP_KEY = "esDocIdMap";

private static Set<String> EXCLUDE_MUTATION_REL_TYPE_NAMES = new HashSet<String>() {{
add("parent_domain_sub_domains");
add("data_domain_data_products");
}};

public enum RelationshipMutation {
RELATIONSHIP_CREATE,
RELATIONSHIP_UPDATE,
Expand All @@ -129,6 +135,8 @@ public AtlasRelationship create(AtlasRelationship relationship) throws AtlasBase
LOG.debug("==> create({})", relationship);
}

validateRelationshipType(relationship.getTypeName());

AtlasVertex end1Vertex = getVertexFromEndPoint(relationship.getEnd1());
AtlasVertex end2Vertex = getVertexFromEndPoint(relationship.getEnd2());

Expand Down Expand Up @@ -161,6 +169,8 @@ public AtlasRelationship update(AtlasRelationship relationship) throws AtlasBase
AtlasVertex end1Vertex = edge.getOutVertex();
AtlasVertex end2Vertex = edge.getInVertex();

validateRelationshipType(edgeType);

// update shouldn't change endType
if (StringUtils.isNotEmpty(relationship.getTypeName()) && !StringUtils.equalsIgnoreCase(edgeType, relationship.getTypeName())) {
throw new AtlasBaseException(AtlasErrorCode.RELATIONSHIP_UPDATE_TYPE_CHANGE_NOT_ALLOWED, guid, edgeType, relationship.getTypeName());
Expand Down Expand Up @@ -320,6 +330,8 @@ public void deleteByIds(List<String> guids) throws AtlasBaseException {
throw new AtlasBaseException(AtlasErrorCode.RELATIONSHIP_ALREADY_DELETED, guid);
}

validateRelationshipType(getTypeName(edge));

edgesToDelete.add(edge);
AtlasRelationship relationshipToDelete = entityRetriever.mapEdgeToAtlasRelationship(edge);
deletedRelationships.add(relationshipToDelete);
Expand Down Expand Up @@ -368,6 +380,9 @@ public void deleteById(String guid, boolean forceDelete) throws AtlasBaseExcepti
if (getState(edge) == DELETED) {
throw new AtlasBaseException(AtlasErrorCode.RELATIONSHIP_ALREADY_DELETED, guid);
}

validateRelationshipType(getTypeName(edge));

deleteDelegate.getHandler().resetHasLineageOnInputOutputDelete(Collections.singleton(edge), null);
deleteDelegate.getHandler().deleteRelationships(Collections.singleton(edge), forceDelete);

Expand Down Expand Up @@ -999,4 +1014,11 @@ private static void setEdgeVertexIdsInContext(AtlasEdge edge) {
RequestContext.get().addRelationshipEndToVertexIdMapping(GraphHelper.getAtlasObjectIdForOutVertex(edge), edge.getOutVertex().getId());
RequestContext.get().addRelationshipEndToVertexIdMapping(GraphHelper.getAtlasObjectIdForInVertex(edge), edge.getInVertex().getId());
}

private static void validateRelationshipType(String relationshipTypeName) throws AtlasBaseException {
if (EXCLUDE_MUTATION_REL_TYPE_NAMES.contains(relationshipTypeName)) {
throw new AtlasBaseException(AtlasErrorCode.BAD_REQUEST,
String.format("Mutating relationship of type %s is not supported via relationship APIs, please use entity APIs", relationshipTypeName));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -70,17 +70,14 @@ public class AuthPolicyPreProcessor implements PreProcessor {
private final AtlasGraph graph;
private final AtlasTypeRegistry typeRegistry;
private final EntityGraphRetriever entityRetriever;
private final FeatureFlagStore featureFlagStore ;
private IndexAliasStore aliasStore;

public AuthPolicyPreProcessor(AtlasGraph graph,
AtlasTypeRegistry typeRegistry,
EntityGraphRetriever entityRetriever,
FeatureFlagStore featureFlagStore) {
EntityGraphRetriever entityRetriever) {
this.graph = graph;
this.typeRegistry = typeRegistry;
this.entityRetriever = entityRetriever;
this.featureFlagStore = featureFlagStore;

aliasStore = new ESAliasStore(graph, entityRetriever);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
package org.apache.atlas.repository.store.graph.v2.preprocessor;

import org.apache.atlas.discovery.EntityDiscoveryService;
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.model.discovery.IndexSearchParams;
import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.atlas.model.instance.AtlasEntityHeader;
import org.apache.atlas.model.instance.AtlasObjectId;
import org.apache.atlas.repository.graphdb.AtlasVertex;
import org.apache.atlas.repository.store.graph.v2.EntityGraphRetriever;
Expand All @@ -11,13 +14,17 @@
import org.apache.atlas.type.AtlasTypeRegistry;
import org.apache.atlas.util.NanoIdUtils;
import org.apache.atlas.utils.AtlasEntityUtil;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.*;

import static org.apache.atlas.repository.Constants.QUERY_COLLECTION_ENTITY_TYPE;
import static org.apache.atlas.repository.Constants.QUALIFIED_NAME;
import static org.apache.atlas.repository.Constants.ENTITY_TYPE_PROPERTY_KEY;
import static org.apache.atlas.repository.util.AtlasEntityUtils.mapOf;

public class PreProcessorUtils {
private static final Logger LOG = LoggerFactory.getLogger(PreProcessorUtils.class);
Expand All @@ -32,6 +39,20 @@ public class PreProcessorUtils {
public static final String GLOSSARY_TERM_REL_TYPE = "AtlasGlossaryTermAnchor";
public static final String GLOSSARY_CATEGORY_REL_TYPE = "AtlasGlossaryCategoryAnchor";

//DataMesh models constants
public static final String PARENT_DOMAIN_REL_TYPE = "parentDomain";
public static final String SUB_DOMAIN_REL_TYPE = "subDomains";
public static final String DATA_PRODUCT_REL_TYPE = "dataProducts";
public static final String DATA_DOMAIN_REL_TYPE = "dataDomain";

public static final String MESH_POLICY_CATEGORY = "datamesh";

public static final String DATA_PRODUCT_EDGE_LABEL = "__DataDomain.dataProducts";
public static final String DOMAIN_PARENT_EDGE_LABEL = "__DataDomain.subDomains";

public static final String PARENT_DOMAIN_QN_ATTR = "parentDomainQualifiedName";
public static final String SUPER_DOMAIN_QN_ATTR = "superDomainQualifiedName";

//Query models constants
public static final String PREFIX_QUERY_QN = "default/collection/";
public static final String COLLECTION_QUALIFIED_NAME = "collectionQualifiedName";
Expand Down Expand Up @@ -107,4 +128,40 @@ public static String updateQueryResourceAttributes(AtlasTypeRegistry typeRegistr

return newCollectionQualifiedName;
}

public static List<AtlasEntityHeader> indexSearchPaginated(Map<String, Object> dsl, Set<String> attributes, EntityDiscoveryService discovery) throws AtlasBaseException {
IndexSearchParams searchParams = new IndexSearchParams();
List<AtlasEntityHeader> ret = new ArrayList<>();

if (CollectionUtils.isNotEmpty(attributes)) {
searchParams.setAttributes(attributes);
}

List<Map> sortList = new ArrayList<>(0);
sortList.add(mapOf("__timestamp", mapOf("order", "asc")));
sortList.add(mapOf("__guid", mapOf("order", "asc")));
dsl.put("sort", sortList);

int from = 0;
int size = 100;
boolean hasMore = true;
do {
dsl.put("from", from);
dsl.put("size", size);
searchParams.setDsl(dsl);

List<AtlasEntityHeader> headers = discovery.directIndexSearch(searchParams).getEntities();

if (CollectionUtils.isNotEmpty(headers)) {
ret.addAll(headers);
} else {
hasMore = false;
}

from += size;

} while (hasMore);

return ret;
}
}
Loading

0 comments on commit f260666

Please sign in to comment.