Skip to content

Commit

Permalink
Merge pull request #3193 from atlanhq/dg-1321-attributes
Browse files Browse the repository at this point in the history
DG-1321 Internal attribute for input/output port relation for filtering.
  • Loading branch information
PRATHAM2002-DS authored Jun 17, 2024
2 parents 2e9583d + 492809d commit 6efb355
Show file tree
Hide file tree
Showing 5 changed files with 69 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,11 @@ public final class Constants {
public static final String REL_DOMAIN_TO_STAKEHOLDERS = "data_domain_stakeholders";
public static final String REL_STAKEHOLDER_TITLE_TO_STAKEHOLDERS = "stakeholder_title_stakeholders";

public static final String REL_DATA_PRODUCT_TO_OUTPUT_PORTS = "data_products_output_ports";
public static final String REL_DATA_PRODUCT_TO_INPUT_PORTS = "data_products_input_ports";

public static final String INPUT_PORT_PRODUCT_EDGE_LABEL = "__Asset.inputPortDataProducts";
public static final String OUTPUT_PORT_PRODUCT_EDGE_LABEL = "__Asset.outputPortDataProducts";

/**
* SQL property keys.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,17 +68,7 @@
import static org.apache.atlas.model.typedef.AtlasRelationshipDef.PropagateTags.NONE;
import static org.apache.atlas.model.typedef.AtlasRelationshipDef.PropagateTags.ONE_TO_TWO;
import static org.apache.atlas.model.typedef.AtlasRelationshipDef.PropagateTags.TWO_TO_ONE;
import static org.apache.atlas.repository.Constants.ENTITY_TYPE_PROPERTY_KEY;
import static org.apache.atlas.repository.Constants.HOME_ID_KEY;
import static org.apache.atlas.repository.Constants.PROVENANCE_TYPE_KEY;
import static org.apache.atlas.repository.Constants.RELATIONSHIPTYPE_TAG_PROPAGATION_KEY;
import static org.apache.atlas.repository.Constants.RELATIONSHIP_GUID_PROPERTY_KEY;
import static org.apache.atlas.repository.Constants.REL_DOMAIN_TO_DOMAINS;
import static org.apache.atlas.repository.Constants.REL_DOMAIN_TO_PRODUCTS;
import static org.apache.atlas.repository.Constants.REL_DOMAIN_TO_STAKEHOLDERS;
import static org.apache.atlas.repository.Constants.REL_POLICY_TO_ACCESS_CONTROL;
import static org.apache.atlas.repository.Constants.REL_STAKEHOLDER_TITLE_TO_STAKEHOLDERS;
import static org.apache.atlas.repository.Constants.VERSION_PROPERTY_KEY;
import static org.apache.atlas.repository.Constants.*;
import static org.apache.atlas.repository.graph.GraphHelper.getTypeName;
import static org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2.*;
import static org.apache.atlas.repository.store.graph.v2.tasks.ClassificationPropagateTaskFactory.CLASSIFICATION_PROPAGATION_RELATIONSHIP_UPDATE;
Expand Down Expand Up @@ -116,6 +106,8 @@ public class AtlasRelationshipStoreV2 implements AtlasRelationshipStore {
add(REL_DOMAIN_TO_STAKEHOLDERS);
add(REL_STAKEHOLDER_TITLE_TO_STAKEHOLDERS);
add(REL_POLICY_TO_ACCESS_CONTROL);
add(REL_DATA_PRODUCT_TO_OUTPUT_PORTS);
add(REL_DATA_PRODUCT_TO_INPUT_PORTS);
}};

public enum RelationshipMutation {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,8 @@
import static org.apache.atlas.repository.graph.GraphHelper.getPropagatableClassifications;
import static org.apache.atlas.repository.graph.GraphHelper.getClassificationEntityGuid;
import static org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2.*;
import static org.apache.atlas.repository.store.graph.v2.preprocessor.PreProcessorUtils.INPUT_PORT_GUIDS_ATTR;
import static org.apache.atlas.repository.store.graph.v2.preprocessor.PreProcessorUtils.OUTPUT_PORT_GUIDS_ATTR;
import static org.apache.atlas.repository.store.graph.v2.tasks.ClassificationPropagateTaskFactory.*;
import static org.apache.atlas.type.AtlasStructType.AtlasAttribute.AtlasRelationshipEdgeDirection.IN;
import static org.apache.atlas.type.AtlasStructType.AtlasAttribute.AtlasRelationshipEdgeDirection.OUT;
Expand Down Expand Up @@ -169,6 +171,7 @@ public class EntityGraphMapper {
private static final String TYPE_GLOSSARY= "AtlasGlossary";
private static final String TYPE_CATEGORY= "AtlasGlossaryCategory";
private static final String TYPE_TERM = "AtlasGlossaryTerm";
private static final String TYPE_PRODUCT = "DataProduct";
private static final String TYPE_PROCESS = "Process";
private static final String ATTR_MEANINGS = "meanings";
private static final String ATTR_ANCHOR = "anchor";
Expand Down Expand Up @@ -1904,7 +1907,7 @@ public List mapArrayValue(AttributeMutationContext ctx, EntityMutationContext co
AtlasAttribute inverseRefAttribute = attribute.getInverseRefAttribute();
Cardinality cardinality = attribute.getAttributeDef().getCardinality();
List<AtlasEdge> removedElements = new ArrayList<>();
List<Object> newElementsCreated = new ArrayList<>();
List<Object> newElementsCreated = new ArrayList<>();
List<Object> allArrayElements = null;
List<Object> currentElements;
boolean deleteExistingRelations = shouldDeleteExistingRelations(ctx, attribute);
Expand Down Expand Up @@ -2002,6 +2005,11 @@ public List mapArrayValue(AttributeMutationContext ctx, EntityMutationContext co
case PROCESS_INPUTS:
case PROCESS_OUTPUTS: addEdgesToContext(GraphHelper.getGuid(ctx.referringVertex), newElementsCreated, removedElements);
break;

case INPUT_PORT_PRODUCT_EDGE_LABEL:
case OUTPUT_PORT_PRODUCT_EDGE_LABEL:
addInternalProductAttr(ctx, newElementsCreated, removedElements);
break;
}

if (LOG.isDebugEnabled()) {
Expand Down Expand Up @@ -2087,6 +2095,11 @@ public List appendArrayValue(AttributeMutationContext ctx, EntityMutationContext
case PROCESS_INPUTS:
case PROCESS_OUTPUTS: addEdgesToContext(GraphHelper.getGuid(ctx.referringVertex), newElementsCreated, new ArrayList<>(0));
break;

case INPUT_PORT_PRODUCT_EDGE_LABEL:
case OUTPUT_PORT_PRODUCT_EDGE_LABEL:
addInternalProductAttr(ctx, newElementsCreated, null);
break;
}

if (LOG.isDebugEnabled()) {
Expand Down Expand Up @@ -2154,6 +2167,11 @@ public List removeArrayValue(AttributeMutationContext ctx, EntityMutationContext
case PROCESS_INPUTS:
case PROCESS_OUTPUTS: addEdgesToContext(GraphHelper.getGuid(ctx.referringVertex), new ArrayList<>(0), removedElements);
break;

case INPUT_PORT_PRODUCT_EDGE_LABEL:
case OUTPUT_PORT_PRODUCT_EDGE_LABEL:
addInternalProductAttr(ctx, null , removedElements);
break;
}

if (LOG.isDebugEnabled()) {
Expand Down Expand Up @@ -2191,6 +2209,40 @@ private void addEdgesToContext(String guid, List<Object> newElementsCreated, Lis
}
}

private void addInternalProductAttr(AttributeMutationContext ctx, List<Object> createdElements, List<AtlasEdge> deletedElements) throws AtlasBaseException {
MetricRecorder metricRecorder = RequestContext.get().startMetricRecord("addInternalProductAttrForAppend");
AtlasVertex toVertex = ctx.getReferringVertex();
String toVertexType = getTypeName(toVertex);

if (CollectionUtils.isEmpty(createdElements) && CollectionUtils.isEmpty(deletedElements)){
RequestContext.get().endMetricRecord(metricRecorder);
return;
}

if (TYPE_PRODUCT.equals(toVertexType)) {
String attrName = ctx.getAttribute().getRelationshipEdgeLabel().equals(OUTPUT_PORT_PRODUCT_EDGE_LABEL)
? OUTPUT_PORT_GUIDS_ATTR
: INPUT_PORT_GUIDS_ATTR;

addOrRemoveDaapInternalAttr(toVertex, attrName, createdElements, deletedElements);
}else{
throw new AtlasBaseException(AtlasErrorCode.BAD_REQUEST, "Can not update product relations while updating any asset");
}
RequestContext.get().endMetricRecord(metricRecorder);
}

private void addOrRemoveDaapInternalAttr(AtlasVertex toVertex, String internalAttr, List<Object> createdElements, List<AtlasEdge> deletedElements) {
if (CollectionUtils.isNotEmpty(createdElements)) {
List<String> addedGuids = createdElements.stream().map(x -> ((AtlasEdge) x).getOutVertex().getProperty("__guid", String.class)).collect(Collectors.toList());
addedGuids.forEach(guid -> AtlasGraphUtilsV2.addEncodedProperty(toVertex, internalAttr, guid));
}

if (CollectionUtils.isNotEmpty(deletedElements)) {
List<String> removedGuids = deletedElements.stream().map(x -> x.getOutVertex().getProperty("__guid", String.class)).collect(Collectors.toList());
removedGuids.forEach(guid -> AtlasGraphUtilsV2.removeItemFromListPropertyValue(toVertex, internalAttr, guid));
}
}

private boolean shouldDeleteExistingRelations(AttributeMutationContext ctx, AtlasAttribute attribute) {
boolean ret = false;
AtlasEntityType entityType = typeRegistry.getEntityTypeByName(AtlasGraphUtilsV2.getTypeName(ctx.getReferringVertex()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ public class PreProcessorUtils {
public static final String DAAP_VISIBILITY_ATTR = "daapVisibility";
public static final String DAAP_VISIBILITY_USERS_ATTR = "daapVisibilityUsers";
public static final String DAAP_VISIBILITY_GROUPS_ATTR = "daapVisibilityGroups";
public static final String OUTPUT_PORT_GUIDS_ATTR = "daapOutputPortGuids";
public static final String INPUT_PORT_GUIDS_ATTR = "daapInputPortGuids";

//Migration Constants
public static final String MIGRATION_TYPE_PREFIX = "MIGRATION:";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,9 @@ private void processCreateProduct(AtlasEntity entity,AtlasVertex vertex) throws
String productName = (String) entity.getAttribute(NAME);
String parentDomainQualifiedName = "";

entity.removeAttribute(OUTPUT_PORT_GUIDS_ATTR);
entity.removeAttribute(INPUT_PORT_GUIDS_ATTR);

if (parentDomainObject == null) {
throw new AtlasBaseException(OPERATION_NOT_SUPPORTED, "Cannot create a Product without a Domain Relationship");
} else {
Expand Down Expand Up @@ -109,6 +112,9 @@ private void processCreateProduct(AtlasEntity entity,AtlasVertex vertex) throws
private void processUpdateProduct(AtlasEntity entity, AtlasVertex vertex) throws AtlasBaseException {
AtlasPerfMetrics.MetricRecorder metricRecorder = RequestContext.get().startMetricRecord("processUpdateProduct");

entity.removeAttribute(OUTPUT_PORT_GUIDS_ATTR);
entity.removeAttribute(INPUT_PORT_GUIDS_ATTR);

if(entity.hasRelationshipAttribute(DATA_DOMAIN_REL_TYPE) && entity.getRelationshipAttribute(DATA_DOMAIN_REL_TYPE) == null){
throw new AtlasBaseException(AtlasErrorCode.BAD_REQUEST, "DataProduct can only be moved to another Domain.");
}
Expand Down

0 comments on commit 6efb355

Please sign in to comment.