Skip to content

Commit

Permalink
Merge pull request #3173 from atlanhq/dg-1321-attributes
Browse files Browse the repository at this point in the history
Dg 1321 attributes
  • Loading branch information
PRATHAM2002-DS authored Jun 4, 2024
2 parents b57b4e1 + 4b7217d commit 62c163f
Show file tree
Hide file tree
Showing 3 changed files with 80 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,8 @@ public final class Constants {

public static final String REL_DOMAIN_TO_STAKEHOLDERS = "data_domain_stakeholders";
public static final String REL_STAKEHOLDER_TITLE_TO_STAKEHOLDERS = "stakeholder_title_stakeholders";

public static final String INPUT_PORT_PRODUCT_EDGE_LABEL = "__Asset.inputPortDataProducts";
public static final String OUTPUT_PORT_PRODUCT_EDGE_LABEL = "__Asset.outputPortDataProducts";

/**
* SQL property keys.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,8 @@
import static org.apache.atlas.repository.graph.GraphHelper.getPropagatableClassifications;
import static org.apache.atlas.repository.graph.GraphHelper.getClassificationEntityGuid;
import static org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2.*;
import static org.apache.atlas.repository.store.graph.v2.preprocessor.PreProcessorUtils.INPUT_PORT_GUIDS_ATTR;
import static org.apache.atlas.repository.store.graph.v2.preprocessor.PreProcessorUtils.OUTPUT_PORT_GUIDS_ATTR;
import static org.apache.atlas.repository.store.graph.v2.tasks.ClassificationPropagateTaskFactory.*;
import static org.apache.atlas.type.AtlasStructType.AtlasAttribute.AtlasRelationshipEdgeDirection.IN;
import static org.apache.atlas.type.AtlasStructType.AtlasAttribute.AtlasRelationshipEdgeDirection.OUT;
Expand Down Expand Up @@ -169,6 +171,7 @@ public class EntityGraphMapper {
private static final String TYPE_GLOSSARY= "AtlasGlossary";
private static final String TYPE_CATEGORY= "AtlasGlossaryCategory";
private static final String TYPE_TERM = "AtlasGlossaryTerm";
private static final String TYPE_PRODUCT = "DataProduct";
private static final String TYPE_PROCESS = "Process";
private static final String ATTR_MEANINGS = "meanings";
private static final String ATTR_ANCHOR = "anchor";
Expand Down Expand Up @@ -2002,6 +2005,11 @@ public List mapArrayValue(AttributeMutationContext ctx, EntityMutationContext co
case PROCESS_INPUTS:
case PROCESS_OUTPUTS: addEdgesToContext(GraphHelper.getGuid(ctx.referringVertex), newElementsCreated, removedElements);
break;

case INPUT_PORT_PRODUCT_EDGE_LABEL:
case OUTPUT_PORT_PRODUCT_EDGE_LABEL:
addInternalProductAttr(ctx, newElementsCreated, removedElements, currentElements);
break;
}

if (LOG.isDebugEnabled()) {
Expand Down Expand Up @@ -2031,6 +2039,7 @@ public List appendArrayValue(AttributeMutationContext ctx, EntityMutationContext

boolean isNewElementsNull = newElements == null;


if (isNewElementsNull) {
newElements = new ArrayList();
}
Expand Down Expand Up @@ -2087,6 +2096,11 @@ public List appendArrayValue(AttributeMutationContext ctx, EntityMutationContext
case PROCESS_INPUTS:
case PROCESS_OUTPUTS: addEdgesToContext(GraphHelper.getGuid(ctx.referringVertex), newElementsCreated, new ArrayList<>(0));
break;

case INPUT_PORT_PRODUCT_EDGE_LABEL:
case OUTPUT_PORT_PRODUCT_EDGE_LABEL:
addInternalProductAttr(ctx, newElementsCreated, new ArrayList<>(0), null);
break;
}

if (LOG.isDebugEnabled()) {
Expand Down Expand Up @@ -2114,6 +2128,7 @@ public List removeArrayValue(AttributeMutationContext ctx, EntityMutationContext

boolean isNewElementsNull = elementsDeleted == null;


if (isNewElementsNull) {
elementsDeleted = new ArrayList();
}
Expand Down Expand Up @@ -2154,6 +2169,11 @@ public List removeArrayValue(AttributeMutationContext ctx, EntityMutationContext
case PROCESS_INPUTS:
case PROCESS_OUTPUTS: addEdgesToContext(GraphHelper.getGuid(ctx.referringVertex), new ArrayList<>(0), removedElements);
break;

case INPUT_PORT_PRODUCT_EDGE_LABEL:
case OUTPUT_PORT_PRODUCT_EDGE_LABEL:
addInternalProductAttr(ctx, new ArrayList<>(0) , removedElements, null);
break;
}

if (LOG.isDebugEnabled()) {
Expand Down Expand Up @@ -2191,6 +2211,62 @@ private void addEdgesToContext(String guid, List<Object> newElementsCreated, Lis
}
}

private void addInternalProductAttr(AttributeMutationContext ctx, List<Object> createdElements, List<AtlasEdge> deletedElements, List<Object> currentElements) throws AtlasBaseException {
MetricRecorder metricRecorder = RequestContext.get().startMetricRecord("addInternalProductAttrForAppend");
AtlasVertex toVertex = ctx.getReferringVertex();
String toVertexType = getTypeName(toVertex);

if (TYPE_PRODUCT.equals(toVertexType)) {

if(currentElements == null){
AtlasAttribute attribute = ctx.getAttribute();
AtlasArrayType arrType = (AtlasArrayType) attribute.getAttributeType();
AtlasType elementType = arrType.getElementType();
boolean isStructType = (TypeCategory.STRUCT == elementType.getTypeCategory()) ||
(TypeCategory.STRUCT == attribute.getDefinedInType().getTypeCategory());

currentElements = (List) getCollectionElementsUsingRelationship(ctx.getReferringVertex(), attribute, isStructType);
}

if(ctx.getAttribute().getRelationshipEdgeLabel().equals(OUTPUT_PORT_PRODUCT_EDGE_LABEL)){
addOrRemoveInternalAttr(toVertex, OUTPUT_PORT_GUIDS_ATTR, createdElements, currentElements, deletedElements);
}
if (ctx.getAttribute().getRelationshipEdgeLabel().equals(INPUT_PORT_PRODUCT_EDGE_LABEL)) {
addOrRemoveInternalAttr(toVertex, INPUT_PORT_GUIDS_ATTR, createdElements, currentElements, deletedElements);
}
}else{
throw new AtlasBaseException(AtlasErrorCode.BAD_REQUEST, "Can not update product relations while updating any asset");
}
RequestContext.get().endMetricRecord(metricRecorder);
}

private void addOrRemoveInternalAttr(AtlasVertex toVertex, String internalAttr, List<Object> createdElements, List<Object> currentElements, List<AtlasEdge> deletedElements) {
List<String> portGuids = toVertex.getMultiValuedProperty(internalAttr, String.class);

if (CollectionUtils.isNotEmpty(currentElements) && CollectionUtils.isEmpty(portGuids)) {
List<String> currentGuids = currentElements.stream()
.map(x -> ((AtlasEdge) x).getOutVertex().getProperty("__guid", String.class))
.collect(Collectors.toList());

portGuids.addAll(currentGuids);
}

if (CollectionUtils.isNotEmpty(createdElements)) {
List<String> assetGuids = createdElements.stream().map(x -> ((AtlasEdge) x).getOutVertex().getProperty("__guid", String.class)).collect(Collectors.toList());
portGuids = (List<String>) CollectionUtils.union(portGuids, assetGuids);
}

if (CollectionUtils.isNotEmpty(deletedElements)) {
List<String> assetGuids = deletedElements.stream().map(x -> x.getOutVertex().getProperty("__guid", String.class)).collect(Collectors.toList());
portGuids.removeAll(assetGuids);
}

toVertex.removeProperty(internalAttr);
if (CollectionUtils.isNotEmpty(portGuids)) {
portGuids.forEach(guid -> AtlasGraphUtilsV2.addEncodedProperty(toVertex, internalAttr , guid));
}
}

private boolean shouldDeleteExistingRelations(AttributeMutationContext ctx, AtlasAttribute attribute) {
boolean ret = false;
AtlasEntityType entityType = typeRegistry.getEntityTypeByName(AtlasGraphUtilsV2.getTypeName(ctx.getReferringVertex()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,8 @@ public class PreProcessorUtils {
public static final String DAAP_VISIBILITY_ATTR = "daapVisibility";
public static final String DAAP_VISIBILITY_USERS_ATTR = "daapVisibilityUsers";
public static final String DAAP_VISIBILITY_GROUPS_ATTR = "daapVisibilityGroups";
public static final String OUTPUT_PORT_GUIDS_ATTR = "daapOutputPortGuids";
public static final String INPUT_PORT_GUIDS_ATTR = "daapInputPortGuids";

//Migration Constants
public static final String MIGRATION_TYPE_PREFIX = "MIGRATION:";
Expand Down

0 comments on commit 62c163f

Please sign in to comment.