Skip to content

Commit

Permalink
Code changes to upgrade the cosmos direct version to 3.31.3.
Browse files Browse the repository at this point in the history
  • Loading branch information
kundadebdatta committed Jun 28, 2023
1 parent 2d076e0 commit 05817df
Show file tree
Hide file tree
Showing 9 changed files with 52 additions and 39 deletions.
2 changes: 1 addition & 1 deletion Directory.Build.props
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
<ClientOfficialVersion>3.35.1</ClientOfficialVersion>
<ClientPreviewVersion>3.35.1</ClientPreviewVersion>
<ClientPreviewSuffixVersion>preview</ClientPreviewSuffixVersion>
<DirectVersion>3.31.2</DirectVersion>
<DirectVersion>3.31.3</DirectVersion>
<EncryptionOfficialVersion>2.0.2</EncryptionOfficialVersion>
<EncryptionPreviewVersion>2.0.2</EncryptionPreviewVersion>
<EncryptionPreviewSuffixVersion>preview</EncryptionPreviewSuffixVersion>
Expand Down
15 changes: 8 additions & 7 deletions Microsoft.Azure.Cosmos/src/CosmosClientOptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -347,21 +347,21 @@ public ConnectionMode ConnectionMode
public bool? EnableContentResponseOnWrite { get; set; }

/// <summary>
/// Gets or sets the prioritize healthy replicas flag.
/// Prioritizing healthy replicas helps the cosmos client to become more
/// resilient to connection timeouts, by choosing a healthy replica over an
/// unhealthy one. The default value for this parameter is false.
/// Gets or sets the advanced replica selection flag. The advanced replica selection logic keeps track of the replica connection
/// status, and based on status, it prioritizes the replicas which are connected to the backend, so that the requests can be sent
/// confidently to the particular replica. This helps the cosmos client to become more resilient and effictive to any connection
/// timeouts. The default value for this parameter is false.
/// </summary>
/// <remarks>
/// <para>This is optimal for workloads where latency spikes are critical during upgrades.</para>
/// <para>This is optimal for workloads where latency spikes are critical due to connection timeouts. Does not apply if <see cref="ConnectionMode.Gateway"/> is used.</para>
/// </remarks>
/// <seealso cref="CosmosClientBuilder.WithPrioritizeHealthyReplicas(bool)"/>
/// <seealso cref="CosmosClientBuilder.WithAdvancedReplicaSelectionEnabledForTcp()"/>
#if PREVIEW
public
#else
internal
#endif
bool PrioritizeHealthyReplicas { get; set; }
bool EnableAdvancedReplicaSelectionForTcp { get; set; }

/// <summary>
/// (Direct/TCP) Controls the amount of idle time after which unused connections are closed.
Expand Down Expand Up @@ -775,6 +775,7 @@ internal virtual ConnectionPolicy GetConnectionPolicy(int clientId)
EnablePartitionLevelFailover = this.EnablePartitionLevelFailover,
PortReuseMode = this.portReuseMode,
EnableTcpConnectionEndpointRediscovery = this.EnableTcpConnectionEndpointRediscovery,
EnableReplicaValidation = this.EnableAdvancedReplicaSelectionForTcp,
HttpClientFactory = this.httpClientFactory,
ServerCertificateCustomValidationCallback = this.ServerCertificateCustomValidationCallback
};
Expand Down
2 changes: 1 addition & 1 deletion Microsoft.Azure.Cosmos/src/DocumentClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6701,7 +6701,7 @@ private void CreateStoreModel(bool subscribeRntbdStatus)
!this.enableRntbdChannel,
this.UseMultipleWriteLocations && (this.accountServiceConfiguration.DefaultConsistencyLevel != Documents.ConsistencyLevel.Strong),
true,
this.ConnectionPolicy.EnableReplicaValidation);
enableReplicaValidation: this.ConnectionPolicy.EnableReplicaValidation);

if (subscribeRntbdStatus)
{
Expand Down
14 changes: 6 additions & 8 deletions Microsoft.Azure.Cosmos/src/Fluent/CosmosClientBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -622,22 +622,20 @@ public CosmosClientBuilder WithContentResponseOnWrite(bool contentResponseOnWrit
}

/// <summary>
/// Gets or sets the prioritize healthy replicas flag.
/// Prioritizing healthy replicas helps the cosmos client to become more
/// resilient to connection timeouts, by choosing a healthy replica over an
/// unhealthy one. The default value for this parameter is false.
/// Enables the advanced replica selection flag. The advanced replica selection logic keeps track of the replica connection status,
/// and based on status, it prioritizes the replicas which are connected to the backend, so that the requests can be sent
/// confidently to the particular replica. This helps the cosmos client to become more resilient and effictive to any connection
/// timeouts. The default value for this parameter is false.
/// </summary>
/// <param name="replicaValidationEnabled">a boolean flag indicating if the feature will be enabled.</param>
/// <returns>The <see cref="CosmosClientBuilder"/> object</returns>
#if PREVIEW
public
#else
internal
#endif
CosmosClientBuilder WithPrioritizeHealthyReplicas(
bool replicaValidationEnabled)
CosmosClientBuilder WithAdvancedReplicaSelectionEnabledForTcp()
{
this.clientOptions.PrioritizeHealthyReplicas = replicaValidationEnabled;
this.clientOptions.EnableAdvancedReplicaSelectionForTcp = true;
return this;
}

Expand Down
20 changes: 12 additions & 8 deletions Microsoft.Azure.Cosmos/src/Routing/GatewayAddressCache.cs
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@ public async Task OpenConnectionsAsync(
Paths.CollectionsPathSegment,
Uri.EscapeUriString(collection.Id));

using CancellationTokenSource linkedTokenSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
using (DocumentServiceRequest request = DocumentServiceRequest.CreateFromName(
OperationType.Read,
collectionAltLink,
Expand All @@ -157,12 +158,11 @@ public async Task OpenConnectionsAsync(
collectionRid: collection.ResourceId,
partitionKeyRangeIds: partitionKeyRangeIdentities.Skip(i).Take(batchSize).Select(range => range.PartitionKeyRangeId),
containerProperties: collection,
shouldOpenRntbdChannels: shouldOpenRntbdChannels));
shouldOpenRntbdChannels: shouldOpenRntbdChannels,
cancellationToken: linkedTokenSource.Token));
}
}

using CancellationTokenSource linkedTokenSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);

// The `timeoutTask` is a background task which adds a delay for a period of WarmupCacheAndOpenConnectionTimeout. The task will
// be cancelled either by - a) when `linkedTokenSource` expires, which means the original `cancellationToken` expires or
// b) the the `linkedTokenSource.Cancel()` is called.
Expand Down Expand Up @@ -347,12 +347,14 @@ public async Task<PartitionAddressInformation> TryGetAddressesAsync(
/// <param name="partitionKeyRangeIds">An instance of <see cref="IEnumerable{T}"/> containing the list of partition key range ids.</param>
/// <param name="containerProperties">An instance of <see cref="ContainerProperties"/> containing the collection properties.</param>
/// <param name="shouldOpenRntbdChannels">A boolean flag indicating whether Rntbd connections are required to be established to the backend replica nodes.</param>
/// <param name="cancellationToken">An instance of <see cref="CancellationToken"/>.</param>
private async Task WarmupCachesAndOpenConnectionsAsync(
DocumentServiceRequest request,
string collectionRid,
IEnumerable<string> partitionKeyRangeIds,
ContainerProperties containerProperties,
bool shouldOpenRntbdChannels)
bool shouldOpenRntbdChannels,
CancellationToken cancellationToken)
{
TryCatch<DocumentServiceResponse> documentServiceResponseWrapper = await this.GetAddressesAsync(
request: request,
Expand Down Expand Up @@ -380,6 +382,11 @@ private async Task WarmupCachesAndOpenConnectionsAsync(
List<Task> openConnectionTasks = new ();
foreach (Tuple<PartitionKeyRangeIdentity, PartitionAddressInformation> addressInfo in addressInfos)
{
if (cancellationToken.IsCancellationRequested)
{
break;
}

this.serverPartitionAddressCache.Set(
new PartitionKeyRangeIdentity(containerProperties.ResourceId, addressInfo.Item1.PartitionKeyRangeId),
addressInfo.Item2);
Expand All @@ -397,10 +404,7 @@ private async Task WarmupCachesAndOpenConnectionsAsync(
}
}

if (openConnectionTasks.Any())
{
await Task.WhenAll(openConnectionTasks);
}
await Task.WhenAll(openConnectionTasks);
}
}
catch (Exception ex)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,8 @@ private async Task ValidateConnectTimeoutTriggersClientRetryPolicy(
enableReadRequestsFallback: false,
useMultipleWriteLocations: useMultipleWriteLocations,
detectClientConnectivityIssues: true,
disableRetryWithRetryPolicy: false);
disableRetryWithRetryPolicy: false,
enableReplicaValidation: false);

// Reducing retry timeout to avoid long-running tests
replicatedResourceClient.GoneAndRetryWithRetryTimeoutInSecondsOverride = 1;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ public void VerifyCosmosConfigurationPropertiesGetUpdated()
Assert.IsNull(clientOptions.HttpClientFactory);
Assert.AreNotEqual(consistencyLevel, clientOptions.ConsistencyLevel);
Assert.IsFalse(clientOptions.EnablePartitionLevelFailover);
Assert.IsFalse(clientOptions.EnableAdvancedReplicaSelectionForTcp);

//Verify GetConnectionPolicy returns the correct values for default
ConnectionPolicy policy = clientOptions.GetConnectionPolicy(clientId: 0);
Expand All @@ -97,6 +98,7 @@ public void VerifyCosmosConfigurationPropertiesGetUpdated()
Assert.IsNull(policy.HttpClientFactory);
Assert.AreNotEqual(Cosmos.ConsistencyLevel.Session, clientOptions.ConsistencyLevel);
Assert.IsFalse(policy.EnablePartitionLevelFailover);
Assert.IsFalse(policy.EnableReplicaValidation);

cosmosClientBuilder.WithApplicationRegion(region)
.WithConnectionModeGateway(maxConnections, webProxy)
Expand All @@ -108,7 +110,8 @@ public void VerifyCosmosConfigurationPropertiesGetUpdated()
.WithBulkExecution(true)
.WithSerializerOptions(cosmosSerializerOptions)
.WithConsistencyLevel(consistencyLevel)
.WithPartitionLevelFailoverEnabled();
.WithPartitionLevelFailoverEnabled()
.WithAdvancedReplicaSelectionEnabledForTcp();

cosmosClient = cosmosClientBuilder.Build(new MockDocumentClient());
clientOptions = cosmosClient.ClientOptions;
Expand All @@ -131,6 +134,7 @@ public void VerifyCosmosConfigurationPropertiesGetUpdated()
Assert.IsTrue(clientOptions.AllowBulkExecution);
Assert.AreEqual(consistencyLevel, clientOptions.ConsistencyLevel);
Assert.IsTrue(clientOptions.EnablePartitionLevelFailover);
Assert.IsTrue(clientOptions.EnableAdvancedReplicaSelectionForTcp);

//Verify GetConnectionPolicy returns the correct values
policy = clientOptions.GetConnectionPolicy(clientId: 0);
Expand All @@ -145,7 +149,8 @@ public void VerifyCosmosConfigurationPropertiesGetUpdated()
Assert.AreEqual((int)maxRetryWaitTime.TotalSeconds, policy.RetryOptions.MaxRetryWaitTimeInSeconds);
Assert.AreEqual((Documents.ConsistencyLevel)consistencyLevel, clientOptions.GetDocumentsConsistencyLevel());
Assert.IsTrue(policy.EnablePartitionLevelFailover);

Assert.IsTrue(policy.EnableReplicaValidation);

IReadOnlyList<string> preferredLocations = new List<string>() { Regions.AustraliaCentral, Regions.AustraliaCentral2 };
//Verify Direct Mode settings
cosmosClientBuilder = new CosmosClientBuilder(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ private StoreClient GetMockStoreClient()
TransportClient mockTransportClient = this.GetMockTransportClient();
ISessionContainer sessionContainer = new SessionContainer(string.Empty);

StoreReader storeReader = new StoreReader(mockTransportClient, addressSelector, new AddressEnumerator(), sessionContainer);
StoreReader storeReader = new StoreReader(mockTransportClient, addressSelector, new AddressEnumerator(), sessionContainer, false);

Mock<IAuthorizationTokenProvider> mockAuthorizationTokenProvider = new Mock<IAuthorizationTokenProvider>();
mockAuthorizationTokenProvider.Setup(provider => provider.AddSystemAuthorizationHeaderAsync(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -536,7 +536,8 @@ public void StoreReaderBarrierTest()
new StoreReader(mockTransportClient,
addressSelector,
new AddressEnumerator(),
sessionContainer);
sessionContainer,
enableReplicaValidation: false);

// reads always go to read quorum (2) replicas
int replicaCountToRead = 2;
Expand Down Expand Up @@ -611,14 +612,14 @@ public void GlobalStrongConsistentWriteMockTest()
for (int i = 0; i < addressInformation.Length; i++)
{
TransportClient mockTransportClient = this.GetMockTransportClientForGlobalStrongWrites(addressInformation, i, false, false, false);
StoreReader storeReader = new StoreReader(mockTransportClient, addressSelector, new AddressEnumerator(), sessionContainer);
ConsistencyWriter consistencyWriter = new ConsistencyWriter(addressSelector, sessionContainer, mockTransportClient, mockServiceConfigReader.Object, mockAuthorizationTokenProvider.Object, false);
StoreReader storeReader = new StoreReader(mockTransportClient, addressSelector, new AddressEnumerator(), sessionContainer, false);
ConsistencyWriter consistencyWriter = new ConsistencyWriter(addressSelector, sessionContainer, mockTransportClient, mockServiceConfigReader.Object, mockAuthorizationTokenProvider.Object, false, false);
StoreResponse response = consistencyWriter.WriteAsync(entity, new TimeoutHelper(TimeSpan.FromSeconds(30)), false).Result;
Assert.AreEqual(100, response.LSN);

//globalCommittedLsn never catches up in this case
mockTransportClient = this.GetMockTransportClientForGlobalStrongWrites(addressInformation, i, true, false, false);
consistencyWriter = new ConsistencyWriter(addressSelector, sessionContainer, mockTransportClient, mockServiceConfigReader.Object, mockAuthorizationTokenProvider.Object, false);
consistencyWriter = new ConsistencyWriter(addressSelector, sessionContainer, mockTransportClient, mockServiceConfigReader.Object, mockAuthorizationTokenProvider.Object, false, false);
try
{
response = consistencyWriter.WriteAsync(entity, new TimeoutHelper(TimeSpan.FromSeconds(30)), false).Result;
Expand All @@ -629,17 +630,17 @@ public void GlobalStrongConsistentWriteMockTest()
}

mockTransportClient = this.GetMockTransportClientForGlobalStrongWrites(addressInformation, i, false, true, false);
consistencyWriter = new ConsistencyWriter(addressSelector, sessionContainer, mockTransportClient, mockServiceConfigReader.Object, mockAuthorizationTokenProvider.Object, false);
consistencyWriter = new ConsistencyWriter(addressSelector, sessionContainer, mockTransportClient, mockServiceConfigReader.Object, mockAuthorizationTokenProvider.Object, false, false);
response = consistencyWriter.WriteAsync(entity, new TimeoutHelper(TimeSpan.FromSeconds(30)), false).Result;
Assert.AreEqual(100, response.LSN);

mockTransportClient = this.GetMockTransportClientForGlobalStrongWrites(addressInformation, i, false, true, true);
consistencyWriter = new ConsistencyWriter(addressSelector, sessionContainer, mockTransportClient, mockServiceConfigReader.Object, mockAuthorizationTokenProvider.Object, false);
consistencyWriter = new ConsistencyWriter(addressSelector, sessionContainer, mockTransportClient, mockServiceConfigReader.Object, mockAuthorizationTokenProvider.Object, false, false);
response = consistencyWriter.WriteAsync(entity, new TimeoutHelper(TimeSpan.FromSeconds(30)), false).Result;
Assert.AreEqual(100, response.LSN);

mockTransportClient = this.GetMockTransportClientForGlobalStrongWrites(addressInformation, i, false, false, true);
consistencyWriter = new ConsistencyWriter(addressSelector, sessionContainer, mockTransportClient, mockServiceConfigReader.Object, mockAuthorizationTokenProvider.Object, false);
consistencyWriter = new ConsistencyWriter(addressSelector, sessionContainer, mockTransportClient, mockServiceConfigReader.Object, mockAuthorizationTokenProvider.Object, false, false);
response = consistencyWriter.WriteAsync(entity, new TimeoutHelper(TimeSpan.FromSeconds(30)), false).Result;
Assert.AreEqual(100, response.LSN);
}
Expand Down Expand Up @@ -703,7 +704,8 @@ public void GlobalStrongConsistencyMockTest()
new StoreReader(mockTransportClient,
addressSelector,
new AddressEnumerator(),
sessionContainer);
sessionContainer,
false);

Mock<IAuthorizationTokenProvider> mockAuthorizationTokenProvider = new Mock<IAuthorizationTokenProvider>();
mockAuthorizationTokenProvider.Setup(provider => provider.AddSystemAuthorizationHeaderAsync(
Expand Down Expand Up @@ -746,7 +748,8 @@ public void GlobalStrongConsistencyMockTest()
new StoreReader(mockTransportClient,
addressSelector,
new AddressEnumerator(),
sessionContainer);
sessionContainer,
false);

Mock<IAuthorizationTokenProvider> mockAuthorizationTokenProvider = new Mock<IAuthorizationTokenProvider>();
mockAuthorizationTokenProvider.Setup(provider => provider.AddSystemAuthorizationHeaderAsync(
Expand Down Expand Up @@ -798,7 +801,8 @@ public void GlobalStrongConsistencyMockTest()
new StoreReader(mockTransportClient,
addressSelector,
new AddressEnumerator(),
sessionContainer);
sessionContainer,
false);

Mock<IAuthorizationTokenProvider> mockAuthorizationTokenProvider = new Mock<IAuthorizationTokenProvider>();
mockAuthorizationTokenProvider.Setup(provider => provider.AddSystemAuthorizationHeaderAsync(
Expand Down

0 comments on commit 05817df

Please sign in to comment.