From a83801a777f714e5abc3e6a5a5ac5fc3a8f5ba8a Mon Sep 17 00:00:00 2001 From: hr2904 Date: Wed, 21 Aug 2024 17:50:01 +0530 Subject: [PATCH 1/2] Added init pagination to cleanup task logic. --- .../atlas/repository/graph/GraphHelper.java | 42 +++++----- .../store/graph/v2/EntityGraphMapper.java | 80 ++++++++++++------- 2 files changed, 73 insertions(+), 49 deletions(-) diff --git a/repository/src/main/java/org/apache/atlas/repository/graph/GraphHelper.java b/repository/src/main/java/org/apache/atlas/repository/graph/GraphHelper.java index d5926e8a00..7e5e95df23 100755 --- a/repository/src/main/java/org/apache/atlas/repository/graph/GraphHelper.java +++ b/repository/src/main/java/org/apache/atlas/repository/graph/GraphHelper.java @@ -377,35 +377,37 @@ public static AtlasVertex getClassificationVertex(AtlasVertex entityVertex, Stri return ret; } - public static List getAllClassificationVerticesByClassificationName(AtlasGraph graph, String classificationName) { - Iterable vertices = graph.query().has(TYPE_NAME_PROPERTY_KEY, classificationName).vertices(); - if (vertices == null) { - return Collections.emptyList(); - } - return IteratorUtils.toList(vertices.iterator()); - } - - public static List getAllAssetsWithClassificationAttached(AtlasGraph graph, String classificationName) { +// public static List getAllClassificationVerticesByClassificationName(AtlasGraph graph, String classificationName) { +// Iterable vertices = graph.query().has(TYPE_NAME_PROPERTY_KEY, classificationName).vertices(); +// if (vertices == null) { +// return Collections.emptyList(); +// } +// return IteratorUtils.toList(vertices.iterator()); +// } + + public static List getClassificationVertexes(AtlasGraph graph, String classificationName) { Iterable classificationVertices = graph.query().has(TYPE_NAME_PROPERTY_KEY, classificationName).vertices(); if (classificationVertices == null) { return Collections.emptyList(); } List classificationVerticesList = IteratorUtils.toList(classificationVertices.iterator()); - LOG.info("classificationVerticesList size: {}", classificationVerticesList.size()); + LOG.info("classificationVerticesList size: {}", classificationVerticesList.size()); + } + + public static List getAllAssetsWithClassificationVertex(AtlasGraph graph, AtlasVertex classificationVertice) { HashSet entityVerticesSet = new HashSet<>(); - for (AtlasVertex classificationVertex : classificationVerticesList) { - Iterable attachedVertices = classificationVertex.query() - .direction(AtlasEdgeDirection.IN) - .label(CLASSIFICATION_LABEL).vertices(); - if (attachedVertices != null) { - Iterator attachedVerticesIterator = attachedVertices.iterator(); - while (attachedVerticesIterator.hasNext()) { - entityVerticesSet.add(attachedVerticesIterator.next()); - } - LOG.info("entityVerticesSet size: {}", entityVerticesSet.size()); + Iterable attachedVertices = classificationVertice.query() + .direction(AtlasEdgeDirection.IN) + .label(CLASSIFICATION_LABEL).vertices(); + if (attachedVertices != null) { + Iterator attachedVerticesIterator = attachedVertices.iterator(); + while (attachedVerticesIterator.hasNext()) { + entityVerticesSet.add(attachedVerticesIterator.next()); } + LOG.info("entityVerticesSet size: {}", entityVerticesSet.size()); } + return entityVerticesSet.stream().collect(Collectors.toList()); } public static AtlasEdge getClassificationEdge(AtlasVertex entityVertex, AtlasVertex classificationVertex) { diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphMapper.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphMapper.java index f52ef2f56d..bbaab1c1ec 100644 --- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphMapper.java +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphMapper.java @@ -78,6 +78,7 @@ import org.apache.atlas.utils.AtlasPerfTracer; import org.apache.commons.codec.digest.DigestUtils; import org.apache.commons.collections.CollectionUtils; +import org.apache.commons.collections.IteratorUtils; import org.apache.commons.collections.MapUtils; import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; @@ -189,6 +190,7 @@ public class EntityGraphMapper { private static final boolean RESTRICT_PROPAGATION_THROUGH_LINEAGE_DEFAULT = false; private static final boolean RESTRICT_PROPAGATION_THROUGH_HIERARCHY_DEFAULT = false; + public static final int CLEANUP_BATCH_SIZE = 200000; private boolean DEFERRED_ACTION_ENABLED = AtlasConfiguration.TASKS_USE_ENABLED.getBoolean(); private boolean DIFFERENTIAL_AUDITS = STORE_DIFFERENTIAL_AUDITS.getBoolean(); @@ -3031,40 +3033,60 @@ private void updateInConsistentOwnedMapVertices(AttributeMutationContext ctx, At } } + public void cleanUpClassificationPropagation(String classificationName) throws AtlasBaseException { - List vertices = GraphHelper.getAllAssetsWithClassificationAttached(graph, classificationName); - int totalVertexSize = vertices.size(); - LOG.info("To clean up tag {} from {} entities", classificationName, totalVertexSize); - int toIndex; - int offset = 0; - do { - toIndex = Math.min((offset + CHUNK_SIZE), totalVertexSize); - List entityVertices = vertices.subList(offset, toIndex); - List impactedGuids = entityVertices.stream().map(GraphHelper::getGuid).collect(Collectors.toList()); - try { - GraphTransactionInterceptor.lockObjectAndReleasePostCommit(impactedGuids); - for (AtlasVertex vertex : entityVertices) { - List deletedClassifications = new ArrayList<>(); - List classificationEdges = GraphHelper.getClassificationEdges(vertex, null, classificationName); - for (AtlasEdge edge : classificationEdges) { - AtlasClassification classification = entityRetriever.toAtlasClassification(edge.getInVertex()); - deletedClassifications.add(classification); - deleteDelegate.getHandler().deleteEdgeReference(edge, TypeCategory.CLASSIFICATION, false, true, null, vertex); - } + List vertices = new ArrayList<>(); + List classificationVertices = GraphHelper.getClassificationVertexes(graph, classificationName); + List assetVertices = new ArrayList<>(); + for(int i = 0 ; i < classificationVertices.size(); i++) { + assetVertices = GraphHelper.getAllAssetsWithClassificationVertex(graph, classificationVertices.get(i)); + int prevTotalVertexSize = vertices.size(); + int assetsSize = assetVertices.size(); + LOG.info("Queue size before adding {} asset vertexes to it : {}", assetsSize, prevTotalVertexSize); + if(prevTotalVertexSize + assetsSize > CLEANUP_BATCH_SIZE) { + i--; + } + + if(prevTotalVertexSize + assetsSize <= CLEANUP_BATCH_SIZE) { + vertices.addAll(assetVertices); + assetVertices = new ArrayList<>(); + } + if(prevTotalVertexSize + assetsSize >= CLEANUP_BATCH_SIZE || + (prevTotalVertexSize + assetsSize < CLEANUP_BATCH_SIZE && i == classificationVertices.size() - 1)) { + int currentTotalVertexSize = vertices.size(); + LOG.info("To clean up tag {} from {} entities", classificationName, currentTotalVertexSize); + int toIndex; + int offset = 0; + do { + toIndex = Math.min((offset + CHUNK_SIZE), currentTotalVertexSize); + List entityVertices = vertices.subList(offset, toIndex); + List impactedGuids = entityVertices.stream().map(GraphHelper::getGuid).collect(Collectors.toList()); + try { + GraphTransactionInterceptor.lockObjectAndReleasePostCommit(impactedGuids); + for (AtlasVertex vertex : entityVertices) { + List deletedClassifications = new ArrayList<>(); + List classificationEdges = GraphHelper.getClassificationEdges(vertex, null, classificationName); + for (AtlasEdge edge : classificationEdges) { + AtlasClassification classification = entityRetriever.toAtlasClassification(edge.getInVertex()); + deletedClassifications.add(classification); + deleteDelegate.getHandler().deleteEdgeReference(edge, TypeCategory.CLASSIFICATION, false, true, null, vertex); + } - AtlasEntity entity = repairClassificationMappings(vertex); + AtlasEntity entity = repairClassificationMappings(vertex); - entityChangeNotifier.onClassificationDeletedFromEntity(entity, deletedClassifications); - } - offset += CHUNK_SIZE; - } finally { - transactionInterceptHelper.intercept(); - LOG.info("Cleaned up {} entities for classification {}", offset, classificationName); - } + entityChangeNotifier.onClassificationDeletedFromEntity(entity, deletedClassifications); + } + offset += CHUNK_SIZE; + } finally { + transactionInterceptHelper.intercept(); + LOG.info("Cleaned up {} entities for classification {}", offset, classificationName); + } - } while (offset < totalVertexSize); + } while (offset < currentTotalVertexSize); + vertices = new ArrayList<>(); + } + } // Fetch all classificationVertex by classificationName and delete them if remaining - List classificationVertices = GraphHelper.getAllClassificationVerticesByClassificationName(graph, classificationName); for (AtlasVertex classificationVertex : classificationVertices) { deleteDelegate.getHandler().deleteClassificationVertex(classificationVertex, true); } From ba409170235688c7441b49d5ea4ad847b96f7730 Mon Sep 17 00:00:00 2001 From: hr2904 Date: Wed, 21 Aug 2024 17:56:12 +0530 Subject: [PATCH 2/2] small syntax fix --- .../java/org/apache/atlas/repository/graph/GraphHelper.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/repository/src/main/java/org/apache/atlas/repository/graph/GraphHelper.java b/repository/src/main/java/org/apache/atlas/repository/graph/GraphHelper.java index 7e5e95df23..1e0bf25632 100755 --- a/repository/src/main/java/org/apache/atlas/repository/graph/GraphHelper.java +++ b/repository/src/main/java/org/apache/atlas/repository/graph/GraphHelper.java @@ -392,6 +392,7 @@ public static List getClassificationVertexes(AtlasGraph graph, Stri } List classificationVerticesList = IteratorUtils.toList(classificationVertices.iterator()); LOG.info("classificationVerticesList size: {}", classificationVerticesList.size()); + return classificationVerticesList; } public static List getAllAssetsWithClassificationVertex(AtlasGraph graph, AtlasVertex classificationVertice) { @@ -406,8 +407,6 @@ public static List getAllAssetsWithClassificationVertex(AtlasGraph } LOG.info("entityVerticesSet size: {}", entityVerticesSet.size()); } - - return entityVerticesSet.stream().collect(Collectors.toList()); } public static AtlasEdge getClassificationEdge(AtlasVertex entityVertex, AtlasVertex classificationVertex) {