Skip to content

Commit

Permalink
Merge pull request #2876 from atlanhq/dg-996-move-domain
Browse files Browse the repository at this point in the history
Pre Processors for Domain Move
  • Loading branch information
nikhilbonte21 authored Mar 13, 2024
2 parents 8f31f6f + 4c4d4e3 commit 2d810e3
Show file tree
Hide file tree
Showing 6 changed files with 678 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,15 @@ public final class Constants {
public static final String GLOSSARY_TERMS_EDGE_LABEL = "r:AtlasGlossaryTermAnchor";
public static final String GLOSSARY_CATEGORY_EDGE_LABEL = "r:AtlasGlossaryCategoryAnchor";

/**
* MESH property keys.
*/
public static final String DATA_DOMAIN_ENTITY_TYPE = "DataDomain";
public static final String DATA_PRODUCT_ENTITY_TYPE = "DataProduct";
public static final String DATA_PRODUCT_EDGE_LABEL = "__DataDomain.dataProducts";
public static final String DOMAIN_PARENT_EDGE_LABEL = "__DataDomain.subDomains";



/**
* SQL property keys.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@
import org.apache.atlas.repository.store.graph.v2.preprocessor.PreProcessor;
import org.apache.atlas.repository.store.graph.v2.preprocessor.accesscontrol.PersonaPreProcessor;
import org.apache.atlas.repository.store.graph.v2.preprocessor.accesscontrol.PurposePreProcessor;
import org.apache.atlas.repository.store.graph.v2.preprocessor.datamesh.DataProductPreProcessor;
import org.apache.atlas.repository.store.graph.v2.preprocessor.datamesh.DomainPreProcessor;
import org.apache.atlas.repository.store.graph.v2.preprocessor.glossary.CategoryPreProcessor;
import org.apache.atlas.repository.store.graph.v2.preprocessor.glossary.GlossaryPreProcessor;
import org.apache.atlas.repository.store.graph.v2.preprocessor.glossary.TermPreProcessor;
Expand Down Expand Up @@ -1892,6 +1894,14 @@ public PreProcessor getPreProcessor(String typeName) {
preProcessor = new CategoryPreProcessor(typeRegistry, entityRetriever, graph, taskManagement, entityGraphMapper);
break;

case DATA_DOMAIN_ENTITY_TYPE:
preProcessor = new DomainPreProcessor(typeRegistry, entityRetriever, graph, entityGraphMapper);
break;

case DATA_PRODUCT_ENTITY_TYPE:
preProcessor = new DataProductPreProcessor(typeRegistry, entityRetriever, graph, entityGraphMapper);
break;

case QUERY_ENTITY_TYPE:
preProcessor = new QueryPreProcessor(typeRegistry, entityRetriever);
break;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,12 @@ public class PreProcessorUtils {
public static final String GLOSSARY_TERM_REL_TYPE = "AtlasGlossaryTermAnchor";
public static final String GLOSSARY_CATEGORY_REL_TYPE = "AtlasGlossaryCategoryAnchor";

//DataMesh models constants
public static final String DATA_PRODUCT_TYPE = "DataProduct";
public static final String PARENT_DOMAIN = "parentDomain";
public static final String PARENT_DOMAIN_QN = "parentDomainQualifiedName";
public static final String SUPER_DOMAIN_QN = "superDomainQualifiedName";

//Query models constants
public static final String PREFIX_QUERY_QN = "default/collection/";
public static final String COLLECTION_QUALIFIED_NAME = "collectionQualifiedName";
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.repository.store.graph.v2.preprocessor.datamesh;

import org.apache.atlas.AtlasException;
import org.apache.atlas.RequestContext;
import org.apache.atlas.authorize.AtlasAuthorizationUtils;
import org.apache.atlas.authorize.AtlasEntityAccessRequest;
import org.apache.atlas.authorize.AtlasPrivilege;
import org.apache.atlas.discovery.EntityDiscoveryService;
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.model.discovery.IndexSearchParams;
import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.atlas.model.instance.AtlasEntityHeader;
import org.apache.atlas.repository.graphdb.AtlasGraph;
import org.apache.atlas.repository.graphdb.AtlasVertex;
import org.apache.atlas.repository.store.graph.v2.EntityGraphRetriever;
import org.apache.atlas.repository.store.graph.v2.preprocessor.PreProcessor;
import org.apache.atlas.repository.store.graph.v2.preprocessor.glossary.AbstractGlossaryPreProcessor;
import org.apache.atlas.type.AtlasEntityType;
import org.apache.atlas.type.AtlasTypeRegistry;
import org.apache.atlas.utils.AtlasPerfMetrics;
import org.apache.commons.collections.CollectionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;

import static org.apache.atlas.repository.Constants.NAME;
import static org.apache.atlas.repository.util.AtlasEntityUtils.mapOf;

public abstract class AbstractDomainPreProcessor implements PreProcessor {
private static final Logger LOG = LoggerFactory.getLogger(AbstractGlossaryPreProcessor.class);


protected final AtlasTypeRegistry typeRegistry;
protected final EntityGraphRetriever entityRetriever;

protected EntityDiscoveryService discovery;

AbstractDomainPreProcessor(AtlasTypeRegistry typeRegistry, EntityGraphRetriever entityRetriever, AtlasGraph graph) {
this.entityRetriever = entityRetriever;
this.typeRegistry = typeRegistry;

try {
this.discovery = new EntityDiscoveryService(typeRegistry, graph, null, null, null, null);
} catch (AtlasException e) {
e.printStackTrace();
}
}

public List<AtlasEntityHeader> indexSearchPaginated(Map<String, Object> dsl) throws AtlasBaseException {
IndexSearchParams searchParams = new IndexSearchParams();
List<AtlasEntityHeader> ret = new ArrayList<>();

List<Map> sortList = new ArrayList<>(0);
sortList.add(mapOf("__timestamp", mapOf("order", "asc")));
sortList.add(mapOf("__guid", mapOf("order", "asc")));
dsl.put("sort", sortList);

int from = 0;
int size = 100;
boolean hasMore = true;
do {
dsl.put("from", from);
dsl.put("size", size);
searchParams.setDsl(dsl);

List<AtlasEntityHeader> headers = discovery.directIndexSearch(searchParams).getEntities();

if (CollectionUtils.isNotEmpty(headers)) {
ret.addAll(headers);
} else {
hasMore = false;
}

from += size;

} while (hasMore);

return ret;
}

protected void isAuthorized(AtlasEntityHeader sourceDomain, AtlasEntityHeader targetDomain) throws AtlasBaseException {

// source -> CREATE + UPDATE + DELETE
AtlasAuthorizationUtils.verifyAccess(new AtlasEntityAccessRequest(typeRegistry, AtlasPrivilege.ENTITY_CREATE, sourceDomain),
"create on source Domain: ", sourceDomain.getAttribute(NAME));

AtlasAuthorizationUtils.verifyAccess(new AtlasEntityAccessRequest(typeRegistry, AtlasPrivilege.ENTITY_UPDATE, sourceDomain),
"update on source Domain: ", sourceDomain.getAttribute(NAME));

AtlasAuthorizationUtils.verifyAccess(new AtlasEntityAccessRequest(typeRegistry, AtlasPrivilege.ENTITY_DELETE, sourceDomain),
"delete on source Domain: ", sourceDomain.getAttribute(NAME));


// target -> CREATE + UPDATE + DELETE
AtlasAuthorizationUtils.verifyAccess(new AtlasEntityAccessRequest(typeRegistry, AtlasPrivilege.ENTITY_CREATE, targetDomain),
"create on target Domain: ", targetDomain.getAttribute(NAME));

AtlasAuthorizationUtils.verifyAccess(new AtlasEntityAccessRequest(typeRegistry, AtlasPrivilege.ENTITY_UPDATE, targetDomain),
"update on target Domain: ", targetDomain.getAttribute(NAME));

AtlasAuthorizationUtils.verifyAccess(new AtlasEntityAccessRequest(typeRegistry, AtlasPrivilege.ENTITY_DELETE, targetDomain),
"delete on target Domain: ", targetDomain.getAttribute(NAME));
}

/**
* Record the updated child entities, it will be used to send notification and store audit logs
* @param entityVertex Child entity vertex
* @param updatedAttributes Updated attributes while updating required attributes on updating collection
*/
protected void recordUpdatedChildEntities(AtlasVertex entityVertex, Map<String, Object> updatedAttributes) {
RequestContext requestContext = RequestContext.get();
AtlasPerfMetrics.MetricRecorder metricRecorder = requestContext.startMetricRecord("recordUpdatedChildEntities");
AtlasEntity entity = new AtlasEntity();
entity = entityRetriever.mapSystemAttributes(entityVertex, entity);
entity.setAttributes(updatedAttributes);
requestContext.cacheDifferentialEntity(new AtlasEntity(entity));

AtlasEntityType entityType = typeRegistry.getEntityTypeByName(entity.getTypeName());

//Add the min info attributes to entity header to be sent as part of notification
if(entityType != null) {
AtlasEntity finalEntity = entity;
entityType.getMinInfoAttributes().values().stream().filter(attribute -> !updatedAttributes.containsKey(attribute.getName())).forEach(attribute -> {
Object attrValue = null;
try {
attrValue = entityRetriever.getVertexAttribute(entityVertex, attribute);
} catch (AtlasBaseException e) {
LOG.error("Error while getting vertex attribute", e);
}
if(attrValue != null) {
finalEntity.setAttribute(attribute.getName(), attrValue);
}
});
requestContext.recordEntityUpdate(new AtlasEntityHeader(finalEntity));
}

requestContext.endMetricRecord(metricRecorder);
}
}
Loading

0 comments on commit 2d810e3

Please sign in to comment.