Skip to content

Commit

Permalink
Merge pull request #3154 from atlanhq/DG-1297-NPE-fix
Browse files Browse the repository at this point in the history
DG-1297 Fix NPE while migration
  • Loading branch information
nikhilbonte21 authored May 29, 2024
2 parents 821de7c + 5da96aa commit bcc19c5
Show file tree
Hide file tree
Showing 4 changed files with 99 additions and 69 deletions.
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package org.apache.atlas.repository.store.graph.v2;

import jline.internal.Log;
import org.apache.atlas.AtlasErrorCode;
import org.apache.atlas.RequestContext;
import org.apache.atlas.discovery.EntityDiscoveryService;
Expand All @@ -21,9 +20,7 @@
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;

import javax.inject.Inject;
import java.util.*;

import static org.apache.atlas.repository.Constants.*;
Expand All @@ -33,15 +30,16 @@
import static org.apache.atlas.repository.util.AccessControlUtils.ATTR_POLICY_CATEGORY;
import static org.apache.atlas.repository.util.AccessControlUtils.ATTR_POLICY_RESOURCES;

public class DataMeshQNMigrationService implements MigrationService, Runnable {
public class DataMeshQNMigrationService implements MigrationService {

private static final Logger LOG = LoggerFactory.getLogger(DataMeshQNMigrationService.class);

private final AtlasEntityStore entityStore;
private final EntityDiscoveryService discovery;
private final EntityGraphRetriever entityRetriever;

protected final AtlasTypeRegistry typeRegistry;
private final AtlasTypeRegistry typeRegistry;
private final RedisService redisService;
private Map<String, String> updatedPolicyResources;

private final int BATCH_SIZE = 20;
Expand All @@ -54,29 +52,38 @@ public class DataMeshQNMigrationService implements MigrationService, Runnable {
private boolean forceRegen;
private final TransactionInterceptHelper transactionInterceptHelper;

public DataMeshQNMigrationService(AtlasEntityStore entityStore, EntityDiscoveryService discovery, EntityGraphRetriever entityRetriever, AtlasTypeRegistry typeRegistry, TransactionInterceptHelper transactionInterceptHelper, boolean forceRegen) {
public DataMeshQNMigrationService(AtlasEntityStore entityStore, EntityDiscoveryService discovery, EntityGraphRetriever entityRetriever, AtlasTypeRegistry typeRegistry, TransactionInterceptHelper transactionInterceptHelper, RedisService redisService, boolean forceRegen) {
this.entityRetriever = entityRetriever;
this.entityStore = entityStore;
this.discovery = discovery;
this.typeRegistry = typeRegistry;
this.updatedPolicyResources = new HashMap<>();
this.counter = 0;
this.redisService = redisService;
this.transactionInterceptHelper = transactionInterceptHelper;
this.forceRegen = forceRegen;

this.updatedPolicyResources = new HashMap<>();
this.counter = 0;
}

public Boolean startMigration() throws Exception{
public void startMigration() throws Exception {
try {
redisService.putValue(DATA_MESH_QN, MigrationStatus.IN_PROGRESS.name());

Set<String> attributes = new HashSet<>(Arrays.asList(SUPER_DOMAIN_QN_ATTR, PARENT_DOMAIN_QN_ATTR, "__customAttributes"));
Set<String> attributes = new HashSet<>(Arrays.asList(SUPER_DOMAIN_QN_ATTR, PARENT_DOMAIN_QN_ATTR, "__customAttributes"));

List<AtlasEntityHeader> entities = getEntity(DATA_DOMAIN_ENTITY_TYPE, attributes, null);
List<AtlasEntityHeader> entities = getEntity(DATA_DOMAIN_ENTITY_TYPE, attributes, null);

for (AtlasEntityHeader superDomain: entities) {
skipSuperDomain = false;
updateChunk(superDomain);
for (AtlasEntityHeader superDomain: entities) {
skipSuperDomain = false;
updateChunk(superDomain);
}
} catch (Exception e) {
LOG.error("Migration failed", e);
redisService.putValue(DATA_MESH_QN, MigrationStatus.FAILED.name());
throw e;
}

return Boolean.TRUE;
redisService.putValue(DATA_MESH_QN, MigrationStatus.SUCCESSFUL.name());
}

private void updateChunk(AtlasEntityHeader atlasEntity) throws AtlasBaseException {
Expand All @@ -86,8 +93,9 @@ private void updateChunk(AtlasEntityHeader atlasEntity) throws AtlasBaseExceptio
try{
migrateDomainAttributes(vertex, "", "");

commitChanges();
LOG.info("Migrated qualified name for entity: {}", qualifiedName);
if (counter > 0) {
commitChanges();
}

} catch (AtlasBaseException e){
this.errorOccured = true;
Expand All @@ -100,18 +108,14 @@ private void migrateDomainAttributes(AtlasVertex vertex, String parentDomainQual
return;
}

counter++;

String currentQualifiedName = vertex.getProperty(QUALIFIED_NAME,String.class);
String updatedQualifiedName = createDomainQualifiedName(parentDomainQualifiedName);

LOG.info("Migrating qualified name for Domain: {}", currentQualifiedName);

Map<String, Object> updatedAttributes = new HashMap<>();

Map<String,String> customAttributes = GraphHelper.getCustomAttributes(vertex);
if(!this.forceRegen && customAttributes != null && customAttributes.get(MIGRATION_CUSTOM_ATTRIBUTE) != null && customAttributes.get(MIGRATION_CUSTOM_ATTRIBUTE).equals("true")){
LOG.info("Entity already migrated for entity: {}", currentQualifiedName);
LOG.info("Entity already migrated: {}", currentQualifiedName);

updatedQualifiedName = vertex.getProperty(QUALIFIED_NAME,String.class);

Expand All @@ -120,40 +124,42 @@ private void migrateDomainAttributes(AtlasVertex vertex, String parentDomainQual
}

} else {
counter++;
LOG.info("Migrating qualified name for Domain: {} to {}", currentQualifiedName, updatedQualifiedName);
superDomainQualifiedName = commitChangesInMemory(currentQualifiedName, updatedQualifiedName, parentDomainQualifiedName, superDomainQualifiedName, vertex, updatedAttributes);
}

if (!skipSuperDomain) {
Iterator<AtlasVertex> products = getAllChildrenVertices(vertex, DATA_PRODUCT_EDGE_LABEL);

while (products.hasNext()) {
try {
AtlasVertex productVertex = products.next();
if (Objects.nonNull(productVertex)) {
migrateDataProductAttributes(productVertex, updatedQualifiedName, superDomainQualifiedName);
}
if (skipSuperDomain)
break;
List<AtlasVertex> productsList = new ArrayList<>();
products.forEachRemaining(productsList::add);

for (AtlasVertex productVertex : productsList) {
if (Objects.nonNull(productVertex)) {
migrateDataProductAttributes(productVertex, updatedQualifiedName, superDomainQualifiedName);
} else {
LOG.warn("Found null product vertex");
}
catch (NullPointerException e){
LOG.error("Null Pointer Exception occured for subdomains with parent : " + currentQualifiedName, e);

if (skipSuperDomain) {
break;
}
}

// Get all children domains of current domain
Iterator<AtlasVertex> childDomains = getAllChildrenVertices(vertex, DOMAIN_PARENT_EDGE_LABEL);

while (childDomains.hasNext()) {
try {
AtlasVertex childVertex = childDomains.next();
if (Objects.nonNull(childVertex)) {
migrateDomainAttributes(childVertex, updatedQualifiedName, superDomainQualifiedName);
}
if (skipSuperDomain)
break;
List<AtlasVertex> childDomainsList = new ArrayList<>();
childDomains.forEachRemaining(childDomainsList::add);

for (AtlasVertex childVertex : childDomainsList) {
if (Objects.nonNull(childVertex)) {
migrateDomainAttributes(childVertex, updatedQualifiedName, superDomainQualifiedName);
} else {
LOG.warn("Found null sub-domain vertex");
}
catch (NullPointerException e){
LOG.error("Null Pointer Exception occured for subdomains with parent : " + currentQualifiedName, e);

if (skipSuperDomain) {
break;
}
}

Expand All @@ -164,24 +170,27 @@ private void migrateDomainAttributes(AtlasVertex vertex, String parentDomainQual
}
}

public void commitChanges() {
public void commitChanges() throws AtlasBaseException {
try {
updatePolicy(this.updatedPolicyResources);
} catch (AtlasBaseException e) {
this.errorOccured = true;
this.skipSuperDomain = true;
LOG.error("Failed to update set of policies: ", e);
LOG.error("Failed policies: {}", AtlasType.toJson(this.updatedPolicyResources));
throw e;
} finally {
this.updatedPolicyResources.clear();
}

try {
transactionInterceptHelper.intercept();
LOG.info("Committed a batch to the graph");
} catch (Exception e){
this.skipSuperDomain = true;
this.errorOccured = true;
LOG.error("Failed to commit set of assets: ", e);
throw e;
} finally {
this.counter = 0;
}
Expand Down Expand Up @@ -225,18 +234,17 @@ private void migrateDataProductAttributes(AtlasVertex vertex, String parentDomai
return;
}

counter++;

String currentQualifiedName = vertex.getProperty(QUALIFIED_NAME,String.class);
String updatedQualifiedName = createProductQualifiedName(parentDomainQualifiedName);

LOG.info("Migrating qualified name for Product: {}", currentQualifiedName);
Map<String,String> customAttributes = GraphHelper.getCustomAttributes(vertex);

if(!this.forceRegen && customAttributes != null && customAttributes.get(MIGRATION_CUSTOM_ATTRIBUTE) != null && customAttributes.get(MIGRATION_CUSTOM_ATTRIBUTE).equals("true")) {
LOG.info("Product already migrated: {}", currentQualifiedName);

} else {
counter++;
LOG.info("Migrating qualified name for Product: {} to {}", currentQualifiedName, updatedQualifiedName);
vertex.setProperty(QUALIFIED_NAME, updatedQualifiedName);

//Store domainPolicies and resources to be updated
Expand Down Expand Up @@ -445,7 +453,9 @@ public static Map<String, Object> mapOf(String key, Object value) {
@Override
public void run() {
try {
LOG.info("Starting migration: {}", DATA_MESH_QN);
startMigration();
LOG.info("Finished migration: {}", DATA_MESH_QN);
} catch (Exception e) {
LOG.error("Error running migration : {}",e.toString());
throw new RuntimeException(e);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
package org.apache.atlas.repository.store.graph.v2;

public interface MigrationService {
Boolean startMigration() throws Exception;
public interface MigrationService extends Runnable {
void startMigration() throws Exception;
}
Original file line number Diff line number Diff line change
Expand Up @@ -60,12 +60,14 @@ public class PreProcessorUtils {
public static final String DAAP_VISIBILITY_GROUPS_ATTR = "daapVisibilityGroups";

//Migration Constants
public static final String MIGRATION = "MIGRATION:";
public static final String DATA_MESH_QN = MIGRATION + "DATA_MESH_QN";
public static final String IN_PROGRESS = "IN_PROGRESS";
public static final String SUCCESSFUL = "SUCCESSFUL";
public static final String MIGRATION_TYPE_PREFIX = "MIGRATION:";
public static final String DATA_MESH_QN = MIGRATION_TYPE_PREFIX + "DATA_MESH_QN";

public static final String FAILED = "FAILED";
public enum MigrationStatus {
IN_PROGRESS,
SUCCESSFUL,
FAILED;
}

//Query models constants
public static final String PREFIX_QUERY_QN = "default/collection/";
Expand Down
46 changes: 32 additions & 14 deletions webapp/src/main/java/org/apache/atlas/web/rest/MigrationREST.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import org.apache.atlas.repository.graphdb.*;
import org.apache.atlas.repository.store.graph.AtlasEntityStore;
import org.apache.atlas.repository.store.graph.v2.*;
import org.apache.atlas.repository.store.graph.v2.preprocessor.PreProcessorUtils;
import org.apache.atlas.repository.store.users.KeycloakStore;
import org.apache.atlas.service.redis.RedisService;
import org.apache.atlas.transformer.PreProcessorPoliciesTransformer;
Expand All @@ -38,7 +39,7 @@
import static org.apache.atlas.auth.client.keycloak.AtlasKeycloakClient.getKeycloakClient;
import static org.apache.atlas.repository.Constants.*;
import static org.apache.atlas.repository.store.graph.v2.preprocessor.PreProcessorUtils.DATA_MESH_QN;
import static org.apache.atlas.repository.store.graph.v2.preprocessor.PreProcessorUtils.MIGRATION;
import static org.apache.atlas.repository.store.graph.v2.preprocessor.PreProcessorUtils.MIGRATION_TYPE_PREFIX;

@Path("migration")
@Singleton
Expand All @@ -57,7 +58,6 @@ public class MigrationREST {
private final PreProcessorPoliciesTransformer transformer;
private KeycloakStore keycloakStore;
private AtlasGraph graph;
DataMeshQNMigrationService dataMeshQNMigrationService;

private final EntityGraphRetriever entityRetriever;
private final RedisService redisService;
Expand All @@ -67,7 +67,8 @@ public class MigrationREST {
private final TransactionInterceptHelper transactionInterceptHelper;

@Inject
public MigrationREST(AtlasEntityStore entityStore, AtlasGraph graph, RedisService redisService,EntityDiscoveryService discovery, EntityGraphRetriever entityRetriever, AtlasTypeRegistry typeRegistry, TransactionInterceptHelper transactionInterceptHelper) {
public MigrationREST(AtlasEntityStore entityStore, AtlasGraph graph, RedisService redisService, EntityDiscoveryService discovery,
EntityGraphRetriever entityRetriever, AtlasTypeRegistry typeRegistry, TransactionInterceptHelper transactionInterceptHelper) {
this.entityStore = entityStore;
this.graph = graph;
this.transformer = new PreProcessorPoliciesTransformer();
Expand All @@ -82,21 +83,31 @@ public MigrationREST(AtlasEntityStore entityStore, AtlasGraph graph, RedisServic
@POST
@Path("submit")
@Timed
public Boolean submit (@QueryParam("migrationType") String migrationType,@QueryParam("forceMigration") boolean forceMigration) throws Exception {
public Boolean submit (@QueryParam("migrationType") String migrationType, @QueryParam("forceMigration") boolean forceMigration) throws Exception {
AtlasPerfTracer perf = null;
MigrationService migrationService;

try {
if (AtlasPerfTracer.isPerfTraceEnabled(PERF_LOG)) {
perf = AtlasPerfTracer.getPerfTracer(PERF_LOG, "MigrationREST.submit()");
perf = AtlasPerfTracer.getPerfTracer(PERF_LOG, "MigrationREST.submit(" + migrationType + ")");
}
if( (MIGRATION + migrationType).equals(DATA_MESH_QN)){
if(Objects.isNull(forceMigration) ){
forceMigration = false;
}
dataMeshQNMigrationService = new DataMeshQNMigrationService(entityStore, discovery, entityRetriever, typeRegistry, transactionInterceptHelper, forceMigration);
dataMeshQNMigrationService.run();

migrationType = MIGRATION_TYPE_PREFIX + migrationType;

isMigrationInProgress(migrationType);

switch (migrationType) {
case DATA_MESH_QN:
migrationService = new DataMeshQNMigrationService(entityStore, discovery, entityRetriever, typeRegistry, transactionInterceptHelper, redisService, forceMigration);
break;

default:
throw new AtlasBaseException(AtlasErrorCode.BAD_REQUEST, "Type of migration is not valid: " + migrationType);
}

Thread migrationThread = new Thread(migrationService);
migrationThread.start();

} catch (Exception e) {
LOG.error("Error while submitting migration", e);
return Boolean.FALSE;
Expand All @@ -106,19 +117,26 @@ public Boolean submit (@QueryParam("migrationType") String migrationType,@QueryP
return Boolean.TRUE;
}

private void isMigrationInProgress(String migrationType) throws AtlasBaseException {
String status = redisService.getValue(migrationType);
if (PreProcessorUtils.MigrationStatus.IN_PROGRESS.name().equals(status)) {
throw new AtlasBaseException(AtlasErrorCode.BAD_REQUEST,
String.format("Migration for %s is already in progress", migrationType));
}
}

@GET
@Path("status")
@Timed
public String getMigrationStatus(@QueryParam("migrationType") String migrationType) throws Exception{
public String getMigrationStatus(@QueryParam("migrationType") String migrationType) throws Exception {
AtlasPerfTracer perf = null;

try {
if (AtlasPerfTracer.isPerfTraceEnabled(PERF_LOG)) {
perf = AtlasPerfTracer.getPerfTracer(PERF_LOG, "MigrationREST.getMigrationStatus()");
perf = AtlasPerfTracer.getPerfTracer(PERF_LOG, "MigrationREST.getMigrationStatus(" + migrationType + ")");
}

String value = redisService.getValue(MIGRATION + migrationType);
String value = redisService.getValue(MIGRATION_TYPE_PREFIX + migrationType);

return Objects.nonNull(value) ? value : "No Migration Found with this key";
} catch (Exception e) {
Expand Down

0 comments on commit bcc19c5

Please sign in to comment.