diff --git a/repository/src/main/java/org/apache/atlas/repository/graph/GraphHelper.java b/repository/src/main/java/org/apache/atlas/repository/graph/GraphHelper.java index 3eaba385cd..179d915df9 100755 --- a/repository/src/main/java/org/apache/atlas/repository/graph/GraphHelper.java +++ b/repository/src/main/java/org/apache/atlas/repository/graph/GraphHelper.java @@ -73,6 +73,7 @@ import java.util.Map; import java.util.Objects; import java.util.Set; +import java.util.stream.Collectors; import static org.apache.atlas.AtlasErrorCode.RELATIONSHIP_CREATE_INVALID_PARAMS; import static org.apache.atlas.model.instance.AtlasEntity.Status.ACTIVE; @@ -384,21 +385,28 @@ public static List getAllClassificationVerticesByClassificationName return IteratorUtils.toList(vertices.iterator()); } - public static List getAllAssetsWithClassificationAttached(AtlasGraph graph, String classificationName, int limit) { - AtlasGraphQuery query = graph.query(); - AtlasGraphQuery hasPropagatedTraitNames = query.createChildQuery().has(PROPAGATED_TRAIT_NAMES_PROPERTY_KEY, classificationName); - AtlasGraphQuery hasTraitNames = query.createChildQuery().has(TRAIT_NAMES_PROPERTY_KEY, classificationName); - Iterable vertices = query.or( - Arrays.asList( - hasPropagatedTraitNames, - hasTraitNames - ) - ).vertices(limit); - if (vertices == null) { + public static List getAllAssetsWithClassificationAttached(AtlasGraph graph, String classificationName) { + Iterable classificationVertices = graph.query().has(TYPE_NAME_PROPERTY_KEY, classificationName).vertices(); + if (classificationVertices == null) { return Collections.emptyList(); } + List classificationVerticesList = IteratorUtils.toList(classificationVertices.iterator()); + LOG.info("classificationVerticesList size: {}", classificationVerticesList.size()); + HashSet entityVerticesSet = new HashSet<>(); + for (AtlasVertex classificationVertex : classificationVerticesList) { + Iterable attachedVertices = classificationVertex.query() + .direction(AtlasEdgeDirection.IN) + .label(CLASSIFICATION_LABEL).vertices(); + if (attachedVertices != null) { + Iterator attachedVerticesIterator = attachedVertices.iterator(); + while (attachedVerticesIterator.hasNext()) { + entityVerticesSet.add(attachedVerticesIterator.next()); + } + LOG.info("entityVerticesSet size: {}", entityVerticesSet.size()); + } + } - return IteratorUtils.toList(vertices.iterator()); + return entityVerticesSet.stream().collect(Collectors.toList()); } public static AtlasEdge getClassificationEdge(AtlasVertex entityVertex, AtlasVertex classificationVertex) { AtlasEdge ret = null; diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphMapper.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphMapper.java index d5cec6de1d..97ff277872 100644 --- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphMapper.java +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphMapper.java @@ -2980,22 +2980,20 @@ private void updateInConsistentOwnedMapVertices(AttributeMutationContext ctx, At } public void cleanUpClassificationPropagation(String classificationName) throws AtlasBaseException { - int batchSize = 100; - int counter = 0; - while (true) { + List vertices = GraphHelper.getAllAssetsWithClassificationAttached(graph, classificationName); + int totalVertexSize = vertices.size(); + LOG.info("To clean up tag {} from {} entities", classificationName, totalVertexSize); + int toIndex; + int offset = 0; + do { + toIndex = Math.min((offset + CHUNK_SIZE), totalVertexSize); + List entityVertices = vertices.subList(offset, toIndex); + List impactedGuids = entityVertices.stream().map(GraphHelper::getGuid).collect(Collectors.toList()); try { - - List vertices = GraphHelper.getAllAssetsWithClassificationAttached(graph, classificationName, batchSize); - if (CollectionUtils.isEmpty(vertices)) { - LOG.info("No entities found for classification {}", classificationName); - break; - } - for(AtlasVertex vertex : vertices) { - String guid = GraphHelper.getGuid(vertex); - GraphTransactionInterceptor.lockObjectAndReleasePostCommit(guid); + GraphTransactionInterceptor.lockObjectAndReleasePostCommit(impactedGuids); + for (AtlasVertex vertex : entityVertices) { List deletedClassifications = new ArrayList<>(); List classificationEdges = GraphHelper.getClassificationEdges(vertex, null, classificationName); - for (AtlasEdge edge : classificationEdges) { AtlasClassification classification = entityRetriever.toAtlasClassification(edge.getInVertex()); deletedClassifications.add(classification); @@ -3005,25 +3003,21 @@ public void cleanUpClassificationPropagation(String classificationName) throws A AtlasEntity entity = repairClassificationMappings(vertex); entityChangeNotifier.onClassificationDeletedFromEntity(entity, deletedClassifications); - - counter++; } - } catch (AtlasBaseException e) { - throw e; + offset += CHUNK_SIZE; } finally { transactionInterceptHelper.intercept(); - LOG.info("Processed cleaning up {} entities", counter); + LOG.info("Cleaned up {} entities for classification {}", offset, classificationName); } - // Fetch all classificationVertex by classificationName and delete them if remaining - List classificationVertices = GraphHelper.getAllClassificationVerticesByClassificationName(graph, classificationName); - for (AtlasVertex classificationVertex : classificationVertices) { - deleteDelegate.getHandler().deleteClassificationVertex(classificationVertex, true); - } - transactionInterceptHelper.intercept(); - LOG.info("Completed cleaning up classification {}", classificationName); - + } while (offset < totalVertexSize); + // Fetch all classificationVertex by classificationName and delete them if remaining + List classificationVertices = GraphHelper.getAllClassificationVerticesByClassificationName(graph, classificationName); + for (AtlasVertex classificationVertex : classificationVertices) { + deleteDelegate.getHandler().deleteClassificationVertex(classificationVertex, true); } + transactionInterceptHelper.intercept(); + LOG.info("Completed cleaning up classification {}", classificationName); } public AtlasEntity repairClassificationMappings(AtlasVertex entityVertex) throws AtlasBaseException {