Skip to content

Commit

Permalink
Merge pull request #3212 from atlanhq/dg-1322-migrations
Browse files Browse the repository at this point in the history
Dg 1322 Migrations to add internal attribute for output/input relations on Daaps for filtering
  • Loading branch information
nikhilbonte21 authored Jun 17, 2024
2 parents 6efb355 + 0960758 commit 3da0798
Show file tree
Hide file tree
Showing 2 changed files with 125 additions and 2 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
package org.apache.atlas.repository.store.graph.v2;

import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.repository.graph.GraphHelper;
import org.apache.atlas.repository.graphdb.AtlasVertex;
import org.apache.commons.collections.CollectionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.*;

import static org.apache.atlas.repository.Constants.*;
import static org.apache.atlas.repository.store.graph.v2.preprocessor.PreProcessorUtils.*;

public class DataProductInputsOutputsMigrationService {

private static final Logger LOG = LoggerFactory.getLogger(DataProductInputsOutputsMigrationService.class);

private final EntityGraphRetriever entityRetriever;


private String productGuid;
private final TransactionInterceptHelper transactionInterceptHelper;

public DataProductInputsOutputsMigrationService(EntityGraphRetriever entityRetriever, String productGuid, TransactionInterceptHelper transactionInterceptHelper) {
this.entityRetriever = entityRetriever;
this.transactionInterceptHelper = transactionInterceptHelper;
this.productGuid = productGuid;
}

public void migrateProduct() throws Exception {
try {
AtlasVertex productVertex = entityRetriever.getEntityVertex(this.productGuid);

boolean isCommitRequired = migrateAttr(productVertex);
if (isCommitRequired){
LOG.info("Committing changes for Product: {}", this.productGuid);
commitChanges();
}
else {
LOG.info("No changes to commit for Product: {} as no migration needed", this.productGuid);
}

} catch (Exception e) {
LOG.error("Error while migration inputs/outputs for Dataproduct: {}", this.productGuid, e);
throw e;
}
}

private boolean migrateAttr(AtlasVertex vertex) throws AtlasBaseException {
boolean isCommitRequired = false;

List<String> outputPortsRelationGuids = getAssetGuids(vertex, OUTPUT_PORT_PRODUCT_EDGE_LABEL);
List<String> outputPortGuidsAttr = vertex.getMultiValuedProperty(OUTPUT_PORT_GUIDS_ATTR, String.class);


List<String> inputPortsRelationGuids = getAssetGuids(vertex, INPUT_PORT_PRODUCT_EDGE_LABEL);
List<String> inputPortGuidsAttr = vertex.getMultiValuedProperty(INPUT_PORT_GUIDS_ATTR, String.class);

if(!CollectionUtils.isEqualCollection(outputPortsRelationGuids, outputPortGuidsAttr)) {
LOG.info("Migrating outputPort guid attribute: {} for Product: {}", OUTPUT_PORT_GUIDS_ATTR, this.productGuid);
addInternalAttr(vertex, OUTPUT_PORT_GUIDS_ATTR, outputPortsRelationGuids);
isCommitRequired = true;
}

if(!CollectionUtils.isEqualCollection(inputPortsRelationGuids, inputPortGuidsAttr)) {
LOG.info("Migrating inputPort guid attribute: {} for Product: {}", INPUT_PORT_GUIDS_ATTR, this.productGuid);
addInternalAttr(vertex, INPUT_PORT_GUIDS_ATTR, inputPortsRelationGuids);
isCommitRequired = true;
}

return isCommitRequired;
}

public void commitChanges() throws AtlasBaseException {
try {
transactionInterceptHelper.intercept();
LOG.info("Committed a entity to the graph");
} catch (Exception e){
LOG.error("Failed to commit asset: ", e);
throw e;
}
}

private List<String> getAssetGuids(AtlasVertex vertex, String edgeLabel) throws AtlasBaseException {
List<String> guids = new ArrayList<>();
Iterator<AtlasVertex> activeParent = GraphHelper.getActiveParentVertices(vertex, edgeLabel);
while(activeParent.hasNext()) {
AtlasVertex child = activeParent.next();
guids.add(child.getProperty(GUID_PROPERTY_KEY, String.class));
}
return guids;
}

private void addInternalAttr(AtlasVertex productVertex, String internalAttr, List<String> currentGuids){
productVertex.removeProperty(internalAttr);
if (CollectionUtils.isNotEmpty(currentGuids)) {
currentGuids.forEach(guid -> AtlasGraphUtilsV2.addEncodedProperty(productVertex, internalAttr , guid));
}
}
}
26 changes: 24 additions & 2 deletions webapp/src/main/java/org/apache/atlas/web/rest/MigrationREST.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,7 @@

import static org.apache.atlas.auth.client.keycloak.AtlasKeycloakClient.getKeycloakClient;
import static org.apache.atlas.repository.Constants.*;
import static org.apache.atlas.repository.store.graph.v2.preprocessor.PreProcessorUtils.DATA_MESH_QN;
import static org.apache.atlas.repository.store.graph.v2.preprocessor.PreProcessorUtils.MIGRATION_TYPE_PREFIX;
import static org.apache.atlas.repository.store.graph.v2.preprocessor.PreProcessorUtils.*;

@Path("migration")
@Singleton
Expand Down Expand Up @@ -147,6 +146,29 @@ public String getMigrationStatus(@QueryParam("migrationType") String migrationTy
}
}

@POST
@Path("dataproduct/inputs-outputs")
@Timed
public Boolean migrateProductInternalAttr (@QueryParam("guid") String guid) throws Exception {
AtlasPerfTracer perf = null;

try {
if (AtlasPerfTracer.isPerfTraceEnabled(PERF_LOG)) {
perf = AtlasPerfTracer.getPerfTracer(PERF_LOG, "MigrationREST.migrateProductInternalAttr(" + guid + ")");
}

DataProductInputsOutputsMigrationService migrationService = new DataProductInputsOutputsMigrationService(entityRetriever, guid, transactionInterceptHelper);
migrationService.migrateProduct();

} catch (Exception e) {
LOG.error("Error while migration inputs/outputs for Dataproduct: {}", guid, e);
throw e;
} finally {
AtlasPerfTracer.log(perf);
}
return Boolean.TRUE;
}

@POST
@Path("bootstrap/connections")
@Timed
Expand Down

0 comments on commit 3da0798

Please sign in to comment.