Skip to content

Commit

Permalink
Merge pull request #3165 from atlanhq/tagcleanup
Browse files Browse the repository at this point in the history
fix: improve performance of tag cleanup task by fetching all vertices at once
  • Loading branch information
sumandas0 authored May 30, 2024
2 parents bcc19c5 + eac6511 commit 469ce90
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;

import static org.apache.atlas.AtlasErrorCode.RELATIONSHIP_CREATE_INVALID_PARAMS;
import static org.apache.atlas.model.instance.AtlasEntity.Status.ACTIVE;
Expand Down Expand Up @@ -384,21 +385,28 @@ public static List<AtlasVertex> getAllClassificationVerticesByClassificationName
return IteratorUtils.toList(vertices.iterator());
}

public static List<AtlasVertex> getAllAssetsWithClassificationAttached(AtlasGraph graph, String classificationName, int limit) {
AtlasGraphQuery query = graph.query();
AtlasGraphQuery hasPropagatedTraitNames = query.createChildQuery().has(PROPAGATED_TRAIT_NAMES_PROPERTY_KEY, classificationName);
AtlasGraphQuery hasTraitNames = query.createChildQuery().has(TRAIT_NAMES_PROPERTY_KEY, classificationName);
Iterable vertices = query.or(
Arrays.asList(
hasPropagatedTraitNames,
hasTraitNames
)
).vertices(limit);
if (vertices == null) {
public static List<AtlasVertex> getAllAssetsWithClassificationAttached(AtlasGraph graph, String classificationName) {
Iterable classificationVertices = graph.query().has(TYPE_NAME_PROPERTY_KEY, classificationName).vertices();
if (classificationVertices == null) {
return Collections.emptyList();
}
List<AtlasVertex> classificationVerticesList = IteratorUtils.toList(classificationVertices.iterator());
LOG.info("classificationVerticesList size: {}", classificationVerticesList.size());
HashSet<AtlasVertex> entityVerticesSet = new HashSet<>();
for (AtlasVertex classificationVertex : classificationVerticesList) {
Iterable attachedVertices = classificationVertex.query()
.direction(AtlasEdgeDirection.IN)
.label(CLASSIFICATION_LABEL).vertices();
if (attachedVertices != null) {
Iterator<AtlasVertex> attachedVerticesIterator = attachedVertices.iterator();
while (attachedVerticesIterator.hasNext()) {
entityVerticesSet.add(attachedVerticesIterator.next());
}
LOG.info("entityVerticesSet size: {}", entityVerticesSet.size());
}
}

return IteratorUtils.toList(vertices.iterator());
return entityVerticesSet.stream().collect(Collectors.toList());
}
public static AtlasEdge getClassificationEdge(AtlasVertex entityVertex, AtlasVertex classificationVertex) {
AtlasEdge ret = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2980,22 +2980,20 @@ private void updateInConsistentOwnedMapVertices(AttributeMutationContext ctx, At
}

public void cleanUpClassificationPropagation(String classificationName) throws AtlasBaseException {
int batchSize = 100;
int counter = 0;
while (true) {
List<AtlasVertex> vertices = GraphHelper.getAllAssetsWithClassificationAttached(graph, classificationName);
int totalVertexSize = vertices.size();
LOG.info("To clean up tag {} from {} entities", classificationName, totalVertexSize);
int toIndex;
int offset = 0;
do {
toIndex = Math.min((offset + CHUNK_SIZE), totalVertexSize);
List<AtlasVertex> entityVertices = vertices.subList(offset, toIndex);
List<String> impactedGuids = entityVertices.stream().map(GraphHelper::getGuid).collect(Collectors.toList());
try {

List<AtlasVertex> vertices = GraphHelper.getAllAssetsWithClassificationAttached(graph, classificationName, batchSize);
if (CollectionUtils.isEmpty(vertices)) {
LOG.info("No entities found for classification {}", classificationName);
break;
}
for(AtlasVertex vertex : vertices) {
String guid = GraphHelper.getGuid(vertex);
GraphTransactionInterceptor.lockObjectAndReleasePostCommit(guid);
GraphTransactionInterceptor.lockObjectAndReleasePostCommit(impactedGuids);
for (AtlasVertex vertex : entityVertices) {
List<AtlasClassification> deletedClassifications = new ArrayList<>();
List<AtlasEdge> classificationEdges = GraphHelper.getClassificationEdges(vertex, null, classificationName);

for (AtlasEdge edge : classificationEdges) {
AtlasClassification classification = entityRetriever.toAtlasClassification(edge.getInVertex());
deletedClassifications.add(classification);
Expand All @@ -3005,25 +3003,21 @@ public void cleanUpClassificationPropagation(String classificationName) throws A
AtlasEntity entity = repairClassificationMappings(vertex);

entityChangeNotifier.onClassificationDeletedFromEntity(entity, deletedClassifications);

counter++;
}
} catch (AtlasBaseException e) {
throw e;
offset += CHUNK_SIZE;
} finally {
transactionInterceptHelper.intercept();
LOG.info("Processed cleaning up {} entities", counter);
LOG.info("Cleaned up {} entities for classification {}", offset, classificationName);
}

// Fetch all classificationVertex by classificationName and delete them if remaining
List<AtlasVertex> classificationVertices = GraphHelper.getAllClassificationVerticesByClassificationName(graph, classificationName);
for (AtlasVertex classificationVertex : classificationVertices) {
deleteDelegate.getHandler().deleteClassificationVertex(classificationVertex, true);
}
transactionInterceptHelper.intercept();
LOG.info("Completed cleaning up classification {}", classificationName);

} while (offset < totalVertexSize);
// Fetch all classificationVertex by classificationName and delete them if remaining
List<AtlasVertex> classificationVertices = GraphHelper.getAllClassificationVerticesByClassificationName(graph, classificationName);
for (AtlasVertex classificationVertex : classificationVertices) {
deleteDelegate.getHandler().deleteClassificationVertex(classificationVertex, true);
}
transactionInterceptHelper.intercept();
LOG.info("Completed cleaning up classification {}", classificationName);
}

public AtlasEntity repairClassificationMappings(AtlasVertex entityVertex) throws AtlasBaseException {
Expand Down

0 comments on commit 469ce90

Please sign in to comment.