Skip to content

Commit

Permalink
Merge pull request #3245 from atlanhq/dg-1322-migrations
Browse files Browse the repository at this point in the history
Dg 1322 Migrations to add internal attribute for output/input relations on Daaps for filtering
  • Loading branch information
PRATHAM2002-DS authored Jun 13, 2024
2 parents bc71a12 + 4491938 commit f1ffa21
Showing 1 changed file with 18 additions and 14 deletions.
Original file line number Diff line number Diff line change
@@ -1,19 +1,13 @@
package org.apache.atlas.repository.store.graph.v2;

import org.apache.atlas.AtlasErrorCode;
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.atlas.model.instance.AtlasRelatedObjectId;
import org.apache.atlas.repository.graph.GraphHelper;
import org.apache.atlas.repository.graphdb.AtlasEdge;
import org.apache.atlas.repository.graphdb.AtlasEdgeDirection;
import org.apache.atlas.repository.graphdb.AtlasVertex;
import org.apache.commons.collections.CollectionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.*;
import java.util.stream.Collectors;

import static org.apache.atlas.repository.Constants.*;
import static org.apache.atlas.repository.store.graph.v2.preprocessor.PreProcessorUtils.*;
Expand All @@ -37,19 +31,25 @@ public DataProductInputsOutputsMigrationService(EntityGraphRetriever entityRetri
public void migrateProduct() throws Exception {
try {
AtlasVertex productVertex = entityRetriever.getEntityVertex(this.productGuid);
if(productVertex == null) {
throw new AtlasBaseException(AtlasErrorCode.INSTANCE_GUID_NOT_FOUND, this.productGuid);

boolean isCommitRequired = migrateAttr(productVertex);
if (isCommitRequired){
LOG.info("Committing changes for Product: {}", this.productGuid);
commitChanges();
}
else {
LOG.info("No changes to commit for Product: {} as no migration needed", this.productGuid);
}

migrateAttr(productVertex);
commitChanges();
} catch (Exception e) {
LOG.error("Error while migration inputs/outputs for Dataproduct: {}", this.productGuid, e);
throw e;
}
}

private void migrateAttr(AtlasVertex vertex) throws AtlasBaseException {
private boolean migrateAttr(AtlasVertex vertex) throws AtlasBaseException {
boolean isCommitRequired = false;

List<String> outputPortsRelationGuids = getAssetGuids(vertex, OUTPUT_PORT_PRODUCT_EDGE_LABEL);
List<String> outputPortGuidsAttr = vertex.getMultiValuedProperty(OUTPUT_PORT_GUIDS_ATTR, String.class);

Expand All @@ -60,12 +60,16 @@ private void migrateAttr(AtlasVertex vertex) throws AtlasBaseException {
if(!CollectionUtils.isEqualCollection(outputPortsRelationGuids, outputPortGuidsAttr)) {
LOG.info("Migrating outputPort guid attribute: {} for Product: {}", OUTPUT_PORT_GUIDS_ATTR, this.productGuid);
addInternalAttr(vertex, OUTPUT_PORT_GUIDS_ATTR, outputPortsRelationGuids);
isCommitRequired = true;
}

if(!CollectionUtils.isEqualCollection(inputPortsRelationGuids, inputPortGuidsAttr)) {
LOG.info("Migrating inputPort guid attribute: {} for Product: {}", INPUT_PORT_GUIDS_ATTR, this.productGuid);
addInternalAttr(vertex, INPUT_PORT_GUIDS_ATTR, inputPortsRelationGuids);
isCommitRequired = true;
}

return isCommitRequired;
}

public void commitChanges() throws AtlasBaseException {
Expand All @@ -80,9 +84,9 @@ public void commitChanges() throws AtlasBaseException {

private List<String> getAssetGuids(AtlasVertex vertex, String edgeLabel) throws AtlasBaseException {
List<String> guids = new ArrayList<>();
Iterator<AtlasVertex> activeChildren = GraphHelper.getActiveParentVertices(vertex, edgeLabel);
while(activeChildren.hasNext()) {
AtlasVertex child = activeChildren.next();
Iterator<AtlasVertex> activeParent = GraphHelper.getActiveParentVertices(vertex, edgeLabel);
while(activeParent.hasNext()) {
AtlasVertex child = activeParent.next();
guids.add(child.getProperty(GUID_PROPERTY_KEY, String.class));
}
return guids;
Expand Down

0 comments on commit f1ffa21

Please sign in to comment.