Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

OpenTelemetry Metrics: Adds support to collect request level metrics #4682

Draft
wants to merge 5 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions Microsoft.Azure.Cosmos/src/CosmosClientTelemetryOptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -46,5 +46,13 @@ public class CosmosClientTelemetryOptions
/// These values decides whether to generate an <see cref="System.Diagnostics.Tracing.EventSource"/> with request diagnostics or not.
/// </summary>
public CosmosThresholdOptions CosmosThresholdOptions { get; set; } = new CosmosThresholdOptions();

/// <summary>
/// Indicates whether client-side metrics collection is enabled or disabled.
/// When set to true, the application will capture and report client metrics such as request counts, latencies, errors, and other key performance indicators.
/// If false, no metrics related to the client will be gathered or reported.
/// <remarks>Metrics data can be published to a monitoring system like Prometheus or Azure Monitor, depending on the configured metrics provider.</remarks>
/// </summary>
public bool IsClientMetricsEnabled { get; set; }
}
}
57 changes: 28 additions & 29 deletions Microsoft.Azure.Cosmos/src/Handler/TelemetryHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
namespace Microsoft.Azure.Cosmos.Handlers
{
using System;
using System.Collections.Generic;
using System.IO;
using System.Threading;
using System.Threading.Tasks;
Expand All @@ -26,38 +27,36 @@ public override async Task<ResponseMessage> SendAsync(
CancellationToken cancellationToken)
{
ResponseMessage response = await base.SendAsync(request, cancellationToken);
if (this.IsAllowed(request))
try
{
try
{
this.telemetryToServiceHelper.GetCollector().CollectOperationAndNetworkInfo(
() => new TelemetryInformation
{
RegionsContactedList = response.Diagnostics.GetContactedRegions(),
RequestLatency = response.Diagnostics.GetClientElapsedTime(),
StatusCode = response.StatusCode,
ResponseSizeInBytes = TelemetryHandler.GetPayloadSize(response),
ContainerId = request.ContainerId,
DatabaseId = request.DatabaseId,
OperationType = request.OperationType,
ResourceType = request.ResourceType,
ConsistencyLevel = request.Headers?[Documents.HttpConstants.HttpHeaders.ConsistencyLevel],
RequestCharge = response.Headers.RequestCharge,
SubStatusCode = response.Headers.SubStatusCode,
Trace = response.Trace
});
}
catch (Exception ex)
{
DefaultTrace.TraceError("Error while collecting telemetry information : {0}", ex);
}
this.telemetryToServiceHelper
.GetCollectors(request)
.ForEach((collector) => collector.CollectOperationAndNetworkInfo(
() => new TelemetryInformation
{
RegionsContactedList = response.Diagnostics.GetContactedRegions(),
RequestLatency = response.Diagnostics.GetClientElapsedTime(),
StatusCode = response.StatusCode,
ResponseSizeInBytes = TelemetryHandler.GetPayloadSize(response),
ContainerId = request.ContainerId,
DatabaseId = request.DatabaseId,
OperationType = request.OperationType,
ResourceType = request.ResourceType,
ConsistencyLevel = request.Headers?[Documents.HttpConstants.HttpHeaders.ConsistencyLevel],
RequestCharge = response.Headers.RequestCharge,
SubStatusCode = response.Headers.SubStatusCode,
Trace = response.Trace,
MaxItemCount = request.Headers.PageSize,
ActualItemCount = response.Headers.ItemCount,
PartitionKeyRangeId = request.Headers.PartitionKeyRangeId
}));
}
catch (Exception ex)
{
DefaultTrace.TraceError("Error while collecting telemetry information : {0}", ex);
}
return response;
}

private bool IsAllowed(RequestMessage request)
{
return ClientTelemetryOptions.AllowedResourceTypes.Equals(request.ResourceType);
return response;
}

/// <summary>
Expand Down
4 changes: 2 additions & 2 deletions Microsoft.Azure.Cosmos/src/Routing/ClientCollectionCache.cs
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ await this.storeModel.ProcessMessageAsync(request))
{
ContainerProperties containerProperties = CosmosResource.FromStream<ContainerProperties>(response);

this.telemetryToServiceHelper.GetCollector().CollectCacheInfo(
this.telemetryToServiceHelper.GetCollectors().ForEach((collector) => collector.CollectCacheInfo(
ClientCollectionCache.TelemetrySourceName,
() => new TelemetryInformation
{
Expand All @@ -227,7 +227,7 @@ await this.storeModel.ProcessMessageAsync(request))
ResourceType = request.ResourceType,
SubStatusCode = response.SubStatusCode,
CollectionLink = collectionLink
});
}));

return containerProperties;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
//------------------------------------------------------------
// Copyright (c) Microsoft Corporation. All rights reserved.
//------------------------------------------------------------

namespace Microsoft.Azure.Cosmos.Telemetry
{
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics.Metrics;
using Microsoft.Azure.Cosmos.Telemetry.Collector;

/// <summary>
/// The OpenTelemetryMetricsCollector class is responsible for collecting and recording Cosmos DB operational metrics, such as item counts, request latency, request units, and regions contacted.
/// This data is captured using the OpenTelemetry metrics API, which allows tracking and analysis of Cosmos DB operations at a granular level.
/// </summary>
internal class OpenTelemetryMetricsCollector : ITelemetryCollector
{
private readonly string clientId;
private readonly string accountName;

private static ConcurrentBag<Tuple<int, KeyValuePair<string, object>[]>> maxItemCounts = null;
private static ConcurrentBag<Tuple<int, KeyValuePair<string, object>[]>> actualItemCounts = null;
private static ConcurrentBag<Tuple<int, KeyValuePair<string, object>[]>> regionsContactedCounts = null;

/// <summary>
/// Initializes a new instance of the OpenTelemetryMetricsCollector class.
/// </summary>
/// <param name="clientId">A unique identifier for the Cosmos DB client instance</param>
/// <param name="accountName">The Cosmos DB account name.</param>
public OpenTelemetryMetricsCollector(string clientId, string accountName)
{
this.clientId = clientId;
this.accountName = accountName;
}

public void CollectCacheInfo(string cacheName, Func<TelemetryInformation> getTelemetryInformation)
{
// No OP
}

/// <summary>
/// Collects telemetry data related to operations and network information, including request performance, item counts, and regions contacted.
/// </summary>
/// <param name="getTelemetryInformation"> A function that provides telemetry details such as operation type, status code, consistency level, and request charge.</param>
/// <remarks>This method gathers telemetry information, including details such as the database, container, operation type, status code, consistency level, and partition key range ID. It uses these dimensions to push metrics to OpenTelemetry, enabling tracking of performance metrics such as request latency, request charge, and item counts.</remarks>
public void CollectOperationAndNetworkInfo(Func<TelemetryInformation> getTelemetryInformation)
{
TelemetryInformation telemetryInformation = getTelemetryInformation();

KeyValuePair<string, object>[] dimensions = new[]
{
new KeyValuePair<string, object>("AccountName", this.accountName),
new KeyValuePair<string, object>("Container", telemetryInformation.ContainerId),
new KeyValuePair<string, object>("Database", telemetryInformation.DatabaseId),
new KeyValuePair<string, object>("Operation", telemetryInformation.OperationType),
new KeyValuePair<string, object>("OperationStatusCode", telemetryInformation.StatusCode),
new KeyValuePair<string, object>("ClientCorrelationId", this.clientId),
new KeyValuePair<string, object>("ConsistencyLevel", telemetryInformation.ConsistencyLevel),
new KeyValuePair<string, object>("PartitionKeyRangeId", telemetryInformation.PartitionKeyRangeId),
};

PushOperationLevelMetrics(telemetryInformation, dimensions);
}

/// <summary>
/// Pushes various operation-level metrics to OpenTelemetry.
/// </summary>
/// <param name="telemetryInformation">Contains telemetry data related to the operation, such as item counts, request charge, and latency.</param>
/// <param name="dimensions">Key-value pairs representing various metadata about the operation (e.g., container, operation type, consistency level).</param>
private static void PushOperationLevelMetrics(TelemetryInformation telemetryInformation, KeyValuePair<string, object>[] dimensions)
{
Console.WriteLine("Pushing maxItemCounts " + telemetryInformation.MaxItemCount);
maxItemCounts ??= new ConcurrentBag<Tuple<int, KeyValuePair<string, object>[]>>();
maxItemCounts.Add(new Tuple<int, KeyValuePair<string, object>[]>(Convert.ToInt32(telemetryInformation.MaxItemCount), dimensions));

Console.WriteLine("Pushing ActualItemCount " + telemetryInformation.ActualItemCount);
actualItemCounts ??= new ConcurrentBag<Tuple<int, KeyValuePair<string, object>[]>>();
actualItemCounts.Add(new Tuple<int, KeyValuePair<string, object>[]>(Convert.ToInt32(telemetryInformation.ActualItemCount), dimensions));

Console.WriteLine("Pushing telemetryInformation.RegionsContactedList.Count " + telemetryInformation.RegionsContactedList.Count);
regionsContactedCounts ??= new ConcurrentBag<Tuple<int, KeyValuePair<string, object>[]>>();
regionsContactedCounts.Add(new Tuple<int, KeyValuePair<string, object>[]>(telemetryInformation.RegionsContactedList.Count, dimensions));

OpenTelemetryMetrics.RequestUnitsHistogram.Record(telemetryInformation.RequestCharge, dimensions);
OpenTelemetryMetrics.RequestLatencyHistogram.Record(telemetryInformation.RequestLatency.Value.Milliseconds, dimensions);
OpenTelemetryMetrics.NumberOfOperationCallCounter.Add(1, dimensions);
}

public static IEnumerable<Measurement<int>> GetMaxItemCount()
{
foreach (Tuple<int, KeyValuePair<string, object>[]> maxItemCount in maxItemCounts)
{
Console.WriteLine("Pulling maxItemCounts " + maxItemCount.Item1);
yield return new Measurement<int>(maxItemCount.Item1, maxItemCount.Item2);
}
}

public static IEnumerable<Measurement<int>> GetActualItemCount()
{
foreach (Tuple<int, KeyValuePair<string, object>[]> actualItemCount in actualItemCounts)
{
Console.WriteLine("Pulling actualItemCount " + actualItemCount.Item1);
yield return new Measurement<int>(actualItemCount.Item1, actualItemCount.Item2);
}
}

public static IEnumerable<Measurement<int>> GetRegionContactedCount()
{
foreach (Tuple<int, KeyValuePair<string, object>[]> regionContactedCount in regionsContactedCounts)
{
Console.WriteLine("Pulling regionContactedCount " + regionContactedCount.Item1);
yield return new Measurement<int>(regionContactedCount.Item1, regionContactedCount.Item2);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,5 +26,9 @@ internal class TelemetryInformation
internal double RequestCharge { get; set; } // Required only for operation level telemetry
internal string CollectionLink { get; set; } = null; // Required only for collection cache telemetry
internal ITrace Trace { get; set; } // Required to fetch network level telemetry out of the trace object

internal string MaxItemCount { get; set; }
internal string ActualItemCount { get; set; }
internal string PartitionKeyRangeId { get; set; }
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
// ------------------------------------------------------------
// Copyright (c) Microsoft Corporation. All rights reserved.
// ------------------------------------------------------------

namespace Microsoft.Azure.Cosmos
{
using System;
using System.Collections.Generic;
using System.Diagnostics.Metrics;
using System.Text;
using Microsoft.Azure.Cosmos.Telemetry;

/// <summary>
/// The OpenTelemetryMetrics class contains internal static members to create and record Cosmos DB SDK metrics using OpenTelemetry. These metrics allow you to monitor the performance and resource consumption of Cosmos DB operations.
/// </summary>
internal static class OpenTelemetryMetrics
{
internal static readonly Meter CosmosMeter = new Meter("Azure.Cosmos.Client");

internal static readonly Counter<int> NumberOfOperationCallCounter =
CosmosMeter.CreateCounter<int>(name: "cosmos.client.op.calls",
unit: "#",
description: "Number of operation calls");

internal static readonly Histogram<double> RequestLatencyHistogram =
CosmosMeter.CreateHistogram<double>(name: "cosmos.client.op.latency",
unit: "#",
description: "Total end-to-end duration of the operation");

internal static readonly Histogram<double> RequestUnitsHistogram =
CosmosMeter.CreateHistogram<double>(name: "cosmos.client.op.RUs",
unit: "#",
description: "Total request units per operation (sum of RUs for all requested needed when processing an operation)");

internal static readonly ObservableGauge<int> maxItemGauge =
CosmosMeter.CreateObservableGauge<int>(name: "cosmos.client.op.maxItemCount",
observeValues: () => OpenTelemetryMetricsCollector.GetMaxItemCount(),
unit: "#",
description: "For feed operations (query, readAll, readMany, change feed) and batch operations this meter capture the requested maxItemCount per page/request");

internal static readonly ObservableGauge<int> ActualItemCounter =
CosmosMeter.CreateObservableGauge<int>(name: "cosmos.client.op.actualItemCount",
observeValues: () => OpenTelemetryMetricsCollector.GetActualItemCount(),
unit: "#",
description: "For feed operations (query, readAll, readMany, change feed) batch operations this meter capture the actual item count in responses from the service");

internal static readonly ObservableGauge<int> RegionsContactedCounter =
CosmosMeter.CreateObservableGauge<int>(name: "cosmos.client.op.regionsContacted",
observeValues: () => OpenTelemetryMetricsCollector.GetRegionContactedCount(),
unit: "# regions",
description: "Number of regions contacted when executing an operation");

}
}
Loading