Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Upgrade Resiliency: Adds Code to Enable Replica Validation Feature for Preview #3951

Merged
2 changes: 1 addition & 1 deletion Directory.Build.props
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
<ClientOfficialVersion>3.35.1</ClientOfficialVersion>
<ClientPreviewVersion>3.35.1</ClientPreviewVersion>
<ClientPreviewSuffixVersion>preview</ClientPreviewSuffixVersion>
<DirectVersion>3.31.2</DirectVersion>
<DirectVersion>3.31.3</DirectVersion>
<EncryptionOfficialVersion>2.0.2</EncryptionOfficialVersion>
<EncryptionPreviewVersion>2.0.2</EncryptionPreviewVersion>
<EncryptionPreviewSuffixVersion>preview</EncryptionPreviewSuffixVersion>
Expand Down
12 changes: 12 additions & 0 deletions Microsoft.Azure.Cosmos/src/ConnectionPolicy.cs
Original file line number Diff line number Diff line change
Expand Up @@ -459,6 +459,18 @@ public Func<HttpClient> HttpClientFactory
set;
}

/// <summary>
/// Gets or sets the boolean flag to enable replica validation.
/// </summary>
/// <value>
/// The default value for this parameter is false.
/// </value>
public bool EnableReplicaValidation
kundadebdatta marked this conversation as resolved.
Show resolved Hide resolved
{
get;
set;
}

/// <summary>
/// (Direct/TCP) This is an advanced setting that controls the number of TCP connections that will be opened eagerly to each Cosmos DB back-end.
/// </summary>
Expand Down
18 changes: 18 additions & 0 deletions Microsoft.Azure.Cosmos/src/CosmosClientOptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -346,6 +346,23 @@ public ConnectionMode ConnectionMode
/// <seealso cref="TransactionalBatchItemRequestOptions.EnableContentResponseOnWrite"/>
public bool? EnableContentResponseOnWrite { get; set; }

/// <summary>
/// Gets or sets the advanced replica selection flag. The advanced replica selection logic keeps track of the replica connection
/// status, and based on status, it prioritizes the replicas which are connected to the backend, so that the requests can be sent
kundadebdatta marked this conversation as resolved.
Show resolved Hide resolved
/// confidently to the particular replica. This helps the cosmos client to become more resilient and effictive to any connection
kundadebdatta marked this conversation as resolved.
Show resolved Hide resolved
/// timeouts. The default value for this parameter is false.
/// </summary>
/// <remarks>
/// <para>This is optimal for workloads where latency spikes are critical due to connection timeouts. Does not apply if <see cref="ConnectionMode.Gateway"/> is used.</para>
kundadebdatta marked this conversation as resolved.
Show resolved Hide resolved
/// </remarks>
/// <seealso cref="CosmosClientBuilder.WithAdvancedReplicaSelectionEnabledForTcp()"/>
#if PREVIEW
public
#else
internal
#endif
bool EnableAdvancedReplicaSelectionForTcp { get; set; }
kundadebdatta marked this conversation as resolved.
Show resolved Hide resolved

/// <summary>
/// (Direct/TCP) Controls the amount of idle time after which unused connections are closed.
/// </summary>
Expand Down Expand Up @@ -758,6 +775,7 @@ internal virtual ConnectionPolicy GetConnectionPolicy(int clientId)
EnablePartitionLevelFailover = this.EnablePartitionLevelFailover,
PortReuseMode = this.portReuseMode,
EnableTcpConnectionEndpointRediscovery = this.EnableTcpConnectionEndpointRediscovery,
EnableReplicaValidation = this.EnableAdvancedReplicaSelectionForTcp,
HttpClientFactory = this.httpClientFactory,
ServerCertificateCustomValidationCallback = this.ServerCertificateCustomValidationCallback
};
Expand Down
3 changes: 2 additions & 1 deletion Microsoft.Azure.Cosmos/src/DocumentClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6700,7 +6700,8 @@ private void CreateStoreModel(bool subscribeRntbdStatus)
this.ConnectionPolicy.EnableReadRequestsFallback ?? (this.accountServiceConfiguration.DefaultConsistencyLevel != Documents.ConsistencyLevel.BoundedStaleness),
!this.enableRntbdChannel,
this.UseMultipleWriteLocations && (this.accountServiceConfiguration.DefaultConsistencyLevel != Documents.ConsistencyLevel.Strong),
true);
true,
enableReplicaValidation: this.ConnectionPolicy.EnableReplicaValidation);

if (subscribeRntbdStatus)
{
Expand Down
18 changes: 18 additions & 0 deletions Microsoft.Azure.Cosmos/src/Fluent/CosmosClientBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -621,6 +621,24 @@ public CosmosClientBuilder WithContentResponseOnWrite(bool contentResponseOnWrit
return this;
}

/// <summary>
/// Enables the advanced replica selection flag. The advanced replica selection logic keeps track of the replica connection status,
/// and based on status, it prioritizes the replicas which are connected to the backend, so that the requests can be sent
/// confidently to the particular replica. This helps the cosmos client to become more resilient and effictive to any connection
/// timeouts. The default value for this parameter is false.
/// </summary>
/// <returns>The <see cref="CosmosClientBuilder"/> object</returns>
#if PREVIEW
public
#else
internal
#endif
CosmosClientBuilder WithAdvancedReplicaSelectionEnabledForTcp()
{
this.clientOptions.EnableAdvancedReplicaSelectionForTcp = true;
return this;
}

/// <summary>
/// The event handler to be invoked before the request is sent.
/// </summary>
Expand Down
27 changes: 15 additions & 12 deletions Microsoft.Azure.Cosmos/src/Routing/GatewayAddressCache.cs
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,8 @@ public GatewayAddressCache(
CosmosHttpClient httpClient,
IOpenConnectionsHandler openConnectionsHandler,
long suboptimalPartitionForceRefreshIntervalInSeconds = 600,
bool enableTcpConnectionEndpointRediscovery = false)
bool enableTcpConnectionEndpointRediscovery = false,
bool replicaAddressValidationEnabled = false)
{
this.addressEndpoint = new Uri(serviceEndpoint + "/" + Paths.AddressPathSegment);
this.protocol = protocol;
Expand All @@ -90,9 +91,7 @@ public GatewayAddressCache(
GatewayAddressCache.ProtocolString(this.protocol));

this.openConnectionsHandler = openConnectionsHandler;
this.isReplicaAddressValidationEnabled = Helpers.GetEnvironmentVariable<bool>(
name: Constants.EnvironmentVariables.ReplicaConnectivityValidationEnabled,
defaultValue: false);
this.isReplicaAddressValidationEnabled = replicaAddressValidationEnabled;
}

public Uri ServiceEndpoint => this.serviceEndpoint;
Expand Down Expand Up @@ -144,6 +143,7 @@ public async Task OpenConnectionsAsync(
Paths.CollectionsPathSegment,
Uri.EscapeUriString(collection.Id));

using CancellationTokenSource linkedTokenSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
using (DocumentServiceRequest request = DocumentServiceRequest.CreateFromName(
OperationType.Read,
collectionAltLink,
Expand All @@ -158,12 +158,11 @@ public async Task OpenConnectionsAsync(
collectionRid: collection.ResourceId,
partitionKeyRangeIds: partitionKeyRangeIdentities.Skip(i).Take(batchSize).Select(range => range.PartitionKeyRangeId),
containerProperties: collection,
shouldOpenRntbdChannels: shouldOpenRntbdChannels));
shouldOpenRntbdChannels: shouldOpenRntbdChannels,
cancellationToken: linkedTokenSource.Token));
kirankumarkolli marked this conversation as resolved.
Show resolved Hide resolved
}
}

using CancellationTokenSource linkedTokenSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);

// The `timeoutTask` is a background task which adds a delay for a period of WarmupCacheAndOpenConnectionTimeout. The task will
// be cancelled either by - a) when `linkedTokenSource` expires, which means the original `cancellationToken` expires or
// b) the the `linkedTokenSource.Cancel()` is called.
Expand Down Expand Up @@ -348,12 +347,14 @@ public async Task<PartitionAddressInformation> TryGetAddressesAsync(
/// <param name="partitionKeyRangeIds">An instance of <see cref="IEnumerable{T}"/> containing the list of partition key range ids.</param>
/// <param name="containerProperties">An instance of <see cref="ContainerProperties"/> containing the collection properties.</param>
/// <param name="shouldOpenRntbdChannels">A boolean flag indicating whether Rntbd connections are required to be established to the backend replica nodes.</param>
/// <param name="cancellationToken">An instance of <see cref="CancellationToken"/>.</param>
private async Task WarmupCachesAndOpenConnectionsAsync(
DocumentServiceRequest request,
string collectionRid,
IEnumerable<string> partitionKeyRangeIds,
ContainerProperties containerProperties,
bool shouldOpenRntbdChannels)
bool shouldOpenRntbdChannels,
CancellationToken cancellationToken)
{
TryCatch<DocumentServiceResponse> documentServiceResponseWrapper = await this.GetAddressesAsync(
request: request,
Expand Down Expand Up @@ -381,6 +382,11 @@ private async Task WarmupCachesAndOpenConnectionsAsync(
List<Task> openConnectionTasks = new ();
foreach (Tuple<PartitionKeyRangeIdentity, PartitionAddressInformation> addressInfo in addressInfos)
{
if (cancellationToken.IsCancellationRequested)
kundadebdatta marked this conversation as resolved.
Show resolved Hide resolved
{
break;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OnCancellation behavior: With new changes its trying to gracefully complete right?
Current existing pattern across is to throw exception

cancellationToken.ThrowIfCancellationRequested()

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't this used during the CreateAndInitialize call? If so, we should avoid throwing. CreateAndInitialize is a best effort operation, if we throw, it will probably break customer application initialization

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this is used in the CreateAndInitializeAsync() flow and throwing the exception might broke it, because the connection opening are more of a best effort, and should continue even if some of the connection warm ups failed.

}

this.serverPartitionAddressCache.Set(
new PartitionKeyRangeIdentity(containerProperties.ResourceId, addressInfo.Item1.PartitionKeyRangeId),
addressInfo.Item2);
Expand All @@ -398,10 +404,7 @@ private async Task WarmupCachesAndOpenConnectionsAsync(
}
}

if (openConnectionTasks.Any())
{
await Task.WhenAll(openConnectionTasks);
}
await Task.WhenAll(openConnectionTasks);
kundadebdatta marked this conversation as resolved.
Show resolved Hide resolved
}
}
catch (Exception ex)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ internal sealed class GlobalAddressResolver : IAddressResolverExtension, IDispos
private readonly CosmosHttpClient httpClient;
private readonly ConcurrentDictionary<Uri, EndpointCache> addressCacheByEndpoint;
private readonly bool enableTcpConnectionEndpointRediscovery;
private readonly bool replicaAddressValidationEnabled;
private IOpenConnectionsHandler openConnectionsHandler;

public GlobalAddressResolver(
Expand Down Expand Up @@ -66,6 +67,7 @@ public GlobalAddressResolver(
? GlobalAddressResolver.MaxBackupReadRegions : 0;

this.enableTcpConnectionEndpointRediscovery = connectionPolicy.EnableTcpConnectionEndpointRediscovery;
this.replicaAddressValidationEnabled = connectionPolicy.EnableReplicaValidation;

this.maxEndpoints = maxBackupReadEndpoints + 2; // for write and alternate write endpoint (during failover)

Expand Down Expand Up @@ -281,7 +283,8 @@ private EndpointCache GetOrAddEndpoint(Uri endpoint)
this.serviceConfigReader,
this.httpClient,
this.openConnectionsHandler,
enableTcpConnectionEndpointRediscovery: this.enableTcpConnectionEndpointRediscovery);
enableTcpConnectionEndpointRediscovery: this.enableTcpConnectionEndpointRediscovery,
replicaAddressValidationEnabled: this.replicaAddressValidationEnabled);

string location = this.endpointManager.GetLocation(endpoint);
AddressResolver addressResolver = new AddressResolver(null, new NullRequestSigner(), location);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,27 +51,57 @@ public async Task Cleanup()
}

[TestMethod]
public async Task ReadManyTypedTest()
[DataRow(true, DisplayName = "Validates Read Many scenario with advanced replica selection enabled.")]
[DataRow(false, DisplayName = "Validates Read Many scenario with advanced replica selection disabled.")]
public async Task ReadManyTypedTestWithAdvancedReplicaSelection(
bool advancedReplicaSelectionEnabled)
{
List<(string, PartitionKey)> itemList = new List<(string, PartitionKey)>();
for (int i=0; i<10; i++)
CosmosClient cosmosClient = advancedReplicaSelectionEnabled
? TestCommon.CreateCosmosClient(
customizeClientBuilder: (CosmosClientBuilder builder) => builder.WithAdvancedReplicaSelectionEnabledForTcp())
: TestCommon.CreateCosmosClient();

Database database = null;
try
{
itemList.Add((i.ToString(), new PartitionKey("pk" + i.ToString())));
}
database = await cosmosClient.CreateDatabaseAsync("ReadManyTypedTestScenarioDb");
Container container = await database.CreateContainerAsync("ReadManyTypedTestContainer", "/pk");

FeedResponse<ToDoActivity> feedResponse= await this.Container.ReadManyItemsAsync<ToDoActivity>(itemList);
Assert.IsNotNull(feedResponse);
Assert.AreEqual(feedResponse.Count, 10);
Assert.IsTrue(feedResponse.Headers.RequestCharge > 0);
Assert.IsNotNull(feedResponse.Diagnostics);
// Create items with different pk values
for (int i = 0; i < 500; i++)
{
ToDoActivity item = ToDoActivity.CreateRandomToDoActivity();
item.pk = "pk" + i.ToString();
item.id = i.ToString();
ItemResponse<ToDoActivity> itemResponse = await container.CreateItemAsync(item);
Assert.AreEqual(HttpStatusCode.Created, itemResponse.StatusCode);
}

List<(string, PartitionKey)> itemList = new List<(string, PartitionKey)>();
for (int i = 0; i < 20; i++)
{
itemList.Add((i.ToString(), new PartitionKey("pk" + i.ToString())));
}

FeedResponse<ToDoActivity> feedResponse = await container.ReadManyItemsAsync<ToDoActivity>(itemList);
Assert.IsNotNull(feedResponse);
Assert.AreEqual(20, feedResponse.Count);
Assert.IsTrue(feedResponse.Headers.RequestCharge > 0);
Assert.IsNotNull(feedResponse.Diagnostics);

int count = 0;
foreach (ToDoActivity item in feedResponse)
int count = 0;
foreach (ToDoActivity item in feedResponse)
{
count++;
Assert.IsNotNull(item);
}
Assert.AreEqual(20, count);
}
finally
{
count++;
Assert.IsNotNull(item);
await database.DeleteAsync();
cosmosClient.Dispose();
}
Assert.AreEqual(count, 10);
}

[TestMethod]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,8 @@ private async Task ValidateConnectTimeoutTriggersClientRetryPolicy(
enableReadRequestsFallback: false,
useMultipleWriteLocations: useMultipleWriteLocations,
detectClientConnectivityIssues: true,
disableRetryWithRetryPolicy: false);
disableRetryWithRetryPolicy: false,
enableReplicaValidation: false);

// Reducing retry timeout to avoid long-running tests
replicatedResourceClient.GoneAndRetryWithRetryTimeoutInSecondsOverride = 1;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ public void VerifyCosmosConfigurationPropertiesGetUpdated()
Assert.IsNull(clientOptions.HttpClientFactory);
Assert.AreNotEqual(consistencyLevel, clientOptions.ConsistencyLevel);
Assert.IsFalse(clientOptions.EnablePartitionLevelFailover);
Assert.IsFalse(clientOptions.EnableAdvancedReplicaSelectionForTcp);

//Verify GetConnectionPolicy returns the correct values for default
ConnectionPolicy policy = clientOptions.GetConnectionPolicy(clientId: 0);
Expand All @@ -97,6 +98,7 @@ public void VerifyCosmosConfigurationPropertiesGetUpdated()
Assert.IsNull(policy.HttpClientFactory);
Assert.AreNotEqual(Cosmos.ConsistencyLevel.Session, clientOptions.ConsistencyLevel);
Assert.IsFalse(policy.EnablePartitionLevelFailover);
Assert.IsFalse(policy.EnableReplicaValidation);

cosmosClientBuilder.WithApplicationRegion(region)
.WithConnectionModeGateway(maxConnections, webProxy)
Expand All @@ -108,7 +110,8 @@ public void VerifyCosmosConfigurationPropertiesGetUpdated()
.WithBulkExecution(true)
.WithSerializerOptions(cosmosSerializerOptions)
.WithConsistencyLevel(consistencyLevel)
.WithPartitionLevelFailoverEnabled();
.WithPartitionLevelFailoverEnabled()
.WithAdvancedReplicaSelectionEnabledForTcp();

cosmosClient = cosmosClientBuilder.Build(new MockDocumentClient());
clientOptions = cosmosClient.ClientOptions;
Expand All @@ -131,6 +134,7 @@ public void VerifyCosmosConfigurationPropertiesGetUpdated()
Assert.IsTrue(clientOptions.AllowBulkExecution);
Assert.AreEqual(consistencyLevel, clientOptions.ConsistencyLevel);
Assert.IsTrue(clientOptions.EnablePartitionLevelFailover);
Assert.IsTrue(clientOptions.EnableAdvancedReplicaSelectionForTcp);

//Verify GetConnectionPolicy returns the correct values
policy = clientOptions.GetConnectionPolicy(clientId: 0);
Expand All @@ -145,7 +149,8 @@ public void VerifyCosmosConfigurationPropertiesGetUpdated()
Assert.AreEqual((int)maxRetryWaitTime.TotalSeconds, policy.RetryOptions.MaxRetryWaitTimeInSeconds);
Assert.AreEqual((Documents.ConsistencyLevel)consistencyLevel, clientOptions.GetDocumentsConsistencyLevel());
Assert.IsTrue(policy.EnablePartitionLevelFailover);

Assert.IsTrue(policy.EnableReplicaValidation);

IReadOnlyList<string> preferredLocations = new List<string>() { Regions.AustraliaCentral, Regions.AustraliaCentral2 };
//Verify Direct Mode settings
cosmosClientBuilder = new CosmosClientBuilder(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ private StoreClient GetMockStoreClient()
TransportClient mockTransportClient = this.GetMockTransportClient();
ISessionContainer sessionContainer = new SessionContainer(string.Empty);

StoreReader storeReader = new StoreReader(mockTransportClient, addressSelector, new AddressEnumerator(), sessionContainer);
StoreReader storeReader = new StoreReader(mockTransportClient, addressSelector, new AddressEnumerator(), sessionContainer, false);

Mock<IAuthorizationTokenProvider> mockAuthorizationTokenProvider = new Mock<IAuthorizationTokenProvider>();
mockAuthorizationTokenProvider.Setup(provider => provider.AddSystemAuthorizationHeaderAsync(
Expand Down
Loading