diff --git a/Microsoft.Azure.Cosmos/src/CosmosClientOptions.cs b/Microsoft.Azure.Cosmos/src/CosmosClientOptions.cs index 32935f2c2d..6bfc65a005 100644 --- a/Microsoft.Azure.Cosmos/src/CosmosClientOptions.cs +++ b/Microsoft.Azure.Cosmos/src/CosmosClientOptions.cs @@ -13,7 +13,7 @@ namespace Microsoft.Azure.Cosmos using System.Net.Http; using System.Net.Security; using System.Security.Cryptography.X509Certificates; - using Microsoft.Azure.Cosmos.Fluent; + using Microsoft.Azure.Cosmos.Fluent; using Microsoft.Azure.Documents; using Microsoft.Azure.Documents.Client; using Newtonsoft.Json; @@ -650,7 +650,12 @@ public Func HttpClientFactory this.httpClientFactory = value; } - } + } + + /// + /// Availability Strategy to be used for periods of high latency + /// + internal AvailabilityStrategy AvailabilityStrategy { get; set; } /// /// Enable partition key level failover diff --git a/Microsoft.Azure.Cosmos/src/DocumentClient.cs b/Microsoft.Azure.Cosmos/src/DocumentClient.cs index ce1c9a43c2..946fa4cb08 100644 --- a/Microsoft.Azure.Cosmos/src/DocumentClient.cs +++ b/Microsoft.Azure.Cosmos/src/DocumentClient.cs @@ -114,6 +114,7 @@ internal partial class DocumentClient : IDisposable, IAuthorizationTokenProvider private readonly bool IsLocalQuorumConsistency = false; private readonly bool isReplicaAddressValidationEnabled; + private readonly AvailabilityStrategy availabilityStrategy; //Fault Injection private readonly IChaosInterceptorFactory chaosInterceptorFactory; @@ -439,6 +440,7 @@ internal DocumentClient(Uri serviceEndpoint, /// /// This delegate responsible for validating the third party certificate. /// This is distributed tracing flag + /// This is the availability strategy for the client" /// This is the chaos interceptor used for fault injection /// /// The service endpoint can be obtained from the Azure Management Portal. @@ -468,6 +470,7 @@ internal DocumentClient(Uri serviceEndpoint, string cosmosClientId = null, RemoteCertificateValidationCallback remoteCertificateValidationCallback = null, CosmosClientTelemetryOptions cosmosClientTelemetryOptions = null, + AvailabilityStrategy availabilityStrategy = null, IChaosInterceptorFactory chaosInterceptorFactory = null) { if (sendingRequestEventArgs != null) @@ -491,6 +494,7 @@ internal DocumentClient(Uri serviceEndpoint, this.transportClientHandlerFactory = transportClientHandlerFactory; this.IsLocalQuorumConsistency = isLocalQuorumConsistency; this.initTaskCache = new AsyncCacheNonBlocking(cancellationToken: this.cancellationTokenSource.Token); + this.availabilityStrategy = availabilityStrategy; this.chaosInterceptorFactory = chaosInterceptorFactory; this.chaosInterceptor = chaosInterceptorFactory?.CreateInterceptor(this); diff --git a/Microsoft.Azure.Cosmos/src/Fluent/CosmosClientBuilder.cs b/Microsoft.Azure.Cosmos/src/Fluent/CosmosClientBuilder.cs index c56fe7feda..43232aa422 100644 --- a/Microsoft.Azure.Cosmos/src/Fluent/CosmosClientBuilder.cs +++ b/Microsoft.Azure.Cosmos/src/Fluent/CosmosClientBuilder.cs @@ -12,7 +12,7 @@ namespace Microsoft.Azure.Cosmos.Fluent using System.Threading.Tasks; using global::Azure; using global::Azure.Core; - using Microsoft.Azure.Cosmos.Core.Trace; + using Microsoft.Azure.Cosmos.Core.Trace; using Microsoft.Azure.Documents; using Microsoft.Azure.Documents.Client; @@ -683,6 +683,17 @@ internal CosmosClientBuilder WithApiType(ApiType apiType) { this.clientOptions.ApiType = apiType; return this; + } + + /// + /// Availability Stragey to be used for periods of high latency + /// + /// + /// The CosmosClientBuilder + internal CosmosClientBuilder WithAvailibilityStrategy(AvailabilityStrategy strategy) + { + this.clientOptions.AvailabilityStrategy = strategy; + return this; } /// diff --git a/Microsoft.Azure.Cosmos/src/Handler/RequestInvokerHandler.cs b/Microsoft.Azure.Cosmos/src/Handler/RequestInvokerHandler.cs index 74888b5316..60e10152b9 100644 --- a/Microsoft.Azure.Cosmos/src/Handler/RequestInvokerHandler.cs +++ b/Microsoft.Azure.Cosmos/src/Handler/RequestInvokerHandler.cs @@ -12,6 +12,7 @@ namespace Microsoft.Azure.Cosmos.Handlers using System.Net.Http; using System.Threading; using System.Threading.Tasks; + using Microsoft.Azure.Cosmos.Diagnostics; using Microsoft.Azure.Cosmos.Routing; using Microsoft.Azure.Cosmos.Tracing; using Microsoft.Azure.Documents; @@ -38,6 +39,7 @@ public RequestInvokerHandler( Cosmos.PriorityLevel? requestedClientPriorityLevel) { this.client = client; + this.RequestedClientConsistencyLevel = requestedClientConsistencyLevel; this.RequestedClientPriorityLevel = requestedClientPriorityLevel; } @@ -52,11 +54,8 @@ public override async Task SendAsync( } RequestOptions promotedRequestOptions = request.RequestOptions; - if (promotedRequestOptions != null) - { - // Fill request options - promotedRequestOptions.PopulateRequestOptions(request); - } + // Fill request options + promotedRequestOptions?.PopulateRequestOptions(request); // Adds the NoContent header if not already added based on Client Level flag if (RequestInvokerHandler.ShouldSetNoContentResponseHeaders( @@ -79,7 +78,44 @@ public override async Task SendAsync( await request.AssertPartitioningDetailsAsync(this.client, cancellationToken, request.Trace); this.FillMultiMasterContext(request); - return await base.SendAsync(request, cancellationToken); + + AvailabilityStrategy strategy = this.AvailabilityStrategy(request); + + if (strategy != null && strategy.Enabled()) + { + return await strategy.ExecuteAvailabilityStrategyAsync( + this.BaseSendAsync, + this.client, + request, + cancellationToken); + } + + return await this.BaseSendAsync(request, cancellationToken); + } + + /// + /// This method determines if there is an availability strategy that the request can use. + /// Note that the request level availability strategy options override the client level options. + /// + /// + /// whether the request should be a parallel hedging request. + public AvailabilityStrategy AvailabilityStrategy(RequestMessage request) + { + return request.RequestOptions?.AvailabilityStrategy + ?? this.client.ClientOptions.AvailabilityStrategy; + } + + public virtual async Task BaseSendAsync( + RequestMessage request, + CancellationToken cancellationToken) + { + ResponseMessage response = await base.SendAsync(request, cancellationToken); + if (request.RequestOptions?.ExcludeRegions != null) + { + ((CosmosTraceDiagnostics)response.Diagnostics).Value.AddOrUpdateDatum("ExcludedRegions", request.RequestOptions.ExcludeRegions); + } + + return response; } public virtual async Task SendAsync( diff --git a/Microsoft.Azure.Cosmos/src/Handler/RequestMessage.cs b/Microsoft.Azure.Cosmos/src/Handler/RequestMessage.cs index 67d3dbdc27..aa6d796df2 100644 --- a/Microsoft.Azure.Cosmos/src/Handler/RequestMessage.cs +++ b/Microsoft.Azure.Cosmos/src/Handler/RequestMessage.cs @@ -5,10 +5,10 @@ namespace Microsoft.Azure.Cosmos { using System; - using System.Collections.Generic; + using System.Collections.Generic; using System.Diagnostics; using System.Globalization; - using System.IO; + using System.IO; using System.Net; using System.Net.Http; using System.Threading; @@ -62,6 +62,28 @@ internal RequestMessage( this.Method = method; this.RequestUriString = requestUriString; this.Trace = trace ?? throw new ArgumentNullException(nameof(trace)); + } + + /// + /// Create a , used for Clone() method. + /// + /// The http method + /// The requested URI + /// The trace node to append traces to. + /// The headers to use. + /// The properties to use. + private RequestMessage( + HttpMethod method, + string requestUriString, + ITrace trace, + Headers headers, + Dictionary properties) + { + this.Method = method; + this.RequestUriString = requestUriString; + this.Trace = trace ?? throw new ArgumentNullException(nameof(trace)); + this.headers = new Lazy(() => headers); + this.properties = new Lazy>(() => properties); } /// @@ -286,6 +308,47 @@ internal DocumentServiceRequest ToDocumentServiceRequest() this.DocumentServiceRequest.RequestContext.ExcludeRegions = this.RequestOptions?.ExcludeRegions; this.OnBeforeRequestHandler(this.DocumentServiceRequest); return this.DocumentServiceRequest; + } + + /// + /// Clone the request message + /// + /// a cloned copy of the RequestMessage + internal RequestMessage Clone(ITrace newTrace, CloneableStream cloneContent) + { + RequestMessage clone = new RequestMessage( + this.Method, + this.RequestUriString, + newTrace, + this.Headers.Clone(), + new Dictionary(this.Properties)); + + if (this.Content != null && cloneContent != null) + { + clone.Content = cloneContent.Clone(); + } + + if (this.RequestOptions != null) + { + clone.RequestOptions = this.RequestOptions.ShallowCopy(); + } + + clone.ResourceType = this.ResourceType; + + clone.OperationType = this.OperationType; + + if (this.PartitionKeyRangeId != null) + { + clone.PartitionKeyRangeId = string.IsNullOrEmpty(this.PartitionKeyRangeId.CollectionRid) + ? new PartitionKeyRangeIdentity(this.PartitionKeyRangeId.PartitionKeyRangeId) + : new PartitionKeyRangeIdentity(this.PartitionKeyRangeId.CollectionRid, this.PartitionKeyRangeId.PartitionKeyRangeId); + } + + clone.UseGatewayMode = this.UseGatewayMode; + clone.ContainerId = this.ContainerId; + clone.DatabaseId = this.DatabaseId; + + return clone; } private static Dictionary CreateDictionary() diff --git a/Microsoft.Azure.Cosmos/src/Headers/Headers.cs b/Microsoft.Azure.Cosmos/src/Headers/Headers.cs index 40f90c423a..f371383465 100644 --- a/Microsoft.Azure.Cosmos/src/Headers/Headers.cs +++ b/Microsoft.Azure.Cosmos/src/Headers/Headers.cs @@ -6,7 +6,7 @@ namespace Microsoft.Azure.Cosmos { using System; using System.Collections; - using System.Collections.Generic; + using System.Collections.Generic; using System.Globalization; using Microsoft.Azure.Documents; using Microsoft.Azure.Documents.Collections; @@ -285,6 +285,11 @@ internal Headers(INameValueCollection nameValueCollection) HttpResponseHeadersWrapper httpResponseHeaders => httpResponseHeaders, _ => new NameValueResponseHeaders(nameValueCollection), }; + } + + internal Headers(CosmosMessageHeadersInternal cosmosMessageHeaders) + { + this.CosmosMessageHeaders = cosmosMessageHeaders; } /// @@ -404,6 +409,21 @@ public virtual T GetHeaderValue(string headerName) { return this.CosmosMessageHeaders.GetHeaderValue(headerName); } + + /// + /// Clones the current . + /// + /// a cloned copy of the current + internal Headers Clone() + { + Headers clone = new Headers(); + foreach (string key in this.CosmosMessageHeaders.AllKeys()) + { + clone.Add(key, this.CosmosMessageHeaders.Get(key)); + } + + return clone; + } /// /// Enumerates all the HTTP headers names in the . diff --git a/Microsoft.Azure.Cosmos/src/RequestOptions/RequestOptions.cs b/Microsoft.Azure.Cosmos/src/RequestOptions/RequestOptions.cs index dffda96fab..14b6c812d0 100644 --- a/Microsoft.Azure.Cosmos/src/RequestOptions/RequestOptions.cs +++ b/Microsoft.Azure.Cosmos/src/RequestOptions/RequestOptions.cs @@ -5,9 +5,8 @@ namespace Microsoft.Azure.Cosmos { using System; - using System.Collections.Generic; + using System.Collections.Generic; using Microsoft.Azure.Documents; - using Telemetry; /// /// The default cosmos request options @@ -69,7 +68,15 @@ public class RequestOptions /// This can be used to route a request to a specific region by excluding all other regions. /// If all regions are excluded, then the request will be routed to the primary/hub region. /// - public List ExcludeRegions { get; set; } + public List ExcludeRegions { get; set; } + + /// + /// Cosmos availability strategy. + /// Availability strategy allows the SDK to send out additional cross region requests to help + /// reduce latency and increase availability. Currently there is one type of availability strategy, parallel request hedging. + /// If there is a globally enabled availability strategy, setting one in the request options will override the global one. + /// + internal AvailabilityStrategy AvailabilityStrategy { get; set; } /// /// Gets or sets the boolean to use effective partition key routing in the cosmos db request. diff --git a/Microsoft.Azure.Cosmos/src/Resource/ClientContextCore.cs b/Microsoft.Azure.Cosmos/src/Resource/ClientContextCore.cs index abe57bbcd2..a8cc0d59dc 100644 --- a/Microsoft.Azure.Cosmos/src/Resource/ClientContextCore.cs +++ b/Microsoft.Azure.Cosmos/src/Resource/ClientContextCore.cs @@ -83,6 +83,7 @@ internal static CosmosClientContext Create( cosmosClientId: cosmosClient.Id, remoteCertificateValidationCallback: ClientContextCore.SslCustomValidationCallBack(clientOptions.GetServerCertificateCustomValidationCallback()), cosmosClientTelemetryOptions: clientOptions.CosmosClientTelemetryOptions, + availabilityStrategy: clientOptions.AvailabilityStrategy, chaosInterceptorFactory: clientOptions.ChaosInterceptorFactory); return ClientContextCore.Create( diff --git a/Microsoft.Azure.Cosmos/src/Routing/AvailabilityStrategy/AvailabilityStrategy.cs b/Microsoft.Azure.Cosmos/src/Routing/AvailabilityStrategy/AvailabilityStrategy.cs new file mode 100644 index 0000000000..994449996b --- /dev/null +++ b/Microsoft.Azure.Cosmos/src/Routing/AvailabilityStrategy/AvailabilityStrategy.cs @@ -0,0 +1,32 @@ +//------------------------------------------------------------ +// Copyright (c) Microsoft Corporation. All rights reserved. +//------------------------------------------------------------ +namespace Microsoft.Azure.Cosmos +{ + using System; + using System.Threading; + using System.Threading.Tasks; + using Microsoft.Azure.Cosmos.Handlers; + + /// + /// Types of availability strategies supported + /// + internal abstract class AvailabilityStrategy + { + /// + /// Execute the availability strategy + /// + /// + /// + /// + /// + /// The response from the service after the availability strategy is executed + public abstract Task ExecuteAvailabilityStrategyAsync( + Func> sender, + CosmosClient client, + RequestMessage requestMessage, + CancellationToken cancellationToken); + + internal abstract bool Enabled(); + } +} \ No newline at end of file diff --git a/Microsoft.Azure.Cosmos/src/Routing/AvailabilityStrategy/CrossRegionParallelHedgingAvailabilityStrategy.cs b/Microsoft.Azure.Cosmos/src/Routing/AvailabilityStrategy/CrossRegionParallelHedgingAvailabilityStrategy.cs new file mode 100644 index 0000000000..696251d0d8 --- /dev/null +++ b/Microsoft.Azure.Cosmos/src/Routing/AvailabilityStrategy/CrossRegionParallelHedgingAvailabilityStrategy.cs @@ -0,0 +1,323 @@ +//------------------------------------------------------------ +// Copyright (c) Microsoft Corporation. All rights reserved. +//------------------------------------------------------------ +namespace Microsoft.Azure.Cosmos +{ + using System; + using System.Collections.Generic; + using System.Diagnostics; + using System.Linq; + using System.Net; + using System.Threading; + using System.Threading.Tasks; + using Microsoft.Azure.Cosmos.Core.Trace; + using Microsoft.Azure.Cosmos.Diagnostics; + using Microsoft.Azure.Documents; + + /// + /// Parallel hedging availability strategy. Once threshold time is reached, + /// the SDK will send out an additional request to a remote region in parallel + /// if the first parallel request or the original has not returned after the step time, + /// additional parallel requests will be sent out there is a response or all regions are exausted. + /// + internal class CrossRegionParallelHedgingAvailabilityStrategy : AvailabilityStrategy + { + private const string HedgeRegions = "Hedge Regions"; + private const string HedgeContext = "Hedge Context"; + private const string HedgeContextOriginalRequest = "Original Request"; + private const string HedgeContextHedgedRequest = "Hedged Request"; + + /// + /// Latency threshold which activates the first region hedging + /// + public TimeSpan Threshold { get; private set; } + + /// + /// When the SDK will send out additional hedging requests after the initial hedging request + /// + public TimeSpan ThresholdStep { get; private set; } + + /// + /// Constructor for parallel hedging availability strategy + /// + /// + /// + public CrossRegionParallelHedgingAvailabilityStrategy( + TimeSpan threshold, + TimeSpan? thresholdStep) + { + if (threshold <= TimeSpan.Zero) + { + throw new ArgumentOutOfRangeException(nameof(threshold)); + } + + if (thresholdStep <= TimeSpan.Zero) + { + throw new ArgumentOutOfRangeException(nameof(thresholdStep)); + } + + this.Threshold = threshold; + this.ThresholdStep = thresholdStep ?? TimeSpan.FromMilliseconds(-1); + } + + internal override bool Enabled() + { + return true; + } + + /// + /// This method determines if the request should be sent with a parallel hedging availability strategy. + /// This availability strategy can only be used if the request is a read-only request on a document request. + /// + /// + /// whether the request should be a parallel hedging request. + internal bool ShouldHedge(RequestMessage request) + { + //Only use availability strategy for document point operations + if (request.ResourceType != ResourceType.Document) + { + return false; + } + + //check to see if it is a not a read-only request + if (!OperationTypeExtensions.IsReadOperation(request.OperationType)) + { + return false; + } + + return true; + } + + /// + /// Execute the parallel hedging availability strategy + /// + /// + /// + /// + /// + /// The response after executing cross region hedging + public override async Task ExecuteAvailabilityStrategyAsync( + Func> sender, + CosmosClient client, + RequestMessage request, + CancellationToken cancellationToken) + { + if (!this.ShouldHedge(request)) + { + return await sender(request, cancellationToken); + } + + using (CancellationTokenSource cancellationTokenSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken)) + { + using (CloneableStream clonedBody = (CloneableStream)(request.Content == null + ? null//new CloneableStream(new MemoryStream()) + : await StreamExtension.AsClonableStreamAsync(request.Content))) + { + IReadOnlyCollection hedgeRegions = client.DocumentClient.GlobalEndpointManager + .GetApplicableRegions( + request.RequestOptions?.ExcludeRegions, + OperationTypeExtensions.IsReadOperation(request.OperationType)); + + List requestTasks = new List(hedgeRegions.Count + 1); + + Task<(bool, ResponseMessage)> primaryRequest = null; + + ResponseMessage responseMessage = null; + + //Send out hedged requests + for (int requestNumber = 0; requestNumber < hedgeRegions.Count; requestNumber++) + { + TimeSpan awaitTime = requestNumber == 0 ? this.Threshold : this.ThresholdStep; + + using (CancellationTokenSource timerTokenSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken)) + { + CancellationToken timerToken = timerTokenSource.Token; + using (Task hedgeTimer = Task.Delay(awaitTime, timerToken)) + { + if (requestNumber == 0) + { + primaryRequest = this.RequestSenderAndResultCheckAsync( + sender, + request, + cancellationToken, + cancellationTokenSource); + + requestTasks.Add(primaryRequest); + } + else + { + Task<(bool, ResponseMessage)> requestTask = this.CloneAndSendAsync( + sender: sender, + request: request, + clonedBody: clonedBody, + hedgeRegions: hedgeRegions, + requestNumber: requestNumber, + cancellationToken: cancellationToken, + cancellationTokenSource: cancellationTokenSource); + + requestTasks.Add(requestTask); + } + + requestTasks.Add(hedgeTimer); + + Task completedTask = await Task.WhenAny(requestTasks); + requestTasks.Remove(completedTask); + + if (completedTask == hedgeTimer) + { + continue; + } + + timerTokenSource.Cancel(); + requestTasks.Remove(hedgeTimer); + + if (completedTask.IsFaulted) + { + AggregateException innerExceptions = completedTask.Exception.Flatten(); + } + + (bool isNonTransient, responseMessage) = await (Task<(bool, ResponseMessage)>)completedTask; + if (isNonTransient) + { + cancellationTokenSource.Cancel(); + ((CosmosTraceDiagnostics)responseMessage.Diagnostics).Value.AddOrUpdateDatum( + HedgeRegions, + HedgeRegionsToString(responseMessage.Diagnostics.GetContactedRegions())); + ((CosmosTraceDiagnostics)responseMessage.Diagnostics).Value.AddOrUpdateDatum( + HedgeContext, + object.ReferenceEquals(primaryRequest, completedTask) + ? HedgeContextOriginalRequest + : HedgeContextHedgedRequest); + return responseMessage; + } + } + } + } + + //Wait for a good response from the hedged requests/primary request + Exception lastException = null; + while (requestTasks.Any()) + { + Task completedTask = await Task.WhenAny(requestTasks); + requestTasks.Remove(completedTask); + if (completedTask.IsFaulted) + { + AggregateException innerExceptions = completedTask.Exception.Flatten(); + lastException = innerExceptions.InnerExceptions.FirstOrDefault(); + } + + (bool isNonTransient, responseMessage) = await (Task<(bool, ResponseMessage)>)completedTask; + if (isNonTransient || requestTasks.Count == 0) + { + cancellationTokenSource.Cancel(); + ((CosmosTraceDiagnostics)responseMessage.Diagnostics).Value.AddOrUpdateDatum( + HedgeRegions, + HedgeRegionsToString(responseMessage.Diagnostics.GetContactedRegions())); + ((CosmosTraceDiagnostics)responseMessage.Diagnostics).Value.AddOrUpdateDatum( + HedgeContext, + object.ReferenceEquals(primaryRequest, completedTask) + ? HedgeContextOriginalRequest + : HedgeContextHedgedRequest); + return responseMessage; + } + } + + if (lastException != null) + { + throw lastException; + } + + Debug.Assert(responseMessage != null); + return responseMessage; + } + } + } + + private async Task<(bool, ResponseMessage)> CloneAndSendAsync( + Func> sender, + RequestMessage request, + CloneableStream clonedBody, + IReadOnlyCollection hedgeRegions, + int requestNumber, + CancellationToken cancellationToken, + CancellationTokenSource cancellationTokenSource) + { + RequestMessage clonedRequest; + using (clonedRequest = request.Clone(request.Trace.Parent, clonedBody)) + { + clonedRequest.RequestOptions ??= new RequestOptions(); + + List excludeRegions = new List(hedgeRegions); + excludeRegions.RemoveAt(requestNumber); + clonedRequest.RequestOptions.ExcludeRegions = excludeRegions; + + return await this.RequestSenderAndResultCheckAsync( + sender, + clonedRequest, + cancellationToken, + cancellationTokenSource); + } + } + + private async Task<(bool, ResponseMessage)> RequestSenderAndResultCheckAsync( + Func> sender, + RequestMessage request, + CancellationToken cancellationToken, + CancellationTokenSource cancellationTokenSource) + { + try + { + ResponseMessage response = await sender.Invoke(request, cancellationToken); + if (IsFinalResult((int)response.StatusCode, (int)response.Headers.SubStatusCode)) + { + if (!cancellationToken.IsCancellationRequested) + { + cancellationTokenSource.Cancel(); + } + return (true, response); + } + + return (false, response); + } + catch (OperationCanceledException) when (cancellationTokenSource.IsCancellationRequested) + { + return (false, null); + } + catch (Exception ex) + { + DefaultTrace.TraceError("Exception thrown while executing cross region hedging availability strategy: {0}", ex); + throw ex; + } + } + + private static bool IsFinalResult(int statusCode, int subStatusCode) + { + //All 1xx, 2xx, and 3xx status codes should be treated as final results + if (statusCode < (int)HttpStatusCode.BadRequest) + { + return true; + } + + //Status codes that indicate non-transient timeouts + if (statusCode == (int)HttpStatusCode.BadRequest + || statusCode == (int)HttpStatusCode.Conflict + || statusCode == (int)HttpStatusCode.MethodNotAllowed + || statusCode == (int)HttpStatusCode.PreconditionFailed + || statusCode == (int)HttpStatusCode.RequestEntityTooLarge + || statusCode == (int)HttpStatusCode.Unauthorized) + { + return true; + } + + //404 - Not found is a final result as the document was not yet available + //after enforcing the consistency model + //All other errors should be treated as possibly transient errors + return statusCode == (int)HttpStatusCode.NotFound && subStatusCode == (int)SubStatusCodes.Unknown; + } + + private static string HedgeRegionsToString(IReadOnlyList<(string, Uri)> hedgeRegions) + { + return string.Join(",", hedgeRegions); + } + } +} diff --git a/Microsoft.Azure.Cosmos/src/Routing/AvailabilityStrategy/DisabledAvailabilityStrategy.cs b/Microsoft.Azure.Cosmos/src/Routing/AvailabilityStrategy/DisabledAvailabilityStrategy.cs new file mode 100644 index 0000000000..defa3a61c8 --- /dev/null +++ b/Microsoft.Azure.Cosmos/src/Routing/AvailabilityStrategy/DisabledAvailabilityStrategy.cs @@ -0,0 +1,39 @@ +//------------------------------------------------------------ +// Copyright (c) Microsoft Corporation. All rights reserved. +//------------------------------------------------------------ +namespace Microsoft.Azure.Cosmos +{ + using System; + using System.Threading; + using System.Threading.Tasks; + + /// + /// A Disabled availability strategy that does not do anything. Used for overriding the default global availability strategy. + /// + internal class DisabledAvailabilityStrategy : AvailabilityStrategy + { + internal override bool Enabled() + { + return false; + } + + /// + /// This method is not implemented and will throw an exception if called. + /// + /// + /// + /// + /// + /// nothing, this will throw. + public override Task ExecuteAvailabilityStrategyAsync( + Func> sender, + CosmosClient client, + RequestMessage requestMessage, + CancellationToken cancellationToken) + { + throw new NotImplementedException(); + } + } +} \ No newline at end of file diff --git a/Microsoft.Azure.Cosmos/src/Routing/GlobalEndpointManager.cs b/Microsoft.Azure.Cosmos/src/Routing/GlobalEndpointManager.cs index 2d60e57adb..4e5184177b 100644 --- a/Microsoft.Azure.Cosmos/src/Routing/GlobalEndpointManager.cs +++ b/Microsoft.Azure.Cosmos/src/Routing/GlobalEndpointManager.cs @@ -104,8 +104,8 @@ public bool IsMultimasterMetadataWriteRequest(DocumentServiceRequest request) public Uri GetHubUri() { - return this.locationCache.GetHubUri(); - } + return this.locationCache.GetHubUri(); + } /// /// This will get the account information. @@ -447,6 +447,11 @@ public ReadOnlyCollection GetApplicableEndpoints(DocumentServiceRequest req { return this.locationCache.GetApplicableEndpoints(request, isReadRequest); } + + public ReadOnlyCollection GetApplicableRegions(IEnumerable excludeRegions, bool isReadRequest) + { + return this.locationCache.GetApplicableRegions(excludeRegions, isReadRequest); + } public bool TryGetLocationForGatewayDiagnostics(Uri endpoint, out string regionName) { diff --git a/Microsoft.Azure.Cosmos/src/Routing/LocationCache.cs b/Microsoft.Azure.Cosmos/src/Routing/LocationCache.cs index b6b2c3927d..c95223f044 100644 --- a/Microsoft.Azure.Cosmos/src/Routing/LocationCache.cs +++ b/Microsoft.Azure.Cosmos/src/Routing/LocationCache.cs @@ -257,6 +257,11 @@ public Uri GetHubUri() string writeLocation = currentLocationInfo.AvailableWriteLocations[0]; Uri locationEndpointToRoute = currentLocationInfo.AvailableWriteEndpointByLocation[writeLocation]; return locationEndpointToRoute; + } + + public ReadOnlyCollection GetAvailableReadLocations() + { + return this.locationInfo.AvailableReadLocations; } /// @@ -315,7 +320,10 @@ public Uri ResolveServiceEndpoint(DocumentServiceRequest request) public ReadOnlyCollection GetApplicableEndpoints(DocumentServiceRequest request, bool isReadRequest) { - ReadOnlyCollection endpoints = isReadRequest ? this.ReadEndpoints : this.WriteEndpoints; + ReadOnlyCollection endpoints = + isReadRequest + ? this.ReadEndpoints + : this.WriteEndpoints; if (request.RequestContext.ExcludeRegions == null || request.RequestContext.ExcludeRegions.Count == 0) { @@ -323,52 +331,107 @@ public ReadOnlyCollection GetApplicableEndpoints(DocumentServiceRequest req } return this.GetApplicableEndpoints( - endpoints, isReadRequest ? this.locationInfo.AvailableReadEndpointByLocation : this.locationInfo.AvailableWriteEndpointByLocation, this.defaultEndpoint, request.RequestContext.ExcludeRegions); + } + + public ReadOnlyCollection GetApplicableRegions(IEnumerable excludeRegions, bool isReadRequest) + { + return this.GetApplicableRegions( + isReadRequest ? this.locationInfo.AvailableReadLocations : this.locationInfo.AvailableWriteLocations, + this.locationInfo.PreferredLocations[0], + excludeRegions); } /// /// Gets applicable endpoints for a request, if there are no applicable endpoints, returns the fallback endpoint /// - /// /// /// /// /// a list of applicable endpoints for a request private ReadOnlyCollection GetApplicableEndpoints( - IReadOnlyList endpoints, ReadOnlyDictionary regionNameByEndpoint, Uri fallbackEndpoint, - IReadOnlyList excludeRegions) + IEnumerable excludeRegions) + { + List applicableEndpoints = new List(regionNameByEndpoint.Count); + HashSet excludeRegionsHash = excludeRegions == null ? null : new HashSet(excludeRegions); + + if (excludeRegions != null) + { + foreach (string region in this.locationInfo.PreferredLocations) + { + if (!excludeRegionsHash.Contains(region) + && regionNameByEndpoint.TryGetValue(region, out Uri endpoint)) + { + applicableEndpoints.Add(endpoint); + } + } + } + else + { + foreach (string region in this.locationInfo.PreferredLocations) + { + if (regionNameByEndpoint.TryGetValue(region, out Uri endpoint)) + { + applicableEndpoints.Add(endpoint); + } + } + } + + if (applicableEndpoints.Count == 0) + { + applicableEndpoints.Add(fallbackEndpoint); + } + + return new ReadOnlyCollection(applicableEndpoints); + } + + /// + /// Gets applicable endpoints for a request, if there are no applicable endpoints, returns the fallback endpoint + /// + /// + /// + /// + /// a list of applicable endpoints for a request + private ReadOnlyCollection GetApplicableRegions( + ReadOnlyCollection regionNameByEndpoint, + string fallbackRegion, + IEnumerable excludeRegions) { - List applicableEndpoints = new List(endpoints.Count); - HashSet excludeUris = new HashSet(); - - foreach (string region in excludeRegions) - { - string normalizedRegionName = this.regionNameMapper.GetCosmosDBRegionName(region); - if (regionNameByEndpoint.ContainsKey(normalizedRegionName)) - { - excludeUris.Add(regionNameByEndpoint[normalizedRegionName]); - } - } - - foreach (Uri endpoint in endpoints) - { - if (!excludeUris.Contains(endpoint)) - { - applicableEndpoints.Add(endpoint); - } + List applicableRegions = new List(regionNameByEndpoint.Count); + HashSet excludeRegionsHash = excludeRegions == null ? null : new HashSet(excludeRegions); + + if (excludeRegions != null) + { + foreach (string region in this.locationInfo.PreferredLocations) + { + if (regionNameByEndpoint.Contains(region) + && !excludeRegionsHash.Contains(region)) + { + applicableRegions.Add(region); + } + } + } + else + { + foreach (string region in this.locationInfo.PreferredLocations) + { + if (regionNameByEndpoint.Contains(region)) + { + applicableRegions.Add(region); + } + } } - if (applicableEndpoints.Count == 0) + if (applicableRegions.Count == 0) { - applicableEndpoints.Add(fallbackEndpoint); + applicableRegions.Add(fallbackRegion); } - return new ReadOnlyCollection(applicableEndpoints); + return new ReadOnlyCollection(applicableRegions); } public bool ShouldRefreshEndpoints(out bool canRefreshInBackground) diff --git a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.EmulatorTests/CosmosAvailabilityStrategyTests.cs b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.EmulatorTests/CosmosAvailabilityStrategyTests.cs new file mode 100644 index 0000000000..97bf5af234 --- /dev/null +++ b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.EmulatorTests/CosmosAvailabilityStrategyTests.cs @@ -0,0 +1,891 @@ + +namespace Microsoft.Azure.Cosmos.SDK.EmulatorTests +{ + using System; + using System.Collections.Generic; + using System.Data; + using System.IO; + using System.Net.Http; + using System.Text.Json; + using System.Text.Json.Serialization; + using System.Threading; + using System.Threading.Tasks; + using global::Azure.Core.Serialization; + using Microsoft.Azure.Cosmos; + using Microsoft.Azure.Cosmos.Diagnostics; + using Microsoft.Azure.Cosmos.FaultInjection; + using Microsoft.Azure.Documents; + using Microsoft.VisualStudio.TestTools.UnitTesting; + using Database = Database; + using PartitionKey = PartitionKey; + + [TestClass] + public class CosmosAvailabilityStrategyTests + { + + private CosmosClient client; + private Database database; + private Container container; + private Container changeFeedContainer; + private CosmosSystemTextJsonSerializer cosmosSystemTextJsonSerializer; + private string connectionString; + private string dbName; + private string containerName; + private string changeFeedContainerName; + + [TestCleanup] + public async Task TestCleanup() + { + await this.database?.DeleteAsync(); + this.client?.Dispose(); + } + + private static readonly FaultInjectionCondition readConditon = new FaultInjectionConditionBuilder() + .WithRegion("Central US") + .WithOperationType(FaultInjectionOperationType.ReadItem) + .Build(); + private static readonly FaultInjectionCondition queryConditon = new FaultInjectionConditionBuilder() + .WithRegion("Central US") + .WithOperationType(FaultInjectionOperationType.QueryItem) + .Build(); + private static readonly FaultInjectionCondition readManyCondition = new FaultInjectionConditionBuilder() + .WithRegion("Central US") + .WithOperationType(FaultInjectionOperationType.QueryItem) + .Build(); + private static readonly FaultInjectionCondition changeFeedCondtion = new FaultInjectionConditionBuilder() + .WithRegion("Central US") + .WithOperationType(FaultInjectionOperationType.All) + .Build(); + + private static readonly FaultInjectionCondition readConditonStep = new FaultInjectionConditionBuilder() + .WithRegion("North Central US") + .WithOperationType(FaultInjectionOperationType.ReadItem) + .Build(); + private static readonly FaultInjectionCondition queryConditonStep = new FaultInjectionConditionBuilder() + .WithRegion("North Central US") + .WithOperationType(FaultInjectionOperationType.QueryItem) + .Build(); + private static readonly FaultInjectionCondition readManyConditionStep = new FaultInjectionConditionBuilder() + .WithRegion("North Central US") + .WithOperationType(FaultInjectionOperationType.QueryItem) + .Build(); + private static readonly FaultInjectionCondition changeFeedCondtionStep = new FaultInjectionConditionBuilder() + .WithRegion("North Central US") + .WithOperationType(FaultInjectionOperationType.ReadFeed) + .Build(); + + private static readonly IFaultInjectionResult goneResult = FaultInjectionResultBuilder + .GetResultBuilder(FaultInjectionServerErrorType.Gone) + .Build(); + private static readonly IFaultInjectionResult retryWithResult = FaultInjectionResultBuilder + .GetResultBuilder(FaultInjectionServerErrorType.RetryWith) + .Build(); + private static readonly IFaultInjectionResult internalServerErrorResult = FaultInjectionResultBuilder + .GetResultBuilder(FaultInjectionServerErrorType.InternalServerEror) + .Build(); + private static readonly IFaultInjectionResult readSessionNotAvailableResult = FaultInjectionResultBuilder + .GetResultBuilder(FaultInjectionServerErrorType.ReadSessionNotAvailable) + .Build(); + private static readonly IFaultInjectionResult timeoutResult = FaultInjectionResultBuilder + .GetResultBuilder(FaultInjectionServerErrorType.Timeout) + .Build(); + private static readonly IFaultInjectionResult partitionIsSplittingResult = FaultInjectionResultBuilder + .GetResultBuilder(FaultInjectionServerErrorType.PartitionIsSplitting) + .Build(); + private static readonly IFaultInjectionResult partitionIsMigratingResult = FaultInjectionResultBuilder + .GetResultBuilder(FaultInjectionServerErrorType.PartitionIsMigrating) + .Build(); + private static readonly IFaultInjectionResult serviceUnavailableResult = FaultInjectionResultBuilder + .GetResultBuilder(FaultInjectionServerErrorType.ServiceUnavailable) + .Build(); + private static readonly IFaultInjectionResult responseDelayResult = FaultInjectionResultBuilder + .GetResultBuilder(FaultInjectionServerErrorType.ResponseDelay) + .WithDelay(TimeSpan.FromMilliseconds(4000)) + .Build(); + + private readonly Dictionary conditions = new Dictionary() + { + { "Read", readConditon }, + { "Query", queryConditon }, + { "ReadMany", readManyCondition }, + { "ChangeFeed", changeFeedCondtion }, + { "ReadStep", readConditonStep }, + { "QueryStep", queryConditonStep }, + { "ReadManyStep", readManyConditionStep }, + { "ChangeFeedStep", changeFeedCondtionStep} + }; + + private readonly Dictionary results = new Dictionary() + { + { "Gone", goneResult }, + { "RetryWith", retryWithResult }, + { "InternalServerError", internalServerErrorResult }, + { "ReadSessionNotAvailable", readSessionNotAvailableResult }, + { "Timeout", timeoutResult }, + { "PartitionIsSplitting", partitionIsSplittingResult }, + { "PartitionIsMigrating", partitionIsMigratingResult }, + { "ServiceUnavailable", serviceUnavailableResult }, + { "ResponseDelay", responseDelayResult } + }; + + [TestInitialize] + public async Task TestInitAsync() + { + this.connectionString = ConfigurationManager.GetEnvironmentVariable("COSMOSDB_MULTI_REGION", null); + + JsonSerializerOptions jsonSerializerOptions = new JsonSerializerOptions() + { + DefaultIgnoreCondition = JsonIgnoreCondition.WhenWritingNull + }; + this.cosmosSystemTextJsonSerializer = new CosmosSystemTextJsonSerializer(jsonSerializerOptions); + + if (string.IsNullOrEmpty(this.connectionString)) + { + Assert.Fail("Set environment variable COSMOSDB_MULTI_REGION to run the tests"); + } + this.client = new CosmosClient( + this.connectionString, + new CosmosClientOptions() + { + Serializer = this.cosmosSystemTextJsonSerializer, + }); + + this.dbName = Guid.NewGuid().ToString(); + this.containerName = Guid.NewGuid().ToString(); + this.changeFeedContainerName = Guid.NewGuid().ToString(); + this.database = await this.client.CreateDatabaseIfNotExistsAsync(this.dbName); + this.container = await this.database.CreateContainerIfNotExistsAsync(this.containerName, "/pk"); + this.changeFeedContainer = await this.database.CreateContainerIfNotExistsAsync(this.changeFeedContainerName, "/partitionKey"); + + await this.container.CreateItemAsync(new AvailabilityStrategyTestObject { Id = "testId", Pk = "pk" }); + await this.container.CreateItemAsync(new AvailabilityStrategyTestObject { Id = "testId2", Pk = "pk2" }); + await this.container.CreateItemAsync(new AvailabilityStrategyTestObject { Id = "testId3", Pk = "pk3" }); + await this.container.CreateItemAsync(new AvailabilityStrategyTestObject { Id = "testId4", Pk = "pk4" }); + + //Must Ensure the data is replicated to all regions + await Task.Delay(60000); + } + + [TestMethod] + [TestCategory("MultiRegion")] + public async Task AvailabilityStrategyNoTriggerTest() + { + FaultInjectionRule responseDelay = new FaultInjectionRuleBuilder( + id: "responseDely", + condition: + new FaultInjectionConditionBuilder() + .WithRegion("Central US") + .WithOperationType(FaultInjectionOperationType.ReadItem) + .Build(), + result: + FaultInjectionResultBuilder.GetResultBuilder(FaultInjectionServerErrorType.ResponseDelay) + .WithDelay(TimeSpan.FromMilliseconds(300)) + .Build()) + .WithDuration(TimeSpan.FromMinutes(90)) + .Build(); + + List rules = new List() { responseDelay }; + FaultInjector faultInjector = new FaultInjector(rules); + + responseDelay.Disable(); + + CosmosClientOptions clientOptions = new CosmosClientOptions() + { + ConnectionMode = ConnectionMode.Direct, + ApplicationPreferredRegions = new List() { "Central US", "North Central US" }, + AvailabilityStrategy = new CrossRegionParallelHedgingAvailabilityStrategy( + threshold: TimeSpan.FromMilliseconds(1500), + thresholdStep: TimeSpan.FromMilliseconds(50)), + Serializer = this.cosmosSystemTextJsonSerializer + }; + + CosmosClient faultInjectionClient = new CosmosClient( + connectionString: this.connectionString, + clientOptions: faultInjector.GetFaultInjectionClientOptions(clientOptions)); + + Database database = faultInjectionClient.GetDatabase(this.dbName); + Container container = database.GetContainer(this.containerName); + + responseDelay.Enable(); + ItemResponse ir = await container.ReadItemAsync("testId", new PartitionKey("pk")); + + CosmosTraceDiagnostics traceDiagnostic = ir.Diagnostics as CosmosTraceDiagnostics; + Assert.IsNotNull(traceDiagnostic); + traceDiagnostic.Value.Data.TryGetValue("Hedge Context", out object hedgeContext); + Assert.IsNotNull(hedgeContext); + Assert.AreEqual("Original Request", (string)hedgeContext); + + faultInjectionClient.Dispose(); + } + + [TestMethod] + [TestCategory("MultiRegion")] + public async Task AvailabilityStrategyRequestOptionsTriggerTest() + { + FaultInjectionRule responseDelay = new FaultInjectionRuleBuilder( + id: "responseDely", + condition: + new FaultInjectionConditionBuilder() + .WithRegion("Central US") + .WithOperationType(FaultInjectionOperationType.ReadItem) + .Build(), + result: + FaultInjectionResultBuilder.GetResultBuilder(FaultInjectionServerErrorType.ResponseDelay) + .WithDelay(TimeSpan.FromMilliseconds(4000)) + .Build()) + .WithDuration(TimeSpan.FromMinutes(90)) + .Build(); + + List rules = new List() { responseDelay }; + FaultInjector faultInjector = new FaultInjector(rules); + + responseDelay.Disable(); + + CosmosClientOptions clientOptions = new CosmosClientOptions() + { + ConnectionMode = ConnectionMode.Direct, + ApplicationPreferredRegions = new List() { "Central US", "North Central US" }, + Serializer = this.cosmosSystemTextJsonSerializer + }; + + CosmosClient faultInjectionClient = new CosmosClient( + connectionString: this.connectionString, + clientOptions: faultInjector.GetFaultInjectionClientOptions(clientOptions)); + + Database database = faultInjectionClient.GetDatabase(this.dbName); + Container container = database.GetContainer(this.containerName); + + responseDelay.Enable(); + + ItemRequestOptions requestOptions = new ItemRequestOptions + { + AvailabilityStrategy = new CrossRegionParallelHedgingAvailabilityStrategy( + threshold: TimeSpan.FromMilliseconds(100), + thresholdStep: TimeSpan.FromMilliseconds(50)) + }; + ItemResponse ir = await container.ReadItemAsync( + "testId", + new PartitionKey("pk"), + requestOptions); + + CosmosTraceDiagnostics traceDiagnostic = ir.Diagnostics as CosmosTraceDiagnostics; + Assert.IsNotNull(traceDiagnostic); + traceDiagnostic.Value.Data.TryGetValue("Hedge Context", out object hedgeContext); + Assert.IsNotNull(hedgeContext); + Assert.AreEqual("Hedged Request", (string)hedgeContext); + traceDiagnostic.Value.Data.TryGetValue("ExcludedRegions", out object excludeRegionsObject); + Assert.IsNotNull(excludeRegionsObject); + List excludeRegionsList = excludeRegionsObject as List; + Assert.IsTrue(excludeRegionsList.Contains("Central US")); + faultInjectionClient.Dispose(); + } + + [TestMethod] + [TestCategory("MultiRegion")] + public async Task AvailabilityStrategyDisableOverideTest() + { + FaultInjectionRule responseDelay = new FaultInjectionRuleBuilder( + id: "responseDely", + condition: + new FaultInjectionConditionBuilder() + .WithRegion("Central US") + .WithOperationType(FaultInjectionOperationType.ReadItem) + .Build(), + result: + FaultInjectionResultBuilder.GetResultBuilder(FaultInjectionServerErrorType.ResponseDelay) + .WithDelay(TimeSpan.FromMilliseconds(6000)) + .Build()) + .WithDuration(TimeSpan.FromMinutes(90)) + .WithHitLimit(2) + .Build(); + + List rules = new List() { responseDelay }; + FaultInjector faultInjector = new FaultInjector(rules); + + responseDelay.Disable(); + + CosmosClientOptions clientOptions = new CosmosClientOptions() + { + ConnectionMode = ConnectionMode.Direct, + ApplicationPreferredRegions = new List() { "Central US", "North Central US" }, + AvailabilityStrategy = new CrossRegionParallelHedgingAvailabilityStrategy( + threshold: TimeSpan.FromMilliseconds(100), + thresholdStep: TimeSpan.FromMilliseconds(50)), + Serializer = this.cosmosSystemTextJsonSerializer + }; + + CosmosClient faultInjectionClient = new CosmosClient( + connectionString: this.connectionString, + clientOptions: faultInjector.GetFaultInjectionClientOptions(clientOptions)); + + Database database = faultInjectionClient.GetDatabase(this.dbName); + Container container = database.GetContainer(this.containerName); + + responseDelay.Enable(); + ItemRequestOptions requestOptions = new ItemRequestOptions + { + AvailabilityStrategy = new DisabledAvailabilityStrategy() + }; + + ItemResponse ir = await container.ReadItemAsync( + "testId", + new PartitionKey("pk"), + requestOptions); + + CosmosTraceDiagnostics traceDiagnostic = ir.Diagnostics as CosmosTraceDiagnostics; + Assert.IsNotNull(traceDiagnostic); + Assert.IsFalse(traceDiagnostic.Value.Data.TryGetValue("ExcludedRegions", out _)); + + faultInjectionClient.Dispose(); + } + + [DataTestMethod] + [TestCategory("MultiRegion")] + [DataRow("Read", "Read", "Gone", DisplayName = "Read | Gone")] + [DataRow("Read", "Read", "RetryWith", DisplayName = "Read | RetryWith")] + [DataRow("Read", "Read", "InternalServerError", DisplayName = "Read | InternalServerError")] + [DataRow("Read", "Read", "ReadSessionNotAvailable", DisplayName = "Read | ReadSessionNotAvailable")] + [DataRow("Read", "Read", "Timeout", DisplayName = "Read | Timeout")] + [DataRow("Read", "Read", "PartitionIsSplitting", DisplayName = "Read | PartitionIsSplitting")] + [DataRow("Read", "Read", "PartitionIsMigrating", DisplayName = "Read | PartitionIsMigrating")] + [DataRow("Read", "Read", "ServiceUnavailable", DisplayName = "Read | ServiceUnavailable")] + [DataRow("Read", "Read", "ResponseDelay", DisplayName = "Read | ResponseDelay")] + [DataRow("SinglePartitionQuery", "Query", "Gone", DisplayName = "SinglePartitionQuery | Gone")] + [DataRow("SinglePartitionQuery", "Query", "RetryWith", DisplayName = "SinglePartitionQuery | RetryWith")] + [DataRow("SinglePartitionQuery", "Query", "InternalServerError", DisplayName = "SinglePartitionQuery | InternalServerError")] + [DataRow("SinglePartitionQuery", "Query", "ReadSessionNotAvailable", DisplayName = "SinglePartitionQuery | ReadSessionNotAvailable")] + [DataRow("SinglePartitionQuery", "Query", "Timeout", DisplayName = "SinglePartitionQuery | Timeout")] + [DataRow("SinglePartitionQuery", "Query", "PartitionIsSplitting", DisplayName = "SinglePartitionQuery | PartitionIsSplitting")] + [DataRow("SinglePartitionQuery", "Query", "PartitionIsMigrating", DisplayName = "SinglePartitionQuery | PartitionIsMigrating")] + [DataRow("SinglePartitionQuery", "Query", "ServiceUnavailable", DisplayName = "SinglePartitionQuery | ServiceUnavailable")] + [DataRow("SinglePartitionQuery", "Query", "ResponseDelay", DisplayName = "SinglePartitionQuery | ResponseDelay")] + [DataRow("CrossPartitionQuery", "Query", "Gone", DisplayName = "CrossPartitionQuery | Gone")] + [DataRow("CrossPartitionQuery", "Query", "RetryWith", DisplayName = "CrossPartitionQuery | RetryWith")] + [DataRow("CrossPartitionQuery", "Query", "InternalServerError", DisplayName = "CrossPartitionQuery | InternalServerError")] + [DataRow("CrossPartitionQuery", "Query", "ReadSessionNotAvailable", DisplayName = "CrossPartitionQuery | ReadSessionNotAvailable")] + [DataRow("CrossPartitionQuery", "Query", "Timeout", DisplayName = "CrossPartitionQuery | Timeout")] + [DataRow("CrossPartitionQuery", "Query", "PartitionIsSplitting", DisplayName = "CrossPartitionQuery | PartitionIsSplitting")] + [DataRow("CrossPartitionQuery", "Query", "PartitionIsMigrating", DisplayName = "CrossPartitionQuery | PartitionIsMigrating")] + [DataRow("CrossPartitionQuery", "Query", "ServiceUnavailable", DisplayName = "CrossPartitionQuery | ServiceUnavailable")] + [DataRow("CrossPartitionQuery", "Query", "ResponseDelay", DisplayName = "CrossPartitionQuery | ResponseDelay")] + [DataRow("ReadMany", "ReadMany", "Gone", DisplayName = "ReadMany | Gone")] + [DataRow("ReadMany", "ReadMany", "RetryWith", DisplayName = "ReadMany | RetryWith")] + [DataRow("ReadMany", "ReadMany", "InternalServerError", DisplayName = "ReadMany | InternalServerError")] + [DataRow("ReadMany", "ReadMany", "ReadSessionNotAvailable", DisplayName = "ReadMany | ReadSessionNotAvailable")] + [DataRow("ReadMany", "ReadMany", "Timeout", DisplayName = "ReadMany | Timeout")] + [DataRow("ReadMany", "ReadMany", "PartitionIsSplitting", DisplayName = "ReadMany | PartitionIsSplitting")] + [DataRow("ReadMany", "ReadMany", "PartitionIsMigrating", DisplayName = "ReadMany | PartitionIsMigrating")] + [DataRow("ReadMany", "ReadMany", "ServiceUnavailable", DisplayName = "ReadMany | ServiceUnavailable")] + [DataRow("ReadMany", "ReadMany", "ResponseDelay", DisplayName = "ReadMany | ResponseDelay")] + [DataRow("ChangeFeed", "ChangeFeed", "Gone", DisplayName = "ChangeFeed | Gone")] + [DataRow("ChangeFeed", "ChangeFeed", "RetryWith", DisplayName = "ChangeFeed | RetryWith")] + [DataRow("ChangeFeed", "ChangeFeed", "InternalServerError", DisplayName = "ChangeFeed | InternalServerError")] + [DataRow("ChangeFeed", "ChangeFeed", "ReadSessionNotAvailable", DisplayName = "ChangeFeed | ReadSessionNotAvailable")] + [DataRow("ChangeFeed", "ChangeFeed", "Timeout", DisplayName = "ChangeFeed | Timeout")] + [DataRow("ChangeFeed", "ChangeFeed", "PartitionIsSplitting", DisplayName = "ChangeFeed | PartitionIsSplitting")] + [DataRow("ChangeFeed", "ChangeFeed", "PartitionIsMigrating", DisplayName = "ChangeFeed | PartitionIsMigrating")] + [DataRow("ChangeFeed", "ChangeFeed", "ServiceUnavailable", DisplayName = "ChangeFeed | ServiceUnavailable")] + [DataRow("ChangeFeed", "ChangeFeed", "ResponseDelay", DisplayName = "ChangeFeed | ResponseDelay")] + public async Task AvailabilityStrategyAllFaultsTests(string operation, string conditonName, string resultName) + { + FaultInjectionCondition conditon = this.conditions[conditonName]; + IFaultInjectionResult result = this.results[resultName]; + + FaultInjectionRule rule = new FaultInjectionRuleBuilder( + id: operation, + condition: conditon, + result: result) + .WithDuration(TimeSpan.FromMinutes(90)) + .Build(); + + List rules = new List() { rule }; + FaultInjector faultInjector = new FaultInjector(rules); + + rule.Disable(); + + CosmosClientOptions clientOptions = new CosmosClientOptions() + { + ConnectionMode = ConnectionMode.Direct, + ApplicationPreferredRegions = new List() { "Central US", "North Central US" }, + AvailabilityStrategy = new CrossRegionParallelHedgingAvailabilityStrategy( + threshold: TimeSpan.FromMilliseconds(100), + thresholdStep: TimeSpan.FromMilliseconds(50)), + Serializer = this.cosmosSystemTextJsonSerializer + }; + + CosmosClient faultInjectionClient = new CosmosClient( + connectionString: this.connectionString, + clientOptions: faultInjector.GetFaultInjectionClientOptions(clientOptions)); + + Database database = faultInjectionClient.GetDatabase(this.dbName); + Container container = database.GetContainer(this.containerName); + + CosmosTraceDiagnostics traceDiagnostic; + object hedgeContext; + List excludeRegionsList; + switch (operation) + { + case "Read": + rule.Enable(); + + ItemResponse ir = await container.ReadItemAsync( + "testId", + new PartitionKey("pk")); + + Assert.IsTrue(rule.GetHitCount() > 0); + traceDiagnostic = ir.Diagnostics as CosmosTraceDiagnostics; + Assert.IsNotNull(traceDiagnostic); + traceDiagnostic.Value.Data.TryGetValue("Hedge Context", out hedgeContext); + Assert.IsNotNull(hedgeContext); + Assert.AreEqual("Hedged Request", (string)hedgeContext); + traceDiagnostic.Value.Data.TryGetValue("ExcludedRegions", out object excludeRegionsObject); + excludeRegionsList = excludeRegionsObject as List; + Assert.IsTrue(excludeRegionsList.Contains("Central US")); + + break; + + case "SinglePartitionQuery": + string queryString = "SELECT * FROM c"; + + QueryRequestOptions requestOptions = new QueryRequestOptions() + { + PartitionKey = new PartitionKey("pk"), + }; + + FeedIterator queryIterator = container.GetItemQueryIterator( + new QueryDefinition(queryString), + requestOptions: requestOptions); + + rule.Enable(); + + while (queryIterator.HasMoreResults) + { + FeedResponse feedResponse = await queryIterator.ReadNextAsync(); + + Assert.IsTrue(rule.GetHitCount() > 0); + traceDiagnostic = feedResponse.Diagnostics as CosmosTraceDiagnostics; + Assert.IsNotNull(traceDiagnostic); + traceDiagnostic.Value.Data.TryGetValue("Hedge Context", out hedgeContext); + Assert.IsNotNull(hedgeContext); + Assert.AreEqual("Hedged Request", (string)hedgeContext); + traceDiagnostic.Value.Data.TryGetValue("ExcludedRegions", out object excludeRegionsQueryObject); + excludeRegionsList = excludeRegionsQueryObject as List; + Assert.IsTrue(excludeRegionsList.Contains("Central US")); + } + + break; + + case "CrossPartitionQuery": + string crossPartitionQueryString = "SELECT * FROM c"; + FeedIterator crossPartitionQueryIterator = container.GetItemQueryIterator( + new QueryDefinition(crossPartitionQueryString)); + + rule.Enable(); + + while (crossPartitionQueryIterator.HasMoreResults) + { + FeedResponse feedResponse = await crossPartitionQueryIterator.ReadNextAsync(); + + Assert.IsTrue(rule.GetHitCount() > 0); + traceDiagnostic = feedResponse.Diagnostics as CosmosTraceDiagnostics; + Assert.IsNotNull(traceDiagnostic); + traceDiagnostic.Value.Data.TryGetValue("Hedge Context", out hedgeContext); + Assert.IsNotNull(hedgeContext); + Assert.AreEqual("Hedged Request", (string)hedgeContext); + traceDiagnostic.Value.Data.TryGetValue("ExcludedRegions", out object excludeRegionsQueryObject); + excludeRegionsList = excludeRegionsQueryObject as List; + Assert.IsTrue(excludeRegionsList.Contains("Central US")); + } + + break; + + case "ReadMany": + rule.Enable(); + + FeedResponse readManyResponse = await container.ReadManyItemsAsync( + new List<(string, PartitionKey)>() + { + ("testId", new PartitionKey("pk")), + ("testId2", new PartitionKey("pk2")), + ("testId3", new PartitionKey("pk3")), + ("testId4", new PartitionKey("pk4")) + }); + + Assert.IsTrue(rule.GetHitCount() > 0); + traceDiagnostic = readManyResponse.Diagnostics as CosmosTraceDiagnostics; + Assert.IsNotNull(traceDiagnostic); + traceDiagnostic.Value.Data.TryGetValue("Hedge Context", out hedgeContext); + Assert.IsNotNull(hedgeContext); + Assert.AreEqual("Hedged Request", (string)hedgeContext); + traceDiagnostic.Value.Data.TryGetValue("ExcludedRegions", out object excludeRegionsReadManyObject); + excludeRegionsList = excludeRegionsReadManyObject as List; + Assert.IsTrue(excludeRegionsList.Contains("Central US")); + + break; + + case "ChangeFeed": + Container leaseContainer = database.GetContainer(this.changeFeedContainerName); + ChangeFeedProcessor changeFeedProcessor = container.GetChangeFeedProcessorBuilder( + processorName: "AvialabilityStrategyTest", + onChangesDelegate: HandleChangesAsync) + .WithInstanceName("test") + .WithLeaseContainer(leaseContainer) + .Build(); + await changeFeedProcessor.StartAsync(); + await Task.Delay(1000); + AvailabilityStrategyTestObject testObject = new AvailabilityStrategyTestObject + { + Id = "testId5", + Pk = "pk5", + Other = "other" + }; + await container.CreateItemAsync(testObject); + + rule.Enable(); + + await Task.Delay(5000); + + Assert.IsTrue(rule.GetHitCount() > 0); + + break; + + default: + + Assert.Fail("Invalid operation"); + break; + } + + rule.Disable(); + + faultInjectionClient.Dispose(); + } + + [DataTestMethod] + [TestCategory("MultiRegion")] + [DataRow("Read", "Read", "ReadStep", DisplayName = "Read | ReadStep")] + [DataRow("SinglePartitionQuery", "Query", "QueryStep", DisplayName = "Query | SinglePartitionQueryStep")] + [DataRow("CrossPartitionQuery", "Query", "QueryStep", DisplayName = "Query | CrossPartitionQueryStep")] + [DataRow("ReadMany", "ReadMany", "ReadManyStep", DisplayName = "ReadMany | ReadManyStep")] + [DataRow("ChangeFeed", "ChangeFeed", "ChangeFeedStep", DisplayName = "ChangeFeed | ChangeFeedStep")] + public async Task AvailabilityStrategyStepTests(string operation, string conditonName1, string conditionName2) + { + FaultInjectionCondition conditon1 = this.conditions[conditonName1]; + FaultInjectionCondition conditon2 = this.conditions[conditionName2]; + IFaultInjectionResult result = responseDelayResult; + + FaultInjectionRule rule1 = new FaultInjectionRuleBuilder( + id: operation, + condition: conditon1, + result: result) + .WithDuration(TimeSpan.FromMinutes(90)) + .Build(); + + FaultInjectionRule rule2 = new FaultInjectionRuleBuilder( + id: operation, + condition: conditon2, + result: result) + .WithDuration(TimeSpan.FromMinutes(90)) + .Build(); + + List rules = new List() { rule1, rule2 }; + FaultInjector faultInjector = new FaultInjector(rules); + + rule1.Disable(); + rule2.Disable(); + + CosmosClientOptions clientOptions = new CosmosClientOptions() + { + ConnectionMode = ConnectionMode.Direct, + ApplicationPreferredRegions = new List() { "Central US", "North Central US", "East US" }, + AvailabilityStrategy = new CrossRegionParallelHedgingAvailabilityStrategy( + threshold: TimeSpan.FromMilliseconds(100), + thresholdStep: TimeSpan.FromMilliseconds(50)), + Serializer = this.cosmosSystemTextJsonSerializer + }; + + CosmosClient faultInjectionClient = new CosmosClient( + connectionString: this.connectionString, + clientOptions: faultInjector.GetFaultInjectionClientOptions(clientOptions)); + + Database database = faultInjectionClient.GetDatabase(this.dbName); + Container container = database.GetContainer(this.containerName); + + CosmosTraceDiagnostics traceDiagnostic; + object hedgeContext; + List excludeRegionsList; + switch (operation) + { + case "Read": + rule1.Enable(); + rule2.Enable(); + + ItemResponse ir = await container.ReadItemAsync( + "testId", + new PartitionKey("pk")); + + traceDiagnostic = ir.Diagnostics as CosmosTraceDiagnostics; + Assert.IsNotNull(traceDiagnostic); + traceDiagnostic.Value.Data.TryGetValue("Hedge Context", out hedgeContext); + Assert.IsNotNull(hedgeContext); + Assert.AreEqual("Hedged Request", (string)hedgeContext); + traceDiagnostic.Value.Data.TryGetValue("ExcludedRegions", out object excludeRegionsObject); + excludeRegionsList = excludeRegionsObject as List; + Assert.IsTrue(excludeRegionsList.Contains("Central US")); + Assert.IsTrue(excludeRegionsList.Contains("North Central US")); + + break; + + case "SinglePartitionQuery": + string queryString = "SELECT * FROM c"; + + QueryRequestOptions requestOptions = new QueryRequestOptions() + { + PartitionKey = new PartitionKey("pk"), + }; + + FeedIterator queryIterator = container.GetItemQueryIterator( + new QueryDefinition(queryString), + requestOptions: requestOptions); + + rule1.Enable(); + rule2.Enable(); + + while (queryIterator.HasMoreResults) + { + FeedResponse feedResponse = await queryIterator.ReadNextAsync(); + + traceDiagnostic = feedResponse.Diagnostics as CosmosTraceDiagnostics; + Assert.IsNotNull(traceDiagnostic); + traceDiagnostic.Value.Data.TryGetValue("Hedge Context", out hedgeContext); + Assert.IsNotNull(hedgeContext); + Assert.AreEqual("Hedged Request", (string)hedgeContext); + traceDiagnostic.Value.Data.TryGetValue("ExcludedRegions", out object excludeRegionsQueryObject); + excludeRegionsList = excludeRegionsQueryObject as List; + Assert.IsTrue(excludeRegionsList.Contains("Central US")); + } + + break; + + case "CrossPartitionQuery": + string crossPartitionQueryString = "SELECT * FROM c"; + FeedIterator crossPartitionQueryIterator = container.GetItemQueryIterator( + new QueryDefinition(crossPartitionQueryString)); + + rule1.Enable(); + rule2.Enable(); + + while (crossPartitionQueryIterator.HasMoreResults) + { + FeedResponse feedResponse = await crossPartitionQueryIterator.ReadNextAsync(); + + traceDiagnostic = feedResponse.Diagnostics as CosmosTraceDiagnostics; + Assert.IsNotNull(traceDiagnostic); + traceDiagnostic.Value.Data.TryGetValue("Hedge Context", out hedgeContext); + Assert.IsNotNull(hedgeContext); + Assert.AreEqual("Hedged Request", (string)hedgeContext); + traceDiagnostic.Value.Data.TryGetValue("ExcludedRegions", out object excludeRegionsQueryObject); + excludeRegionsList = excludeRegionsQueryObject as List; + Assert.IsTrue(excludeRegionsList.Contains("Central US")); + } + + break; + + case "ReadMany": + rule1.Enable(); + rule2.Enable(); + + FeedResponse readManyResponse = await container.ReadManyItemsAsync( + new List<(string, PartitionKey)>() + { + ("testId", new PartitionKey("pk")), + ("testId2", new PartitionKey("pk2")), + ("testId3", new PartitionKey("pk3")), + ("testId4", new PartitionKey("pk4")) + }); + + traceDiagnostic = readManyResponse.Diagnostics as CosmosTraceDiagnostics; + Assert.IsNotNull(traceDiagnostic); + traceDiagnostic.Value.Data.TryGetValue("Hedge Context", out hedgeContext); + Assert.IsNotNull(hedgeContext); + Assert.AreEqual("Hedged Request", (string)hedgeContext); + traceDiagnostic.Value.Data.TryGetValue("ExcludedRegions", out object excludeRegionsReadManyObject); + excludeRegionsList = excludeRegionsReadManyObject as List; + Assert.IsTrue(excludeRegionsList.Contains("Central US")); + Assert.IsTrue(excludeRegionsList.Contains("North Central US")); + + break; + + case "ChangeFeed": + Container leaseContainer = database.GetContainer(this.changeFeedContainerName); + ChangeFeedProcessor changeFeedProcessor = container.GetChangeFeedProcessorBuilder( + processorName: "AvialabilityStrategyTest", + onChangesDelegate: HandleChangesStepAsync) + .WithInstanceName("test") + .WithLeaseContainer(leaseContainer) + .Build(); + await changeFeedProcessor.StartAsync(); + await Task.Delay(1000); + AvailabilityStrategyTestObject testObject = new AvailabilityStrategyTestObject + { + Id = "testId5", + Pk = "pk5", + Other = "other" + }; + await container.CreateItemAsync(testObject); + + rule1.Enable(); + rule2.Enable(); + + await Task.Delay(5000); + + break; + + default: + Assert.Fail("Invalid operation"); + break; + } + + rule1.Disable(); + rule2.Disable(); + + faultInjectionClient.Dispose(); + } + + [TestMethod] + [TestCategory("MultiRegion")] + public async Task RequestMessageCloneTests() + { + RequestMessage httpRequest = new RequestMessage( + HttpMethod.Get, + new Uri("/dbs/testdb/colls/testcontainer/docs/testId", UriKind.Relative)); + + string key = Guid.NewGuid().ToString(); + Dictionary properties = new Dictionary() + { + { key, Guid.NewGuid() } + }; + + RequestOptions requestOptions = new RequestOptions() + { + Properties = properties + }; + + httpRequest.RequestOptions = requestOptions; + httpRequest.ResourceType = ResourceType.Document; + httpRequest.OperationType = OperationType.Read; + httpRequest.Headers.CorrelatedActivityId = Guid.NewGuid().ToString(); + httpRequest.PartitionKeyRangeId = new PartitionKeyRangeIdentity("0", "1"); + httpRequest.UseGatewayMode = true; + httpRequest.ContainerId = "testcontainer"; + httpRequest.DatabaseId = "testdb"; + httpRequest.Content = Stream.Null; + + using (CloneableStream clonedBody = await StreamExtension.AsClonableStreamAsync(httpRequest.Content)) + { + RequestMessage clone = httpRequest.Clone(httpRequest.Trace, clonedBody); + + Assert.AreEqual(httpRequest.RequestOptions.Properties, clone.RequestOptions.Properties); + Assert.AreEqual(httpRequest.ResourceType, clone.ResourceType); + Assert.AreEqual(httpRequest.OperationType, clone.OperationType); + Assert.AreEqual(httpRequest.Headers.CorrelatedActivityId, clone.Headers.CorrelatedActivityId); + Assert.AreEqual(httpRequest.PartitionKeyRangeId, clone.PartitionKeyRangeId); + Assert.AreEqual(httpRequest.UseGatewayMode, clone.UseGatewayMode); + Assert.AreEqual(httpRequest.ContainerId, clone.ContainerId); + Assert.AreEqual(httpRequest.DatabaseId, clone.DatabaseId); + } + } + + private static async Task HandleChangesAsync( + ChangeFeedProcessorContext context, + IReadOnlyCollection changes, + CancellationToken cancellationToken) + { + if (context.Diagnostics.GetClientElapsedTime() > TimeSpan.FromSeconds(1)) + { + Assert.Fail("Change Feed Processor took too long"); + } + + CosmosTraceDiagnostics traceDiagnostic = context.Diagnostics as CosmosTraceDiagnostics; + Assert.IsNotNull(traceDiagnostic); + traceDiagnostic.Value.Data.TryGetValue("Hedge Context", out object hedgeContext); + Assert.IsNotNull(hedgeContext); + Assert.AreEqual("Hedged Request", (string)hedgeContext); + traceDiagnostic.Value.Data.TryGetValue("ExcludedRegions", out object excludeRegionsObject); + List excludeRegionsList = excludeRegionsObject as List; + Assert.IsTrue(excludeRegionsList.Contains("Central US")); + await Task.Delay(1); + } + + private static async Task HandleChangesStepAsync( + ChangeFeedProcessorContext context, + IReadOnlyCollection changes, + CancellationToken cancellationToken) + { + if (context.Diagnostics.GetClientElapsedTime() > TimeSpan.FromSeconds(1)) + { + Assert.Fail("Change Feed Processor took too long"); + } + + CosmosTraceDiagnostics traceDiagnostic = context.Diagnostics as CosmosTraceDiagnostics; + Assert.IsNotNull(traceDiagnostic); + traceDiagnostic.Value.Data.TryGetValue("Hedge Context", out object hedgeContext); + Assert.IsNotNull(hedgeContext); + Assert.AreEqual("Hedged Request", (string)hedgeContext); + traceDiagnostic.Value.Data.TryGetValue("ExcludedRegions", out object excludeRegionsObject); + List excludeRegionsList = excludeRegionsObject as List; + Assert.IsTrue(excludeRegionsList.Contains("Central US")); + Assert.IsTrue(excludeRegionsList.Contains("North Central US")); + await Task.Delay(1); + } + + private class AvailabilityStrategyTestObject + { + + [JsonPropertyName("id")] + public string Id { get; set; } + + [JsonPropertyName("pk")] + public string Pk { get; set; } + + [JsonPropertyName("other")] + public string Other { get; set; } + } + + private class CosmosSystemTextJsonSerializer : CosmosSerializer + { + private readonly JsonObjectSerializer systemTextJsonSerializer; + + public CosmosSystemTextJsonSerializer(JsonSerializerOptions jsonSerializerOptions) + { + this.systemTextJsonSerializer = new JsonObjectSerializer(jsonSerializerOptions); + } + + public override T FromStream(Stream stream) + { + using (stream) + { + if (stream.CanSeek + && stream.Length == 0) + { + return default; + } + + if (typeof(Stream).IsAssignableFrom(typeof(T))) + { + return (T)(object)stream; + } + + return (T)this.systemTextJsonSerializer.Deserialize(stream, typeof(T), default); + } + } + + public override Stream ToStream(T input) + { + MemoryStream streamPayload = new MemoryStream(); + this.systemTextJsonSerializer.Serialize(streamPayload, input, input.GetType(), default); + streamPayload.Position = 0; + return streamPayload; + } + } + } +} \ No newline at end of file diff --git a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.EmulatorTests/Microsoft.Azure.Cosmos.EmulatorTests.csproj b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.EmulatorTests/Microsoft.Azure.Cosmos.EmulatorTests.csproj index ede84818e9..813b5453fe 100644 --- a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.EmulatorTests/Microsoft.Azure.Cosmos.EmulatorTests.csproj +++ b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.EmulatorTests/Microsoft.Azure.Cosmos.EmulatorTests.csproj @@ -371,6 +371,9 @@ PreserveNewest + + + true diff --git a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/LocationCacheTests.cs b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/LocationCacheTests.cs index 06ed0d4026..f5070672d5 100644 --- a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/LocationCacheTests.cs +++ b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/LocationCacheTests.cs @@ -847,7 +847,7 @@ public void VerifyRegionExcludedTest( { request.RequestContext.ExcludeRegions = excludeRegions; - ReadOnlyCollection applicableEndpoints = endpointManager.GetApplicableEndpoints(request, isReadRequest); + ReadOnlyCollection applicableEndpoints = this.cache.GetApplicableEndpoints(request, isReadRequest); Uri endpoint = endpointManager.ResolveServiceEndpoint(request); ReadOnlyCollection applicableRegions = this.GetApplicableRegions(isReadRequest, useMultipleWriteLocations, usesPreferredLocations, excludeRegions);