diff --git a/common/src/main/java/org/apache/atlas/repository/Constants.java b/common/src/main/java/org/apache/atlas/repository/Constants.java index 95a6e0c007..9408328b9f 100644 --- a/common/src/main/java/org/apache/atlas/repository/Constants.java +++ b/common/src/main/java/org/apache/atlas/repository/Constants.java @@ -150,6 +150,11 @@ public final class Constants { public static final String REL_DOMAIN_TO_STAKEHOLDERS = "data_domain_stakeholders"; public static final String REL_STAKEHOLDER_TITLE_TO_STAKEHOLDERS = "stakeholder_title_stakeholders"; + public static final String REL_DATA_PRODUCT_TO_OUTPUT_PORTS = "data_products_output_ports"; + public static final String REL_DATA_PRODUCT_TO_INPUT_PORTS = "data_products_input_ports"; + + public static final String INPUT_PORT_PRODUCT_EDGE_LABEL = "__Asset.inputPortDataProducts"; + public static final String OUTPUT_PORT_PRODUCT_EDGE_LABEL = "__Asset.outputPortDataProducts"; /** * SQL property keys. diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasRelationshipStoreV2.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasRelationshipStoreV2.java index 1459942192..afdf2825f1 100644 --- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasRelationshipStoreV2.java +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasRelationshipStoreV2.java @@ -68,17 +68,7 @@ import static org.apache.atlas.model.typedef.AtlasRelationshipDef.PropagateTags.NONE; import static org.apache.atlas.model.typedef.AtlasRelationshipDef.PropagateTags.ONE_TO_TWO; import static org.apache.atlas.model.typedef.AtlasRelationshipDef.PropagateTags.TWO_TO_ONE; -import static org.apache.atlas.repository.Constants.ENTITY_TYPE_PROPERTY_KEY; -import static org.apache.atlas.repository.Constants.HOME_ID_KEY; -import static org.apache.atlas.repository.Constants.PROVENANCE_TYPE_KEY; -import static org.apache.atlas.repository.Constants.RELATIONSHIPTYPE_TAG_PROPAGATION_KEY; -import static org.apache.atlas.repository.Constants.RELATIONSHIP_GUID_PROPERTY_KEY; -import static org.apache.atlas.repository.Constants.REL_DOMAIN_TO_DOMAINS; -import static org.apache.atlas.repository.Constants.REL_DOMAIN_TO_PRODUCTS; -import static org.apache.atlas.repository.Constants.REL_DOMAIN_TO_STAKEHOLDERS; -import static org.apache.atlas.repository.Constants.REL_POLICY_TO_ACCESS_CONTROL; -import static org.apache.atlas.repository.Constants.REL_STAKEHOLDER_TITLE_TO_STAKEHOLDERS; -import static org.apache.atlas.repository.Constants.VERSION_PROPERTY_KEY; +import static org.apache.atlas.repository.Constants.*; import static org.apache.atlas.repository.graph.GraphHelper.getTypeName; import static org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2.*; import static org.apache.atlas.repository.store.graph.v2.tasks.ClassificationPropagateTaskFactory.CLASSIFICATION_PROPAGATION_RELATIONSHIP_UPDATE; @@ -116,6 +106,8 @@ public class AtlasRelationshipStoreV2 implements AtlasRelationshipStore { add(REL_DOMAIN_TO_STAKEHOLDERS); add(REL_STAKEHOLDER_TITLE_TO_STAKEHOLDERS); add(REL_POLICY_TO_ACCESS_CONTROL); + add(REL_DATA_PRODUCT_TO_OUTPUT_PORTS); + add(REL_DATA_PRODUCT_TO_INPUT_PORTS); }}; public enum RelationshipMutation { diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphMapper.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphMapper.java index 97ff277872..596420696d 100644 --- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphMapper.java +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphMapper.java @@ -137,6 +137,8 @@ import static org.apache.atlas.repository.graph.GraphHelper.getPropagatableClassifications; import static org.apache.atlas.repository.graph.GraphHelper.getClassificationEntityGuid; import static org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2.*; +import static org.apache.atlas.repository.store.graph.v2.preprocessor.PreProcessorUtils.INPUT_PORT_GUIDS_ATTR; +import static org.apache.atlas.repository.store.graph.v2.preprocessor.PreProcessorUtils.OUTPUT_PORT_GUIDS_ATTR; import static org.apache.atlas.repository.store.graph.v2.tasks.ClassificationPropagateTaskFactory.*; import static org.apache.atlas.type.AtlasStructType.AtlasAttribute.AtlasRelationshipEdgeDirection.IN; import static org.apache.atlas.type.AtlasStructType.AtlasAttribute.AtlasRelationshipEdgeDirection.OUT; @@ -169,6 +171,7 @@ public class EntityGraphMapper { private static final String TYPE_GLOSSARY= "AtlasGlossary"; private static final String TYPE_CATEGORY= "AtlasGlossaryCategory"; private static final String TYPE_TERM = "AtlasGlossaryTerm"; + private static final String TYPE_PRODUCT = "DataProduct"; private static final String TYPE_PROCESS = "Process"; private static final String ATTR_MEANINGS = "meanings"; private static final String ATTR_ANCHOR = "anchor"; @@ -1904,7 +1907,7 @@ public List mapArrayValue(AttributeMutationContext ctx, EntityMutationContext co AtlasAttribute inverseRefAttribute = attribute.getInverseRefAttribute(); Cardinality cardinality = attribute.getAttributeDef().getCardinality(); List removedElements = new ArrayList<>(); - List newElementsCreated = new ArrayList<>(); + List newElementsCreated = new ArrayList<>(); List allArrayElements = null; List currentElements; boolean deleteExistingRelations = shouldDeleteExistingRelations(ctx, attribute); @@ -2002,6 +2005,11 @@ public List mapArrayValue(AttributeMutationContext ctx, EntityMutationContext co case PROCESS_INPUTS: case PROCESS_OUTPUTS: addEdgesToContext(GraphHelper.getGuid(ctx.referringVertex), newElementsCreated, removedElements); break; + + case INPUT_PORT_PRODUCT_EDGE_LABEL: + case OUTPUT_PORT_PRODUCT_EDGE_LABEL: + addInternalProductAttr(ctx, newElementsCreated, removedElements); + break; } if (LOG.isDebugEnabled()) { @@ -2087,6 +2095,11 @@ public List appendArrayValue(AttributeMutationContext ctx, EntityMutationContext case PROCESS_INPUTS: case PROCESS_OUTPUTS: addEdgesToContext(GraphHelper.getGuid(ctx.referringVertex), newElementsCreated, new ArrayList<>(0)); break; + + case INPUT_PORT_PRODUCT_EDGE_LABEL: + case OUTPUT_PORT_PRODUCT_EDGE_LABEL: + addInternalProductAttr(ctx, newElementsCreated, null); + break; } if (LOG.isDebugEnabled()) { @@ -2154,6 +2167,11 @@ public List removeArrayValue(AttributeMutationContext ctx, EntityMutationContext case PROCESS_INPUTS: case PROCESS_OUTPUTS: addEdgesToContext(GraphHelper.getGuid(ctx.referringVertex), new ArrayList<>(0), removedElements); break; + + case INPUT_PORT_PRODUCT_EDGE_LABEL: + case OUTPUT_PORT_PRODUCT_EDGE_LABEL: + addInternalProductAttr(ctx, null , removedElements); + break; } if (LOG.isDebugEnabled()) { @@ -2191,6 +2209,40 @@ private void addEdgesToContext(String guid, List newElementsCreated, Lis } } + private void addInternalProductAttr(AttributeMutationContext ctx, List createdElements, List deletedElements) throws AtlasBaseException { + MetricRecorder metricRecorder = RequestContext.get().startMetricRecord("addInternalProductAttrForAppend"); + AtlasVertex toVertex = ctx.getReferringVertex(); + String toVertexType = getTypeName(toVertex); + + if (CollectionUtils.isEmpty(createdElements) && CollectionUtils.isEmpty(deletedElements)){ + RequestContext.get().endMetricRecord(metricRecorder); + return; + } + + if (TYPE_PRODUCT.equals(toVertexType)) { + String attrName = ctx.getAttribute().getRelationshipEdgeLabel().equals(OUTPUT_PORT_PRODUCT_EDGE_LABEL) + ? OUTPUT_PORT_GUIDS_ATTR + : INPUT_PORT_GUIDS_ATTR; + + addOrRemoveDaapInternalAttr(toVertex, attrName, createdElements, deletedElements); + }else{ + throw new AtlasBaseException(AtlasErrorCode.BAD_REQUEST, "Can not update product relations while updating any asset"); + } + RequestContext.get().endMetricRecord(metricRecorder); + } + + private void addOrRemoveDaapInternalAttr(AtlasVertex toVertex, String internalAttr, List createdElements, List deletedElements) { + if (CollectionUtils.isNotEmpty(createdElements)) { + List addedGuids = createdElements.stream().map(x -> ((AtlasEdge) x).getOutVertex().getProperty("__guid", String.class)).collect(Collectors.toList()); + addedGuids.forEach(guid -> AtlasGraphUtilsV2.addEncodedProperty(toVertex, internalAttr, guid)); + } + + if (CollectionUtils.isNotEmpty(deletedElements)) { + List removedGuids = deletedElements.stream().map(x -> x.getOutVertex().getProperty("__guid", String.class)).collect(Collectors.toList()); + removedGuids.forEach(guid -> AtlasGraphUtilsV2.removeItemFromListPropertyValue(toVertex, internalAttr, guid)); + } + } + private boolean shouldDeleteExistingRelations(AttributeMutationContext ctx, AtlasAttribute attribute) { boolean ret = false; AtlasEntityType entityType = typeRegistry.getEntityTypeByName(AtlasGraphUtilsV2.getTypeName(ctx.getReferringVertex())); diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/preprocessor/PreProcessorUtils.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/preprocessor/PreProcessorUtils.java index 2680e48207..3dc97fa642 100644 --- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/preprocessor/PreProcessorUtils.java +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/preprocessor/PreProcessorUtils.java @@ -58,6 +58,8 @@ public class PreProcessorUtils { public static final String DAAP_VISIBILITY_ATTR = "daapVisibility"; public static final String DAAP_VISIBILITY_USERS_ATTR = "daapVisibilityUsers"; public static final String DAAP_VISIBILITY_GROUPS_ATTR = "daapVisibilityGroups"; + public static final String OUTPUT_PORT_GUIDS_ATTR = "daapOutputPortGuids"; + public static final String INPUT_PORT_GUIDS_ATTR = "daapInputPortGuids"; //Migration Constants public static final String MIGRATION_TYPE_PREFIX = "MIGRATION:"; diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/preprocessor/datamesh/DataProductPreProcessor.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/preprocessor/datamesh/DataProductPreProcessor.java index ea974535b6..0851baa95c 100644 --- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/preprocessor/datamesh/DataProductPreProcessor.java +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/preprocessor/datamesh/DataProductPreProcessor.java @@ -81,6 +81,9 @@ private void processCreateProduct(AtlasEntity entity,AtlasVertex vertex) throws String productName = (String) entity.getAttribute(NAME); String parentDomainQualifiedName = ""; + entity.removeAttribute(OUTPUT_PORT_GUIDS_ATTR); + entity.removeAttribute(INPUT_PORT_GUIDS_ATTR); + if (parentDomainObject == null) { throw new AtlasBaseException(OPERATION_NOT_SUPPORTED, "Cannot create a Product without a Domain Relationship"); } else { @@ -109,6 +112,9 @@ private void processCreateProduct(AtlasEntity entity,AtlasVertex vertex) throws private void processUpdateProduct(AtlasEntity entity, AtlasVertex vertex) throws AtlasBaseException { AtlasPerfMetrics.MetricRecorder metricRecorder = RequestContext.get().startMetricRecord("processUpdateProduct"); + entity.removeAttribute(OUTPUT_PORT_GUIDS_ATTR); + entity.removeAttribute(INPUT_PORT_GUIDS_ATTR); + if(entity.hasRelationshipAttribute(DATA_DOMAIN_REL_TYPE) && entity.getRelationshipAttribute(DATA_DOMAIN_REL_TYPE) == null){ throw new AtlasBaseException(AtlasErrorCode.BAD_REQUEST, "DataProduct can only be moved to another Domain."); }