Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: instrument more functions to get the metric logs #3596

Open
wants to merge 7 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,14 @@
import java.util.Iterator;
import java.util.stream.StreamSupport;

import org.apache.atlas.RequestContext;
import org.apache.atlas.repository.graphdb.AtlasEdge;
import org.apache.atlas.repository.graphdb.AtlasEdgeDirection;
import org.apache.atlas.repository.graphdb.AtlasSchemaViolationException;
import org.apache.atlas.repository.graphdb.AtlasVertex;
import org.apache.atlas.repository.graphdb.AtlasVertexQuery;
import org.apache.atlas.repository.graphdb.utils.IteratorToIterableAdapter;
import org.apache.atlas.utils.AtlasPerfMetrics;
import org.apache.tinkerpop.gremlin.structure.Direction;
import org.apache.tinkerpop.gremlin.structure.Edge;
import org.apache.tinkerpop.gremlin.structure.Vertex;
Expand Down Expand Up @@ -66,10 +68,14 @@ public <T> void addListProperty(String propertyName, T value) {

@Override
public Iterable<AtlasEdge<AtlasJanusVertex, AtlasJanusEdge>> getEdges(AtlasEdgeDirection dir, String edgeLabel) {

Direction d = AtlasJanusObjectFactory.createDirection(dir);
Iterator<Edge> edges = getWrappedElement().edges(d, edgeLabel);
return graph.wrapEdges(edges);
AtlasPerfMetrics.MetricRecorder metricRecorder = RequestContext.get().startMetricRecord("getEdges");
try {
Direction d = AtlasJanusObjectFactory.createDirection(dir);
Iterator<Edge> edges = getWrappedElement().edges(d, edgeLabel);
return graph.wrapEdges(edges);
} finally {
RequestContext.get().endMetricRecord(metricRecorder);
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.apache.atlas.model.legacy.EntityResult;
import org.apache.atlas.repository.graphdb.AtlasGraph;
import org.apache.atlas.repository.store.graph.v2.EntityGraphRetriever;
import org.apache.atlas.utils.AtlasPerfMetrics;
import org.apache.atlas.v1.model.instance.Referenceable;
import org.apache.atlas.v1.model.instance.Struct;
import org.apache.atlas.repository.converters.AtlasFormatConverter.ConverterContext;
Expand Down Expand Up @@ -300,26 +301,31 @@ public AtlasEntity getAndCacheEntity(String guid) throws AtlasBaseException {
}

public AtlasEntity getAndCacheEntity(String guid, boolean ignoreRelationshipAttributes) throws AtlasBaseException {
RequestContext context = RequestContext.get();
AtlasEntity entity = context.getEntity(guid);

if (entity == null) {
if (ignoreRelationshipAttributes) {
entity = entityGraphRetrieverIgnoreRelationshipAttrs.toAtlasEntity(guid);
} else {
entity = entityGraphRetriever.toAtlasEntity(guid);
}
AtlasPerfMetrics.MetricRecorder metricRecorder = RequestContext.get().startMetricRecord("getAndCacheEntity");
try {
RequestContext context = RequestContext.get();
AtlasEntity entity = context.getEntity(guid);

if (entity == null) {
if (ignoreRelationshipAttributes) {
entity = entityGraphRetrieverIgnoreRelationshipAttrs.toAtlasEntity(guid);
} else {
entity = entityGraphRetriever.toAtlasEntity(guid);
}

if (entity != null) {
context.cache(entity);
if (entity != null) {
context.cache(entity);

if (LOG.isDebugEnabled()) {
LOG.debug("Cache miss -> GUID = {}", guid);
if (LOG.isDebugEnabled()) {
LOG.debug("Cache miss -> GUID = {}", guid);
}
}
}
}

return entity;
return entity;
} finally {
RequestContext.get().endMetricRecord(metricRecorder);
}
}

public AtlasEntity getEntity(String guid, boolean ignoreRelationshipAttributes) throws AtlasBaseException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.apache.atlas.type.AtlasStructType.AtlasAttribute;
import org.apache.atlas.type.AtlasType;
import org.apache.atlas.type.AtlasTypeRegistry;
import org.apache.atlas.utils.AtlasPerfMetrics;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.collections.MapUtils;
import org.apache.commons.configuration.Configuration;
Expand Down Expand Up @@ -156,18 +157,23 @@ public String getIndexTextForEntity(String guid) throws AtlasBaseException {

@Override
public String getClassificationTextForEntity(AtlasEntity entity) throws AtlasBaseException {
String ret = null;
AtlasPerfMetrics.MetricRecorder metricRecorderEntity = RequestContext.get().startMetricRecord("getClassificationTextForEntity");
try {
String ret = null;

if (entity != null) {
StringBuilder sb = new StringBuilder();
map(entity, null, sb, new HashSet<String>(), true);
ret = sb.toString();
}
if (entity != null) {
StringBuilder sb = new StringBuilder();
map(entity, null, sb, new HashSet<String>(), true);
ret = sb.toString();
}

if (LOG.isDebugEnabled()) {
LOG.info("FullTextMapperV2.getClassificationTextForEntity({}): {}", entity.getGuid(), ret);
if (LOG.isDebugEnabled()) {
LOG.info("FullTextMapperV2.getClassificationTextForEntity({}): {}", entity.getGuid(), ret);
}
return ret;
} finally {
RequestContext.get().endMetricRecord(metricRecorderEntity);
}
return ret;
}

private void map(AtlasEntity entity, AtlasEntityExtInfo entityExtInfo, StringBuilder sb, Set<String> processedGuids, boolean isClassificationOnly) throws AtlasBaseException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -360,21 +360,26 @@ public static boolean getRestrictPropagationThroughHierarchy(AtlasVertex classif
}

public static AtlasVertex getClassificationVertex(AtlasVertex entityVertex, String classificationName) {
AtlasVertex ret = null;
Iterable edges = entityVertex.query().direction(AtlasEdgeDirection.OUT).label(CLASSIFICATION_LABEL)
.has(CLASSIFICATION_EDGE_IS_PROPAGATED_PROPERTY_KEY, false)
.has(CLASSIFICATION_EDGE_NAME_PROPERTY_KEY, classificationName).edges();
if (edges != null) {
Iterator<AtlasEdge> iterator = edges.iterator();
AtlasPerfMetrics.MetricRecorder metricRecorder = RequestContext.get().startMetricRecord("getClassificationVertex");
try {
AtlasVertex ret = null;
Iterable edges = entityVertex.query().direction(AtlasEdgeDirection.OUT).label(CLASSIFICATION_LABEL)
.has(CLASSIFICATION_EDGE_IS_PROPAGATED_PROPERTY_KEY, false)
.has(CLASSIFICATION_EDGE_NAME_PROPERTY_KEY, classificationName).edges();
if (edges != null) {
Iterator<AtlasEdge> iterator = edges.iterator();

if (iterator.hasNext()) {
AtlasEdge edge = iterator.next();
if (iterator.hasNext()) {
AtlasEdge edge = iterator.next();

ret = (edge != null) ? edge.getInVertex() : null;
ret = (edge != null) ? edge.getInVertex() : null;
}
}
}

return ret;
return ret;
} finally {
RequestContext.get().endMetricRecord(metricRecorder);
}
}
public static Iterator<AtlasVertex> getClassificationVertices(AtlasGraph graph, String classificationName, int size) {
Iterable classificationVertices = graph.query().has(TYPE_NAME_PROPERTY_KEY, classificationName).vertices(size);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1481,6 +1481,7 @@ public void resetHasLineageOnInputOutputDelete(Collection<AtlasEdge> removedEdge
AtlasPerfMetrics.MetricRecorder metricRecorder = RequestContext.get().startMetricRecord("resetHasLineageOnInputOutputDelete");

for (AtlasEdge atlasEdge : removedEdges) {
AtlasPerfMetrics.MetricRecorder edgeMetricRecorder = RequestContext.get().startMetricRecord("resetHasLineageOnInputOutputDelete_edge");

boolean isOutputEdge = PROCESS_OUTPUTS.equals(atlasEdge.getLabel());

Expand All @@ -1498,6 +1499,7 @@ public void resetHasLineageOnInputOutputDelete(Collection<AtlasEdge> removedEdge
boolean activeEdgeFound = false;

while (edgeIterator.hasNext()) {
AtlasPerfMetrics.MetricRecorder edgeIteratorMetricRecorder1 = RequestContext.get().startMetricRecord("resetHasLineageOnInputOutputDelete_edgeIterator1");
AtlasEdge edge = edgeIterator.next();
if (getStatus(edge) == ACTIVE && !removedEdges.contains(edge)) {
AtlasVertex relatedAssetVertex = edge.getInVertex();
Expand All @@ -1507,6 +1509,7 @@ public void resetHasLineageOnInputOutputDelete(Collection<AtlasEdge> removedEdge
break;
}
}
RequestContext.get().endMetricRecord(edgeIteratorMetricRecorder1);
}

if (!activeEdgeFound) {
Expand All @@ -1517,15 +1520,19 @@ public void resetHasLineageOnInputOutputDelete(Collection<AtlasEdge> removedEdge
Iterator<AtlasEdge> processEdgeIterator = processVertex.getEdges(AtlasEdgeDirection.BOTH, oppositeEdgeLabel).iterator();

while (processEdgeIterator.hasNext()) {
AtlasPerfMetrics.MetricRecorder edgeIteratorMetricRecorder2 = RequestContext.get().startMetricRecord("resetHasLineageOnInputOutputDelete_edgeIterator2");
AtlasEdge edge = processEdgeIterator.next();

if (!removedEdges.contains(edge)) {
AtlasVertex relatedAssetVertex = edge.getInVertex();
updateAssetHasLineageStatus(relatedAssetVertex, edge, removedEdges);
}
RequestContext.get().endMetricRecord(edgeIteratorMetricRecorder2);
}
}
}

RequestContext.get().endMetricRecord(edgeMetricRecorder);
}
RequestContext.get().endMetricRecord(metricRecorder);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -209,29 +209,34 @@ public void onClassificationsAddedToEntities(List<AtlasEntity> entities, List<At

@Override
public void onClassificationUpdatedToEntity(AtlasEntity entity, List<AtlasClassification> updatedClassifications) throws AtlasBaseException {
doFullTextMapping(entity.getGuid());

if (isV2EntityNotificationEnabled) {
for (EntityChangeListenerV2 listener : entityChangeListenersV2) {
listener.onClassificationsUpdated(entity, updatedClassifications);
}
} else {
if (instanceConverter != null) {
Referenceable entityRef = toReferenceable(entity.getGuid());
List<Struct> traits = toStruct(updatedClassifications);
MetricRecorder metricRecorder = RequestContext.get().startMetricRecord("onClassificationUpdatedToEntity");
try {
doFullTextMapping(entity.getGuid());

if (entityRef == null || CollectionUtils.isEmpty(traits)) {
return;
if (isV2EntityNotificationEnabled) {
for (EntityChangeListenerV2 listener : entityChangeListenersV2) {
listener.onClassificationsUpdated(entity, updatedClassifications);
}
} else {
if (instanceConverter != null) {
Referenceable entityRef = toReferenceable(entity.getGuid());
List<Struct> traits = toStruct(updatedClassifications);

for (EntityChangeListener listener : entityChangeListeners) {
try {
listener.onTraitsUpdated(entityRef, traits);
} catch (AtlasException e) {
throw new AtlasBaseException(AtlasErrorCode.NOTIFICATION_FAILED, e, getListenerName(listener), "TraitUpdate");
if (entityRef == null || CollectionUtils.isEmpty(traits)) {
return;
}

for (EntityChangeListener listener : entityChangeListeners) {
try {
listener.onTraitsUpdated(entityRef, traits);
} catch (AtlasException e) {
throw new AtlasBaseException(AtlasErrorCode.NOTIFICATION_FAILED, e, getListenerName(listener), "TraitUpdate");
}
}
}
}
} finally {
RequestContext.get().endMetricRecord(metricRecorder);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2054,21 +2054,27 @@ private EntityMutationResponse purgeVertices(Collection<AtlasVertex> purgeCandid
}

private void validateAndNormalize(AtlasClassification classification) throws AtlasBaseException {
AtlasClassificationType type = typeRegistry.getClassificationTypeByName(classification.getTypeName());
AtlasPerfMetrics.MetricRecorder metricRecorder = RequestContext.get().startMetricRecord("validateAndNormalize");

if (type == null) {
throw new AtlasBaseException(AtlasErrorCode.CLASSIFICATION_NOT_FOUND, classification.getTypeName());
}
try {
AtlasClassificationType type = typeRegistry.getClassificationTypeByName(classification.getTypeName());

List<String> messages = new ArrayList<>();
if (type == null) {
throw new AtlasBaseException(AtlasErrorCode.CLASSIFICATION_NOT_FOUND, classification.getTypeName());
}

type.validateValue(classification, classification.getTypeName(), messages);
List<String> messages = new ArrayList<>();

if (!messages.isEmpty()) {
throw new AtlasBaseException(AtlasErrorCode.INVALID_PARAMETERS, messages);
}
type.validateValue(classification, classification.getTypeName(), messages);

if (!messages.isEmpty()) {
throw new AtlasBaseException(AtlasErrorCode.INVALID_PARAMETERS, messages);
}

type.getNormalizedValue(classification);
type.getNormalizedValue(classification);
} finally {
RequestContext.get().endMetricRecord(metricRecorder);
}
}

/**
Expand All @@ -2078,32 +2084,37 @@ private void validateAndNormalize(AtlasClassification classification) throws Atl
* @param classifications list of classifications to be associated
*/
private void validateEntityAssociations(String guid, List<AtlasClassification> classifications) throws AtlasBaseException {
List<String> entityClassifications = getClassificationNames(guid);
String entityTypeName = AtlasGraphUtilsV2.getTypeNameFromGuid(graph, guid);
AtlasEntityType entityType = typeRegistry.getEntityTypeByName(entityTypeName);
Set<String> processedTagTypeNames = new HashSet<>();
AtlasPerfMetrics.MetricRecorder metricRecorder = RequestContext.get().startMetricRecord("validateEntityAssociations");
try {
List<String> entityClassifications = getClassificationNames(guid);
String entityTypeName = AtlasGraphUtilsV2.getTypeNameFromGuid(graph, guid);
AtlasEntityType entityType = typeRegistry.getEntityTypeByName(entityTypeName);
Set<String> processedTagTypeNames = new HashSet<>();

List <AtlasClassification> copyList = new ArrayList<>(classifications);
for (AtlasClassification classification : copyList) {
List <AtlasClassification> copyList = new ArrayList<>(classifications);
for (AtlasClassification classification : copyList) {

if (processedTagTypeNames.contains(classification.getTypeName())){
classifications.remove(classification);
} else {
String newClassification = classification.getTypeName();
processedTagTypeNames.add(newClassification);
if (processedTagTypeNames.contains(classification.getTypeName())){
classifications.remove(classification);
} else {
String newClassification = classification.getTypeName();
processedTagTypeNames.add(newClassification);

if (CollectionUtils.isNotEmpty(entityClassifications) && entityClassifications.contains(newClassification)) {
throw new AtlasBaseException(AtlasErrorCode.INVALID_PARAMETERS, "entity: " + guid +
", already associated with classification: " + newClassification);
}
if (CollectionUtils.isNotEmpty(entityClassifications) && entityClassifications.contains(newClassification)) {
throw new AtlasBaseException(AtlasErrorCode.INVALID_PARAMETERS, "entity: " + guid +
", already associated with classification: " + newClassification);
}

// for each classification, check whether there are entities it should be restricted to
AtlasClassificationType classificationType = typeRegistry.getClassificationTypeByName(newClassification);
// for each classification, check whether there are entities it should be restricted to
AtlasClassificationType classificationType = typeRegistry.getClassificationTypeByName(newClassification);

if (!classificationType.canApplyToEntityType(entityType)) {
throw new AtlasBaseException(AtlasErrorCode.INVALID_ENTITY_FOR_CLASSIFICATION, guid, entityTypeName, newClassification);
if (!classificationType.canApplyToEntityType(entityType)) {
throw new AtlasBaseException(AtlasErrorCode.INVALID_ENTITY_FOR_CLASSIFICATION, guid, entityTypeName, newClassification);
}
}
}
} finally {
RequestContext.get().endMetricRecord(metricRecorder);
}
}

Expand Down
Loading
Loading