diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/DataProductInputsOutputsMigrationService.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/DataProductInputsOutputsMigrationService.java index 65dd4b8e23..2f33a32481 100644 --- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/DataProductInputsOutputsMigrationService.java +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/DataProductInputsOutputsMigrationService.java @@ -1,19 +1,13 @@ package org.apache.atlas.repository.store.graph.v2; -import org.apache.atlas.AtlasErrorCode; import org.apache.atlas.exception.AtlasBaseException; -import org.apache.atlas.model.instance.AtlasEntity; -import org.apache.atlas.model.instance.AtlasRelatedObjectId; import org.apache.atlas.repository.graph.GraphHelper; -import org.apache.atlas.repository.graphdb.AtlasEdge; -import org.apache.atlas.repository.graphdb.AtlasEdgeDirection; import org.apache.atlas.repository.graphdb.AtlasVertex; import org.apache.commons.collections.CollectionUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.*; -import java.util.stream.Collectors; import static org.apache.atlas.repository.Constants.*; import static org.apache.atlas.repository.store.graph.v2.preprocessor.PreProcessorUtils.*; @@ -37,19 +31,25 @@ public DataProductInputsOutputsMigrationService(EntityGraphRetriever entityRetri public void migrateProduct() throws Exception { try { AtlasVertex productVertex = entityRetriever.getEntityVertex(this.productGuid); - if(productVertex == null) { - throw new AtlasBaseException(AtlasErrorCode.INSTANCE_GUID_NOT_FOUND, this.productGuid); + + boolean isCommitRequired = migrateAttr(productVertex); + if (isCommitRequired){ + LOG.info("Committing changes for Product: {}", this.productGuid); + commitChanges(); + } + else { + LOG.info("No changes to commit for Product: {} as no migration needed", this.productGuid); } - migrateAttr(productVertex); - commitChanges(); } catch (Exception e) { LOG.error("Error while migration inputs/outputs for Dataproduct: {}", this.productGuid, e); throw e; } } - private void migrateAttr(AtlasVertex vertex) throws AtlasBaseException { + private boolean migrateAttr(AtlasVertex vertex) throws AtlasBaseException { + boolean isCommitRequired = false; + List<String> outputPortsRelationGuids = getAssetGuids(vertex, OUTPUT_PORT_PRODUCT_EDGE_LABEL); List<String> outputPortGuidsAttr = vertex.getMultiValuedProperty(OUTPUT_PORT_GUIDS_ATTR, String.class); @@ -60,12 +60,16 @@ private void migrateAttr(AtlasVertex vertex) throws AtlasBaseException { if(!CollectionUtils.isEqualCollection(outputPortsRelationGuids, outputPortGuidsAttr)) { LOG.info("Migrating outputPort guid attribute: {} for Product: {}", OUTPUT_PORT_GUIDS_ATTR, this.productGuid); addInternalAttr(vertex, OUTPUT_PORT_GUIDS_ATTR, outputPortsRelationGuids); + isCommitRequired = true; } if(!CollectionUtils.isEqualCollection(inputPortsRelationGuids, inputPortGuidsAttr)) { LOG.info("Migrating inputPort guid attribute: {} for Product: {}", INPUT_PORT_GUIDS_ATTR, this.productGuid); addInternalAttr(vertex, INPUT_PORT_GUIDS_ATTR, inputPortsRelationGuids); + isCommitRequired = true; } + + return isCommitRequired; } public void commitChanges() throws AtlasBaseException { @@ -80,9 +84,9 @@ public void commitChanges() throws AtlasBaseException { private List<String> getAssetGuids(AtlasVertex vertex, String edgeLabel) throws AtlasBaseException { List<String> guids = new ArrayList<>(); - Iterator<AtlasVertex> activeChildren = GraphHelper.getActiveParentVertices(vertex, edgeLabel); - while(activeChildren.hasNext()) { - AtlasVertex child = activeChildren.next(); + Iterator<AtlasVertex> activeParent = GraphHelper.getActiveParentVertices(vertex, edgeLabel); + while(activeParent.hasNext()) { + AtlasVertex child = activeParent.next(); guids.add(child.getProperty(GUID_PROPERTY_KEY, String.class)); } return guids;