Skip to content

Commit

Permalink
Merge pull request #3405 from atlanhq/DG-1726-cleanUpPagination
Browse files Browse the repository at this point in the history
DG-1726 Cleanup Pagination
  • Loading branch information
hr2904 authored Aug 21, 2024
2 parents ca4fda5 + ba40917 commit bb71763
Show file tree
Hide file tree
Showing 2 changed files with 73 additions and 50 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -379,35 +379,36 @@ public static AtlasVertex getClassificationVertex(AtlasVertex entityVertex, Stri
return ret;
}

public static List<AtlasVertex> getAllClassificationVerticesByClassificationName(AtlasGraph graph, String classificationName) {
Iterable vertices = graph.query().has(TYPE_NAME_PROPERTY_KEY, classificationName).vertices();
if (vertices == null) {
return Collections.emptyList();
}
return IteratorUtils.toList(vertices.iterator());
}

public static List<AtlasVertex> getAllAssetsWithClassificationAttached(AtlasGraph graph, String classificationName) {
// public static List<AtlasVertex> getAllClassificationVerticesByClassificationName(AtlasGraph graph, String classificationName) {
// Iterable vertices = graph.query().has(TYPE_NAME_PROPERTY_KEY, classificationName).vertices();
// if (vertices == null) {
// return Collections.emptyList();
// }
// return IteratorUtils.toList(vertices.iterator());
// }

public static List<AtlasVertex> getClassificationVertexes(AtlasGraph graph, String classificationName) {
Iterable classificationVertices = graph.query().has(TYPE_NAME_PROPERTY_KEY, classificationName).vertices();
if (classificationVertices == null) {
return Collections.emptyList();
}
List<AtlasVertex> classificationVerticesList = IteratorUtils.toList(classificationVertices.iterator());
LOG.info("classificationVerticesList size: {}", classificationVerticesList.size());
LOG.info("classificationVerticesList size: {}", classificationVerticesList.size());
return classificationVerticesList;
}

public static List<AtlasVertex> getAllAssetsWithClassificationVertex(AtlasGraph graph, AtlasVertex classificationVertice) {
HashSet<AtlasVertex> entityVerticesSet = new HashSet<>();
for (AtlasVertex classificationVertex : classificationVerticesList) {
Iterable attachedVertices = classificationVertex.query()
.direction(AtlasEdgeDirection.IN)
.label(CLASSIFICATION_LABEL).vertices();
if (attachedVertices != null) {
Iterator<AtlasVertex> attachedVerticesIterator = attachedVertices.iterator();
while (attachedVerticesIterator.hasNext()) {
entityVerticesSet.add(attachedVerticesIterator.next());
}
LOG.info("entityVerticesSet size: {}", entityVerticesSet.size());
Iterable attachedVertices = classificationVertice.query()
.direction(AtlasEdgeDirection.IN)
.label(CLASSIFICATION_LABEL).vertices();
if (attachedVertices != null) {
Iterator<AtlasVertex> attachedVerticesIterator = attachedVertices.iterator();
while (attachedVerticesIterator.hasNext()) {
entityVerticesSet.add(attachedVerticesIterator.next());
}
LOG.info("entityVerticesSet size: {}", entityVerticesSet.size());
}

return entityVerticesSet.stream().collect(Collectors.toList());
}
public static AtlasEdge getClassificationEdge(AtlasVertex entityVertex, AtlasVertex classificationVertex) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@
import org.apache.atlas.utils.AtlasPerfTracer;
import org.apache.commons.codec.digest.DigestUtils;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.collections.IteratorUtils;
import org.apache.commons.collections.MapUtils;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
Expand Down Expand Up @@ -189,6 +190,7 @@ public class EntityGraphMapper {
private static final boolean RESTRICT_PROPAGATION_THROUGH_LINEAGE_DEFAULT = false;

private static final boolean RESTRICT_PROPAGATION_THROUGH_HIERARCHY_DEFAULT = false;
public static final int CLEANUP_BATCH_SIZE = 200000;
private boolean DEFERRED_ACTION_ENABLED = AtlasConfiguration.TASKS_USE_ENABLED.getBoolean();
private boolean DIFFERENTIAL_AUDITS = STORE_DIFFERENTIAL_AUDITS.getBoolean();

Expand Down Expand Up @@ -3097,40 +3099,60 @@ private void updateInConsistentOwnedMapVertices(AttributeMutationContext ctx, At
}
}


public void cleanUpClassificationPropagation(String classificationName) throws AtlasBaseException {
List<AtlasVertex> vertices = GraphHelper.getAllAssetsWithClassificationAttached(graph, classificationName);
int totalVertexSize = vertices.size();
LOG.info("To clean up tag {} from {} entities", classificationName, totalVertexSize);
int toIndex;
int offset = 0;
do {
toIndex = Math.min((offset + CHUNK_SIZE), totalVertexSize);
List<AtlasVertex> entityVertices = vertices.subList(offset, toIndex);
List<String> impactedGuids = entityVertices.stream().map(GraphHelper::getGuid).collect(Collectors.toList());
try {
GraphTransactionInterceptor.lockObjectAndReleasePostCommit(impactedGuids);
for (AtlasVertex vertex : entityVertices) {
List<AtlasClassification> deletedClassifications = new ArrayList<>();
List<AtlasEdge> classificationEdges = GraphHelper.getClassificationEdges(vertex, null, classificationName);
for (AtlasEdge edge : classificationEdges) {
AtlasClassification classification = entityRetriever.toAtlasClassification(edge.getInVertex());
deletedClassifications.add(classification);
deleteDelegate.getHandler().deleteEdgeReference(edge, TypeCategory.CLASSIFICATION, false, true, null, vertex);
}
List<AtlasVertex> vertices = new ArrayList<>();
List<AtlasVertex> classificationVertices = GraphHelper.getClassificationVertexes(graph, classificationName);
List<AtlasVertex> assetVertices = new ArrayList<>();
for(int i = 0 ; i < classificationVertices.size(); i++) {
assetVertices = GraphHelper.getAllAssetsWithClassificationVertex(graph, classificationVertices.get(i));
int prevTotalVertexSize = vertices.size();
int assetsSize = assetVertices.size();
LOG.info("Queue size before adding {} asset vertexes to it : {}", assetsSize, prevTotalVertexSize);
if(prevTotalVertexSize + assetsSize > CLEANUP_BATCH_SIZE) {
i--;
}

if(prevTotalVertexSize + assetsSize <= CLEANUP_BATCH_SIZE) {
vertices.addAll(assetVertices);
assetVertices = new ArrayList<>();
}
if(prevTotalVertexSize + assetsSize >= CLEANUP_BATCH_SIZE ||
(prevTotalVertexSize + assetsSize < CLEANUP_BATCH_SIZE && i == classificationVertices.size() - 1)) {
int currentTotalVertexSize = vertices.size();
LOG.info("To clean up tag {} from {} entities", classificationName, currentTotalVertexSize);
int toIndex;
int offset = 0;
do {
toIndex = Math.min((offset + CHUNK_SIZE), currentTotalVertexSize);
List<AtlasVertex> entityVertices = vertices.subList(offset, toIndex);
List<String> impactedGuids = entityVertices.stream().map(GraphHelper::getGuid).collect(Collectors.toList());
try {
GraphTransactionInterceptor.lockObjectAndReleasePostCommit(impactedGuids);
for (AtlasVertex vertex : entityVertices) {
List<AtlasClassification> deletedClassifications = new ArrayList<>();
List<AtlasEdge> classificationEdges = GraphHelper.getClassificationEdges(vertex, null, classificationName);
for (AtlasEdge edge : classificationEdges) {
AtlasClassification classification = entityRetriever.toAtlasClassification(edge.getInVertex());
deletedClassifications.add(classification);
deleteDelegate.getHandler().deleteEdgeReference(edge, TypeCategory.CLASSIFICATION, false, true, null, vertex);
}

AtlasEntity entity = repairClassificationMappings(vertex);
AtlasEntity entity = repairClassificationMappings(vertex);

entityChangeNotifier.onClassificationDeletedFromEntity(entity, deletedClassifications);
}
offset += CHUNK_SIZE;
} finally {
transactionInterceptHelper.intercept();
LOG.info("Cleaned up {} entities for classification {}", offset, classificationName);
}
entityChangeNotifier.onClassificationDeletedFromEntity(entity, deletedClassifications);
}
offset += CHUNK_SIZE;
} finally {
transactionInterceptHelper.intercept();
LOG.info("Cleaned up {} entities for classification {}", offset, classificationName);
}

} while (offset < totalVertexSize);
} while (offset < currentTotalVertexSize);
vertices = new ArrayList<>();
}
}
// Fetch all classificationVertex by classificationName and delete them if remaining
List<AtlasVertex> classificationVertices = GraphHelper.getAllClassificationVerticesByClassificationName(graph, classificationName);
for (AtlasVertex classificationVertex : classificationVertices) {
deleteDelegate.getHandler().deleteClassificationVertex(classificationVertex, true);
}
Expand Down

0 comments on commit bb71763

Please sign in to comment.