From 946bc4a9c7ae8eb90e773050eb5b7e78f23b9f0d Mon Sep 17 00:00:00 2001 From: Debdatta Kunda Date: Tue, 20 Jun 2023 16:02:04 -0700 Subject: [PATCH 1/7] Code changes to open connections in parallel. Added replica validation in contract. --- .../src/ConnectionPolicy.cs | 13 ++++++++++ .../src/CosmosClientOptions.cs | 13 ++++++++++ Microsoft.Azure.Cosmos/src/DocumentClient.cs | 3 ++- .../src/Fluent/CosmosClientBuilder.cs | 21 ++++++++++++++++ .../src/Routing/GatewayAddressCache.cs | 20 ++++++++++------ .../src/Routing/GlobalAddressResolver.cs | 5 +++- .../src/direct/ConsistencyReader.cs | 5 ++-- .../src/direct/ConsistencyWriter.cs | 6 +++-- .../src/direct/Constants.cs | 1 - .../src/direct/IStoreClientFactory.cs | 3 ++- .../src/direct/ReplicatedResourceClient.cs | 7 ++++-- .../src/direct/StoreClient.cs | 4 +++- .../src/direct/StoreClientFactory.cs | 9 ++++--- .../src/direct/StoreReader.cs | 7 +++--- .../ClientRetryPolicyTests.cs | 3 ++- .../RequestEventHandlerTests.cs | 2 +- .../StoreReaderTest.cs | 24 +++++++++++-------- 17 files changed, 109 insertions(+), 37 deletions(-) diff --git a/Microsoft.Azure.Cosmos/src/ConnectionPolicy.cs b/Microsoft.Azure.Cosmos/src/ConnectionPolicy.cs index 7abfd76deb..341de350d9 100644 --- a/Microsoft.Azure.Cosmos/src/ConnectionPolicy.cs +++ b/Microsoft.Azure.Cosmos/src/ConnectionPolicy.cs @@ -50,6 +50,7 @@ public ConnectionPolicy() this.EnableReadRequestsFallback = null; this.EnableClientTelemetry = ClientTelemetryOptions.IsClientTelemetryEnabled(); this.ServerCertificateCustomValidationCallback = null; + this.EnableReplicaValidation = false; } /// @@ -459,6 +460,18 @@ public Func HttpClientFactory set; } + /// + /// Gets or sets the flag to enable replica validation. + /// + /// + /// The default value is false + /// + public bool EnableReplicaValidation + { + get; + set; + } + /// /// (Direct/TCP) This is an advanced setting that controls the number of TCP connections that will be opened eagerly to each Cosmos DB back-end. /// diff --git a/Microsoft.Azure.Cosmos/src/CosmosClientOptions.cs b/Microsoft.Azure.Cosmos/src/CosmosClientOptions.cs index 1fce195915..436100dc72 100644 --- a/Microsoft.Azure.Cosmos/src/CosmosClientOptions.cs +++ b/Microsoft.Azure.Cosmos/src/CosmosClientOptions.cs @@ -346,6 +346,18 @@ public ConnectionMode ConnectionMode /// public bool? EnableContentResponseOnWrite { get; set; } + /// + /// Gets or sets the replica validation flag. + /// Enabling replica validation helps the cosmos client to become more + /// resilient to service upgrades by choosing a healthy replica over the + /// one undergoing an upgrade. The default value for this parameter is false. + /// + /// + /// This is optimal for workloads where latency spikes are critical during upgrades. + /// + /// + public bool EnableReplicaValidation { get; set; } = false; + /// /// (Direct/TCP) Controls the amount of idle time after which unused connections are closed. /// @@ -758,6 +770,7 @@ internal virtual ConnectionPolicy GetConnectionPolicy(int clientId) EnablePartitionLevelFailover = this.EnablePartitionLevelFailover, PortReuseMode = this.portReuseMode, EnableTcpConnectionEndpointRediscovery = this.EnableTcpConnectionEndpointRediscovery, + EnableReplicaValidation = this.EnableReplicaValidation, HttpClientFactory = this.httpClientFactory, ServerCertificateCustomValidationCallback = this.ServerCertificateCustomValidationCallback }; diff --git a/Microsoft.Azure.Cosmos/src/DocumentClient.cs b/Microsoft.Azure.Cosmos/src/DocumentClient.cs index 068bb5b857..015959ba50 100644 --- a/Microsoft.Azure.Cosmos/src/DocumentClient.cs +++ b/Microsoft.Azure.Cosmos/src/DocumentClient.cs @@ -6700,7 +6700,8 @@ private void CreateStoreModel(bool subscribeRntbdStatus) this.ConnectionPolicy.EnableReadRequestsFallback ?? (this.accountServiceConfiguration.DefaultConsistencyLevel != Documents.ConsistencyLevel.BoundedStaleness), !this.enableRntbdChannel, this.UseMultipleWriteLocations && (this.accountServiceConfiguration.DefaultConsistencyLevel != Documents.ConsistencyLevel.Strong), - true); + true, + this.ConnectionPolicy.EnableReplicaValidation); if (subscribeRntbdStatus) { diff --git a/Microsoft.Azure.Cosmos/src/Fluent/CosmosClientBuilder.cs b/Microsoft.Azure.Cosmos/src/Fluent/CosmosClientBuilder.cs index 8b72bfffa4..c7c05aae68 100644 --- a/Microsoft.Azure.Cosmos/src/Fluent/CosmosClientBuilder.cs +++ b/Microsoft.Azure.Cosmos/src/Fluent/CosmosClientBuilder.cs @@ -621,6 +621,27 @@ public CosmosClientBuilder WithContentResponseOnWrite(bool contentResponseOnWrit return this; } + /// + /// Gets or sets the boolean to only return the headers and status code in + /// the Cosmos DB response for write item operation like Create, Upsert, Patch and Replace. + /// Setting the option to false will cause the response to have a null resource. This reduces networking and CPU load by not sending + /// the resource back over the network and serializing it on the client. + /// + /// a boolean indicating whether payload will be included in the response or not. + /// + /// + /// This option can be overriden by similar property in ItemRequestOptions and TransactionalBatchItemRequestOptions + /// + /// + /// The object + /// + /// + public CosmosClientBuilder WithReplicaValidation(bool replicaValidationEnabled) + { + this.clientOptions.EnableReplicaValidation = replicaValidationEnabled; + return this; + } + /// /// The event handler to be invoked before the request is sent. /// diff --git a/Microsoft.Azure.Cosmos/src/Routing/GatewayAddressCache.cs b/Microsoft.Azure.Cosmos/src/Routing/GatewayAddressCache.cs index 14b641959a..068e79a679 100644 --- a/Microsoft.Azure.Cosmos/src/Routing/GatewayAddressCache.cs +++ b/Microsoft.Azure.Cosmos/src/Routing/GatewayAddressCache.cs @@ -61,7 +61,8 @@ public GatewayAddressCache( CosmosHttpClient httpClient, IOpenConnectionsHandler openConnectionsHandler, long suboptimalPartitionForceRefreshIntervalInSeconds = 600, - bool enableTcpConnectionEndpointRediscovery = false) + bool enableTcpConnectionEndpointRediscovery = false, + bool replicaAddressValidationEnabled = false) { this.addressEndpoint = new Uri(serviceEndpoint + "/" + Paths.AddressPathSegment); this.protocol = protocol; @@ -85,9 +86,7 @@ public GatewayAddressCache( GatewayAddressCache.ProtocolString(this.protocol)); this.openConnectionsHandler = openConnectionsHandler; - this.isReplicaAddressValidationEnabled = Helpers.GetEnvironmentVariable( - name: Constants.EnvironmentVariables.ReplicaConnectivityValidationEnabled, - defaultValue: false); + this.isReplicaAddressValidationEnabled = replicaAddressValidationEnabled; } public Uri ServiceEndpoint => this.serviceEndpoint; @@ -173,6 +172,7 @@ public async Task OpenConnectionsAsync( .GroupBy(address => address.PartitionKeyRangeId, StringComparer.Ordinal) .Select(group => this.ToPartitionAddressAndRange(collection.ResourceId, @group.ToList(), inNetworkRequest)); + List openConnectionTasks = new (); foreach (Tuple addressInfo in addressInfos) { this.serverPartitionAddressCache.Set( @@ -185,11 +185,17 @@ public async Task OpenConnectionsAsync( // other flow, the flag should be passed as `false`. if (this.openConnectionsHandler != null && shouldOpenRntbdChannels) { - await this.openConnectionsHandler - .TryOpenRntbdChannelsAsync( - addresses: addressInfo.Item2.Get(Protocol.Tcp)?.ReplicaTransportAddressUris); + openConnectionTasks + .Add(this.openConnectionsHandler + .TryOpenRntbdChannelsAsync( + addresses: addressInfo.Item2.Get(Protocol.Tcp)?.ReplicaTransportAddressUris)); } } + + if (openConnectionTasks.Any()) + { + await Task.WhenAll(openConnectionTasks); + } } } } diff --git a/Microsoft.Azure.Cosmos/src/Routing/GlobalAddressResolver.cs b/Microsoft.Azure.Cosmos/src/Routing/GlobalAddressResolver.cs index 344f994395..4e4118965b 100644 --- a/Microsoft.Azure.Cosmos/src/Routing/GlobalAddressResolver.cs +++ b/Microsoft.Azure.Cosmos/src/Routing/GlobalAddressResolver.cs @@ -39,6 +39,7 @@ internal sealed class GlobalAddressResolver : IAddressResolverExtension, IDispos private readonly CosmosHttpClient httpClient; private readonly ConcurrentDictionary addressCacheByEndpoint; private readonly bool enableTcpConnectionEndpointRediscovery; + private readonly bool replicaAddressValidationEnabled; private IOpenConnectionsHandler openConnectionsHandler; public GlobalAddressResolver( @@ -66,6 +67,7 @@ public GlobalAddressResolver( ? GlobalAddressResolver.MaxBackupReadRegions : 0; this.enableTcpConnectionEndpointRediscovery = connectionPolicy.EnableTcpConnectionEndpointRediscovery; + this.replicaAddressValidationEnabled = connectionPolicy.EnableReplicaValidation; this.maxEndpoints = maxBackupReadEndpoints + 2; // for write and alternate write endpoint (during failover) @@ -281,7 +283,8 @@ private EndpointCache GetOrAddEndpoint(Uri endpoint) this.serviceConfigReader, this.httpClient, this.openConnectionsHandler, - enableTcpConnectionEndpointRediscovery: this.enableTcpConnectionEndpointRediscovery); + enableTcpConnectionEndpointRediscovery: this.enableTcpConnectionEndpointRediscovery, + replicaAddressValidationEnabled: this.replicaAddressValidationEnabled); string location = this.endpointManager.GetLocation(endpoint); AddressResolver addressResolver = new AddressResolver(null, new NullRequestSigner(), location); diff --git a/Microsoft.Azure.Cosmos/src/direct/ConsistencyReader.cs b/Microsoft.Azure.Cosmos/src/direct/ConsistencyReader.cs index fd92ee456c..e9db63f927 100644 --- a/Microsoft.Azure.Cosmos/src/direct/ConsistencyReader.cs +++ b/Microsoft.Azure.Cosmos/src/direct/ConsistencyReader.cs @@ -144,12 +144,13 @@ public ConsistencyReader( ISessionContainer sessionContainer, TransportClient transportClient, IServiceConfigurationReader serviceConfigReader, - IAuthorizationTokenProvider authorizationTokenProvider) + IAuthorizationTokenProvider authorizationTokenProvider, + bool enableReplicaValidation) { this.addressSelector = addressSelector; this.serviceConfigReader = serviceConfigReader; this.authorizationTokenProvider = authorizationTokenProvider; - this.storeReader = new StoreReader(transportClient, addressSelector, new AddressEnumerator(), sessionContainer); + this.storeReader = new StoreReader(transportClient, addressSelector, new AddressEnumerator(), sessionContainer, enableReplicaValidation); this.quorumReader = new QuorumReader(transportClient, addressSelector, this.storeReader, serviceConfigReader, authorizationTokenProvider); } diff --git a/Microsoft.Azure.Cosmos/src/direct/ConsistencyWriter.cs b/Microsoft.Azure.Cosmos/src/direct/ConsistencyWriter.cs index 2f10eafe82..e614b51133 100644 --- a/Microsoft.Azure.Cosmos/src/direct/ConsistencyWriter.cs +++ b/Microsoft.Azure.Cosmos/src/direct/ConsistencyWriter.cs @@ -59,7 +59,8 @@ public ConsistencyWriter( TransportClient transportClient, IServiceConfigurationReader serviceConfigReader, IAuthorizationTokenProvider authorizationTokenProvider, - bool useMultipleWriteLocations) + bool useMultipleWriteLocations, + bool enableReplicaValidation) { this.transportClient = transportClient; this.addressSelector = addressSelector; @@ -71,7 +72,8 @@ public ConsistencyWriter( transportClient, addressSelector, new AddressEnumerator(), - sessionContainer: null); //we need store reader only for global strong, no session is needed*/ + sessionContainer: null, + enableReplicaValidation); //we need store reader only for global strong, no session is needed*/ } // Test hook diff --git a/Microsoft.Azure.Cosmos/src/direct/Constants.cs b/Microsoft.Azure.Cosmos/src/direct/Constants.cs index 68a5c15afe..5e39d91a09 100644 --- a/Microsoft.Azure.Cosmos/src/direct/Constants.cs +++ b/Microsoft.Azure.Cosmos/src/direct/Constants.cs @@ -2134,7 +2134,6 @@ public static class EnvironmentVariables { public const string SocketOptionTcpKeepAliveIntervalName = "AZURE_COSMOS_TCP_KEEPALIVE_INTERVAL_SECONDS"; public const string SocketOptionTcpKeepAliveTimeName = "AZURE_COSMOS_TCP_KEEPALIVE_TIME_SECONDS"; - public const string ReplicaConnectivityValidationEnabled = "AZURE_COSMOS_REPLICA_VALIDATION_ENABLED"; public const string AggressiveTimeoutDetectionEnabled = "AZURE_COSMOS_AGGRESSIVE_TIMEOUT_DETECTION_ENABLED"; public const string TimeoutDetectionTimeLimit = "AZURE_COSMOS_TIMEOUT_DETECTION_TIME_LIMIT_IN_SECONDS"; public const string TimeoutDetectionOnWriteThreshold = "AZURE_COSMOS_TIMEOUT_DETECTION_ON_WRITE_THRESHOLD"; diff --git a/Microsoft.Azure.Cosmos/src/direct/IStoreClientFactory.cs b/Microsoft.Azure.Cosmos/src/direct/IStoreClientFactory.cs index 8dd2180d12..0ccb0f525a 100644 --- a/Microsoft.Azure.Cosmos/src/direct/IStoreClientFactory.cs +++ b/Microsoft.Azure.Cosmos/src/direct/IStoreClientFactory.cs @@ -17,6 +17,7 @@ StoreClient CreateStoreClient( bool enableReadRequestsFallback = false, bool useFallbackClient = true, bool useMultipleWriteLocations = false, - bool detectClientConnectivityIssues = false); + bool detectClientConnectivityIssues = false, + bool enableReplicaValidation = false); } } diff --git a/Microsoft.Azure.Cosmos/src/direct/ReplicatedResourceClient.cs b/Microsoft.Azure.Cosmos/src/direct/ReplicatedResourceClient.cs index f3b17e7fc6..1489231323 100644 --- a/Microsoft.Azure.Cosmos/src/direct/ReplicatedResourceClient.cs +++ b/Microsoft.Azure.Cosmos/src/direct/ReplicatedResourceClient.cs @@ -68,6 +68,7 @@ public ReplicatedResourceClient( bool useMultipleWriteLocations, bool detectClientConnectivityIssues, bool disableRetryWithRetryPolicy, + bool enableReplicaValidation, RetryWithConfiguration retryWithConfiguration = null) { this.addressResolver = addressResolver; @@ -86,14 +87,16 @@ public ReplicatedResourceClient( sessionContainer, transportClient, serviceConfigReader, - authorizationTokenProvider); + authorizationTokenProvider, + enableReplicaValidation); this.consistencyWriter = new ConsistencyWriter( this.addressSelector, sessionContainer, transportClient, serviceConfigReader, authorizationTokenProvider, - useMultipleWriteLocations); + useMultipleWriteLocations, + enableReplicaValidation); this.enableReadRequestsFallback = enableReadRequestsFallback; this.useMultipleWriteLocations = useMultipleWriteLocations; this.detectClientConnectivityIssues = detectClientConnectivityIssues; diff --git a/Microsoft.Azure.Cosmos/src/direct/StoreClient.cs b/Microsoft.Azure.Cosmos/src/direct/StoreClient.cs index fb390a314c..6610ebe338 100644 --- a/Microsoft.Azure.Cosmos/src/direct/StoreClient.cs +++ b/Microsoft.Azure.Cosmos/src/direct/StoreClient.cs @@ -41,6 +41,7 @@ public StoreClient( bool useMultipleWriteLocations = false, bool detectClientConnectivityIssues = false, bool disableRetryWithRetryPolicy = false, + bool enableReplicaValidation = false, RetryWithConfiguration retryWithConfiguration = null) { this.transportClient = transportClient; @@ -68,7 +69,8 @@ public StoreClient( useMultipleWriteLocations: useMultipleWriteLocations, detectClientConnectivityIssues: detectClientConnectivityIssues, disableRetryWithRetryPolicy: disableRetryWithRetryPolicy, - retryWithConfiguration: retryWithConfiguration); + retryWithConfiguration: retryWithConfiguration, + enableReplicaValidation: enableReplicaValidation); } internal JsonSerializerSettings SerializerSettings { get; set; } diff --git a/Microsoft.Azure.Cosmos/src/direct/StoreClientFactory.cs b/Microsoft.Azure.Cosmos/src/direct/StoreClientFactory.cs index ae790bb843..dc206ad642 100644 --- a/Microsoft.Azure.Cosmos/src/direct/StoreClientFactory.cs +++ b/Microsoft.Azure.Cosmos/src/direct/StoreClientFactory.cs @@ -308,7 +308,8 @@ public StoreClient CreateStoreClient( bool enableReadRequestsFallback = false, bool useFallbackClient = true, bool useMultipleWriteLocations = false, - bool detectClientConnectivityIssues = false) + bool detectClientConnectivityIssues = false, + bool enableReplicaValidation = false) { this.ThrowIfDisposed(); if (useFallbackClient && this.fallbackTransportClient != null) @@ -326,7 +327,8 @@ public StoreClient CreateStoreClient( useMultipleWriteLocations: useMultipleWriteLocations, detectClientConnectivityIssues: detectClientConnectivityIssues, disableRetryWithRetryPolicy: this.disableRetryWithRetryPolicy, - retryWithConfiguration: this.retryWithConfiguration); + retryWithConfiguration: this.retryWithConfiguration, + enableReplicaValidation: enableReplicaValidation); } return new StoreClient( @@ -341,7 +343,8 @@ public StoreClient CreateStoreClient( useMultipleWriteLocations: useMultipleWriteLocations, detectClientConnectivityIssues: detectClientConnectivityIssues, disableRetryWithRetryPolicy: this.disableRetryWithRetryPolicy, - retryWithConfiguration: this.retryWithConfiguration); + retryWithConfiguration: this.retryWithConfiguration, + enableReplicaValidation: enableReplicaValidation); } #region IDisposable diff --git a/Microsoft.Azure.Cosmos/src/direct/StoreReader.cs b/Microsoft.Azure.Cosmos/src/direct/StoreReader.cs index 6ef9b0a4c9..962be405c8 100644 --- a/Microsoft.Azure.Cosmos/src/direct/StoreReader.cs +++ b/Microsoft.Azure.Cosmos/src/direct/StoreReader.cs @@ -25,16 +25,15 @@ public StoreReader( TransportClient transportClient, AddressSelector addressSelector, IAddressEnumerator addressEnumerator, - ISessionContainer sessionContainer) + ISessionContainer sessionContainer, + bool enableReplicaValidation) { this.transportClient = transportClient; this.addressSelector = addressSelector; this.addressEnumerator = addressEnumerator ?? throw new ArgumentNullException(nameof(addressEnumerator)); this.sessionContainer = sessionContainer; this.canUseLocalLSNBasedHeaders = VersionUtility.IsLaterThan(HttpConstants.Versions.CurrentVersion, HttpConstants.Versions.v2018_06_18); - this.isReplicaAddressValidationEnabled = Helpers.GetEnvironmentVariable( - name: Constants.EnvironmentVariables.ReplicaConnectivityValidationEnabled, - defaultValue: false); + this.isReplicaAddressValidationEnabled = enableReplicaValidation; } // Test hook diff --git a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/ClientRetryPolicyTests.cs b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/ClientRetryPolicyTests.cs index a41f08afa1..0851952f40 100644 --- a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/ClientRetryPolicyTests.cs +++ b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/ClientRetryPolicyTests.cs @@ -209,7 +209,8 @@ private async Task ValidateConnectTimeoutTriggersClientRetryPolicy( enableReadRequestsFallback: false, useMultipleWriteLocations: useMultipleWriteLocations, detectClientConnectivityIssues: true, - disableRetryWithRetryPolicy: false); + disableRetryWithRetryPolicy: false, + enableReplicaValidation: false); // Reducing retry timeout to avoid long-running tests replicatedResourceClient.GoneAndRetryWithRetryTimeoutInSecondsOverride = 1; diff --git a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/RequestEventHandlerTests.cs b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/RequestEventHandlerTests.cs index f5602a1f29..9a19fb4a90 100644 --- a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/RequestEventHandlerTests.cs +++ b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/RequestEventHandlerTests.cs @@ -82,7 +82,7 @@ private StoreClient GetMockStoreClient() TransportClient mockTransportClient = this.GetMockTransportClient(); ISessionContainer sessionContainer = new SessionContainer(string.Empty); - StoreReader storeReader = new StoreReader(mockTransportClient, addressSelector, new AddressEnumerator(), sessionContainer); + StoreReader storeReader = new StoreReader(mockTransportClient, addressSelector, new AddressEnumerator(), sessionContainer, false); Mock mockAuthorizationTokenProvider = new Mock(); mockAuthorizationTokenProvider.Setup(provider => provider.AddSystemAuthorizationHeaderAsync( diff --git a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/StoreReaderTest.cs b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/StoreReaderTest.cs index 609817f3de..ac9d3f95f2 100644 --- a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/StoreReaderTest.cs +++ b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/StoreReaderTest.cs @@ -536,7 +536,8 @@ public void StoreReaderBarrierTest() new StoreReader(mockTransportClient, addressSelector, new AddressEnumerator(), - sessionContainer); + sessionContainer, + enableReplicaValidation: false); // reads always go to read quorum (2) replicas int replicaCountToRead = 2; @@ -611,14 +612,14 @@ public void GlobalStrongConsistentWriteMockTest() for (int i = 0; i < addressInformation.Length; i++) { TransportClient mockTransportClient = this.GetMockTransportClientForGlobalStrongWrites(addressInformation, i, false, false, false); - StoreReader storeReader = new StoreReader(mockTransportClient, addressSelector, new AddressEnumerator(), sessionContainer); - ConsistencyWriter consistencyWriter = new ConsistencyWriter(addressSelector, sessionContainer, mockTransportClient, mockServiceConfigReader.Object, mockAuthorizationTokenProvider.Object, false); + StoreReader storeReader = new StoreReader(mockTransportClient, addressSelector, new AddressEnumerator(), sessionContainer, false); + ConsistencyWriter consistencyWriter = new ConsistencyWriter(addressSelector, sessionContainer, mockTransportClient, mockServiceConfigReader.Object, mockAuthorizationTokenProvider.Object, false, false); StoreResponse response = consistencyWriter.WriteAsync(entity, new TimeoutHelper(TimeSpan.FromSeconds(30)), false).Result; Assert.AreEqual(100, response.LSN); //globalCommittedLsn never catches up in this case mockTransportClient = this.GetMockTransportClientForGlobalStrongWrites(addressInformation, i, true, false, false); - consistencyWriter = new ConsistencyWriter(addressSelector, sessionContainer, mockTransportClient, mockServiceConfigReader.Object, mockAuthorizationTokenProvider.Object, false); + consistencyWriter = new ConsistencyWriter(addressSelector, sessionContainer, mockTransportClient, mockServiceConfigReader.Object, mockAuthorizationTokenProvider.Object, false, false); try { response = consistencyWriter.WriteAsync(entity, new TimeoutHelper(TimeSpan.FromSeconds(30)), false).Result; @@ -629,17 +630,17 @@ public void GlobalStrongConsistentWriteMockTest() } mockTransportClient = this.GetMockTransportClientForGlobalStrongWrites(addressInformation, i, false, true, false); - consistencyWriter = new ConsistencyWriter(addressSelector, sessionContainer, mockTransportClient, mockServiceConfigReader.Object, mockAuthorizationTokenProvider.Object, false); + consistencyWriter = new ConsistencyWriter(addressSelector, sessionContainer, mockTransportClient, mockServiceConfigReader.Object, mockAuthorizationTokenProvider.Object, false, false); response = consistencyWriter.WriteAsync(entity, new TimeoutHelper(TimeSpan.FromSeconds(30)), false).Result; Assert.AreEqual(100, response.LSN); mockTransportClient = this.GetMockTransportClientForGlobalStrongWrites(addressInformation, i, false, true, true); - consistencyWriter = new ConsistencyWriter(addressSelector, sessionContainer, mockTransportClient, mockServiceConfigReader.Object, mockAuthorizationTokenProvider.Object, false); + consistencyWriter = new ConsistencyWriter(addressSelector, sessionContainer, mockTransportClient, mockServiceConfigReader.Object, mockAuthorizationTokenProvider.Object, false, false); response = consistencyWriter.WriteAsync(entity, new TimeoutHelper(TimeSpan.FromSeconds(30)), false).Result; Assert.AreEqual(100, response.LSN); mockTransportClient = this.GetMockTransportClientForGlobalStrongWrites(addressInformation, i, false, false, true); - consistencyWriter = new ConsistencyWriter(addressSelector, sessionContainer, mockTransportClient, mockServiceConfigReader.Object, mockAuthorizationTokenProvider.Object, false); + consistencyWriter = new ConsistencyWriter(addressSelector, sessionContainer, mockTransportClient, mockServiceConfigReader.Object, mockAuthorizationTokenProvider.Object, false, false); response = consistencyWriter.WriteAsync(entity, new TimeoutHelper(TimeSpan.FromSeconds(30)), false).Result; Assert.AreEqual(100, response.LSN); } @@ -703,7 +704,8 @@ public void GlobalStrongConsistencyMockTest() new StoreReader(mockTransportClient, addressSelector, new AddressEnumerator(), - sessionContainer); + sessionContainer, + false); Mock mockAuthorizationTokenProvider = new Mock(); mockAuthorizationTokenProvider.Setup(provider => provider.AddSystemAuthorizationHeaderAsync( @@ -746,7 +748,8 @@ public void GlobalStrongConsistencyMockTest() new StoreReader(mockTransportClient, addressSelector, new AddressEnumerator(), - sessionContainer); + sessionContainer, + false); Mock mockAuthorizationTokenProvider = new Mock(); mockAuthorizationTokenProvider.Setup(provider => provider.AddSystemAuthorizationHeaderAsync( @@ -798,7 +801,8 @@ public void GlobalStrongConsistencyMockTest() new StoreReader(mockTransportClient, addressSelector, new AddressEnumerator(), - sessionContainer); + sessionContainer, + false); Mock mockAuthorizationTokenProvider = new Mock(); mockAuthorizationTokenProvider.Setup(provider => provider.AddSystemAuthorizationHeaderAsync( From ad5f6f01973c21dd9e1372629b57b36b03152304 Mon Sep 17 00:00:00 2001 From: Debdatta Kunda Date: Wed, 21 Jun 2023 12:13:55 -0700 Subject: [PATCH 2/7] Code changes to add global concurrency control from Gateway Address Cache. --- .../src/Routing/GatewayAddressCache.cs | 27 +++++++++++++-- .../src/direct/IOpenConnectionsHandler.cs | 8 ++++- .../src/direct/RntbdOpenConnectionHandler.cs | 33 +++++-------------- .../GatewayAddressCacheTests.cs | 4 ++- 4 files changed, 44 insertions(+), 28 deletions(-) diff --git a/Microsoft.Azure.Cosmos/src/Routing/GatewayAddressCache.cs b/Microsoft.Azure.Cosmos/src/Routing/GatewayAddressCache.cs index 068e79a679..ea57dd6606 100644 --- a/Microsoft.Azure.Cosmos/src/Routing/GatewayAddressCache.cs +++ b/Microsoft.Azure.Cosmos/src/Routing/GatewayAddressCache.cs @@ -48,6 +48,18 @@ internal class GatewayAddressCache : IAddressCache, IDisposable private readonly CosmosHttpClient httpClient; private readonly bool isReplicaAddressValidationEnabled; + /// + /// A read-only instance of for + /// concurrency control. + /// + private readonly SemaphoreSlim semaphore; + + /// + /// A read-only TimeSpan indicating the semephore timeout in minutes. + /// The default timeout is 10 minutes. + /// + private static readonly TimeSpan SemaphoreAcquireTimeout = TimeSpan.FromMinutes(10); + private Tuple masterPartitionAddressCache; private DateTime suboptimalMasterPartitionTimestamp; private bool disposedValue; @@ -85,6 +97,12 @@ public GatewayAddressCache( Constants.Properties.Protocol, GatewayAddressCache.ProtocolString(this.protocol)); + // The semaphore arguments `initialCount` and `maxCount` are set to match the number of cpu cores, to keep the + // implementation similar to the Java counterpart. + this.semaphore = new SemaphoreSlim( + initialCount: Environment.ProcessorCount, + maxCount: Environment.ProcessorCount); + this.openConnectionsHandler = openConnectionsHandler; this.isReplicaAddressValidationEnabled = replicaAddressValidationEnabled; } @@ -188,7 +206,9 @@ public async Task OpenConnectionsAsync( openConnectionTasks .Add(this.openConnectionsHandler .TryOpenRntbdChannelsAsync( - addresses: addressInfo.Item2.Get(Protocol.Tcp)?.ReplicaTransportAddressUris)); + addresses: addressInfo.Item2.Get(Protocol.Tcp)?.ReplicaTransportAddressUris, + semaphore: this.semaphore, + semaphoreAcquireTimeout: GatewayAddressCache.SemaphoreAcquireTimeout)); } } @@ -872,7 +892,9 @@ private void ValidateReplicaAddresses( if (addressesNeedToValidateStatus.Any()) { Task openConnectionsInBackgroundTask = Task.Run(async () => await this.openConnectionsHandler.TryOpenRntbdChannelsAsync( - addresses: addressesNeedToValidateStatus)); + addresses: addressesNeedToValidateStatus, + semaphore: this.semaphore, + semaphoreAcquireTimeout: GatewayAddressCache.SemaphoreAcquireTimeout)); } } @@ -975,6 +997,7 @@ protected virtual void Dispose(bool disposing) if (disposing) { this.serverPartitionAddressCache?.Dispose(); + this.semaphore.Dispose(); } this.disposedValue = true; diff --git a/Microsoft.Azure.Cosmos/src/direct/IOpenConnectionsHandler.cs b/Microsoft.Azure.Cosmos/src/direct/IOpenConnectionsHandler.cs index 85d72edc49..38652fc020 100644 --- a/Microsoft.Azure.Cosmos/src/direct/IOpenConnectionsHandler.cs +++ b/Microsoft.Azure.Cosmos/src/direct/IOpenConnectionsHandler.cs @@ -4,7 +4,9 @@ namespace Microsoft.Azure.Documents { + using System; using System.Collections.Generic; + using System.Threading; using System.Threading.Tasks; /// @@ -18,7 +20,11 @@ internal interface IOpenConnectionsHandler /// /// An enumerable of /// containing the backend replica addresses. + /// The semaphore. + /// The semaphore connection timeout. Task TryOpenRntbdChannelsAsync( - IEnumerable addresses); + IEnumerable addresses, + SemaphoreSlim semaphore, + TimeSpan semaphoreAcquireTimeout); } } diff --git a/Microsoft.Azure.Cosmos/src/direct/RntbdOpenConnectionHandler.cs b/Microsoft.Azure.Cosmos/src/direct/RntbdOpenConnectionHandler.cs index 84a3b1ee97..ae596d481e 100644 --- a/Microsoft.Azure.Cosmos/src/direct/RntbdOpenConnectionHandler.cs +++ b/Microsoft.Azure.Cosmos/src/direct/RntbdOpenConnectionHandler.cs @@ -25,18 +25,6 @@ internal sealed class RntbdOpenConnectionHandler : IOpenConnectionsHandler, IDis /// private readonly TransportClient transportClient; - /// - /// A read-only instance of for - /// concurrency control. - /// - private readonly SemaphoreSlim semaphore; - - /// - /// A read-only TimeSpan indicating the semephore timeout in minutes. - /// The default timeout is 10 minutes. - /// - private static readonly TimeSpan SemaphoreAcquireTimeout = TimeSpan.FromMinutes(10); - /// /// A booolean flag indicating if the current instance of RntbdOpenConnectionHandler /// has been disposed. @@ -52,16 +40,13 @@ public RntbdOpenConnectionHandler( { this.disposed = false; this.transportClient = transportClient ?? throw new ArgumentNullException(nameof(transportClient), $"Argument {nameof(transportClient)} can not be null"); - - // The semaphore arguments `initialCount` and `maxCount` are set to match the number of cpu cores, to keep the - // implementation similar to the Java counterpart. - this.semaphore = new SemaphoreSlim( - initialCount: Environment.ProcessorCount, - maxCount: Environment.ProcessorCount); } /// - public async Task TryOpenRntbdChannelsAsync(IEnumerable addresses) + public async Task TryOpenRntbdChannelsAsync( + IEnumerable addresses, + SemaphoreSlim semaphore, + TimeSpan semaphoreAcquireTimeout) { foreach (TransportAddressUri address in addresses) { @@ -71,8 +56,8 @@ public async Task TryOpenRntbdChannelsAsync(IEnumerable add Trace.CorrelationManager.ActivityId); try { - slimAcquired = await this.semaphore - .WaitAsync(RntbdOpenConnectionHandler.SemaphoreAcquireTimeout) + slimAcquired = await semaphore + .WaitAsync(semaphoreAcquireTimeout) .ConfigureAwait(false); if (slimAcquired) @@ -86,7 +71,7 @@ await this.transportClient.OpenConnectionAsync( DefaultTrace.TraceWarning("Failed to open Rntbd connection to backend uri: {0} because" + "the semaphore couldn't be acquired within the given timeout: {1} minutes. '{2}'", address.Uri, - RntbdOpenConnectionHandler.SemaphoreAcquireTimeout.TotalMinutes, + semaphoreAcquireTimeout.TotalMinutes, Trace.CorrelationManager.ActivityId); } } @@ -102,7 +87,7 @@ await this.transportClient.OpenConnectionAsync( { if (slimAcquired) { - this.semaphore.Release(); + semaphore.Release(); } } } @@ -113,7 +98,7 @@ public void Dispose() { if (!this.disposed) { - this.semaphore.Dispose(); + this.transportClient.Dispose(); this.disposed = true; } else diff --git a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/GatewayAddressCacheTests.cs b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/GatewayAddressCacheTests.cs index 5116eb22de..7962573baa 100644 --- a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/GatewayAddressCacheTests.cs +++ b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/GatewayAddressCacheTests.cs @@ -1555,7 +1555,9 @@ public int GetTotalMethodInvocationCount() } Task IOpenConnectionsHandler.TryOpenRntbdChannelsAsync( - IEnumerable addresses) + IEnumerable addresses, + SemaphoreSlim semaphore, + TimeSpan semaphoreAcquireTimeout) { int idx = 0; this.totalReceivedAddressesCounter += addresses.Count(); From b813526cc9d4483fc9955e305ab7f57f69da1442 Mon Sep 17 00:00:00 2001 From: Debdatta Kunda Date: Wed, 21 Jun 2023 14:08:22 -0700 Subject: [PATCH 3/7] Revert "Code changes to add global concurrency control from Gateway Address Cache." This reverts commit ad5f6f01973c21dd9e1372629b57b36b03152304. --- .../src/Routing/GatewayAddressCache.cs | 27 ++------------- .../src/direct/IOpenConnectionsHandler.cs | 8 +---- .../src/direct/RntbdOpenConnectionHandler.cs | 33 ++++++++++++++----- .../GatewayAddressCacheTests.cs | 4 +-- 4 files changed, 28 insertions(+), 44 deletions(-) diff --git a/Microsoft.Azure.Cosmos/src/Routing/GatewayAddressCache.cs b/Microsoft.Azure.Cosmos/src/Routing/GatewayAddressCache.cs index ea57dd6606..068e79a679 100644 --- a/Microsoft.Azure.Cosmos/src/Routing/GatewayAddressCache.cs +++ b/Microsoft.Azure.Cosmos/src/Routing/GatewayAddressCache.cs @@ -48,18 +48,6 @@ internal class GatewayAddressCache : IAddressCache, IDisposable private readonly CosmosHttpClient httpClient; private readonly bool isReplicaAddressValidationEnabled; - /// - /// A read-only instance of for - /// concurrency control. - /// - private readonly SemaphoreSlim semaphore; - - /// - /// A read-only TimeSpan indicating the semephore timeout in minutes. - /// The default timeout is 10 minutes. - /// - private static readonly TimeSpan SemaphoreAcquireTimeout = TimeSpan.FromMinutes(10); - private Tuple masterPartitionAddressCache; private DateTime suboptimalMasterPartitionTimestamp; private bool disposedValue; @@ -97,12 +85,6 @@ public GatewayAddressCache( Constants.Properties.Protocol, GatewayAddressCache.ProtocolString(this.protocol)); - // The semaphore arguments `initialCount` and `maxCount` are set to match the number of cpu cores, to keep the - // implementation similar to the Java counterpart. - this.semaphore = new SemaphoreSlim( - initialCount: Environment.ProcessorCount, - maxCount: Environment.ProcessorCount); - this.openConnectionsHandler = openConnectionsHandler; this.isReplicaAddressValidationEnabled = replicaAddressValidationEnabled; } @@ -206,9 +188,7 @@ public async Task OpenConnectionsAsync( openConnectionTasks .Add(this.openConnectionsHandler .TryOpenRntbdChannelsAsync( - addresses: addressInfo.Item2.Get(Protocol.Tcp)?.ReplicaTransportAddressUris, - semaphore: this.semaphore, - semaphoreAcquireTimeout: GatewayAddressCache.SemaphoreAcquireTimeout)); + addresses: addressInfo.Item2.Get(Protocol.Tcp)?.ReplicaTransportAddressUris)); } } @@ -892,9 +872,7 @@ private void ValidateReplicaAddresses( if (addressesNeedToValidateStatus.Any()) { Task openConnectionsInBackgroundTask = Task.Run(async () => await this.openConnectionsHandler.TryOpenRntbdChannelsAsync( - addresses: addressesNeedToValidateStatus, - semaphore: this.semaphore, - semaphoreAcquireTimeout: GatewayAddressCache.SemaphoreAcquireTimeout)); + addresses: addressesNeedToValidateStatus)); } } @@ -997,7 +975,6 @@ protected virtual void Dispose(bool disposing) if (disposing) { this.serverPartitionAddressCache?.Dispose(); - this.semaphore.Dispose(); } this.disposedValue = true; diff --git a/Microsoft.Azure.Cosmos/src/direct/IOpenConnectionsHandler.cs b/Microsoft.Azure.Cosmos/src/direct/IOpenConnectionsHandler.cs index 38652fc020..85d72edc49 100644 --- a/Microsoft.Azure.Cosmos/src/direct/IOpenConnectionsHandler.cs +++ b/Microsoft.Azure.Cosmos/src/direct/IOpenConnectionsHandler.cs @@ -4,9 +4,7 @@ namespace Microsoft.Azure.Documents { - using System; using System.Collections.Generic; - using System.Threading; using System.Threading.Tasks; /// @@ -20,11 +18,7 @@ internal interface IOpenConnectionsHandler /// /// An enumerable of /// containing the backend replica addresses. - /// The semaphore. - /// The semaphore connection timeout. Task TryOpenRntbdChannelsAsync( - IEnumerable addresses, - SemaphoreSlim semaphore, - TimeSpan semaphoreAcquireTimeout); + IEnumerable addresses); } } diff --git a/Microsoft.Azure.Cosmos/src/direct/RntbdOpenConnectionHandler.cs b/Microsoft.Azure.Cosmos/src/direct/RntbdOpenConnectionHandler.cs index ae596d481e..84a3b1ee97 100644 --- a/Microsoft.Azure.Cosmos/src/direct/RntbdOpenConnectionHandler.cs +++ b/Microsoft.Azure.Cosmos/src/direct/RntbdOpenConnectionHandler.cs @@ -25,6 +25,18 @@ internal sealed class RntbdOpenConnectionHandler : IOpenConnectionsHandler, IDis /// private readonly TransportClient transportClient; + /// + /// A read-only instance of for + /// concurrency control. + /// + private readonly SemaphoreSlim semaphore; + + /// + /// A read-only TimeSpan indicating the semephore timeout in minutes. + /// The default timeout is 10 minutes. + /// + private static readonly TimeSpan SemaphoreAcquireTimeout = TimeSpan.FromMinutes(10); + /// /// A booolean flag indicating if the current instance of RntbdOpenConnectionHandler /// has been disposed. @@ -40,13 +52,16 @@ public RntbdOpenConnectionHandler( { this.disposed = false; this.transportClient = transportClient ?? throw new ArgumentNullException(nameof(transportClient), $"Argument {nameof(transportClient)} can not be null"); + + // The semaphore arguments `initialCount` and `maxCount` are set to match the number of cpu cores, to keep the + // implementation similar to the Java counterpart. + this.semaphore = new SemaphoreSlim( + initialCount: Environment.ProcessorCount, + maxCount: Environment.ProcessorCount); } /// - public async Task TryOpenRntbdChannelsAsync( - IEnumerable addresses, - SemaphoreSlim semaphore, - TimeSpan semaphoreAcquireTimeout) + public async Task TryOpenRntbdChannelsAsync(IEnumerable addresses) { foreach (TransportAddressUri address in addresses) { @@ -56,8 +71,8 @@ public async Task TryOpenRntbdChannelsAsync( Trace.CorrelationManager.ActivityId); try { - slimAcquired = await semaphore - .WaitAsync(semaphoreAcquireTimeout) + slimAcquired = await this.semaphore + .WaitAsync(RntbdOpenConnectionHandler.SemaphoreAcquireTimeout) .ConfigureAwait(false); if (slimAcquired) @@ -71,7 +86,7 @@ await this.transportClient.OpenConnectionAsync( DefaultTrace.TraceWarning("Failed to open Rntbd connection to backend uri: {0} because" + "the semaphore couldn't be acquired within the given timeout: {1} minutes. '{2}'", address.Uri, - semaphoreAcquireTimeout.TotalMinutes, + RntbdOpenConnectionHandler.SemaphoreAcquireTimeout.TotalMinutes, Trace.CorrelationManager.ActivityId); } } @@ -87,7 +102,7 @@ await this.transportClient.OpenConnectionAsync( { if (slimAcquired) { - semaphore.Release(); + this.semaphore.Release(); } } } @@ -98,7 +113,7 @@ public void Dispose() { if (!this.disposed) { - this.transportClient.Dispose(); + this.semaphore.Dispose(); this.disposed = true; } else diff --git a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/GatewayAddressCacheTests.cs b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/GatewayAddressCacheTests.cs index 7962573baa..5116eb22de 100644 --- a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/GatewayAddressCacheTests.cs +++ b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/GatewayAddressCacheTests.cs @@ -1555,9 +1555,7 @@ public int GetTotalMethodInvocationCount() } Task IOpenConnectionsHandler.TryOpenRntbdChannelsAsync( - IEnumerable addresses, - SemaphoreSlim semaphore, - TimeSpan semaphoreAcquireTimeout) + IEnumerable addresses) { int idx = 0; this.totalReceivedAddressesCounter += addresses.Count(); From fb3d4df3827b80b702b3db66dd52c43410623772 Mon Sep 17 00:00:00 2001 From: Debdatta Kunda Date: Thu, 22 Jun 2023 00:25:47 -0700 Subject: [PATCH 4/7] Code changes to make parallel continue calls to open connection handler. Added cancellation token wire in. --- .../src/Routing/GatewayAddressCache.cs | 109 ++++++++++-------- 1 file changed, 59 insertions(+), 50 deletions(-) diff --git a/Microsoft.Azure.Cosmos/src/Routing/GatewayAddressCache.cs b/Microsoft.Azure.Cosmos/src/Routing/GatewayAddressCache.cs index 068e79a679..2e1121a66e 100644 --- a/Microsoft.Azure.Cosmos/src/Routing/GatewayAddressCache.cs +++ b/Microsoft.Azure.Cosmos/src/Routing/GatewayAddressCache.cs @@ -112,7 +112,7 @@ public async Task OpenConnectionsAsync( bool shouldOpenRntbdChannels, CancellationToken cancellationToken) { - List>> tasks = new (); + List tasks = new (); int batchSize = GatewayAddressCache.DefaultBatchSize; #if !(NETSTANDARD15 || NETSTANDARD16) @@ -146,58 +146,19 @@ public async Task OpenConnectionsAsync( { for (int i = 0; i < partitionKeyRangeIdentities.Count; i += batchSize) { - tasks - .Add(this.GetAddressesAsync( - request: request, - collectionRid: collection.ResourceId, - partitionKeyRangeIds: partitionKeyRangeIdentities.Skip(i).Take(batchSize).Select(range => range.PartitionKeyRangeId))); + tasks.Add( + this.GetAddressesAsync( + request: request, + collectionRid: collection.ResourceId, + partitionKeyRangeIds: partitionKeyRangeIdentities.Skip(i).Take(batchSize).Select(range => range.PartitionKeyRangeId)) + .ContinueWith(response => this.WarmupCachesAndOpenConnectionsAsync( + containerProperties: collection, + documentServiceResponseWrapper: response.Result, + shouldOpenRntbdChannels: shouldOpenRntbdChannels))); } } - foreach (TryCatch task in await Task.WhenAll(tasks)) - { - if (task.Failed) - { - continue; - } - - using (DocumentServiceResponse response = task.Result) - { - FeedResource
addressFeed = response.GetResource>(); - - bool inNetworkRequest = this.IsInNetworkRequest(response); - - IEnumerable> addressInfos = - addressFeed.Where(addressInfo => ProtocolFromString(addressInfo.Protocol) == this.protocol) - .GroupBy(address => address.PartitionKeyRangeId, StringComparer.Ordinal) - .Select(group => this.ToPartitionAddressAndRange(collection.ResourceId, @group.ToList(), inNetworkRequest)); - - List openConnectionTasks = new (); - foreach (Tuple addressInfo in addressInfos) - { - this.serverPartitionAddressCache.Set( - new PartitionKeyRangeIdentity(collection.ResourceId, addressInfo.Item1.PartitionKeyRangeId), - addressInfo.Item2); - - // The `shouldOpenRntbdChannels` boolean flag indicates whether the SDK should establish Rntbd connections to the - // backend replica nodes. For the `CosmosClient.CreateAndInitializeAsync()` flow, the flag should be passed as - // `true` so that the Rntbd connections to the backend replicas could be established deterministically. For any - // other flow, the flag should be passed as `false`. - if (this.openConnectionsHandler != null && shouldOpenRntbdChannels) - { - openConnectionTasks - .Add(this.openConnectionsHandler - .TryOpenRntbdChannelsAsync( - addresses: addressInfo.Item2.Get(Protocol.Tcp)?.ReplicaTransportAddressUris)); - } - } - - if (openConnectionTasks.Any()) - { - await Task.WhenAll(openConnectionTasks); - } - } - } + await Task.Run(() => Task.WhenAll(tasks), cancellationToken); } /// @@ -356,6 +317,54 @@ public async Task TryGetAddressesAsync( } } + private async Task WarmupCachesAndOpenConnectionsAsync( + ContainerProperties containerProperties, + TryCatch documentServiceResponseWrapper, + bool shouldOpenRntbdChannels) + { + if (documentServiceResponseWrapper.Failed) + { + return; + } + + using (DocumentServiceResponse response = documentServiceResponseWrapper.Result) + { + FeedResource
addressFeed = response.GetResource>(); + + bool inNetworkRequest = this.IsInNetworkRequest(response); + + IEnumerable> addressInfos = + addressFeed.Where(addressInfo => ProtocolFromString(addressInfo.Protocol) == this.protocol) + .GroupBy(address => address.PartitionKeyRangeId, StringComparer.Ordinal) + .Select(group => this.ToPartitionAddressAndRange(containerProperties.ResourceId, @group.ToList(), inNetworkRequest)); + + List openConnectionTasks = new (); + foreach (Tuple addressInfo in addressInfos) + { + this.serverPartitionAddressCache.Set( + new PartitionKeyRangeIdentity(containerProperties.ResourceId, addressInfo.Item1.PartitionKeyRangeId), + addressInfo.Item2); + + // The `shouldOpenRntbdChannels` boolean flag indicates whether the SDK should establish Rntbd connections to the + // backend replica nodes. For the `CosmosClient.CreateAndInitializeAsync()` flow, the flag should be passed as + // `true` so that the Rntbd connections to the backend replicas could be established deterministically. For any + // other flow, the flag should be passed as `false`. + if (this.openConnectionsHandler != null && shouldOpenRntbdChannels) + { + openConnectionTasks + .Add(this.openConnectionsHandler + .TryOpenRntbdChannelsAsync( + addresses: addressInfo.Item2.Get(Protocol.Tcp)?.ReplicaTransportAddressUris)); + } + } + + if (openConnectionTasks.Any()) + { + await Task.WhenAll(openConnectionTasks); + } + } + } + private static void SetTransportAddressUrisToUnhealthy( PartitionAddressInformation stalePartitionAddressInformation, Lazy> failedEndpoints) From 08bc68a673c684641770c8a0d9d0dce58b1ea3e6 Mon Sep 17 00:00:00 2001 From: Debdatta Kunda Date: Thu, 22 Jun 2023 11:46:14 -0700 Subject: [PATCH 5/7] Code changes to update public contracts. --- .../src/CosmosClientOptions.cs | 15 +++++++---- .../src/Fluent/CosmosClientBuilder.cs | 27 +++++++++---------- 2 files changed, 23 insertions(+), 19 deletions(-) diff --git a/Microsoft.Azure.Cosmos/src/CosmosClientOptions.cs b/Microsoft.Azure.Cosmos/src/CosmosClientOptions.cs index 436100dc72..4eaf02f3e3 100644 --- a/Microsoft.Azure.Cosmos/src/CosmosClientOptions.cs +++ b/Microsoft.Azure.Cosmos/src/CosmosClientOptions.cs @@ -347,16 +347,21 @@ public ConnectionMode ConnectionMode public bool? EnableContentResponseOnWrite { get; set; } /// - /// Gets or sets the replica validation flag. - /// Enabling replica validation helps the cosmos client to become more + /// Gets or sets the prioritize healthy replicas flag. + /// Prioritizing healthy replicas helps the cosmos client to become more /// resilient to service upgrades by choosing a healthy replica over the /// one undergoing an upgrade. The default value for this parameter is false. /// /// /// This is optimal for workloads where latency spikes are critical during upgrades. /// - /// - public bool EnableReplicaValidation { get; set; } = false; + /// +#if PREVIEW + public +#else + internal +#endif + bool PrioritizeHealthyReplicas { get; set; } /// /// (Direct/TCP) Controls the amount of idle time after which unused connections are closed. @@ -770,7 +775,7 @@ internal virtual ConnectionPolicy GetConnectionPolicy(int clientId) EnablePartitionLevelFailover = this.EnablePartitionLevelFailover, PortReuseMode = this.portReuseMode, EnableTcpConnectionEndpointRediscovery = this.EnableTcpConnectionEndpointRediscovery, - EnableReplicaValidation = this.EnableReplicaValidation, + EnableReplicaValidation = this.PrioritizeHealthyReplicas, HttpClientFactory = this.httpClientFactory, ServerCertificateCustomValidationCallback = this.ServerCertificateCustomValidationCallback }; diff --git a/Microsoft.Azure.Cosmos/src/Fluent/CosmosClientBuilder.cs b/Microsoft.Azure.Cosmos/src/Fluent/CosmosClientBuilder.cs index c7c05aae68..d759f7fcde 100644 --- a/Microsoft.Azure.Cosmos/src/Fluent/CosmosClientBuilder.cs +++ b/Microsoft.Azure.Cosmos/src/Fluent/CosmosClientBuilder.cs @@ -622,23 +622,22 @@ public CosmosClientBuilder WithContentResponseOnWrite(bool contentResponseOnWrit } /// - /// Gets or sets the boolean to only return the headers and status code in - /// the Cosmos DB response for write item operation like Create, Upsert, Patch and Replace. - /// Setting the option to false will cause the response to have a null resource. This reduces networking and CPU load by not sending - /// the resource back over the network and serializing it on the client. + /// Gets or sets the prioritize healthy replicas flag. + /// Prioritizing healthy replicas helps the cosmos client to become more + /// resilient to service upgrades by choosing a healthy replica over the + /// one undergoing an upgrade. The default value for this parameter is false. /// - /// a boolean indicating whether payload will be included in the response or not. - /// - /// - /// This option can be overriden by similar property in ItemRequestOptions and TransactionalBatchItemRequestOptions - /// - /// + /// a boolean flag indicating if the feature will be enabled. /// The object - /// - /// - public CosmosClientBuilder WithReplicaValidation(bool replicaValidationEnabled) +#if PREVIEW + public +#else + internal +#endif + CosmosClientBuilder WithPrioritizeHealthyReplicas( + bool replicaValidationEnabled) { - this.clientOptions.EnableReplicaValidation = replicaValidationEnabled; + this.clientOptions.PrioritizeHealthyReplicas = replicaValidationEnabled; return this; } From 4980a872b44330492745e65b641900c1aa677b14 Mon Sep 17 00:00:00 2001 From: Debdatta Kunda Date: Thu, 22 Jun 2023 14:33:47 -0700 Subject: [PATCH 6/7] Code changes to address review comments. --- .../src/CosmosClientOptions.cs | 4 +- .../src/Fluent/CosmosClientBuilder.cs | 4 +- .../src/Routing/GatewayAddressCache.cs | 94 ++++++++++++------- 3 files changed, 63 insertions(+), 39 deletions(-) diff --git a/Microsoft.Azure.Cosmos/src/CosmosClientOptions.cs b/Microsoft.Azure.Cosmos/src/CosmosClientOptions.cs index 4eaf02f3e3..abb2547d5a 100644 --- a/Microsoft.Azure.Cosmos/src/CosmosClientOptions.cs +++ b/Microsoft.Azure.Cosmos/src/CosmosClientOptions.cs @@ -349,8 +349,8 @@ public ConnectionMode ConnectionMode /// /// Gets or sets the prioritize healthy replicas flag. /// Prioritizing healthy replicas helps the cosmos client to become more - /// resilient to service upgrades by choosing a healthy replica over the - /// one undergoing an upgrade. The default value for this parameter is false. + /// resilient to connection timeouts, by choosing a healthy replica over an + /// unhealthy one. The default value for this parameter is false. /// /// /// This is optimal for workloads where latency spikes are critical during upgrades. diff --git a/Microsoft.Azure.Cosmos/src/Fluent/CosmosClientBuilder.cs b/Microsoft.Azure.Cosmos/src/Fluent/CosmosClientBuilder.cs index d759f7fcde..d2c3546204 100644 --- a/Microsoft.Azure.Cosmos/src/Fluent/CosmosClientBuilder.cs +++ b/Microsoft.Azure.Cosmos/src/Fluent/CosmosClientBuilder.cs @@ -624,8 +624,8 @@ public CosmosClientBuilder WithContentResponseOnWrite(bool contentResponseOnWrit /// /// Gets or sets the prioritize healthy replicas flag. /// Prioritizing healthy replicas helps the cosmos client to become more - /// resilient to service upgrades by choosing a healthy replica over the - /// one undergoing an upgrade. The default value for this parameter is false. + /// resilient to connection timeouts, by choosing a healthy replica over an + /// unhealthy one. The default value for this parameter is false. /// /// a boolean flag indicating if the feature will be enabled. /// The object diff --git a/Microsoft.Azure.Cosmos/src/Routing/GatewayAddressCache.cs b/Microsoft.Azure.Cosmos/src/Routing/GatewayAddressCache.cs index 2e1121a66e..7e0cead1d9 100644 --- a/Microsoft.Azure.Cosmos/src/Routing/GatewayAddressCache.cs +++ b/Microsoft.Azure.Cosmos/src/Routing/GatewayAddressCache.cs @@ -47,6 +47,7 @@ internal class GatewayAddressCache : IAddressCache, IDisposable private readonly CosmosHttpClient httpClient; private readonly bool isReplicaAddressValidationEnabled; + private static readonly TimeSpan WarmupCacheAndOpenConnectionDelayInMinutes = TimeSpan.FromMinutes(40); private Tuple masterPartitionAddressCache; private DateTime suboptimalMasterPartitionTimestamp; @@ -147,18 +148,24 @@ public async Task OpenConnectionsAsync( for (int i = 0; i < partitionKeyRangeIdentities.Count; i += batchSize) { tasks.Add( - this.GetAddressesAsync( + this.WarmupCachesAndOpenConnectionsAsync( request: request, collectionRid: collection.ResourceId, - partitionKeyRangeIds: partitionKeyRangeIdentities.Skip(i).Take(batchSize).Select(range => range.PartitionKeyRangeId)) - .ContinueWith(response => this.WarmupCachesAndOpenConnectionsAsync( - containerProperties: collection, - documentServiceResponseWrapper: response.Result, - shouldOpenRntbdChannels: shouldOpenRntbdChannels))); + partitionKeyRangeIds: partitionKeyRangeIdentities.Skip(i).Take(batchSize).Select(range => range.PartitionKeyRangeId), + containerProperties: collection, + shouldOpenRntbdChannels: shouldOpenRntbdChannels)); } } - await Task.Run(() => Task.WhenAll(tasks), cancellationToken); + Task timeoutTask = Task.Delay(GatewayAddressCache.WarmupCacheAndOpenConnectionDelayInMinutes, cancellationToken); + Task resultTask = await Task.WhenAny(Task.WhenAll(tasks), timeoutTask); + + if (resultTask == timeoutTask) + { + // Operation cancelled. + DefaultTrace.TraceWarning("The open connection task was cancelled because the cancellation token was expired. '{0}'", + System.Diagnostics.Trace.CorrelationManager.ActivityId); + } } /// @@ -318,51 +325,68 @@ public async Task TryGetAddressesAsync( } private async Task WarmupCachesAndOpenConnectionsAsync( + DocumentServiceRequest request, + string collectionRid, + IEnumerable partitionKeyRangeIds, ContainerProperties containerProperties, - TryCatch documentServiceResponseWrapper, bool shouldOpenRntbdChannels) { + TryCatch documentServiceResponseWrapper = await this.GetAddressesAsync( + request: request, + collectionRid: collectionRid, + partitionKeyRangeIds: partitionKeyRangeIds); + if (documentServiceResponseWrapper.Failed) { return; } - using (DocumentServiceResponse response = documentServiceResponseWrapper.Result) + try { - FeedResource
addressFeed = response.GetResource>(); + using (DocumentServiceResponse response = documentServiceResponseWrapper.Result) + { + FeedResource
addressFeed = response.GetResource>(); - bool inNetworkRequest = this.IsInNetworkRequest(response); + bool inNetworkRequest = this.IsInNetworkRequest(response); - IEnumerable> addressInfos = - addressFeed.Where(addressInfo => ProtocolFromString(addressInfo.Protocol) == this.protocol) - .GroupBy(address => address.PartitionKeyRangeId, StringComparer.Ordinal) - .Select(group => this.ToPartitionAddressAndRange(containerProperties.ResourceId, @group.ToList(), inNetworkRequest)); + IEnumerable> addressInfos = + addressFeed.Where(addressInfo => ProtocolFromString(addressInfo.Protocol) == this.protocol) + .GroupBy(address => address.PartitionKeyRangeId, StringComparer.Ordinal) + .Select(group => this.ToPartitionAddressAndRange(containerProperties.ResourceId, @group.ToList(), inNetworkRequest)); - List openConnectionTasks = new (); - foreach (Tuple addressInfo in addressInfos) - { - this.serverPartitionAddressCache.Set( - new PartitionKeyRangeIdentity(containerProperties.ResourceId, addressInfo.Item1.PartitionKeyRangeId), - addressInfo.Item2); - - // The `shouldOpenRntbdChannels` boolean flag indicates whether the SDK should establish Rntbd connections to the - // backend replica nodes. For the `CosmosClient.CreateAndInitializeAsync()` flow, the flag should be passed as - // `true` so that the Rntbd connections to the backend replicas could be established deterministically. For any - // other flow, the flag should be passed as `false`. - if (this.openConnectionsHandler != null && shouldOpenRntbdChannels) + List openConnectionTasks = new (); + foreach (Tuple addressInfo in addressInfos) { - openConnectionTasks - .Add(this.openConnectionsHandler - .TryOpenRntbdChannelsAsync( - addresses: addressInfo.Item2.Get(Protocol.Tcp)?.ReplicaTransportAddressUris)); + this.serverPartitionAddressCache.Set( + new PartitionKeyRangeIdentity(containerProperties.ResourceId, addressInfo.Item1.PartitionKeyRangeId), + addressInfo.Item2); + + // The `shouldOpenRntbdChannels` boolean flag indicates whether the SDK should establish Rntbd connections to the + // backend replica nodes. For the `CosmosClient.CreateAndInitializeAsync()` flow, the flag should be passed as + // `true` so that the Rntbd connections to the backend replicas could be established deterministically. For any + // other flow, the flag should be passed as `false`. + if (this.openConnectionsHandler != null && shouldOpenRntbdChannels) + { + openConnectionTasks + .Add(this.openConnectionsHandler + .TryOpenRntbdChannelsAsync( + addresses: addressInfo.Item2.Get(Protocol.Tcp)?.ReplicaTransportAddressUris)); + } } - } - if (openConnectionTasks.Any()) - { - await Task.WhenAll(openConnectionTasks); + if (openConnectionTasks.Any()) + { + await Task.WhenAll(openConnectionTasks); + } } } + catch (Exception ex) + { + DefaultTrace.TraceWarning("Failed to open connections warm-up cache for the server addresses: {0} with exception: {1}. '{2}'", + collectionRid, + ex, + System.Diagnostics.Trace.CorrelationManager.ActivityId); + } } private static void SetTransportAddressUrisToUnhealthy( From 8d94c211ce7a31ec4638ed4fdb728d56e8a25d86 Mon Sep 17 00:00:00 2001 From: Debdatta Kunda Date: Thu, 22 Jun 2023 16:01:21 -0700 Subject: [PATCH 7/7] Code changes to add cancellation token source. --- .../src/Routing/GatewayAddressCache.cs | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/Microsoft.Azure.Cosmos/src/Routing/GatewayAddressCache.cs b/Microsoft.Azure.Cosmos/src/Routing/GatewayAddressCache.cs index 7e0cead1d9..b03b2e877c 100644 --- a/Microsoft.Azure.Cosmos/src/Routing/GatewayAddressCache.cs +++ b/Microsoft.Azure.Cosmos/src/Routing/GatewayAddressCache.cs @@ -47,7 +47,7 @@ internal class GatewayAddressCache : IAddressCache, IDisposable private readonly CosmosHttpClient httpClient; private readonly bool isReplicaAddressValidationEnabled; - private static readonly TimeSpan WarmupCacheAndOpenConnectionDelayInMinutes = TimeSpan.FromMinutes(40); + private static readonly TimeSpan WarmupCacheAndOpenConnectionTimeout = TimeSpan.FromMinutes(40); private Tuple masterPartitionAddressCache; private DateTime suboptimalMasterPartitionTimestamp; @@ -157,7 +157,8 @@ public async Task OpenConnectionsAsync( } } - Task timeoutTask = Task.Delay(GatewayAddressCache.WarmupCacheAndOpenConnectionDelayInMinutes, cancellationToken); + using CancellationTokenSource linkedToken = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken); + Task timeoutTask = Task.Delay(GatewayAddressCache.WarmupCacheAndOpenConnectionTimeout, linkedToken.Token); Task resultTask = await Task.WhenAny(Task.WhenAll(tasks), timeoutTask); if (resultTask == timeoutTask) @@ -166,6 +167,10 @@ public async Task OpenConnectionsAsync( DefaultTrace.TraceWarning("The open connection task was cancelled because the cancellation token was expired. '{0}'", System.Diagnostics.Trace.CorrelationManager.ActivityId); } + else + { + linkedToken.Cancel(); + } } ///