Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Trigger Failover On 3092 And 1022 #4670

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,11 @@ public virtual async Task<TransactionalBatchOperationResult> AddAsync(
ItemBatchOperationContext context = new ItemBatchOperationContext(
resolvedPartitionKeyRangeId,
trace,
BatchAsyncContainerExecutor.GetRetryPolicy(this.cosmosContainer, operation.OperationType, this.retryOptions));
BatchAsyncContainerExecutor.GetRetryPolicy(
this.cosmosContainer,
this.cosmosClientContext?.DocumentClient?.GlobalEndpointManager,
operation.OperationType,
this.retryOptions));

if (itemRequestOptions != null && itemRequestOptions.AddRequestHeaders != null)
{
Expand Down Expand Up @@ -159,6 +163,7 @@ internal virtual async Task ValidateOperationAsync(

private static IDocumentClientRetryPolicy GetRetryPolicy(
ContainerInternal containerInternal,
GlobalEndpointManager endpointManager,
OperationType operationType,
RetryOptions retryOptions)
{
Expand All @@ -167,6 +172,7 @@ private static IDocumentClientRetryPolicy GetRetryPolicy(
operationType,
new ResourceThrottleRetryPolicy(
retryOptions.MaxRetryAttemptsOnThrottledRequests,
endpointManager,
retryOptions.MaxRetryWaitTimeInSeconds));
}

Expand Down
5 changes: 3 additions & 2 deletions Microsoft.Azure.Cosmos/src/ClientRetryPolicy.cs
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,9 @@ public ClientRetryPolicy(
bool isPertitionLevelFailoverEnabled)
{
this.throttlingRetry = new ResourceThrottleRetryPolicy(
retryOptions.MaxRetryAttemptsOnThrottledRequests,
retryOptions.MaxRetryWaitTimeInSeconds);
maxAttemptCount: retryOptions.MaxRetryAttemptsOnThrottledRequests,
endpointManager: globalEndpointManager,
maxWaitTimeInSeconds: retryOptions.MaxRetryWaitTimeInSeconds);

this.globalEndpointManager = globalEndpointManager;
this.partitionKeyRangeLocationCache = partitionKeyRangeLocationCache;
Expand Down
1 change: 1 addition & 0 deletions Microsoft.Azure.Cosmos/src/DocumentClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -985,6 +985,7 @@ internal virtual void Initialize(Uri serviceEndpoint,
() => this.GetInitializationTaskAsync(storeClientFactory: storeClientFactory),
new ResourceThrottleRetryPolicy(
this.ConnectionPolicy.RetryOptions.MaxRetryAttemptsOnThrottledRequests,
this.GlobalEndpointManager,
this.ConnectionPolicy.RetryOptions.MaxRetryWaitTimeInSeconds));

// Create the task to start the initialize task
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ public MetadataRequestThrottleRetryPolicy(

this.throttlingRetryPolicy = new ResourceThrottleRetryPolicy(
maxRetryAttemptsOnThrottledRequests,
globalEndpointManager,
maxRetryWaitTimeInSeconds);

this.retryContext = new MetadataRetryContext
Expand Down
38 changes: 35 additions & 3 deletions Microsoft.Azure.Cosmos/src/ResourceThrottleRetryPolicy.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ namespace Microsoft.Azure.Cosmos
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Azure.Cosmos.Core.Trace;
using Microsoft.Azure.Cosmos.Routing;
using Microsoft.Azure.Documents;

// Retry when we receive the throttling from server.
Expand All @@ -19,12 +20,15 @@ internal sealed class ResourceThrottleRetryPolicy : IDocumentClientRetryPolicy
private readonly uint backoffDelayFactor;
private readonly int maxAttemptCount;
private readonly TimeSpan maxWaitTimeInMilliseconds;
private readonly IGlobalEndpointManager globalEndpointManager;

private int currentAttemptCount;
private TimeSpan cumulativeRetryDelay;
private bool? isMultiMasterWriteRegion;

public ResourceThrottleRetryPolicy(
int maxAttemptCount,
IGlobalEndpointManager endpointManager,
int maxWaitTimeInSeconds = DefaultMaxWaitTimeInSeconds,
uint backoffDelayFactor = 1)
{
Expand All @@ -33,6 +37,7 @@ public ResourceThrottleRetryPolicy(
throw new ArgumentException("maxWaitTimeInSeconds", "maxWaitTimeInSeconds must be less than " + (int.MaxValue / 1000));
}

this.globalEndpointManager = endpointManager;
this.maxAttemptCount = maxAttemptCount;
this.backoffDelayFactor = backoffDelayFactor;
this.maxWaitTimeInMilliseconds = TimeSpan.FromSeconds(maxWaitTimeInSeconds);
Expand All @@ -59,7 +64,9 @@ public Task<ShouldRetryResult> ShouldRetryAsync(
return Task.FromResult(ShouldRetryResult.NoRetry());
}

return this.ShouldRetryInternalAsync(dce.RetryAfter);
return this.ShouldRetryInternalAsync(
dce?.GetSubStatus(),
dce?.RetryAfter);
}

DefaultTrace.TraceError(
Expand Down Expand Up @@ -88,11 +95,34 @@ public Task<ShouldRetryResult> ShouldRetryAsync(
return Task.FromResult(ShouldRetryResult.NoRetry());
}

return this.ShouldRetryInternalAsync(cosmosResponseMessage?.Headers.RetryAfter);
return this.ShouldRetryInternalAsync(
cosmosResponseMessage?.Headers.SubStatusCode,
cosmosResponseMessage?.Headers.RetryAfter,
cosmosResponseMessage?.CosmosException);
}

private Task<ShouldRetryResult> ShouldRetryInternalAsync(TimeSpan? retryAfter)
private Task<ShouldRetryResult> ShouldRetryInternalAsync(
SubStatusCodes? subStatusCode,
TimeSpan? retryAfter,
Exception exception = null)
{
if (this.isMultiMasterWriteRegion.HasValue
&& this.isMultiMasterWriteRegion.Value
&& subStatusCode != null
&& subStatusCode == SubStatusCodes.SystemResourceUnavailable)
{
DefaultTrace.TraceError(
"Operation will NOT be retried. Converting 429 to 503. Current attempt {0} sub status code: {1}.",
this.currentAttemptCount, SubStatusCodes.SystemResourceUnavailable);

var exceptionToThrow = ServiceUnavailableException.Create(
SubStatusCodes.SystemResourceUnavailable,
innerException: exception);

return Task.FromResult(
ShouldRetryResult.NoRetry(exceptionToThrow));
}

TimeSpan retryDelay = TimeSpan.Zero;
if (this.currentAttemptCount < this.maxAttemptCount &&
this.CheckIfRetryNeeded(retryAfter, out retryDelay))
Expand Down Expand Up @@ -133,6 +163,8 @@ private object GetExceptionMessage(Exception exception)
/// <param name="request">The request being sent to the service.</param>
public void OnBeforeSendRequest(DocumentServiceRequest request)
{
this.isMultiMasterWriteRegion = !request.IsReadOnlyRequest
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you are missing the condition that >1 write region need to be available (honoring preferred regions, excludedRegions etc.) - Could be included in GlobalEndpointManager.CanUseMultipleriteRegion but glancing at the code I did not see this check.

&& (this.globalEndpointManager?.CanUseMultipleWriteLocations(request) ?? false);
}

/// <summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ namespace Microsoft.Azure.Documents
using System.Net;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Azure.Cosmos.ChangeFeed.Exceptions;
using Microsoft.Azure.Cosmos.Core.Trace;
using Microsoft.Azure.Documents.Routing;

Expand Down Expand Up @@ -143,6 +144,7 @@ public bool TryHandleResponseSynchronously(DocumentServiceRequest request, TResp

bool isRetryWith = false;
if (!GoneAndRetryWithRequestRetryPolicy<TResponse>.IsBaseGone(response, exception) &&
!GoneAndRetryWithRequestRetryPolicy<TResponse>.IsGoneWithLeaseNotFound(response, exception) &&
!(exception is RetryWithException) &&
!(GoneAndRetryWithRequestRetryPolicy<TResponse>.IsPartitionIsMigrating(response, exception) && (request.ServiceIdentity == null || request.ServiceIdentity.IsMasterService)) &&
!(GoneAndRetryWithRequestRetryPolicy<TResponse>.IsInvalidPartition(response, exception) && (request.PartitionKeyRangeIdentity == null || request.PartitionKeyRangeIdentity.CollectionRid == null)) &&
Expand Down Expand Up @@ -170,6 +172,21 @@ public bool TryHandleResponseSynchronously(DocumentServiceRequest request, TResp
isRetryWith = true;
this.lastRetryWithException = exception as RetryWithException;
}
else if (GoneAndRetryWithRequestRetryPolicy<TResponse>.IsGoneWithLeaseNotFound(response, exception))
{
DefaultTrace.TraceWarning(
"The GoneAndRetryWithRequestRetryPolicy has hit 410 with lease not found. This is by design to do a cross regional failover to handle the exception: {0}",
new ErrorOrResponse(exception));

exceptionToThrow = ServiceUnavailableException.Create(
SubStatusCodes.LeaseNotFound,
innerException: exception);

this.durationTimer.Stop();

shouldRetryResult = ShouldRetryResult.NoRetry(exceptionToThrow);
return true;
}

int remainingMilliseconds;
if (isRetryWith)
Expand Down Expand Up @@ -428,6 +445,13 @@ private static bool IsPartitionKeyRangeGone(TResponse response, Exception except
|| (response?.StatusCode == HttpStatusCode.Gone && response?.SubStatusCode == SubStatusCodes.PartitionKeyRangeGone);
}

private static bool IsGoneWithLeaseNotFound(TResponse response, Exception exception)
{
return exception is LeaseLostException
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No LeaseLostException has completely different meaning - that is thrown when Change Feed Processor is losing the CFP lease.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this is taken care in the original msdata PR: 1453704

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have created a new exception type: LeaseNotFoundException. That PR has more context on it. We can follow up on that PR itself!

|| (response?.StatusCode == HttpStatusCode.Gone &&
(response?.SubStatusCode == SubStatusCodes.LeaseNotFound));
}

private static void ClearRequestContext(DocumentServiceRequest request)
{
request.RequestContext.TargetIdentity = null;
Expand Down
1 change: 1 addition & 0 deletions Microsoft.Azure.Cosmos/src/direct/StatusCodes.cs
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,7 @@ internal enum SubStatusCodes
GatewayThrottled = 3201,
StoredProcedureConcurrency = 3084,
ThottleDueToSplit = 3088,
SystemResourceUnavailable = 3092,

// Key Vault Access Client Error Code
AadClientCredentialsGrantFailure = 4000, // Indicated access to AAD failed to get a token
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -562,15 +562,18 @@ public async Task CannotAddToDispatchedBatch()
[TestMethod]
public async Task RetrierGetsCalledOnSplit()
{
Mock<IDocumentClientInternal> mockedClient = new Mock<IDocumentClientInternal>();
GlobalEndpointManager endpointManager = new GlobalEndpointManager(mockedClient.Object, new ConnectionPolicy());

IDocumentClientRetryPolicy retryPolicy1 = new BulkExecutionRetryPolicy(
GetSplitEnabledContainer(),
OperationType.Read,
new ResourceThrottleRetryPolicy(1));
new ResourceThrottleRetryPolicy(1, endpointManager));

IDocumentClientRetryPolicy retryPolicy2 = new BulkExecutionRetryPolicy(
GetSplitEnabledContainer(),
OperationType.Read,
new ResourceThrottleRetryPolicy(1));
new ResourceThrottleRetryPolicy(1, endpointManager));

ItemBatchOperation operation1 = this.CreateItemBatchOperation();
ItemBatchOperation operation2 = this.CreateItemBatchOperation();
Expand All @@ -591,15 +594,18 @@ public async Task RetrierGetsCalledOnSplit()
[TestMethod]
public async Task RetrierGetsCalledOnCompletingSplit()
{
Mock<IDocumentClientInternal> mockedClient = new Mock<IDocumentClientInternal>();
GlobalEndpointManager endpointManager = new GlobalEndpointManager(mockedClient.Object, new ConnectionPolicy());

IDocumentClientRetryPolicy retryPolicy1 = new BulkExecutionRetryPolicy(
GetSplitEnabledContainer(),
OperationType.Read,
new ResourceThrottleRetryPolicy(1));
new ResourceThrottleRetryPolicy(1, endpointManager));

IDocumentClientRetryPolicy retryPolicy2 = new BulkExecutionRetryPolicy(
GetSplitEnabledContainer(),
OperationType.Read,
new ResourceThrottleRetryPolicy(1));
new ResourceThrottleRetryPolicy(1, endpointManager));

ItemBatchOperation operation1 = this.CreateItemBatchOperation();
ItemBatchOperation operation2 = this.CreateItemBatchOperation();
Expand All @@ -620,15 +626,18 @@ public async Task RetrierGetsCalledOnCompletingSplit()
[TestMethod]
public async Task RetrierGetsCalledOnCompletingPartitionMigration()
{
Mock<IDocumentClientInternal> mockedClient = new Mock<IDocumentClientInternal>();
GlobalEndpointManager endpointManager = new GlobalEndpointManager(mockedClient.Object, new ConnectionPolicy());

IDocumentClientRetryPolicy retryPolicy1 = new BulkExecutionRetryPolicy(
GetSplitEnabledContainer(),
OperationType.Read,
new ResourceThrottleRetryPolicy(1));
new ResourceThrottleRetryPolicy(1, endpointManager));

IDocumentClientRetryPolicy retryPolicy2 = new BulkExecutionRetryPolicy(
GetSplitEnabledContainer(),
OperationType.Read,
new ResourceThrottleRetryPolicy(1));
new ResourceThrottleRetryPolicy(1, endpointManager));

ItemBatchOperation operation1 = this.CreateItemBatchOperation();
ItemBatchOperation operation2 = this.CreateItemBatchOperation();
Expand Down Expand Up @@ -669,20 +678,23 @@ public async Task RetrierGetsCalledOnOverFlow()
[TestMethod]
public async Task RetrierGetsCalledOn413_3402()
{
Mock<IDocumentClientInternal> mockedClient = new Mock<IDocumentClientInternal>();
GlobalEndpointManager endpointManager = new GlobalEndpointManager(mockedClient.Object, new ConnectionPolicy());

IDocumentClientRetryPolicy retryPolicy1 = new BulkExecutionRetryPolicy(
GetSplitEnabledContainer(),
OperationType.Read,
new ResourceThrottleRetryPolicy(1));
new ResourceThrottleRetryPolicy(1, endpointManager));

IDocumentClientRetryPolicy retryPolicy2 = new BulkExecutionRetryPolicy(
GetSplitEnabledContainer(),
OperationType.Read,
new ResourceThrottleRetryPolicy(1));
new ResourceThrottleRetryPolicy(1, endpointManager));

IDocumentClientRetryPolicy retryPolicy3 = new BulkExecutionRetryPolicy(
GetSplitEnabledContainer(),
OperationType.Create,
new ResourceThrottleRetryPolicy(1));
new ResourceThrottleRetryPolicy(1, endpointManager));

ItemBatchOperation operation1 = this.CreateItemBatchOperation();
ItemBatchOperation operation2 = this.CreateItemBatchOperation();
Expand All @@ -707,20 +719,23 @@ public async Task RetrierGetsCalledOn413_3402()
[TestMethod]
public async Task RetrierGetsCalledOn413_NoSubstatus()
{
Mock<IDocumentClientInternal> mockedClient = new Mock<IDocumentClientInternal>();
GlobalEndpointManager endpointManager = new GlobalEndpointManager(mockedClient.Object, new ConnectionPolicy());

IDocumentClientRetryPolicy retryPolicy1 = new BulkExecutionRetryPolicy(
GetSplitEnabledContainer(),
OperationType.Read,
new ResourceThrottleRetryPolicy(1));
new ResourceThrottleRetryPolicy(1, endpointManager));

IDocumentClientRetryPolicy retryPolicy2 = new BulkExecutionRetryPolicy(
GetSplitEnabledContainer(),
OperationType.Read,
new ResourceThrottleRetryPolicy(1));
new ResourceThrottleRetryPolicy(1, endpointManager));

IDocumentClientRetryPolicy retryPolicy3 = new BulkExecutionRetryPolicy(
GetSplitEnabledContainer(),
OperationType.Create,
new ResourceThrottleRetryPolicy(1));
new ResourceThrottleRetryPolicy(1, endpointManager));

ItemBatchOperation operation1 = this.CreateItemBatchOperation();
ItemBatchOperation operation2 = this.CreateItemBatchOperation();
Expand Down
Loading