Skip to content

Commit

Permalink
Merge pull request #3291 from atlanhq/plt-1764-fix-delete-assets
Browse files Browse the repository at this point in the history
beta, Added metric Recorder for delete operations
  • Loading branch information
hitk6 authored Jul 1, 2024
2 parents f60c274 + eefb187 commit f0f0152
Show file tree
Hide file tree
Showing 4 changed files with 33 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
import org.apache.atlas.exception.EntityNotFoundException;
import org.apache.atlas.util.AttributeValueMap;
import org.apache.atlas.util.IndexedInstance;
import org.apache.atlas.utils.AtlasPerfMetrics.MetricRecorder;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.collections.IteratorUtils;
import org.apache.commons.lang.StringUtils;
Expand Down Expand Up @@ -869,6 +870,7 @@ public static List<String> getTraitNames(AtlasVertex entityVertex, Boolean propa
}

public static List<AtlasVertex> getPropagatableClassifications(AtlasEdge edge) {
MetricRecorder metric = RequestContext.get().startMetricRecord("getPropagatableClassifications");
List<AtlasVertex> ret = new ArrayList<>();

RequestContext requestContext = RequestContext.get();
Expand All @@ -887,7 +889,7 @@ public static List<AtlasVertex> getPropagatableClassifications(AtlasEdge edge) {
ret.addAll(getPropagationEnabledClassificationVertices(inVertex));
}
}

RequestContext.get().endMetricRecord(metric);
return ret;
}
//Returns the vertex from which the tag is being propagated
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
import org.apache.atlas.type.AtlasStructType.AtlasAttribute.AtlasRelationshipEdgeDirection;
import org.apache.atlas.utils.AtlasEntityUtil;
import org.apache.atlas.utils.AtlasPerfMetrics;
import org.apache.atlas.utils.AtlasPerfMetrics.MetricRecorder;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.collections.MapUtils;
import org.apache.commons.lang.StringUtils;
Expand Down Expand Up @@ -118,6 +119,7 @@ public DeleteHandlerV1(AtlasGraph graph, AtlasTypeRegistry typeRegistry, boolean
* @throws AtlasException
*/
public void deleteEntities(Collection<AtlasVertex> instanceVertices) throws AtlasBaseException {
MetricRecorder metric = RequestContext.get().startMetricRecord("deleteEntities");
final RequestContext requestContext = RequestContext.get();
final Set<AtlasVertex> deletionCandidateVertices = new HashSet<>();

Expand Down Expand Up @@ -160,6 +162,7 @@ public void deleteEntities(Collection<AtlasVertex> instanceVertices) throws Atla
}
}
}
RequestContext.get().endMetricRecord(metric);
}

/**
Expand Down Expand Up @@ -754,6 +757,7 @@ protected void deleteEdge(AtlasEdge edge, boolean updateInverseAttribute, boolea
}

protected void deleteTypeVertex(AtlasVertex instanceVertex, TypeCategory typeCategory, boolean force) throws AtlasBaseException {
MetricRecorder metric = RequestContext.get().startMetricRecord("deleteTypeVertex");
switch (typeCategory) {
case STRUCT:
deleteTypeVertex(instanceVertex, force);
Expand All @@ -771,6 +775,7 @@ protected void deleteTypeVertex(AtlasVertex instanceVertex, TypeCategory typeCat
default:
throw new IllegalStateException("Type category " + typeCategory + " not handled");
}
RequestContext.get().endMetricRecord(metric);
}

/**
Expand All @@ -779,6 +784,7 @@ protected void deleteTypeVertex(AtlasVertex instanceVertex, TypeCategory typeCat
* @throws AtlasException
*/
protected void deleteTypeVertex(AtlasVertex instanceVertex, boolean force) throws AtlasBaseException {
MetricRecorder metric = RequestContext.get().startMetricRecord("deleteTypeVertex");
if (LOG.isDebugEnabled()) {
LOG.debug("Deleting {}, force={}", string(instanceVertex), force);
}
Expand Down Expand Up @@ -852,6 +858,7 @@ protected void deleteTypeVertex(AtlasVertex instanceVertex, boolean force) throw
}

deleteVertex(instanceVertex, force);
RequestContext.get().endMetricRecord(metric);
}

protected AtlasAttribute getAttributeForEdge(AtlasEdge edge) throws AtlasBaseException {
Expand Down Expand Up @@ -888,6 +895,7 @@ protected void deleteEdgeBetweenVertices(AtlasVertex outVertex, AtlasVertex inVe
if (skipVertexForDelete(outVertex)) {
return;
}
MetricRecorder metric = RequestContext.get().startMetricRecord("deleteEdgeBetweenVertices");

AtlasStructType parentType = (AtlasStructType) typeRegistry.getType(GraphHelper.getTypeName(outVertex));
String propertyName = getQualifiedAttributePropertyKey(parentType, attribute.getName());
Expand Down Expand Up @@ -988,9 +996,11 @@ protected void deleteEdgeBetweenVertices(AtlasVertex outVertex, AtlasVertex inVe
requestContext.recordEntityUpdate(entityRetriever.toAtlasEntityHeader(outVertex));
}
}
RequestContext.get().endMetricRecord(metric);
}

protected void deleteVertex(AtlasVertex instanceVertex, boolean force) throws AtlasBaseException {
MetricRecorder metric = RequestContext.get().startMetricRecord("deleteVertex");
if (LOG.isDebugEnabled()) {
LOG.debug("Setting the external references to {} to null(removing edges)", string(instanceVertex));
}
Expand Down Expand Up @@ -1020,14 +1030,14 @@ protected void deleteVertex(AtlasVertex instanceVertex, boolean force) throws At
if (!isDeletedEntity(outVertex)) {
AtlasVertex inVertex = edge.getInVertex();
AtlasAttribute attribute = getAttributeForEdge(edge);

deleteEdgeBetweenVertices(outVertex, inVertex, attribute);
}
}
}
}

_deleteVertex(instanceVertex, force);
RequestContext.get().endMetricRecord(metric);
}

private boolean isDeletedEntity(AtlasVertex entityVertex) {
Expand Down Expand Up @@ -1110,6 +1120,7 @@ private void deleteAllClassifications(AtlasVertex instanceVertex) throws AtlasBa
if (!ACTIVE.equals(getState(instanceVertex)))
return;

MetricRecorder metric = RequestContext.get().startMetricRecord("deleteAllClassifications");
List<AtlasEdge> classificationEdges = getAllClassificationEdges(instanceVertex);

for (AtlasEdge edge : classificationEdges) {
Expand All @@ -1127,6 +1138,7 @@ private void deleteAllClassifications(AtlasVertex instanceVertex) throws AtlasBa

deleteEdgeReference(edge, CLASSIFICATION, false, false, instanceVertex);
}
RequestContext.get().endMetricRecord(metric);
}

private boolean skipVertexForDelete(AtlasVertex vertex) {
Expand Down Expand Up @@ -1336,12 +1348,13 @@ public void createAndQueueTask(String taskType, AtlasEdge relationshipEdge, Atla
}

public void createAndQueueClassificationRefreshPropagationTask(AtlasEdge edge) throws AtlasBaseException{

if (taskManagement==null) {
LOG.warn("Task management is null, can't schedule task now");
return;
}

MetricRecorder metric = RequestContext.get().startMetricRecord("createAndQueueClassificationRefreshPropagationTask");

String currentUser = RequestContext.getCurrentUser();
boolean isRelationshipEdge = isRelationshipEdge(edge);
boolean isTermEntityEdge = GraphHelper.isTermEntityEdge(edge);
Expand Down Expand Up @@ -1377,7 +1390,7 @@ public void createAndQueueClassificationRefreshPropagationTask(AtlasEdge edge) t

RequestContext.get().queueTask(task);
}

RequestContext.get().endMetricRecord(metric);
}

private boolean skipClassificationTaskCreation(String classificationId) throws AtlasBaseException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.atlas.repository.store.graph.v2.AtlasRelationshipStoreV2;
import org.apache.atlas.tasks.TaskManagement;
import org.apache.atlas.type.AtlasTypeRegistry;
import org.apache.atlas.utils.AtlasPerfMetrics.MetricRecorder;
import org.apache.commons.collections.CollectionUtils;

import javax.inject.Inject;
Expand Down Expand Up @@ -68,6 +69,7 @@ protected void _deleteVertex(AtlasVertex instanceVertex, boolean force) {

@Override
protected void deleteEdge(AtlasEdge edge, boolean force) throws AtlasBaseException {
MetricRecorder metric = RequestContext.get().startMetricRecord("deleteEdge");
try {
if (LOG.isDebugEnabled()) {
LOG.debug("==> SoftDeleteHandlerV1.deleteEdge({}, {})", GraphHelper.string(edge), force);
Expand Down Expand Up @@ -103,7 +105,8 @@ protected void deleteEdge(AtlasEdge edge, boolean force) throws AtlasBaseExcepti
} catch (Exception e) {
LOG.error("Error while deleting edge {}", GraphHelper.string(edge), e);
throw new AtlasBaseException(e);
} finally {
RequestContext.get().endMetricRecord(metric);
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -586,6 +586,8 @@ public EntityMutationResponse deleteById(final String guid) throws AtlasBaseExce
throw new AtlasBaseException(AtlasErrorCode.INSTANCE_GUID_NOT_FOUND, guid);
}

AtlasPerfMetrics.MetricRecorder metricRecorder = RequestContext.get().startMetricRecord("deleteById");

Collection<AtlasVertex> deletionCandidates = new ArrayList<>();
AtlasVertex vertex = AtlasGraphUtilsV2.findByGuid(graph, guid);

Expand All @@ -611,6 +613,7 @@ public EntityMutationResponse deleteById(final String guid) throws AtlasBaseExce
// Notify the change listeners
entityChangeNotifier.onEntitiesMutated(ret, false);
atlasRelationshipStore.onRelationshipsMutated(RequestContext.get().getRelationshipMutationMap());
RequestContext.get().endMetricRecord(metricRecorder);
return ret;
}

Expand Down Expand Up @@ -773,10 +776,12 @@ public EntityMutationResponse deleteByUniqueAttributes(AtlasEntityType entityTyp
@Override
@GraphTransaction
public EntityMutationResponse deleteByUniqueAttributes(List<AtlasObjectId> objectIds) throws AtlasBaseException {

if (CollectionUtils.isEmpty(objectIds)) {
throw new AtlasBaseException(AtlasErrorCode.INVALID_PARAMETERS);
}

MetricRecorder metric = RequestContext.get().startMetricRecord("deleteByUniqueAttributes");
EntityMutationResponse ret = new EntityMutationResponse();
Collection<AtlasVertex> deletionCandidates = new ArrayList<>();
try {
Expand Down Expand Up @@ -820,10 +825,11 @@ public EntityMutationResponse deleteByUniqueAttributes(List<AtlasObjectId> objec
// Notify the change listeners
entityChangeNotifier.onEntitiesMutated(ret, false);
atlasRelationshipStore.onRelationshipsMutated(RequestContext.get().getRelationshipMutationMap());

} catch (Exception e) {
LOG.error("Failed to delete objects:{}", objectIds.stream().map(AtlasObjectId::getUniqueAttributes).collect(Collectors.toList()), e);
throw new AtlasBaseException(e);
} finally {
RequestContext.get().endMetricRecord(metric);
}
return ret;
}
Expand Down Expand Up @@ -1992,7 +1998,7 @@ private EntityMutationResponse deleteVertices(Collection<AtlasVertex> deletionCa
Collection<AtlasVertex> categories = new ArrayList<>();
Collection<AtlasVertex> others = new ArrayList<>();

MetricRecorder metric = RequestContext.get().startMetricRecord("filterCategoryVertices");
MetricRecorder metric = RequestContext.get().startMetricRecord("deleteVertices_filterCategoryVertices");
for (AtlasVertex vertex : deletionCandidates) {
String typeName = getTypeName(vertex);

Expand All @@ -2008,7 +2014,7 @@ private EntityMutationResponse deleteVertices(Collection<AtlasVertex> deletionCa
}
}
RequestContext.get().endMetricRecord(metric);

MetricRecorder metric2 = RequestContext.get().startMetricRecord("deleteVertices");
if (CollectionUtils.isNotEmpty(categories)) {
entityGraphMapper.removeAttrForCategoryDelete(categories);
deleteDelegate.getHandler(DeleteType.HARD).deleteEntities(categories);
Expand Down Expand Up @@ -2037,11 +2043,11 @@ private EntityMutationResponse deleteVertices(Collection<AtlasVertex> deletionCa
for (AtlasEntityHeader entity : req.getUpdatedEntities()) {
response.addEntity(UPDATE, entity);
}
RequestContext.get().endMetricRecord(metric2);
} catch (Exception e) {
LOG.error("Delete vertices request failed", e);
throw new AtlasBaseException(e);
}

return response;
}

Expand Down

0 comments on commit f0f0152

Please sign in to comment.