diff --git a/Directory.Build.props b/Directory.Build.props index 55a67ffec4..71b854a5c7 100644 --- a/Directory.Build.props +++ b/Directory.Build.props @@ -3,7 +3,7 @@ 3.35.1 3.35.1 preview - 3.31.2 + 3.31.3 2.0.2 2.0.2 preview diff --git a/Microsoft.Azure.Cosmos/src/CosmosClientOptions.cs b/Microsoft.Azure.Cosmos/src/CosmosClientOptions.cs index 875fbba29a..0baea086de 100644 --- a/Microsoft.Azure.Cosmos/src/CosmosClientOptions.cs +++ b/Microsoft.Azure.Cosmos/src/CosmosClientOptions.cs @@ -347,21 +347,21 @@ public ConnectionMode ConnectionMode public bool? EnableContentResponseOnWrite { get; set; } /// - /// Gets or sets the prioritize healthy replicas flag. - /// Prioritizing healthy replicas helps the cosmos client to become more - /// resilient to connection timeouts, by choosing a healthy replica over an - /// unhealthy one. The default value for this parameter is false. + /// Gets or sets the advanced replica selection flag. The advanced replica selection logic keeps track of the replica connection + /// status, and based on status, it prioritizes the replicas which are connected to the backend, so that the requests can be sent + /// confidently to the particular replica. This helps the cosmos client to become more resilient and effictive to any connection + /// timeouts. The default value for this parameter is false. /// /// - /// This is optimal for workloads where latency spikes are critical during upgrades. + /// This is optimal for workloads where latency spikes are critical due to connection timeouts. Does not apply if is used. /// - /// + /// #if PREVIEW public #else internal #endif - bool PrioritizeHealthyReplicas { get; set; } + bool EnableAdvancedReplicaSelectionForTcp { get; set; } /// /// (Direct/TCP) Controls the amount of idle time after which unused connections are closed. @@ -775,6 +775,7 @@ internal virtual ConnectionPolicy GetConnectionPolicy(int clientId) EnablePartitionLevelFailover = this.EnablePartitionLevelFailover, PortReuseMode = this.portReuseMode, EnableTcpConnectionEndpointRediscovery = this.EnableTcpConnectionEndpointRediscovery, + EnableReplicaValidation = this.EnableAdvancedReplicaSelectionForTcp, HttpClientFactory = this.httpClientFactory, ServerCertificateCustomValidationCallback = this.ServerCertificateCustomValidationCallback }; diff --git a/Microsoft.Azure.Cosmos/src/DocumentClient.cs b/Microsoft.Azure.Cosmos/src/DocumentClient.cs index 015959ba50..ca423eee00 100644 --- a/Microsoft.Azure.Cosmos/src/DocumentClient.cs +++ b/Microsoft.Azure.Cosmos/src/DocumentClient.cs @@ -6701,7 +6701,7 @@ private void CreateStoreModel(bool subscribeRntbdStatus) !this.enableRntbdChannel, this.UseMultipleWriteLocations && (this.accountServiceConfiguration.DefaultConsistencyLevel != Documents.ConsistencyLevel.Strong), true, - this.ConnectionPolicy.EnableReplicaValidation); + enableReplicaValidation: this.ConnectionPolicy.EnableReplicaValidation); if (subscribeRntbdStatus) { diff --git a/Microsoft.Azure.Cosmos/src/Fluent/CosmosClientBuilder.cs b/Microsoft.Azure.Cosmos/src/Fluent/CosmosClientBuilder.cs index d2c3546204..35970bcca0 100644 --- a/Microsoft.Azure.Cosmos/src/Fluent/CosmosClientBuilder.cs +++ b/Microsoft.Azure.Cosmos/src/Fluent/CosmosClientBuilder.cs @@ -622,22 +622,20 @@ public CosmosClientBuilder WithContentResponseOnWrite(bool contentResponseOnWrit } /// - /// Gets or sets the prioritize healthy replicas flag. - /// Prioritizing healthy replicas helps the cosmos client to become more - /// resilient to connection timeouts, by choosing a healthy replica over an - /// unhealthy one. The default value for this parameter is false. + /// Enables the advanced replica selection flag. The advanced replica selection logic keeps track of the replica connection status, + /// and based on status, it prioritizes the replicas which are connected to the backend, so that the requests can be sent + /// confidently to the particular replica. This helps the cosmos client to become more resilient and effictive to any connection + /// timeouts. The default value for this parameter is false. /// - /// a boolean flag indicating if the feature will be enabled. /// The object #if PREVIEW public #else internal #endif - CosmosClientBuilder WithPrioritizeHealthyReplicas( - bool replicaValidationEnabled) + CosmosClientBuilder WithAdvancedReplicaSelectionEnabledForTcp() { - this.clientOptions.PrioritizeHealthyReplicas = replicaValidationEnabled; + this.clientOptions.EnableAdvancedReplicaSelectionForTcp = true; return this; } diff --git a/Microsoft.Azure.Cosmos/src/Routing/GatewayAddressCache.cs b/Microsoft.Azure.Cosmos/src/Routing/GatewayAddressCache.cs index 5c0719bf88..f47f1496b9 100644 --- a/Microsoft.Azure.Cosmos/src/Routing/GatewayAddressCache.cs +++ b/Microsoft.Azure.Cosmos/src/Routing/GatewayAddressCache.cs @@ -143,6 +143,7 @@ public async Task OpenConnectionsAsync( Paths.CollectionsPathSegment, Uri.EscapeUriString(collection.Id)); + using CancellationTokenSource linkedTokenSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken); using (DocumentServiceRequest request = DocumentServiceRequest.CreateFromName( OperationType.Read, collectionAltLink, @@ -157,12 +158,11 @@ public async Task OpenConnectionsAsync( collectionRid: collection.ResourceId, partitionKeyRangeIds: partitionKeyRangeIdentities.Skip(i).Take(batchSize).Select(range => range.PartitionKeyRangeId), containerProperties: collection, - shouldOpenRntbdChannels: shouldOpenRntbdChannels)); + shouldOpenRntbdChannels: shouldOpenRntbdChannels, + cancellationToken: linkedTokenSource.Token)); } } - using CancellationTokenSource linkedTokenSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken); - // The `timeoutTask` is a background task which adds a delay for a period of WarmupCacheAndOpenConnectionTimeout. The task will // be cancelled either by - a) when `linkedTokenSource` expires, which means the original `cancellationToken` expires or // b) the the `linkedTokenSource.Cancel()` is called. @@ -347,12 +347,14 @@ public async Task TryGetAddressesAsync( /// An instance of containing the list of partition key range ids. /// An instance of containing the collection properties. /// A boolean flag indicating whether Rntbd connections are required to be established to the backend replica nodes. + /// An instance of . private async Task WarmupCachesAndOpenConnectionsAsync( DocumentServiceRequest request, string collectionRid, IEnumerable partitionKeyRangeIds, ContainerProperties containerProperties, - bool shouldOpenRntbdChannels) + bool shouldOpenRntbdChannels, + CancellationToken cancellationToken) { TryCatch documentServiceResponseWrapper = await this.GetAddressesAsync( request: request, @@ -380,6 +382,11 @@ private async Task WarmupCachesAndOpenConnectionsAsync( List openConnectionTasks = new (); foreach (Tuple addressInfo in addressInfos) { + if (cancellationToken.IsCancellationRequested) + { + break; + } + this.serverPartitionAddressCache.Set( new PartitionKeyRangeIdentity(containerProperties.ResourceId, addressInfo.Item1.PartitionKeyRangeId), addressInfo.Item2); @@ -397,10 +404,7 @@ private async Task WarmupCachesAndOpenConnectionsAsync( } } - if (openConnectionTasks.Any()) - { - await Task.WhenAll(openConnectionTasks); - } + await Task.WhenAll(openConnectionTasks); } } catch (Exception ex) diff --git a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/ClientRetryPolicyTests.cs b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/ClientRetryPolicyTests.cs index a41f08afa1..0851952f40 100644 --- a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/ClientRetryPolicyTests.cs +++ b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/ClientRetryPolicyTests.cs @@ -209,7 +209,8 @@ private async Task ValidateConnectTimeoutTriggersClientRetryPolicy( enableReadRequestsFallback: false, useMultipleWriteLocations: useMultipleWriteLocations, detectClientConnectivityIssues: true, - disableRetryWithRetryPolicy: false); + disableRetryWithRetryPolicy: false, + enableReplicaValidation: false); // Reducing retry timeout to avoid long-running tests replicatedResourceClient.GoneAndRetryWithRetryTimeoutInSecondsOverride = 1; diff --git a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/CosmosClientOptionsUnitTests.cs b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/CosmosClientOptionsUnitTests.cs index efb06d7cc5..e6c5ea6f92 100644 --- a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/CosmosClientOptionsUnitTests.cs +++ b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/CosmosClientOptionsUnitTests.cs @@ -81,6 +81,7 @@ public void VerifyCosmosConfigurationPropertiesGetUpdated() Assert.IsNull(clientOptions.HttpClientFactory); Assert.AreNotEqual(consistencyLevel, clientOptions.ConsistencyLevel); Assert.IsFalse(clientOptions.EnablePartitionLevelFailover); + Assert.IsFalse(clientOptions.EnableAdvancedReplicaSelectionForTcp); //Verify GetConnectionPolicy returns the correct values for default ConnectionPolicy policy = clientOptions.GetConnectionPolicy(clientId: 0); @@ -97,6 +98,7 @@ public void VerifyCosmosConfigurationPropertiesGetUpdated() Assert.IsNull(policy.HttpClientFactory); Assert.AreNotEqual(Cosmos.ConsistencyLevel.Session, clientOptions.ConsistencyLevel); Assert.IsFalse(policy.EnablePartitionLevelFailover); + Assert.IsFalse(policy.EnableReplicaValidation); cosmosClientBuilder.WithApplicationRegion(region) .WithConnectionModeGateway(maxConnections, webProxy) @@ -108,7 +110,8 @@ public void VerifyCosmosConfigurationPropertiesGetUpdated() .WithBulkExecution(true) .WithSerializerOptions(cosmosSerializerOptions) .WithConsistencyLevel(consistencyLevel) - .WithPartitionLevelFailoverEnabled(); + .WithPartitionLevelFailoverEnabled() + .WithAdvancedReplicaSelectionEnabledForTcp(); cosmosClient = cosmosClientBuilder.Build(new MockDocumentClient()); clientOptions = cosmosClient.ClientOptions; @@ -131,6 +134,7 @@ public void VerifyCosmosConfigurationPropertiesGetUpdated() Assert.IsTrue(clientOptions.AllowBulkExecution); Assert.AreEqual(consistencyLevel, clientOptions.ConsistencyLevel); Assert.IsTrue(clientOptions.EnablePartitionLevelFailover); + Assert.IsTrue(clientOptions.EnableAdvancedReplicaSelectionForTcp); //Verify GetConnectionPolicy returns the correct values policy = clientOptions.GetConnectionPolicy(clientId: 0); @@ -145,7 +149,8 @@ public void VerifyCosmosConfigurationPropertiesGetUpdated() Assert.AreEqual((int)maxRetryWaitTime.TotalSeconds, policy.RetryOptions.MaxRetryWaitTimeInSeconds); Assert.AreEqual((Documents.ConsistencyLevel)consistencyLevel, clientOptions.GetDocumentsConsistencyLevel()); Assert.IsTrue(policy.EnablePartitionLevelFailover); - + Assert.IsTrue(policy.EnableReplicaValidation); + IReadOnlyList preferredLocations = new List() { Regions.AustraliaCentral, Regions.AustraliaCentral2 }; //Verify Direct Mode settings cosmosClientBuilder = new CosmosClientBuilder( diff --git a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/RequestEventHandlerTests.cs b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/RequestEventHandlerTests.cs index f5602a1f29..9a19fb4a90 100644 --- a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/RequestEventHandlerTests.cs +++ b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/RequestEventHandlerTests.cs @@ -82,7 +82,7 @@ private StoreClient GetMockStoreClient() TransportClient mockTransportClient = this.GetMockTransportClient(); ISessionContainer sessionContainer = new SessionContainer(string.Empty); - StoreReader storeReader = new StoreReader(mockTransportClient, addressSelector, new AddressEnumerator(), sessionContainer); + StoreReader storeReader = new StoreReader(mockTransportClient, addressSelector, new AddressEnumerator(), sessionContainer, false); Mock mockAuthorizationTokenProvider = new Mock(); mockAuthorizationTokenProvider.Setup(provider => provider.AddSystemAuthorizationHeaderAsync( diff --git a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/StoreReaderTest.cs b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/StoreReaderTest.cs index b92ca3c804..831e32041a 100644 --- a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/StoreReaderTest.cs +++ b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/StoreReaderTest.cs @@ -536,7 +536,8 @@ public void StoreReaderBarrierTest() new StoreReader(mockTransportClient, addressSelector, new AddressEnumerator(), - sessionContainer); + sessionContainer, + enableReplicaValidation: false); // reads always go to read quorum (2) replicas int replicaCountToRead = 2; @@ -611,14 +612,14 @@ public void GlobalStrongConsistentWriteMockTest() for (int i = 0; i < addressInformation.Length; i++) { TransportClient mockTransportClient = this.GetMockTransportClientForGlobalStrongWrites(addressInformation, i, false, false, false); - StoreReader storeReader = new StoreReader(mockTransportClient, addressSelector, new AddressEnumerator(), sessionContainer); - ConsistencyWriter consistencyWriter = new ConsistencyWriter(addressSelector, sessionContainer, mockTransportClient, mockServiceConfigReader.Object, mockAuthorizationTokenProvider.Object, false); + StoreReader storeReader = new StoreReader(mockTransportClient, addressSelector, new AddressEnumerator(), sessionContainer, false); + ConsistencyWriter consistencyWriter = new ConsistencyWriter(addressSelector, sessionContainer, mockTransportClient, mockServiceConfigReader.Object, mockAuthorizationTokenProvider.Object, false, false); StoreResponse response = consistencyWriter.WriteAsync(entity, new TimeoutHelper(TimeSpan.FromSeconds(30)), false).Result; Assert.AreEqual(100, response.LSN); //globalCommittedLsn never catches up in this case mockTransportClient = this.GetMockTransportClientForGlobalStrongWrites(addressInformation, i, true, false, false); - consistencyWriter = new ConsistencyWriter(addressSelector, sessionContainer, mockTransportClient, mockServiceConfigReader.Object, mockAuthorizationTokenProvider.Object, false); + consistencyWriter = new ConsistencyWriter(addressSelector, sessionContainer, mockTransportClient, mockServiceConfigReader.Object, mockAuthorizationTokenProvider.Object, false, false); try { response = consistencyWriter.WriteAsync(entity, new TimeoutHelper(TimeSpan.FromSeconds(30)), false).Result; @@ -629,17 +630,17 @@ public void GlobalStrongConsistentWriteMockTest() } mockTransportClient = this.GetMockTransportClientForGlobalStrongWrites(addressInformation, i, false, true, false); - consistencyWriter = new ConsistencyWriter(addressSelector, sessionContainer, mockTransportClient, mockServiceConfigReader.Object, mockAuthorizationTokenProvider.Object, false); + consistencyWriter = new ConsistencyWriter(addressSelector, sessionContainer, mockTransportClient, mockServiceConfigReader.Object, mockAuthorizationTokenProvider.Object, false, false); response = consistencyWriter.WriteAsync(entity, new TimeoutHelper(TimeSpan.FromSeconds(30)), false).Result; Assert.AreEqual(100, response.LSN); mockTransportClient = this.GetMockTransportClientForGlobalStrongWrites(addressInformation, i, false, true, true); - consistencyWriter = new ConsistencyWriter(addressSelector, sessionContainer, mockTransportClient, mockServiceConfigReader.Object, mockAuthorizationTokenProvider.Object, false); + consistencyWriter = new ConsistencyWriter(addressSelector, sessionContainer, mockTransportClient, mockServiceConfigReader.Object, mockAuthorizationTokenProvider.Object, false, false); response = consistencyWriter.WriteAsync(entity, new TimeoutHelper(TimeSpan.FromSeconds(30)), false).Result; Assert.AreEqual(100, response.LSN); mockTransportClient = this.GetMockTransportClientForGlobalStrongWrites(addressInformation, i, false, false, true); - consistencyWriter = new ConsistencyWriter(addressSelector, sessionContainer, mockTransportClient, mockServiceConfigReader.Object, mockAuthorizationTokenProvider.Object, false); + consistencyWriter = new ConsistencyWriter(addressSelector, sessionContainer, mockTransportClient, mockServiceConfigReader.Object, mockAuthorizationTokenProvider.Object, false, false); response = consistencyWriter.WriteAsync(entity, new TimeoutHelper(TimeSpan.FromSeconds(30)), false).Result; Assert.AreEqual(100, response.LSN); } @@ -703,7 +704,8 @@ public void GlobalStrongConsistencyMockTest() new StoreReader(mockTransportClient, addressSelector, new AddressEnumerator(), - sessionContainer); + sessionContainer, + false); Mock mockAuthorizationTokenProvider = new Mock(); mockAuthorizationTokenProvider.Setup(provider => provider.AddSystemAuthorizationHeaderAsync( @@ -746,7 +748,8 @@ public void GlobalStrongConsistencyMockTest() new StoreReader(mockTransportClient, addressSelector, new AddressEnumerator(), - sessionContainer); + sessionContainer, + false); Mock mockAuthorizationTokenProvider = new Mock(); mockAuthorizationTokenProvider.Setup(provider => provider.AddSystemAuthorizationHeaderAsync( @@ -798,7 +801,8 @@ public void GlobalStrongConsistencyMockTest() new StoreReader(mockTransportClient, addressSelector, new AddressEnumerator(), - sessionContainer); + sessionContainer, + false); Mock mockAuthorizationTokenProvider = new Mock(); mockAuthorizationTokenProvider.Setup(provider => provider.AddSystemAuthorizationHeaderAsync(