Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Internal] Request Hedging: Refactors HedgeRegion diagnostics field to only show successful region #4625

Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,10 @@
namespace Microsoft.Azure.Cosmos
{
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics;
using System.Dynamic;
NaluTripician marked this conversation as resolved.
Show resolved Hide resolved
using System.Linq;
using System.Net;
using System.Threading;
Expand All @@ -22,10 +24,8 @@ namespace Microsoft.Azure.Cosmos
/// </summary>
internal class CrossRegionParallelHedgingAvailabilityStrategy : AvailabilityStrategy
{
private const string HedgeRegions = "Hedge Regions";
private const string HedgeContext = "Hedge Context";
private const string HedgeContextOriginalRequest = "Original Request";
private const string HedgeContextHedgedRequest = "Hedged Request";
private const string ResponseRegion = "Response Region";

/// <summary>
/// Latency threshold which activates the first region hedging
Expand Down Expand Up @@ -120,9 +120,8 @@ public override async Task<ResponseMessage> ExecuteAvailabilityStrategyAsync(

List<Task> requestTasks = new List<Task>(hedgeRegions.Count + 1);

Task<(bool, ResponseMessage)> primaryRequest = null;

ResponseMessage responseMessage = null;
Task<HedgingResponse> primaryRequest = null;
HedgingResponse hedgeResponse = null;

//Send out hedged requests
for (int requestNumber = 0; requestNumber < hedgeRegions.Count; requestNumber++)
Expand All @@ -139,14 +138,15 @@ public override async Task<ResponseMessage> ExecuteAvailabilityStrategyAsync(
primaryRequest = this.RequestSenderAndResultCheckAsync(
sender,
request,
hedgeRegions.ElementAt(requestNumber),
cancellationToken,
cancellationTokenSource);

requestTasks.Add(primaryRequest);
}
else
{
Task<(bool, ResponseMessage)> requestTask = this.CloneAndSendAsync(
Task<HedgingResponse> requestTask = this.CloneAndSendAsync(
sender: sender,
request: request,
clonedBody: clonedBody,
Expand Down Expand Up @@ -176,19 +176,17 @@ public override async Task<ResponseMessage> ExecuteAvailabilityStrategyAsync(
AggregateException innerExceptions = completedTask.Exception.Flatten();
}

(bool isNonTransient, responseMessage) = await (Task<(bool, ResponseMessage)>)completedTask;
if (isNonTransient)
hedgeResponse = await (Task<HedgingResponse>)completedTask;
if (hedgeResponse.IsNonTransient)
{
cancellationTokenSource.Cancel();
((CosmosTraceDiagnostics)responseMessage.Diagnostics).Value.AddOrUpdateDatum(
HedgeRegions,
HedgeRegionsToString(responseMessage.Diagnostics.GetContactedRegions()));
((CosmosTraceDiagnostics)responseMessage.Diagnostics).Value.AddOrUpdateDatum(
((CosmosTraceDiagnostics)hedgeResponse.ResponseMessage.Diagnostics).Value.AddOrUpdateDatum(
HedgeContext,
object.ReferenceEquals(primaryRequest, completedTask)
? HedgeContextOriginalRequest
: HedgeContextHedgedRequest);
return responseMessage;
hedgeRegions.Take(requestNumber + 1));
NaluTripician marked this conversation as resolved.
Show resolved Hide resolved
((CosmosTraceDiagnostics)hedgeResponse.ResponseMessage.Diagnostics).Value.AddOrUpdateDatum(
ResponseRegion,
hedgeResponse.ResponseRegion);
return hedgeResponse.ResponseMessage;
}
}
}
Expand All @@ -206,19 +204,17 @@ public override async Task<ResponseMessage> ExecuteAvailabilityStrategyAsync(
lastException = innerExceptions.InnerExceptions.FirstOrDefault();
}

(bool isNonTransient, responseMessage) = await (Task<(bool, ResponseMessage)>)completedTask;
if (isNonTransient || requestTasks.Count == 0)
hedgeResponse = await (Task<HedgingResponse>)completedTask;
if (hedgeResponse.IsNonTransient || requestTasks.Count == 0)
{
cancellationTokenSource.Cancel();
((CosmosTraceDiagnostics)responseMessage.Diagnostics).Value.AddOrUpdateDatum(
HedgeRegions,
HedgeRegionsToString(responseMessage.Diagnostics.GetContactedRegions()));
((CosmosTraceDiagnostics)responseMessage.Diagnostics).Value.AddOrUpdateDatum(
((CosmosTraceDiagnostics)hedgeResponse.ResponseMessage.Diagnostics).Value.AddOrUpdateDatum(
HedgeContext,
object.ReferenceEquals(primaryRequest, completedTask)
? HedgeContextOriginalRequest
: HedgeContextHedgedRequest);
return responseMessage;
hedgeRegions);
((CosmosTraceDiagnostics)hedgeResponse.ResponseMessage.Diagnostics).Value.AddOrUpdateDatum(
ResponseRegion,
hedgeResponse.ResponseRegion);
return hedgeResponse.ResponseMessage;
}
}

Expand All @@ -227,13 +223,13 @@ public override async Task<ResponseMessage> ExecuteAvailabilityStrategyAsync(
throw lastException;
}

Debug.Assert(responseMessage != null);
return responseMessage;
Debug.Assert(hedgeResponse != null);
return hedgeResponse.ResponseMessage;
}
}
}

private async Task<(bool, ResponseMessage)> CloneAndSendAsync(
private async Task<HedgingResponse> CloneAndSendAsync(
Func<RequestMessage, CancellationToken, Task<ResponseMessage>> sender,
RequestMessage request,
CloneableStream clonedBody,
Expand All @@ -248,20 +244,23 @@ public override async Task<ResponseMessage> ExecuteAvailabilityStrategyAsync(
clonedRequest.RequestOptions ??= new RequestOptions();

List<string> excludeRegions = new List<string>(hedgeRegions);
string region = excludeRegions[requestNumber];
excludeRegions.RemoveAt(requestNumber);
clonedRequest.RequestOptions.ExcludeRegions = excludeRegions;

return await this.RequestSenderAndResultCheckAsync(
sender,
clonedRequest,
region,
cancellationToken,
cancellationTokenSource);
}
}

private async Task<(bool, ResponseMessage)> RequestSenderAndResultCheckAsync(
private async Task<HedgingResponse> RequestSenderAndResultCheckAsync(
Func<RequestMessage, CancellationToken, Task<ResponseMessage>> sender,
RequestMessage request,
string hedgedRegion,
CancellationToken cancellationToken,
CancellationTokenSource cancellationTokenSource)
{
Expand All @@ -274,14 +273,15 @@ public override async Task<ResponseMessage> ExecuteAvailabilityStrategyAsync(
{
cancellationTokenSource.Cancel();
}
return (true, response);

return new HedgingResponse(true, response, hedgedRegion);
}

return (false, response);
return new HedgingResponse(false, response, hedgedRegion);
}
catch (OperationCanceledException) when (cancellationTokenSource.IsCancellationRequested)
{
return (false, null);
return new HedgingResponse(false, null, hedgedRegion);
}
catch (Exception ex)
{
Expand Down Expand Up @@ -315,9 +315,18 @@ private static bool IsFinalResult(int statusCode, int subStatusCode)
return statusCode == (int)HttpStatusCode.NotFound && subStatusCode == (int)SubStatusCodes.Unknown;
}

private static string HedgeRegionsToString(IReadOnlyList<(string, Uri)> hedgeRegions)
NaluTripician marked this conversation as resolved.
Show resolved Hide resolved
private class HedgingResponse
NaluTripician marked this conversation as resolved.
Show resolved Hide resolved
{
return string.Join(",", hedgeRegions);
public bool IsNonTransient;
NaluTripician marked this conversation as resolved.
Show resolved Hide resolved
public ResponseMessage ResponseMessage;
public string ResponseRegion;

public HedgingResponse(bool isNonTransient, ResponseMessage responseMessage, string responseRegion)
{
this.IsNonTransient = isNonTransient;
this.ResponseMessage = responseMessage;
this.ResponseRegion = responseRegion;
}
}
}
}
Loading
Loading