Skip to content

Commit

Permalink
Routing: Adds Parallel Request Hedging (#4198)
Browse files Browse the repository at this point in the history
* initial commit

* fix

* document client restore

* document client changes

* clientContextCore fix

* global endpoint manager fix

* pre test changes

* start of tests

* added dispose for cancellation token source

* test changes

* working test

* more testing

* removed unneeded changes

* revert changes to global endpoint manager (unneeded)

* requested changes

* requested changes

* moves logic into availability strategy

* adds disableStrategy type

* fixed test

* refactor should hedge

* refactor should hedge/adds can use availability strat

* fixed mocking test

* Update Microsoft.Azure.Cosmos/src/Handler/RequestInvokerHandler.cs

Co-authored-by: Matias Quaranta <[email protected]>

* requested changes

* fix enabled

* added preview flag

* fixed XML

* fixed preview flags

* fixed bugs

* nit

* changed preview flags + update contracts

* revert file for whitespace

* changed file back after update contract

* removed using

* requested changes

* fixed small bug

* fixed bug

* removed options from client builder

* removed usings

* constructor check

* fixed test

* added exclude region to logs

* lazy task create

* rename + lazy

* improvements and fixes

* Added XML commentes

* fixed xml comments

* small tweeks

* added fixes + tests

* added item check to tests

* changed test regions to match with CI accounts

* query test

* update test for multiregion CI pipelines

* enviroment var null check

* null checks

* perf tests

* revert benchmark project

* possible memory saving

* Tests/improvements

* extensive testing improvements + bug fixes

* removed unneeded changes

* nits

* fix hedge regions

* update contracts

* fix updatecontract

* test fix

* fixed areequal asserts

* ALTERNATE METHOD

* added readfeed FI operation type to tests

* requested changes and improvemtns

* list optimization

* fixed edge case diagnostics

* small fix

* small fixes

* refactor code

* fixed null issues

* null refrence

* bug fixes

* changed header clone to internal

* fixed API doc + test change

* removed unused method

* changed to internal

* fixed internal

* removed contract changes

* updated abstract class

* suggested changes

* small bug fixes/improvements

* nits and fixes

* removed unused method

* test fix + applicable region fix + error handeling

* fixed test

* fixed test

* location cache change

* requested changes

* fixed test

* nits

* headers change

* Update Microsoft.Azure.Cosmos/src/Handler/RequestInvokerHandler.cs

Co-authored-by: Matias Quaranta <[email protected]>

* simplifed method

* nit

* fixed change

* remove linq

* fixed faulty change

* reverted accidental test change

* removed tooManyRequests test - unlrelated bug

* removed tooManyRequests test - unlrelated bug

---------

Co-authored-by: Matias Quaranta <[email protected]>
  • Loading branch information
NaluTripician and ealsur authored Jul 12, 2024
1 parent b478595 commit 3fd2ce6
Show file tree
Hide file tree
Showing 16 changed files with 1,547 additions and 44 deletions.
9 changes: 7 additions & 2 deletions Microsoft.Azure.Cosmos/src/CosmosClientOptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ namespace Microsoft.Azure.Cosmos
using System.Net.Http;
using System.Net.Security;
using System.Security.Cryptography.X509Certificates;
using Microsoft.Azure.Cosmos.Fluent;
using Microsoft.Azure.Cosmos.Fluent;
using Microsoft.Azure.Documents;
using Microsoft.Azure.Documents.Client;
using Newtonsoft.Json;
Expand Down Expand Up @@ -650,7 +650,12 @@ public Func<HttpClient> HttpClientFactory

this.httpClientFactory = value;
}
}
}

/// <summary>
/// Availability Strategy to be used for periods of high latency
/// </summary>
internal AvailabilityStrategy AvailabilityStrategy { get; set; }

/// <summary>
/// Enable partition key level failover
Expand Down
4 changes: 4 additions & 0 deletions Microsoft.Azure.Cosmos/src/DocumentClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ internal partial class DocumentClient : IDisposable, IAuthorizationTokenProvider

private readonly bool IsLocalQuorumConsistency = false;
private readonly bool isReplicaAddressValidationEnabled;
private readonly AvailabilityStrategy availabilityStrategy;

//Fault Injection
private readonly IChaosInterceptorFactory chaosInterceptorFactory;
Expand Down Expand Up @@ -439,6 +440,7 @@ internal DocumentClient(Uri serviceEndpoint,
/// <param name="cosmosClientId"></param>
/// <param name="remoteCertificateValidationCallback">This delegate responsible for validating the third party certificate. </param>
/// <param name="cosmosClientTelemetryOptions">This is distributed tracing flag</param>
/// <param name="availabilityStrategy">This is the availability strategy for the client</param>"
/// <param name="chaosInterceptorFactory">This is the chaos interceptor used for fault injection</param>
/// <remarks>
/// The service endpoint can be obtained from the Azure Management Portal.
Expand Down Expand Up @@ -468,6 +470,7 @@ internal DocumentClient(Uri serviceEndpoint,
string cosmosClientId = null,
RemoteCertificateValidationCallback remoteCertificateValidationCallback = null,
CosmosClientTelemetryOptions cosmosClientTelemetryOptions = null,
AvailabilityStrategy availabilityStrategy = null,
IChaosInterceptorFactory chaosInterceptorFactory = null)
{
if (sendingRequestEventArgs != null)
Expand All @@ -491,6 +494,7 @@ internal DocumentClient(Uri serviceEndpoint,
this.transportClientHandlerFactory = transportClientHandlerFactory;
this.IsLocalQuorumConsistency = isLocalQuorumConsistency;
this.initTaskCache = new AsyncCacheNonBlocking<string, bool>(cancellationToken: this.cancellationTokenSource.Token);
this.availabilityStrategy = availabilityStrategy;
this.chaosInterceptorFactory = chaosInterceptorFactory;
this.chaosInterceptor = chaosInterceptorFactory?.CreateInterceptor(this);

Expand Down
13 changes: 12 additions & 1 deletion Microsoft.Azure.Cosmos/src/Fluent/CosmosClientBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ namespace Microsoft.Azure.Cosmos.Fluent
using System.Threading.Tasks;
using global::Azure;
using global::Azure.Core;
using Microsoft.Azure.Cosmos.Core.Trace;
using Microsoft.Azure.Cosmos.Core.Trace;
using Microsoft.Azure.Documents;
using Microsoft.Azure.Documents.Client;

Expand Down Expand Up @@ -683,6 +683,17 @@ internal CosmosClientBuilder WithApiType(ApiType apiType)
{
this.clientOptions.ApiType = apiType;
return this;
}

/// <summary>
/// Availability Stragey to be used for periods of high latency
/// </summary>
/// <param name="strategy"></param>
/// <returns>The CosmosClientBuilder</returns>
internal CosmosClientBuilder WithAvailibilityStrategy(AvailabilityStrategy strategy)
{
this.clientOptions.AvailabilityStrategy = strategy;
return this;
}

/// <summary>
Expand Down
48 changes: 42 additions & 6 deletions Microsoft.Azure.Cosmos/src/Handler/RequestInvokerHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ namespace Microsoft.Azure.Cosmos.Handlers
using System.Net.Http;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Azure.Cosmos.Diagnostics;
using Microsoft.Azure.Cosmos.Routing;
using Microsoft.Azure.Cosmos.Tracing;
using Microsoft.Azure.Documents;
Expand All @@ -38,6 +39,7 @@ public RequestInvokerHandler(
Cosmos.PriorityLevel? requestedClientPriorityLevel)
{
this.client = client;

this.RequestedClientConsistencyLevel = requestedClientConsistencyLevel;
this.RequestedClientPriorityLevel = requestedClientPriorityLevel;
}
Expand All @@ -52,11 +54,8 @@ public override async Task<ResponseMessage> SendAsync(
}

RequestOptions promotedRequestOptions = request.RequestOptions;
if (promotedRequestOptions != null)
{
// Fill request options
promotedRequestOptions.PopulateRequestOptions(request);
}
// Fill request options
promotedRequestOptions?.PopulateRequestOptions(request);

// Adds the NoContent header if not already added based on Client Level flag
if (RequestInvokerHandler.ShouldSetNoContentResponseHeaders(
Expand All @@ -79,7 +78,44 @@ public override async Task<ResponseMessage> SendAsync(

await request.AssertPartitioningDetailsAsync(this.client, cancellationToken, request.Trace);
this.FillMultiMasterContext(request);
return await base.SendAsync(request, cancellationToken);

AvailabilityStrategy strategy = this.AvailabilityStrategy(request);

if (strategy != null && strategy.Enabled())
{
return await strategy.ExecuteAvailabilityStrategyAsync(
this.BaseSendAsync,
this.client,
request,
cancellationToken);
}

return await this.BaseSendAsync(request, cancellationToken);
}

/// <summary>
/// This method determines if there is an availability strategy that the request can use.
/// Note that the request level availability strategy options override the client level options.
/// </summary>
/// <param name="request"></param>
/// <returns>whether the request should be a parallel hedging request.</returns>
public AvailabilityStrategy AvailabilityStrategy(RequestMessage request)
{
return request.RequestOptions?.AvailabilityStrategy
?? this.client.ClientOptions.AvailabilityStrategy;
}

public virtual async Task<ResponseMessage> BaseSendAsync(
RequestMessage request,
CancellationToken cancellationToken)
{
ResponseMessage response = await base.SendAsync(request, cancellationToken);
if (request.RequestOptions?.ExcludeRegions != null)
{
((CosmosTraceDiagnostics)response.Diagnostics).Value.AddOrUpdateDatum("ExcludedRegions", request.RequestOptions.ExcludeRegions);
}

return response;
}

public virtual async Task<T> SendAsync<T>(
Expand Down
67 changes: 65 additions & 2 deletions Microsoft.Azure.Cosmos/src/Handler/RequestMessage.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,10 @@
namespace Microsoft.Azure.Cosmos
{
using System;
using System.Collections.Generic;
using System.Collections.Generic;
using System.Diagnostics;
using System.Globalization;
using System.IO;
using System.IO;
using System.Net;
using System.Net.Http;
using System.Threading;
Expand Down Expand Up @@ -62,6 +62,28 @@ internal RequestMessage(
this.Method = method;
this.RequestUriString = requestUriString;
this.Trace = trace ?? throw new ArgumentNullException(nameof(trace));
}

/// <summary>
/// Create a <see cref="RequestMessage"/>, used for Clone() method.
/// </summary>
/// <param name="method">The http method</param>
/// <param name="requestUriString">The requested URI</param>
/// <param name="trace">The trace node to append traces to.</param>
/// <param name="headers">The headers to use.</param>
/// <param name="properties">The properties to use.</param>
private RequestMessage(
HttpMethod method,
string requestUriString,
ITrace trace,
Headers headers,
Dictionary<string, object> properties)
{
this.Method = method;
this.RequestUriString = requestUriString;
this.Trace = trace ?? throw new ArgumentNullException(nameof(trace));
this.headers = new Lazy<Headers>(() => headers);
this.properties = new Lazy<Dictionary<string, object>>(() => properties);
}

/// <summary>
Expand Down Expand Up @@ -286,6 +308,47 @@ internal DocumentServiceRequest ToDocumentServiceRequest()
this.DocumentServiceRequest.RequestContext.ExcludeRegions = this.RequestOptions?.ExcludeRegions;
this.OnBeforeRequestHandler(this.DocumentServiceRequest);
return this.DocumentServiceRequest;
}

/// <summary>
/// Clone the request message
/// </summary>
/// <returns>a cloned copy of the RequestMessage</returns>
internal RequestMessage Clone(ITrace newTrace, CloneableStream cloneContent)
{
RequestMessage clone = new RequestMessage(
this.Method,
this.RequestUriString,
newTrace,
this.Headers.Clone(),
new Dictionary<string, object>(this.Properties));

if (this.Content != null && cloneContent != null)
{
clone.Content = cloneContent.Clone();
}

if (this.RequestOptions != null)
{
clone.RequestOptions = this.RequestOptions.ShallowCopy();
}

clone.ResourceType = this.ResourceType;

clone.OperationType = this.OperationType;

if (this.PartitionKeyRangeId != null)
{
clone.PartitionKeyRangeId = string.IsNullOrEmpty(this.PartitionKeyRangeId.CollectionRid)
? new PartitionKeyRangeIdentity(this.PartitionKeyRangeId.PartitionKeyRangeId)
: new PartitionKeyRangeIdentity(this.PartitionKeyRangeId.CollectionRid, this.PartitionKeyRangeId.PartitionKeyRangeId);
}

clone.UseGatewayMode = this.UseGatewayMode;
clone.ContainerId = this.ContainerId;
clone.DatabaseId = this.DatabaseId;

return clone;
}

private static Dictionary<string, object> CreateDictionary()
Expand Down
22 changes: 21 additions & 1 deletion Microsoft.Azure.Cosmos/src/Headers/Headers.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ namespace Microsoft.Azure.Cosmos
{
using System;
using System.Collections;
using System.Collections.Generic;
using System.Collections.Generic;
using System.Globalization;
using Microsoft.Azure.Documents;
using Microsoft.Azure.Documents.Collections;
Expand Down Expand Up @@ -285,6 +285,11 @@ internal Headers(INameValueCollection nameValueCollection)
HttpResponseHeadersWrapper httpResponseHeaders => httpResponseHeaders,
_ => new NameValueResponseHeaders(nameValueCollection),
};
}

internal Headers(CosmosMessageHeadersInternal cosmosMessageHeaders)
{
this.CosmosMessageHeaders = cosmosMessageHeaders;
}

/// <summary>
Expand Down Expand Up @@ -404,6 +409,21 @@ public virtual T GetHeaderValue<T>(string headerName)
{
return this.CosmosMessageHeaders.GetHeaderValue<T>(headerName);
}

/// <summary>
/// Clones the current <see cref="Headers"/>.
/// </summary>
/// <returns>a cloned copy of the current <see cref="Headers"/></returns>
internal Headers Clone()
{
Headers clone = new Headers();
foreach (string key in this.CosmosMessageHeaders.AllKeys())
{
clone.Add(key, this.CosmosMessageHeaders.Get(key));
}

return clone;
}

/// <summary>
/// Enumerates all the HTTP headers names in the <see cref="Headers"/>.
Expand Down
13 changes: 10 additions & 3 deletions Microsoft.Azure.Cosmos/src/RequestOptions/RequestOptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,8 @@
namespace Microsoft.Azure.Cosmos
{
using System;
using System.Collections.Generic;
using System.Collections.Generic;
using Microsoft.Azure.Documents;
using Telemetry;

/// <summary>
/// The default cosmos request options
Expand Down Expand Up @@ -69,7 +68,15 @@ public class RequestOptions
/// This can be used to route a request to a specific region by excluding all other regions.
/// If all regions are excluded, then the request will be routed to the primary/hub region.
/// </summary>
public List<string> ExcludeRegions { get; set; }
public List<string> ExcludeRegions { get; set; }

/// <summary>
/// Cosmos availability strategy.
/// Availability strategy allows the SDK to send out additional cross region requests to help
/// reduce latency and increase availability. Currently there is one type of availability strategy, parallel request hedging.
/// If there is a globally enabled availability strategy, setting one in the request options will override the global one.
/// </summary>
internal AvailabilityStrategy AvailabilityStrategy { get; set; }

/// <summary>
/// Gets or sets the boolean to use effective partition key routing in the cosmos db request.
Expand Down
1 change: 1 addition & 0 deletions Microsoft.Azure.Cosmos/src/Resource/ClientContextCore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ internal static CosmosClientContext Create(
cosmosClientId: cosmosClient.Id,
remoteCertificateValidationCallback: ClientContextCore.SslCustomValidationCallBack(clientOptions.GetServerCertificateCustomValidationCallback()),
cosmosClientTelemetryOptions: clientOptions.CosmosClientTelemetryOptions,
availabilityStrategy: clientOptions.AvailabilityStrategy,
chaosInterceptorFactory: clientOptions.ChaosInterceptorFactory);

return ClientContextCore.Create(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
//------------------------------------------------------------
// Copyright (c) Microsoft Corporation. All rights reserved.
//------------------------------------------------------------
namespace Microsoft.Azure.Cosmos
{
using System;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Azure.Cosmos.Handlers;

/// <summary>
/// Types of availability strategies supported
/// </summary>
internal abstract class AvailabilityStrategy
{
/// <summary>
/// Execute the availability strategy
/// </summary>
/// <param name="sender"></param>
/// <param name="client"></param>
/// <param name="requestMessage"></param>
/// <param name="cancellationToken"></param>
/// <returns>The response from the service after the availability strategy is executed</returns>
public abstract Task<ResponseMessage> ExecuteAvailabilityStrategyAsync(
Func<RequestMessage, CancellationToken, Task<ResponseMessage>> sender,
CosmosClient client,
RequestMessage requestMessage,
CancellationToken cancellationToken);

internal abstract bool Enabled();
}
}
Loading

0 comments on commit 3fd2ce6

Please sign in to comment.