Skip to content

Commit

Permalink
Added metric Recorder for delete operations
Browse files Browse the repository at this point in the history
  • Loading branch information
hitk6 committed Jul 1, 2024
1 parent 3da0798 commit 08a8887
Show file tree
Hide file tree
Showing 4 changed files with 24 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
import org.apache.atlas.exception.EntityNotFoundException;
import org.apache.atlas.util.AttributeValueMap;
import org.apache.atlas.util.IndexedInstance;
import org.apache.atlas.utils.AtlasPerfMetrics.MetricRecorder;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.collections.IteratorUtils;
import org.apache.commons.lang.StringUtils;
Expand Down Expand Up @@ -857,6 +858,7 @@ public static List<String> getTraitNames(AtlasVertex entityVertex, Boolean propa
}

public static List<AtlasVertex> getPropagatableClassifications(AtlasEdge edge) {
MetricRecorder metric = RequestContext.get().startMetricRecord("getPropagatableClassifications");
List<AtlasVertex> ret = new ArrayList<>();

RequestContext requestContext = RequestContext.get();
Expand All @@ -875,7 +877,7 @@ public static List<AtlasVertex> getPropagatableClassifications(AtlasEdge edge) {
ret.addAll(getPropagationEnabledClassificationVertices(inVertex));
}
}

RequestContext.get().endMetricRecord(metric);
return ret;
}
//Returns the vertex from which the tag is being propagated
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
import org.apache.atlas.type.AtlasStructType.AtlasAttribute.AtlasRelationshipEdgeDirection;
import org.apache.atlas.utils.AtlasEntityUtil;
import org.apache.atlas.utils.AtlasPerfMetrics;
import org.apache.atlas.utils.AtlasPerfMetrics.MetricRecorder;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.collections.MapUtils;
import org.apache.commons.lang.StringUtils;
Expand Down Expand Up @@ -118,37 +119,31 @@ public DeleteHandlerV1(AtlasGraph graph, AtlasTypeRegistry typeRegistry, boolean
* @throws AtlasException
*/
public void deleteEntities(Collection<AtlasVertex> instanceVertices) throws AtlasBaseException {
MetricRecorder metric = RequestContext.get().startMetricRecord("deleteEntities");
final RequestContext requestContext = RequestContext.get();
final Set<AtlasVertex> deletionCandidateVertices = new HashSet<>();

for (AtlasVertex instanceVertex : instanceVertices) {
final String guid = AtlasGraphUtilsV2.getIdFromVertex(instanceVertex);

if (skipVertexForDelete(instanceVertex)) {
if (LOG.isDebugEnabled()) {
LOG.debug("Skipping deletion of entity={} as it is already deleted", guid);
}
continue;
}

// Record all deletion candidate entities in RequestContext
// and gather deletion candidate vertices.
for (GraphHelper.VertexInfo vertexInfo : getOwnedVertices(instanceVertex)) {
AtlasEntityHeader entityHeader = vertexInfo.getEntity();

if (requestContext.isPurgeRequested()) {
entityHeader.setClassifications(entityRetriever.getAllClassifications(vertexInfo.getVertex()));
}

requestContext.recordEntityDelete(entityHeader);
deletionCandidateVertices.add(vertexInfo.getVertex());
}
}

// Delete traits and vertices.
for (AtlasVertex deletionCandidateVertex : deletionCandidateVertices) {
RequestContext.get().getDeletedEdgesIds().clear();

deleteAllClassifications(deletionCandidateVertex);
deleteTypeVertex(deletionCandidateVertex, isInternalType(deletionCandidateVertex));

Expand All @@ -160,6 +155,7 @@ public void deleteEntities(Collection<AtlasVertex> instanceVertices) throws Atla
}
}
}
RequestContext.get().endMetricRecord(metric);
}

/**
Expand Down Expand Up @@ -779,6 +775,7 @@ protected void deleteTypeVertex(AtlasVertex instanceVertex, TypeCategory typeCat
* @throws AtlasException
*/
protected void deleteTypeVertex(AtlasVertex instanceVertex, boolean force) throws AtlasBaseException {
MetricRecorder metric = RequestContext.get().startMetricRecord("deleteTypeVertex");
if (LOG.isDebugEnabled()) {
LOG.debug("Deleting {}, force={}", string(instanceVertex), force);
}
Expand Down Expand Up @@ -852,6 +849,7 @@ protected void deleteTypeVertex(AtlasVertex instanceVertex, boolean force) throw
}

deleteVertex(instanceVertex, force);
RequestContext.get().endMetricRecord(metric);
}

protected AtlasAttribute getAttributeForEdge(AtlasEdge edge) throws AtlasBaseException {
Expand Down Expand Up @@ -881,6 +879,7 @@ protected AtlasAttribute getAttributeForEdge(AtlasEdge edge) throws AtlasBaseExc
* @throws AtlasException
*/
protected void deleteEdgeBetweenVertices(AtlasVertex outVertex, AtlasVertex inVertex, AtlasAttribute attribute) throws AtlasBaseException {
MetricRecorder metric = RequestContext.get().startMetricRecord("deleteEdgeBetweenVertices");
if (LOG.isDebugEnabled()) {
LOG.debug("Removing edge from {} to {} with attribute name {}", string(outVertex), string(inVertex), attribute.getName());
}
Expand Down Expand Up @@ -988,13 +987,14 @@ protected void deleteEdgeBetweenVertices(AtlasVertex outVertex, AtlasVertex inVe
requestContext.recordEntityUpdate(entityRetriever.toAtlasEntityHeader(outVertex));
}
}
RequestContext.get().endMetricRecord(metric);
}

protected void deleteVertex(AtlasVertex instanceVertex, boolean force) throws AtlasBaseException {
MetricRecorder metric = RequestContext.get().startMetricRecord("deleteVertex");
if (LOG.isDebugEnabled()) {
LOG.debug("Setting the external references to {} to null(removing edges)", string(instanceVertex));
}

// Delete external references to this vertex - incoming edges from lineage or glossary term edges
final Iterable<AtlasEdge> incomingEdges = instanceVertex.getEdges(AtlasEdgeDirection.IN);
final Iterable<AtlasEdge> outgoingEdges = instanceVertex.getEdges(AtlasEdgeDirection.OUT);
Expand All @@ -1006,11 +1006,9 @@ protected void deleteVertex(AtlasVertex instanceVertex, boolean force) throws At
AtlasRelationshipStoreV2.recordRelationshipMutation(AtlasRelationshipStoreV2.RelationshipMutation.RELATIONSHIP_HARD_DELETE, edge, entityRetriever);
}
}

for (AtlasEdge edge : incomingEdges) {
AtlasEntity.Status edgeStatus = getStatus(edge);
boolean isProceed = edgeStatus == (isPurgeRequested ? DELETED : ACTIVE);

if (isProceed) {
if (isRelationshipEdge(edge)) {
deleteRelationship(edge);
Expand All @@ -1020,14 +1018,13 @@ protected void deleteVertex(AtlasVertex instanceVertex, boolean force) throws At
if (!isDeletedEntity(outVertex)) {
AtlasVertex inVertex = edge.getInVertex();
AtlasAttribute attribute = getAttributeForEdge(edge);

deleteEdgeBetweenVertices(outVertex, inVertex, attribute);
}
}
}
}

_deleteVertex(instanceVertex, force);
RequestContext.get().endMetricRecord(metric);
}

private boolean isDeletedEntity(AtlasVertex entityVertex) {
Expand Down Expand Up @@ -1106,6 +1103,7 @@ private String getDelimitedPropagatedClassificationNames(AtlasVertex entityVerte
* @throws AtlasException
*/
private void deleteAllClassifications(AtlasVertex instanceVertex) throws AtlasBaseException {
MetricRecorder metric = RequestContext.get().startMetricRecord("deleteAllClassifications");
// If instance is deleted no need to operate classification deleted
if (!ACTIVE.equals(getState(instanceVertex)))
return;
Expand All @@ -1127,6 +1125,7 @@ private void deleteAllClassifications(AtlasVertex instanceVertex) throws AtlasBa

deleteEdgeReference(edge, CLASSIFICATION, false, false, instanceVertex);
}
RequestContext.get().endMetricRecord(metric);
}

private boolean skipVertexForDelete(AtlasVertex vertex) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.atlas.repository.store.graph.v2.AtlasRelationshipStoreV2;
import org.apache.atlas.tasks.TaskManagement;
import org.apache.atlas.type.AtlasTypeRegistry;
import org.apache.atlas.utils.AtlasPerfMetrics.MetricRecorder;
import org.apache.commons.collections.CollectionUtils;

import javax.inject.Inject;
Expand Down Expand Up @@ -68,6 +69,7 @@ protected void _deleteVertex(AtlasVertex instanceVertex, boolean force) {

@Override
protected void deleteEdge(AtlasEdge edge, boolean force) throws AtlasBaseException {
MetricRecorder metric = RequestContext.get().startMetricRecord("deleteEdge");
try {
if (LOG.isDebugEnabled()) {
LOG.debug("==> SoftDeleteHandlerV1.deleteEdge({}, {})", GraphHelper.string(edge), force);
Expand Down Expand Up @@ -104,6 +106,6 @@ protected void deleteEdge(AtlasEdge edge, boolean force) throws AtlasBaseExcepti
LOG.error("Error while deleting edge {}", GraphHelper.string(edge), e);
throw new AtlasBaseException(e);
}

RequestContext.get().endMetricRecord(metric);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -582,13 +582,13 @@ public EntityMutationResponse updateEntityAttributeByGuid(String guid, String at
@Override
@GraphTransaction
public EntityMutationResponse deleteById(final String guid) throws AtlasBaseException {
AtlasPerfMetrics.MetricRecorder metricRecorder = RequestContext.get().startMetricRecord("deleteById");
if (StringUtils.isEmpty(guid)) {
throw new AtlasBaseException(AtlasErrorCode.INSTANCE_GUID_NOT_FOUND, guid);
}

Collection<AtlasVertex> deletionCandidates = new ArrayList<>();
AtlasVertex vertex = AtlasGraphUtilsV2.findByGuid(graph, guid);

if (vertex != null) {
AtlasEntityHeader entityHeader = entityRetriever.toAtlasEntityHeaderWithClassifications(vertex);

Expand All @@ -602,15 +602,13 @@ public EntityMutationResponse deleteById(final String guid) throws AtlasBaseExce
LOG.debug("Deletion request ignored for non-existent entity with guid " + guid);
}
}

EntityMutationResponse ret = deleteVertices(deletionCandidates);

if(ret.getDeletedEntities()!=null)
processTermEntityDeletion(ret.getDeletedEntities());

// Notify the change listeners
entityChangeNotifier.onEntitiesMutated(ret, false);
atlasRelationshipStore.onRelationshipsMutated(RequestContext.get().getRelationshipMutationMap());
RequestContext.get().endMetricRecord(metricRecorder);
return ret;
}

Expand Down Expand Up @@ -773,6 +771,7 @@ public EntityMutationResponse deleteByUniqueAttributes(AtlasEntityType entityTyp
@Override
@GraphTransaction
public EntityMutationResponse deleteByUniqueAttributes(List<AtlasObjectId> objectIds) throws AtlasBaseException {
MetricRecorder metric = RequestContext.get().startMetricRecord("deleteByUniqueAttributes");
if (CollectionUtils.isEmpty(objectIds)) {
throw new AtlasBaseException(AtlasErrorCode.INVALID_PARAMETERS);
}
Expand Down Expand Up @@ -820,7 +819,7 @@ public EntityMutationResponse deleteByUniqueAttributes(List<AtlasObjectId> objec
// Notify the change listeners
entityChangeNotifier.onEntitiesMutated(ret, false);
atlasRelationshipStore.onRelationshipsMutated(RequestContext.get().getRelationshipMutationMap());

RequestContext.get().endMetricRecord(metric);
} catch (Exception e) {
LOG.error("Failed to delete objects:{}", objectIds.stream().map(AtlasObjectId::getUniqueAttributes).collect(Collectors.toList()), e);
throw new AtlasBaseException(e);
Expand Down Expand Up @@ -1931,7 +1930,7 @@ private EntityMutationResponse deleteVertices(Collection<AtlasVertex> deletionCa
Collection<AtlasVertex> categories = new ArrayList<>();
Collection<AtlasVertex> others = new ArrayList<>();

MetricRecorder metric = RequestContext.get().startMetricRecord("filterCategoryVertices");
MetricRecorder metric = RequestContext.get().startMetricRecord("deleteVertices_filterCategoryVertices");
for (AtlasVertex vertex : deletionCandidates) {
String typeName = getTypeName(vertex);

Expand All @@ -1947,7 +1946,7 @@ private EntityMutationResponse deleteVertices(Collection<AtlasVertex> deletionCa
}
}
RequestContext.get().endMetricRecord(metric);

MetricRecorder metric2 = RequestContext.get().startMetricRecord("deleteVertices"); // todo change name
if (CollectionUtils.isNotEmpty(categories)) {
entityGraphMapper.removeAttrForCategoryDelete(categories);
deleteDelegate.getHandler(DeleteType.HARD).deleteEntities(categories);
Expand Down Expand Up @@ -1976,11 +1975,11 @@ private EntityMutationResponse deleteVertices(Collection<AtlasVertex> deletionCa
for (AtlasEntityHeader entity : req.getUpdatedEntities()) {
response.addEntity(UPDATE, entity);
}
RequestContext.get().endMetricRecord(metric2);
} catch (Exception e) {
LOG.error("Delete vertices request failed", e);
throw new AtlasBaseException(e);
}

return response;
}

Expand Down

0 comments on commit 08a8887

Please sign in to comment.