Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Internal] Replica Validation: Refactors Code to Address Follow Ups #3929

13 changes: 13 additions & 0 deletions Microsoft.Azure.Cosmos/src/ConnectionPolicy.cs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ public ConnectionPolicy()
this.EnableReadRequestsFallback = null;
this.EnableClientTelemetry = ClientTelemetryOptions.IsClientTelemetryEnabled();
this.ServerCertificateCustomValidationCallback = null;
this.EnableReplicaValidation = false;
}

/// <summary>
Expand Down Expand Up @@ -459,6 +460,18 @@ public Func<HttpClient> HttpClientFactory
set;
}

/// <summary>
/// Gets or sets the flag to enable replica validation.
/// </summary>
/// <value>
/// The default value is false
/// </value>
public bool EnableReplicaValidation
kirankumarkolli marked this conversation as resolved.
Show resolved Hide resolved
{
get;
set;
}

/// <summary>
/// (Direct/TCP) This is an advanced setting that controls the number of TCP connections that will be opened eagerly to each Cosmos DB back-end.
/// </summary>
Expand Down
18 changes: 18 additions & 0 deletions Microsoft.Azure.Cosmos/src/CosmosClientOptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -346,6 +346,23 @@ public ConnectionMode ConnectionMode
/// <seealso cref="TransactionalBatchItemRequestOptions.EnableContentResponseOnWrite"/>
public bool? EnableContentResponseOnWrite { get; set; }

/// <summary>
/// Gets or sets the prioritize healthy replicas flag.
/// Prioritizing healthy replicas helps the cosmos client to become more
ealsur marked this conversation as resolved.
Show resolved Hide resolved
/// resilient to connection timeouts, by choosing a healthy replica over an
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Healthy or unhealthy is not defined and might be interpreted many ways.

/// unhealthy one. The default value for this parameter is false.
/// </summary>
/// <remarks>
/// <para>This is optimal for workloads where latency spikes are critical during upgrades.</para>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just mentioning upgrades in not right?
General verbatim would be ideal or for now exclude it.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ack. Taken a note on this. I will remove this from the original PR.

/// </remarks>
/// <seealso cref="CosmosClientBuilder.WithPrioritizeHealthyReplicas(bool)"/>
#if PREVIEW
public
#else
internal
#endif
bool PrioritizeHealthyReplicas { get; set; }

/// <summary>
/// (Direct/TCP) Controls the amount of idle time after which unused connections are closed.
/// </summary>
Expand Down Expand Up @@ -758,6 +775,7 @@ internal virtual ConnectionPolicy GetConnectionPolicy(int clientId)
EnablePartitionLevelFailover = this.EnablePartitionLevelFailover,
PortReuseMode = this.portReuseMode,
EnableTcpConnectionEndpointRediscovery = this.EnableTcpConnectionEndpointRediscovery,
EnableReplicaValidation = this.PrioritizeHealthyReplicas,
HttpClientFactory = this.httpClientFactory,
ServerCertificateCustomValidationCallback = this.ServerCertificateCustomValidationCallback
};
Expand Down
3 changes: 2 additions & 1 deletion Microsoft.Azure.Cosmos/src/DocumentClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6700,7 +6700,8 @@ private void CreateStoreModel(bool subscribeRntbdStatus)
this.ConnectionPolicy.EnableReadRequestsFallback ?? (this.accountServiceConfiguration.DefaultConsistencyLevel != Documents.ConsistencyLevel.BoundedStaleness),
!this.enableRntbdChannel,
this.UseMultipleWriteLocations && (this.accountServiceConfiguration.DefaultConsistencyLevel != Documents.ConsistencyLevel.Strong),
true);
true,
this.ConnectionPolicy.EnableReplicaValidation);

if (subscribeRntbdStatus)
{
Expand Down
20 changes: 20 additions & 0 deletions Microsoft.Azure.Cosmos/src/Fluent/CosmosClientBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -621,6 +621,26 @@ public CosmosClientBuilder WithContentResponseOnWrite(bool contentResponseOnWrit
return this;
}

/// <summary>
/// Gets or sets the prioritize healthy replicas flag.
/// Prioritizing healthy replicas helps the cosmos client to become more
ealsur marked this conversation as resolved.
Show resolved Hide resolved
/// resilient to connection timeouts, by choosing a healthy replica over an
/// unhealthy one. The default value for this parameter is false.
/// </summary>
/// <param name="replicaValidationEnabled">a boolean flag indicating if the feature will be enabled.</param>
/// <returns>The <see cref="CosmosClientBuilder"/> object</returns>
#if PREVIEW
public
#else
internal
#endif
CosmosClientBuilder WithPrioritizeHealthyReplicas(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did JAVA have it by-default? or public API?

Copy link
Member Author

@kundadebdatta kundadebdatta Jun 22, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

JAVA has it by the environment variables. I think it's enabled by default. Cc: @xinlian12

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes it is controlled by environment variable, and it is enabled by default.

bool replicaValidationEnabled)
{
this.clientOptions.PrioritizeHealthyReplicas = replicaValidationEnabled;
return this;
}

/// <summary>
/// The event handler to be invoked before the request is sent.
/// </summary>
Expand Down
129 changes: 84 additions & 45 deletions Microsoft.Azure.Cosmos/src/Routing/GatewayAddressCache.cs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ internal class GatewayAddressCache : IAddressCache, IDisposable

private readonly CosmosHttpClient httpClient;
private readonly bool isReplicaAddressValidationEnabled;
private static readonly TimeSpan WarmupCacheAndOpenConnectionDelayInMinutes = TimeSpan.FromMinutes(40);
kundadebdatta marked this conversation as resolved.
Show resolved Hide resolved
kundadebdatta marked this conversation as resolved.
Show resolved Hide resolved

private Tuple<PartitionKeyRangeIdentity, PartitionAddressInformation> masterPartitionAddressCache;
private DateTime suboptimalMasterPartitionTimestamp;
Expand All @@ -61,7 +62,8 @@ public GatewayAddressCache(
CosmosHttpClient httpClient,
IOpenConnectionsHandler openConnectionsHandler,
long suboptimalPartitionForceRefreshIntervalInSeconds = 600,
bool enableTcpConnectionEndpointRediscovery = false)
bool enableTcpConnectionEndpointRediscovery = false,
bool replicaAddressValidationEnabled = false)
{
this.addressEndpoint = new Uri(serviceEndpoint + "/" + Paths.AddressPathSegment);
this.protocol = protocol;
Expand All @@ -85,9 +87,7 @@ public GatewayAddressCache(
GatewayAddressCache.ProtocolString(this.protocol));

this.openConnectionsHandler = openConnectionsHandler;
this.isReplicaAddressValidationEnabled = Helpers.GetEnvironmentVariable<bool>(
name: Constants.EnvironmentVariables.ReplicaConnectivityValidationEnabled,
defaultValue: false);
this.isReplicaAddressValidationEnabled = replicaAddressValidationEnabled;
}

public Uri ServiceEndpoint => this.serviceEndpoint;
Expand All @@ -113,7 +113,7 @@ public async Task OpenConnectionsAsync(
bool shouldOpenRntbdChannels,
CancellationToken cancellationToken)
{
List<Task<TryCatch<DocumentServiceResponse>>> tasks = new ();
List<Task> tasks = new ();
int batchSize = GatewayAddressCache.DefaultBatchSize;

#if !(NETSTANDARD15 || NETSTANDARD16)
Expand Down Expand Up @@ -147,50 +147,24 @@ public async Task OpenConnectionsAsync(
{
for (int i = 0; i < partitionKeyRangeIdentities.Count; i += batchSize)
{
tasks
.Add(this.GetAddressesAsync(
request: request,
collectionRid: collection.ResourceId,
partitionKeyRangeIds: partitionKeyRangeIdentities.Skip(i).Take(batchSize).Select(range => range.PartitionKeyRangeId)));
tasks.Add(
this.WarmupCachesAndOpenConnectionsAsync(
request: request,
collectionRid: collection.ResourceId,
partitionKeyRangeIds: partitionKeyRangeIdentities.Skip(i).Take(batchSize).Select(range => range.PartitionKeyRangeId),
containerProperties: collection,
shouldOpenRntbdChannels: shouldOpenRntbdChannels));
}
}

foreach (TryCatch<DocumentServiceResponse> task in await Task.WhenAll(tasks))
{
if (task.Failed)
{
continue;
}

using (DocumentServiceResponse response = task.Result)
{
FeedResource<Address> addressFeed = response.GetResource<FeedResource<Address>>();
Task timeoutTask = Task.Delay(GatewayAddressCache.WarmupCacheAndOpenConnectionDelayInMinutes, cancellationToken);
kundadebdatta marked this conversation as resolved.
Show resolved Hide resolved
kundadebdatta marked this conversation as resolved.
Show resolved Hide resolved
Task resultTask = await Task.WhenAny(Task.WhenAll(tasks), timeoutTask);

bool inNetworkRequest = this.IsInNetworkRequest(response);

IEnumerable<Tuple<PartitionKeyRangeIdentity, PartitionAddressInformation>> addressInfos =
addressFeed.Where(addressInfo => ProtocolFromString(addressInfo.Protocol) == this.protocol)
.GroupBy(address => address.PartitionKeyRangeId, StringComparer.Ordinal)
.Select(group => this.ToPartitionAddressAndRange(collection.ResourceId, @group.ToList(), inNetworkRequest));

foreach (Tuple<PartitionKeyRangeIdentity, PartitionAddressInformation> addressInfo in addressInfos)
{
this.serverPartitionAddressCache.Set(
new PartitionKeyRangeIdentity(collection.ResourceId, addressInfo.Item1.PartitionKeyRangeId),
addressInfo.Item2);

// The `shouldOpenRntbdChannels` boolean flag indicates whether the SDK should establish Rntbd connections to the
// backend replica nodes. For the `CosmosClient.CreateAndInitializeAsync()` flow, the flag should be passed as
// `true` so that the Rntbd connections to the backend replicas could be established deterministically. For any
// other flow, the flag should be passed as `false`.
if (this.openConnectionsHandler != null && shouldOpenRntbdChannels)
{
await this.openConnectionsHandler
.TryOpenRntbdChannelsAsync(
addresses: addressInfo.Item2.Get(Protocol.Tcp)?.ReplicaTransportAddressUris);
}
}
}
if (resultTask == timeoutTask)
{
// Operation cancelled.
DefaultTrace.TraceWarning("The open connection task was cancelled because the cancellation token was expired. '{0}'",
System.Diagnostics.Trace.CorrelationManager.ActivityId);
}
}

Expand Down Expand Up @@ -350,6 +324,71 @@ public async Task<PartitionAddressInformation> TryGetAddressesAsync(
}
}

private async Task WarmupCachesAndOpenConnectionsAsync(
ealsur marked this conversation as resolved.
Show resolved Hide resolved
DocumentServiceRequest request,
string collectionRid,
IEnumerable<string> partitionKeyRangeIds,
ContainerProperties containerProperties,
bool shouldOpenRntbdChannels)
{
TryCatch<DocumentServiceResponse> documentServiceResponseWrapper = await this.GetAddressesAsync(
request: request,
collectionRid: collectionRid,
partitionKeyRangeIds: partitionKeyRangeIds);

if (documentServiceResponseWrapper.Failed)
{
return;
}

try
{
using (DocumentServiceResponse response = documentServiceResponseWrapper.Result)
{
FeedResource<Address> addressFeed = response.GetResource<FeedResource<Address>>();

bool inNetworkRequest = this.IsInNetworkRequest(response);

IEnumerable<Tuple<PartitionKeyRangeIdentity, PartitionAddressInformation>> addressInfos =
addressFeed.Where(addressInfo => ProtocolFromString(addressInfo.Protocol) == this.protocol)
.GroupBy(address => address.PartitionKeyRangeId, StringComparer.Ordinal)
.Select(group => this.ToPartitionAddressAndRange(containerProperties.ResourceId, @group.ToList(), inNetworkRequest));

List<Task> openConnectionTasks = new ();
foreach (Tuple<PartitionKeyRangeIdentity, PartitionAddressInformation> addressInfo in addressInfos)
{
this.serverPartitionAddressCache.Set(
new PartitionKeyRangeIdentity(containerProperties.ResourceId, addressInfo.Item1.PartitionKeyRangeId),
addressInfo.Item2);

// The `shouldOpenRntbdChannels` boolean flag indicates whether the SDK should establish Rntbd connections to the
// backend replica nodes. For the `CosmosClient.CreateAndInitializeAsync()` flow, the flag should be passed as
// `true` so that the Rntbd connections to the backend replicas could be established deterministically. For any
// other flow, the flag should be passed as `false`.
if (this.openConnectionsHandler != null && shouldOpenRntbdChannels)
{
openConnectionTasks
.Add(this.openConnectionsHandler
.TryOpenRntbdChannelsAsync(
addresses: addressInfo.Item2.Get(Protocol.Tcp)?.ReplicaTransportAddressUris));
}
}

if (openConnectionTasks.Any())
{
await Task.WhenAll(openConnectionTasks);
}
}
}
catch (Exception ex)
{
DefaultTrace.TraceWarning("Failed to open connections warm-up cache for the server addresses: {0} with exception: {1}. '{2}'",
collectionRid,
ex,
System.Diagnostics.Trace.CorrelationManager.ActivityId);
}
}

private static void SetTransportAddressUrisToUnhealthy(
PartitionAddressInformation stalePartitionAddressInformation,
Lazy<HashSet<TransportAddressUri>> failedEndpoints)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ internal sealed class GlobalAddressResolver : IAddressResolverExtension, IDispos
private readonly CosmosHttpClient httpClient;
private readonly ConcurrentDictionary<Uri, EndpointCache> addressCacheByEndpoint;
private readonly bool enableTcpConnectionEndpointRediscovery;
private readonly bool replicaAddressValidationEnabled;
private IOpenConnectionsHandler openConnectionsHandler;

public GlobalAddressResolver(
Expand Down Expand Up @@ -66,6 +67,7 @@ public GlobalAddressResolver(
? GlobalAddressResolver.MaxBackupReadRegions : 0;

this.enableTcpConnectionEndpointRediscovery = connectionPolicy.EnableTcpConnectionEndpointRediscovery;
this.replicaAddressValidationEnabled = connectionPolicy.EnableReplicaValidation;

this.maxEndpoints = maxBackupReadEndpoints + 2; // for write and alternate write endpoint (during failover)

Expand Down Expand Up @@ -281,7 +283,8 @@ private EndpointCache GetOrAddEndpoint(Uri endpoint)
this.serviceConfigReader,
this.httpClient,
this.openConnectionsHandler,
enableTcpConnectionEndpointRediscovery: this.enableTcpConnectionEndpointRediscovery);
enableTcpConnectionEndpointRediscovery: this.enableTcpConnectionEndpointRediscovery,
replicaAddressValidationEnabled: this.replicaAddressValidationEnabled);

string location = this.endpointManager.GetLocation(endpoint);
AddressResolver addressResolver = new AddressResolver(null, new NullRequestSigner(), location);
Expand Down
5 changes: 3 additions & 2 deletions Microsoft.Azure.Cosmos/src/direct/ConsistencyReader.cs
Original file line number Diff line number Diff line change
Expand Up @@ -144,12 +144,13 @@ public ConsistencyReader(
ISessionContainer sessionContainer,
TransportClient transportClient,
IServiceConfigurationReader serviceConfigReader,
IAuthorizationTokenProvider authorizationTokenProvider)
IAuthorizationTokenProvider authorizationTokenProvider,
bool enableReplicaValidation)
{
this.addressSelector = addressSelector;
this.serviceConfigReader = serviceConfigReader;
this.authorizationTokenProvider = authorizationTokenProvider;
this.storeReader = new StoreReader(transportClient, addressSelector, new AddressEnumerator(), sessionContainer);
this.storeReader = new StoreReader(transportClient, addressSelector, new AddressEnumerator(), sessionContainer, enableReplicaValidation);
this.quorumReader = new QuorumReader(transportClient, addressSelector, this.storeReader, serviceConfigReader, authorizationTokenProvider);
}

Expand Down
6 changes: 4 additions & 2 deletions Microsoft.Azure.Cosmos/src/direct/ConsistencyWriter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,8 @@ public ConsistencyWriter(
TransportClient transportClient,
IServiceConfigurationReader serviceConfigReader,
IAuthorizationTokenProvider authorizationTokenProvider,
bool useMultipleWriteLocations)
bool useMultipleWriteLocations,
bool enableReplicaValidation)
{
this.transportClient = transportClient;
this.addressSelector = addressSelector;
Expand All @@ -71,7 +72,8 @@ public ConsistencyWriter(
transportClient,
addressSelector,
new AddressEnumerator(),
sessionContainer: null); //we need store reader only for global strong, no session is needed*/
sessionContainer: null,
enableReplicaValidation); //we need store reader only for global strong, no session is needed*/
}

// Test hook
Expand Down
1 change: 0 additions & 1 deletion Microsoft.Azure.Cosmos/src/direct/Constants.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2134,7 +2134,6 @@ public static class EnvironmentVariables
{
public const string SocketOptionTcpKeepAliveIntervalName = "AZURE_COSMOS_TCP_KEEPALIVE_INTERVAL_SECONDS";
public const string SocketOptionTcpKeepAliveTimeName = "AZURE_COSMOS_TCP_KEEPALIVE_TIME_SECONDS";
public const string ReplicaConnectivityValidationEnabled = "AZURE_COSMOS_REPLICA_VALIDATION_ENABLED";
public const string AggressiveTimeoutDetectionEnabled = "AZURE_COSMOS_AGGRESSIVE_TIMEOUT_DETECTION_ENABLED";
public const string TimeoutDetectionTimeLimit = "AZURE_COSMOS_TIMEOUT_DETECTION_TIME_LIMIT_IN_SECONDS";
public const string TimeoutDetectionOnWriteThreshold = "AZURE_COSMOS_TIMEOUT_DETECTION_ON_WRITE_THRESHOLD";
Expand Down
3 changes: 2 additions & 1 deletion Microsoft.Azure.Cosmos/src/direct/IStoreClientFactory.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ StoreClient CreateStoreClient(
bool enableReadRequestsFallback = false,
bool useFallbackClient = true,
bool useMultipleWriteLocations = false,
bool detectClientConnectivityIssues = false);
bool detectClientConnectivityIssues = false,
bool enableReplicaValidation = false);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ public ReplicatedResourceClient(
bool useMultipleWriteLocations,
bool detectClientConnectivityIssues,
bool disableRetryWithRetryPolicy,
bool enableReplicaValidation,
RetryWithConfiguration retryWithConfiguration = null)
{
this.addressResolver = addressResolver;
Expand All @@ -86,14 +87,16 @@ public ReplicatedResourceClient(
sessionContainer,
transportClient,
serviceConfigReader,
authorizationTokenProvider);
authorizationTokenProvider,
enableReplicaValidation);
this.consistencyWriter = new ConsistencyWriter(
this.addressSelector,
sessionContainer,
transportClient,
serviceConfigReader,
authorizationTokenProvider,
useMultipleWriteLocations);
useMultipleWriteLocations,
enableReplicaValidation);
this.enableReadRequestsFallback = enableReadRequestsFallback;
this.useMultipleWriteLocations = useMultipleWriteLocations;
this.detectClientConnectivityIssues = detectClientConnectivityIssues;
Expand Down
Loading