Skip to content

Commit

Permalink
Added request level metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
sourabh1007 committed Sep 13, 2024
1 parent 0fdb5e4 commit 6fc4195
Show file tree
Hide file tree
Showing 8 changed files with 214 additions and 49 deletions.
57 changes: 28 additions & 29 deletions Microsoft.Azure.Cosmos/src/Handler/TelemetryHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
namespace Microsoft.Azure.Cosmos.Handlers
{
using System;
using System.Collections.Generic;
using System.IO;
using System.Threading;
using System.Threading.Tasks;
Expand All @@ -26,38 +27,36 @@ public override async Task<ResponseMessage> SendAsync(
CancellationToken cancellationToken)
{
ResponseMessage response = await base.SendAsync(request, cancellationToken);
if (this.IsAllowed(request))
try
{
try
{
this.telemetryToServiceHelper.GetCollector().CollectOperationAndNetworkInfo(
() => new TelemetryInformation
{
RegionsContactedList = response.Diagnostics.GetContactedRegions(),
RequestLatency = response.Diagnostics.GetClientElapsedTime(),
StatusCode = response.StatusCode,
ResponseSizeInBytes = TelemetryHandler.GetPayloadSize(response),
ContainerId = request.ContainerId,
DatabaseId = request.DatabaseId,
OperationType = request.OperationType,
ResourceType = request.ResourceType,
ConsistencyLevel = request.Headers?[Documents.HttpConstants.HttpHeaders.ConsistencyLevel],
RequestCharge = response.Headers.RequestCharge,
SubStatusCode = response.Headers.SubStatusCode,
Trace = response.Trace
});
}
catch (Exception ex)
{
DefaultTrace.TraceError("Error while collecting telemetry information : {0}", ex);
}
this.telemetryToServiceHelper
.GetCollectors(request)
.ForEach((collector) => collector.CollectOperationAndNetworkInfo(
() => new TelemetryInformation
{
RegionsContactedList = response.Diagnostics.GetContactedRegions(),
RequestLatency = response.Diagnostics.GetClientElapsedTime(),
StatusCode = response.StatusCode,
ResponseSizeInBytes = TelemetryHandler.GetPayloadSize(response),
ContainerId = request.ContainerId,
DatabaseId = request.DatabaseId,
OperationType = request.OperationType,
ResourceType = request.ResourceType,
ConsistencyLevel = request.Headers?[Documents.HttpConstants.HttpHeaders.ConsistencyLevel],
RequestCharge = response.Headers.RequestCharge,
SubStatusCode = response.Headers.SubStatusCode,
Trace = response.Trace,
MaxItemCount = request.Headers.PageSize,
ActualItemCount = response.Headers.ItemCount,
PartitionKeyRangeId = request.Headers.PartitionKeyRangeId
}));
}
catch (Exception ex)
{
DefaultTrace.TraceError("Error while collecting telemetry information : {0}", ex);
}
return response;
}

private bool IsAllowed(RequestMessage request)
{
return ClientTelemetryOptions.AllowedResourceTypes.Equals(request.ResourceType);
return response;
}

/// <summary>
Expand Down
4 changes: 2 additions & 2 deletions Microsoft.Azure.Cosmos/src/Routing/ClientCollectionCache.cs
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ await this.storeModel.ProcessMessageAsync(request))
{
ContainerProperties containerProperties = CosmosResource.FromStream<ContainerProperties>(response);

this.telemetryToServiceHelper.GetCollector().CollectCacheInfo(
this.telemetryToServiceHelper.GetCollectors().ForEach((collector) => collector.CollectCacheInfo(
ClientCollectionCache.TelemetrySourceName,
() => new TelemetryInformation
{
Expand All @@ -227,7 +227,7 @@ await this.storeModel.ProcessMessageAsync(request))
ResourceType = request.ResourceType,
SubStatusCode = response.SubStatusCode,
CollectionLink = collectionLink
});
}));

return containerProperties;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
//------------------------------------------------------------
// Copyright (c) Microsoft Corporation. All rights reserved.
//------------------------------------------------------------

namespace Microsoft.Azure.Cosmos.Telemetry
{
using System;
using System.Collections.Generic;
using Microsoft.Azure.Cosmos.Telemetry.Collector;

/// <summary>
/// The OpenTelemetryMetricsCollector class is responsible for collecting and recording Cosmos DB operational metrics, such as item counts, request latency, request units, and regions contacted.
/// This data is captured using the OpenTelemetry metrics API, which allows tracking and analysis of Cosmos DB operations at a granular level.
/// </summary>
internal class OpenTelemetryMetricsCollector : ITelemetryCollector
{
private readonly string clientId;
private readonly string accountName;

/// <summary>
/// Initializes a new instance of the OpenTelemetryMetricsCollector class.
/// </summary>
/// <param name="clientId">A unique identifier for the Cosmos DB client instance</param>
/// <param name="accountName">The Cosmos DB account name.</param>
public OpenTelemetryMetricsCollector(string clientId, string accountName)
{
this.clientId = clientId;
this.accountName = accountName;
}

public void CollectCacheInfo(string cacheName, Func<TelemetryInformation> getTelemetryInformation)
{
// No OP
}

/// <summary>
/// Collects telemetry data related to operations and network information, including request performance, item counts, and regions contacted.
/// </summary>
/// <param name="getTelemetryInformation"> A function that provides telemetry details such as operation type, status code, consistency level, and request charge.</param>
/// <remarks>This method gathers telemetry information, including details such as the database, container, operation type, status code, consistency level, and partition key range ID. It uses these dimensions to push metrics to OpenTelemetry, enabling tracking of performance metrics such as request latency, request charge, and item counts.</remarks>
public void CollectOperationAndNetworkInfo(Func<TelemetryInformation> getTelemetryInformation)
{
TelemetryInformation telemetryInformation = getTelemetryInformation();

KeyValuePair<string, object>[] dimensions = new[]
{
new KeyValuePair<string, object>("Container", $"{this.accountName}/{telemetryInformation.DatabaseId}/{telemetryInformation.ContainerId}"),
new KeyValuePair<string, object>("Operation", telemetryInformation.OperationType),
new KeyValuePair<string, object>("OperationStatusCode", telemetryInformation.StatusCode),
new KeyValuePair<string, object>("ClientCorrelationId", this.clientId),
new KeyValuePair<string, object>("ConsistencyLevel", telemetryInformation.ConsistencyLevel),
new KeyValuePair<string, object>("PartitionKeyRangeId", telemetryInformation.PartitionKeyRangeId),
};

PushOperationLevelMetrics(telemetryInformation, dimensions);
}

/// <summary>
/// Pushes various operation-level metrics to OpenTelemetry.
/// </summary>
/// <param name="telemetryInformation">Contains telemetry data related to the operation, such as item counts, request charge, and latency.</param>
/// <param name="dimensions">Key-value pairs representing various metadata about the operation (e.g., container, operation type, consistency level).</param>
private static void PushOperationLevelMetrics(TelemetryInformation telemetryInformation, KeyValuePair<string, object>[] dimensions)
{
OpenTelemetryMetrics.MaxItemCounter.Add(Convert.ToInt32(telemetryInformation.MaxItemCount), dimensions);
OpenTelemetryMetrics.ActualItemCounter.Add(Convert.ToInt32(telemetryInformation.ActualItemCount), dimensions);
OpenTelemetryMetrics.RegionsContactedCounter.Add(telemetryInformation.RegionsContactedList.Count, dimensions);
OpenTelemetryMetrics.RequestUnitsHistogram.Record(telemetryInformation.RequestCharge, dimensions);
OpenTelemetryMetrics.RequestLatencyHistogram.Record(telemetryInformation.RequestLatency.Value.Milliseconds, dimensions);
OpenTelemetryMetrics.NumberOfOperationCallCounter.Add(1, dimensions);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,5 +26,9 @@ internal class TelemetryInformation
internal double RequestCharge { get; set; } // Required only for operation level telemetry
internal string CollectionLink { get; set; } = null; // Required only for collection cache telemetry
internal ITrace Trace { get; set; } // Required to fetch network level telemetry out of the trace object

internal string MaxItemCount { get; set; }
internal string ActualItemCount { get; set; }
internal string PartitionKeyRangeId { get; set; }
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
// ------------------------------------------------------------
// Copyright (c) Microsoft Corporation. All rights reserved.
// ------------------------------------------------------------

namespace Microsoft.Azure.Cosmos
{
using System;
using System.Collections.Generic;
using System.Diagnostics.Metrics;
using System.Text;

/// <summary>
/// The OpenTelemetryMetrics class contains internal static members to create and record Cosmos DB SDK metrics using OpenTelemetry. These metrics allow you to monitor the performance and resource consumption of Cosmos DB operations.
/// </summary>
internal static class OpenTelemetryMetrics
{
private static readonly Meter Meter = new Meter("Azure.Cosmos.SDK.Metrics");

internal static readonly Counter<int> NumberOfOperationCallCounter =
Meter.CreateCounter<int>("cosmos.client.op.calls", "#", "Number of operation calls");

internal static readonly Histogram<double> RequestLatencyHistogram =
Meter.CreateHistogram<double>("cosmos.client.op.latency", "#", "Total end-to-end duration of the operation");

internal static readonly Histogram<double> RequestUnitsHistogram =
Meter.CreateHistogram<double>("cosmos.client.op.RUs", "#", "Total request units per operation (sum of RUs for all requested needed when processing an operation)");

internal static readonly Counter<int> MaxItemCounter =
Meter.CreateCounter<int>("cosmos.client.op.maxItemCount", "#", "For feed operations (query, readAll, readMany, change feed) and batch operations this meter capture the requested maxItemCount per page/request");

internal static readonly Counter<int> ActualItemCounter =
Meter.CreateCounter<int>("cosmos.client.op.actualItemCount", "#", "For feed operations (query, readAll, readMany, change feed) batch operations this meter capture the actual item count in responses from the service");

internal static readonly Counter<int> RegionsContactedCounter =
Meter.CreateCounter<int>("cosmos.client.op.regionsContacted", "#", "Number of regions contacted when executing an operation");
}
}
42 changes: 31 additions & 11 deletions Microsoft.Azure.Cosmos/src/Telemetry/TelemetryToServiceHelper.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
namespace Microsoft.Azure.Cosmos.Telemetry
{
using System;
using System.Collections.Generic;
using System.Net.Http;
using System.Threading;
using System.Threading.Tasks;
Expand All @@ -19,24 +20,25 @@ namespace Microsoft.Azure.Cosmos.Telemetry

internal class TelemetryToServiceHelper : IDisposable
{
private readonly OpenTelemetryMetricsCollector openTelemetryCollector = null;
private ITelemetryCollector collector = new TelemetryCollectorNoOp();

internal static TimeSpan DefaultBackgroundRefreshClientConfigTimeInterval
= TimeSpan.FromMinutes(10);

private readonly AuthorizationTokenProvider cosmosAuthorization;
private readonly CosmosHttpClient httpClient;
private readonly Uri serviceEnpoint;
private readonly Uri serviceEndpoint;
private readonly ConnectionPolicy connectionPolicy;
private readonly string clientId;
private readonly GlobalEndpointManager globalEndpointManager;
private readonly CancellationTokenSource cancellationTokenSource;

private ClientTelemetry clientTelemetry = null;

private TelemetryToServiceHelper()
private TelemetryToServiceHelper(string clientId, string accountName)
{
//NoOpConstructor
this.openTelemetryCollector = new OpenTelemetryMetricsCollector(clientId: clientId, accountName: accountName);
}

private TelemetryToServiceHelper(
Expand All @@ -52,9 +54,11 @@ private TelemetryToServiceHelper(
this.cosmosAuthorization = cosmosAuthorization;
this.httpClient = httpClient;
this.connectionPolicy = connectionPolicy;
this.serviceEnpoint = serviceEndpoint;
this.serviceEndpoint = serviceEndpoint;
this.globalEndpointManager = globalEndpointManager;
this.cancellationTokenSource = cancellationTokenSource;

this.openTelemetryCollector = new OpenTelemetryMetricsCollector(clientId: clientId, accountName: serviceEndpoint.Host);
}

public static TelemetryToServiceHelper CreateAndInitializeClientConfigAndTelemetryJob(string clientId,
Expand All @@ -66,11 +70,11 @@ public static TelemetryToServiceHelper CreateAndInitializeClientConfigAndTelemet
CancellationTokenSource cancellationTokenSource)
{
#if INTERNAL
return new TelemetryToServiceHelper();
return new TelemetryToServiceHelper(clientId: clientId, accountName: serviceEndpoint.Host);
#else
if (connectionPolicy.CosmosClientTelemetryOptions.DisableSendingMetricsToService)
{
return new TelemetryToServiceHelper();
return new TelemetryToServiceHelper(clientId: clientId, accountName: serviceEndpoint.Host);
}

TelemetryToServiceHelper helper = new TelemetryToServiceHelper(
Expand All @@ -82,7 +86,7 @@ public static TelemetryToServiceHelper CreateAndInitializeClientConfigAndTelemet
globalEndpointManager: globalEndpointManager,
cancellationTokenSource: cancellationTokenSource);

_ = helper.RetrieveConfigAndInitiateTelemetryAsync(); // Let it run in backgroud
_ = helper.RetrieveConfigAndInitiateTelemetryAsync(); // Let it run in background

return helper;
#endif
Expand All @@ -92,7 +96,7 @@ private async Task RetrieveConfigAndInitiateTelemetryAsync()
{
try
{
Uri serviceEndpointWithPath = new Uri(this.serviceEnpoint + Paths.ClientConfigPathSegment);
Uri serviceEndpointWithPath = new Uri(this.serviceEndpoint + Paths.ClientConfigPathSegment);
while (!this.cancellationTokenSource.IsCancellationRequested)
{
TryCatch<AccountClientConfiguration> databaseAccountClientConfigs = await this.GetDatabaseAccountClientConfigAsync(
Expand Down Expand Up @@ -172,9 +176,25 @@ await cosmosAuthorization.AddAuthorizationHeaderAsync(
}
}

public ITelemetryCollector GetCollector()
public List<ITelemetryCollector> GetCollectors(RequestMessage request = null)
{
List<ITelemetryCollector> collectors = new List<ITelemetryCollector>(2);
if (request is null || IsAllowed(request))
{
collectors.Add(this.collector);
}

if (this.openTelemetryCollector != null)
{
collectors.Add(this.openTelemetryCollector);
}

return collectors;
}

private static bool IsAllowed(RequestMessage request)
{
return this.collector;
return ClientTelemetryOptions.AllowedResourceTypes.Equals(request.ResourceType);
}

public bool IsClientTelemetryJobRunning()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@
using Microsoft.VisualStudio.TestTools.UnitTesting;
using Moq;
using Moq.Protected;
using OpenTelemetry.Metrics;
using OpenTelemetry;
using System.Diagnostics;

[TestClass]
public class ClientCreateAndInitializeTest : BaseCosmosClientHelper
Expand Down Expand Up @@ -54,6 +57,14 @@ public async Task Cleanup()
[TestMethod]
public async Task CreateAndInitializeTest()
{
var histogramBuckets = new double[] { 0, 5, 10, 25, 50, 75, 100, 250, 500 };
MeterProvider meterProvider = Sdk
.CreateMeterProviderBuilder()
.AddMeter("*")
.AddView("cosmos.client.op.RUs", new ExplicitBucketHistogramConfiguration { Boundaries = histogramBuckets })
.AddConsoleExporter()
.Build();

int httpCallsMade = 0;
HttpClientHandlerHelper httpClientHandlerHelper = new HttpClientHandlerHelper
{
Expand All @@ -74,15 +85,35 @@ public async Task CreateAndInitializeTest()
};

CosmosClient cosmosClient = await CosmosClient.CreateAndInitializeAsync(endpoint, authKey, containers, cosmosClientOptions);
Assert.IsNotNull(cosmosClient);
// Assert.IsNotNull(cosmosClient);
int httpCallsMadeAfterCreation = httpCallsMade;

ContainerInternal container = (ContainerInternal)cosmosClient.GetContainer(this.database.Id, "ClientCreateAndInitializeContainer");
ItemResponse<ToDoActivity> readResponse = await container.ReadItemAsync<ToDoActivity>("1", new Cosmos.PartitionKey("Status1"));
string diagnostics = readResponse.Diagnostics.ToString();
Assert.IsTrue(diagnostics.Contains("\"ConnectionMode\":\"Direct\""));
Assert.AreEqual(httpCallsMade, httpCallsMadeAfterCreation);

await Task.Delay(1000);

Stopwatch sw = Stopwatch.StartNew();
sw.Start();
while(true)
{
ItemResponse<ToDoActivity> readResponse = await container.ReadItemAsync<ToDoActivity>("1", new Cosmos.PartitionKey("Status1"));
string diagnostics = readResponse.Diagnostics.ToString();
if(sw.ElapsedMilliseconds > 2000)
{
break;
}
}
sw.Stop();

await Task.Delay(1000);

/* Assert.IsTrue(diagnostics.Contains("\"ConnectionMode\":\"Direct\""));
Assert.AreEqual(httpCallsMade, httpCallsMadeAfterCreation);*/
cosmosClient.Dispose();

meterProvider.Dispose();

await Task.Delay(1000);
}

[TestMethod]
Expand Down
Loading

0 comments on commit 6fc4195

Please sign in to comment.