diff --git a/Microsoft.Azure.Cosmos/src/Routing/GlobalEndpointManager.cs b/Microsoft.Azure.Cosmos/src/Routing/GlobalEndpointManager.cs
index 7b66310a34..6cc829cd3d 100644
--- a/Microsoft.Azure.Cosmos/src/Routing/GlobalEndpointManager.cs
+++ b/Microsoft.Azure.Cosmos/src/Routing/GlobalEndpointManager.cs
@@ -1 +1,740 @@
-//------------------------------------------------------------
// Copyright (c) Microsoft Corporation. All rights reserved.
//------------------------------------------------------------
#nullable enable
namespace Microsoft.Azure.Cosmos.Routing
{
using System;
using System.Collections.Generic;
using System.Collections.ObjectModel;
using System.Collections.Specialized;
using System.Linq;
using System.Net;
using System.Runtime.ExceptionServices;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Azure.Cosmos.Common;
using Microsoft.Azure.Cosmos.Core.Trace;
using Microsoft.Azure.Documents;
///
/// AddressCache implementation for client SDK. Supports cross region address routing based on
/// availability and preference list.
///
/// Marking it as non-sealed in order to unit test it using Moq framework
internal class GlobalEndpointManager : IGlobalEndpointManager
{
private const int DefaultBackgroundRefreshLocationTimeIntervalInMS = 5 * 60 * 1000;
private const string BackgroundRefreshLocationTimeIntervalInMS = "BackgroundRefreshLocationTimeIntervalInMS";
private const string MinimumIntervalForNonForceRefreshLocationInMS = "MinimumIntervalForNonForceRefreshLocationInMS";
private readonly CancellationTokenSource cancellationTokenSource = new CancellationTokenSource();
private readonly LocationCache locationCache;
private readonly Uri defaultEndpoint;
private readonly ConnectionPolicy connectionPolicy;
private readonly IDocumentClientInternal owner;
private readonly AsyncCache databaseAccountCache = new AsyncCache();
private readonly TimeSpan MinTimeBetweenAccountRefresh = TimeSpan.FromSeconds(15);
private readonly int backgroundRefreshLocationTimeIntervalInMS = GlobalEndpointManager.DefaultBackgroundRefreshLocationTimeIntervalInMS;
private readonly object backgroundAccountRefreshLock = new object();
private readonly object isAccountRefreshInProgressLock = new object();
private readonly ReaderWriterLockSlim locationCacheDatabaseAccountReadWriteLock = new ReaderWriterLockSlim();
private bool isAccountRefreshInProgress = false;
private bool isBackgroundAccountRefreshActive = false;
private DateTime LastBackgroundRefreshUtc = DateTime.MinValue;
public GlobalEndpointManager(IDocumentClientInternal owner, ConnectionPolicy connectionPolicy)
{
this.locationCache = new LocationCache(
new ReadOnlyCollection(connectionPolicy.PreferredLocations),
owner.ServiceEndpoint,
connectionPolicy.EnableEndpointDiscovery,
connectionPolicy.MaxConnectionLimit,
connectionPolicy.UseMultipleWriteLocations);
this.owner = owner;
this.defaultEndpoint = owner.ServiceEndpoint;
this.connectionPolicy = connectionPolicy;
this.connectionPolicy.PreferenceChanged += this.OnPreferenceChanged;
#if !(NETSTANDARD15 || NETSTANDARD16)
#if NETSTANDARD20
// GetEntryAssembly returns null when loaded from native netstandard2.0
if (System.Reflection.Assembly.GetEntryAssembly() != null)
{
#endif
string backgroundRefreshLocationTimeIntervalInMSConfig = System.Configuration.ConfigurationManager.AppSettings[GlobalEndpointManager.BackgroundRefreshLocationTimeIntervalInMS];
if (!string.IsNullOrEmpty(backgroundRefreshLocationTimeIntervalInMSConfig))
{
if (!int.TryParse(backgroundRefreshLocationTimeIntervalInMSConfig, out this.backgroundRefreshLocationTimeIntervalInMS))
{
this.backgroundRefreshLocationTimeIntervalInMS = GlobalEndpointManager.DefaultBackgroundRefreshLocationTimeIntervalInMS;
}
}
#if NETSTANDARD20
}
#endif
#endif
string minimumIntervalForNonForceRefreshLocationInMSConfig = Environment.GetEnvironmentVariable(GlobalEndpointManager.MinimumIntervalForNonForceRefreshLocationInMS);
if (!string.IsNullOrEmpty(minimumIntervalForNonForceRefreshLocationInMSConfig))
{
if (int.TryParse(minimumIntervalForNonForceRefreshLocationInMSConfig, out int minimumIntervalForNonForceRefreshLocationInMS))
{
this.MinTimeBetweenAccountRefresh = TimeSpan.FromMilliseconds(minimumIntervalForNonForceRefreshLocationInMS);
}
else
{
DefaultTrace.TraceError($"GlobalEndpointManager: Failed to parse {GlobalEndpointManager.MinimumIntervalForNonForceRefreshLocationInMS}; Value:{minimumIntervalForNonForceRefreshLocationInMSConfig}");
}
}
}
public ReadOnlyCollection ReadEndpoints => this.locationCache.ReadEndpoints;
public ReadOnlyCollection AccountReadEndpoints => this.locationCache.AccountReadEndpoints;
public ReadOnlyCollection WriteEndpoints => this.locationCache.WriteEndpoints;
public int PreferredLocationCount
{
get
{
Collection effectivePreferredLocations = this.GetEffectivePreferredLocations();
return effectivePreferredLocations.Count;
}
}
public bool IsMultimasterMetadataWriteRequest(DocumentServiceRequest request)
{
return this.locationCache.IsMultimasterMetadataWriteRequest(request);
}
public Uri GetHubUri()
{
return this.locationCache.GetHubUri();
}
///
/// This will get the account information.
/// It will try the global endpoint first.
/// If no response in 5 seconds it will create 2 additional tasks
/// The 2 additional tasks will go through all the preferred regions in parallel
/// It will return the first success and stop the parallel tasks.
///
public static async Task GetDatabaseAccountFromAnyLocationsAsync(
Uri defaultEndpoint,
IList? locations,
IList? accountInitializationCustomEndpoints,
Func> getDatabaseAccountFn,
CancellationToken cancellationToken,
ReaderWriterLockSlim accountPropertiesReaderWriterLock)
{
using (GetAccountPropertiesHelper threadSafeGetAccountHelper = new GetAccountPropertiesHelper(
defaultEndpoint,
locations,
accountInitializationCustomEndpoints,
getDatabaseAccountFn,
cancellationToken))
{
return await threadSafeGetAccountHelper.GetAccountPropertiesAsync(accountPropertiesReaderWriterLock);
}
}
///
/// This is a helper class to
///
private class GetAccountPropertiesHelper : IDisposable
{
private readonly CancellationTokenSource CancellationTokenSource;
private readonly Uri DefaultEndpoint;
private readonly bool LimitToGlobalEndpointOnly;
private readonly IEnumerator ServiceEndpointEnumerator;
private readonly Func> GetDatabaseAccountFn;
private readonly List TransientExceptions = new List();
private AccountProperties? AccountProperties = null;
private Exception? NonRetriableException = null;
private int disposeCounter = 0;
public GetAccountPropertiesHelper(
Uri defaultEndpoint,
IList? locations,
IList? accountInitializationCustomEndpoints,
Func> getDatabaseAccountFn,
CancellationToken cancellationToken)
{
this.DefaultEndpoint = defaultEndpoint;
this.LimitToGlobalEndpointOnly = (locations == null || locations.Count == 0) && (accountInitializationCustomEndpoints == null || accountInitializationCustomEndpoints.Count == 0);
this.GetDatabaseAccountFn = getDatabaseAccountFn;
this.CancellationTokenSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
this.ServiceEndpointEnumerator = GetAccountPropertiesHelper
.GetServiceEndpoints(
defaultEndpoint,
locations,
accountInitializationCustomEndpoints)
.GetEnumerator();
}
public async Task GetAccountPropertiesAsync(ReaderWriterLockSlim readerWriterLock)
{
// If there are no preferred regions or private endpoints, then just wait for the global endpoint results
if (this.LimitToGlobalEndpointOnly)
{
return await this.GetOnlyGlobalEndpointAsync(readerWriterLock);
}
Task globalEndpointTask = this.GetAndUpdateAccountPropertiesAsync(this.DefaultEndpoint, readerWriterLock);
// Start a timer to start secondary requests in parallel.
Task timerTask = Task.Delay(TimeSpan.FromSeconds(5));
await Task.WhenAny(globalEndpointTask, timerTask);
if (this.AccountProperties != null)
{
return this.AccountProperties;
}
if (this.NonRetriableException != null)
{
ExceptionDispatchInfo.Capture(this.NonRetriableException).Throw();
}
// Start 2 additional tasks to try to get the account information
// from the preferred region list since global account has not succeed yet.
HashSet tasksToWaitOn = new HashSet
{
globalEndpointTask,
this.TryGetAccountPropertiesFromAllLocationsAsync(readerWriterLock),
this.TryGetAccountPropertiesFromAllLocationsAsync(readerWriterLock)
};
while (tasksToWaitOn.Any())
{
Task completedTask = await Task.WhenAny(tasksToWaitOn);
if (this.AccountProperties != null)
{
return this.AccountProperties;
}
if (this.NonRetriableException != null)
{
ExceptionDispatchInfo.Capture(this.NonRetriableException).Throw();
}
tasksToWaitOn.Remove(completedTask);
}
if (this.TransientExceptions.Count == 0)
{
throw new ArgumentException("Account properties and NonRetriableException are null and there are no TransientExceptions.");
}
if (this.TransientExceptions.Count == 1)
{
ExceptionDispatchInfo.Capture(this.TransientExceptions[0]).Throw();
}
throw new AggregateException(this.TransientExceptions);
}
private async Task GetOnlyGlobalEndpointAsync(ReaderWriterLockSlim readerWriterLock)
{
if (!this.LimitToGlobalEndpointOnly)
{
throw new ArgumentException("GetOnlyGlobalEndpointAsync should only be called if there are no other private endpoints or regions");
}
await this.GetAndUpdateAccountPropertiesAsync(this.DefaultEndpoint, readerWriterLock);
if (this.AccountProperties != null)
{
return this.AccountProperties;
}
if (this.NonRetriableException != null)
{
throw this.NonRetriableException;
}
if (this.TransientExceptions.Count == 0)
{
throw new ArgumentException("Account properties and NonRetriableException are null and there are no TransientExceptions.");
}
if (this.TransientExceptions.Count == 1)
{
throw this.TransientExceptions[0];
}
throw new AggregateException(this.TransientExceptions);
}
///
/// This is done in a thread safe way to allow multiple tasks to iterate over the list of service endpoints.
///
private async Task TryGetAccountPropertiesFromAllLocationsAsync(ReaderWriterLockSlim readerWriterLock)
{
while (this.TryMoveNextServiceEndpointhreadSafe(
out Uri? serviceEndpoint))
{
if (serviceEndpoint == null)
{
DefaultTrace.TraceCritical("GlobalEndpointManager: serviceEndpoint is null for TryMoveNextServiceEndpointhreadSafe.");
return;
}
await this.GetAndUpdateAccountPropertiesAsync(
endpoint: serviceEndpoint,
readerWriterLock);
}
}
///
/// We first iterate through all the private endpoints to fetch the account information.
/// If all the attempt fails to fetch the metadata from the private endpoints, we will
/// attempt to retrieve the account information from the regional endpoints constructed
/// using the preferred regions list.
///
/// An instance of that will contain the service endpoint.
/// A boolean flag indicating if the was advanced in a thread safe manner.
private bool TryMoveNextServiceEndpointhreadSafe(
out Uri? serviceEndpoint)
{
if (this.CancellationTokenSource.IsCancellationRequested)
{
serviceEndpoint = null;
return false;
}
lock (this.ServiceEndpointEnumerator)
{
if (!this.ServiceEndpointEnumerator.MoveNext())
{
serviceEndpoint = null;
return false;
}
serviceEndpoint = this.ServiceEndpointEnumerator.Current;
return true;
}
}
private async Task GetAndUpdateAccountPropertiesAsync(Uri endpoint, ReaderWriterLockSlim readerWriterLock)
{
try
{
if (this.CancellationTokenSource.IsCancellationRequested)
{
lock (this.TransientExceptions)
{
this.TransientExceptions.Add(new OperationCanceledException($"GlobalEndpointManager: Get account information canceled for URI: {endpoint}"));
}
return;
}
AccountProperties databaseAccount = await this.GetDatabaseAccountFn(endpoint);
if (databaseAccount != null)
{
readerWriterLock.EnterWriteLock();
try
{
this.AccountProperties = databaseAccount;
this.CancellationTokenSource.Cancel();
}
finally
{
readerWriterLock.ExitWriteLock();
}
}
}
catch (Exception e)
{
DefaultTrace.TraceInformation("GlobalEndpointManager: Fail to reach gateway endpoint {0}, {1}", endpoint, e.ToString());
if (GetAccountPropertiesHelper.IsNonRetriableException(e))
{
DefaultTrace.TraceInformation("GlobalEndpointManager: Exception is not retriable");
this.CancellationTokenSource.Cancel();
this.NonRetriableException = e;
}
else
{
lock (this.TransientExceptions)
{
this.TransientExceptions.Add(e);
}
}
}
}
private static bool IsNonRetriableException(Exception exception)
{
if (exception is DocumentClientException dce &&
(dce.StatusCode == HttpStatusCode.Unauthorized || dce.StatusCode == HttpStatusCode.Forbidden))
{
return true;
}
return false;
}
///
/// Returns an instance of containing the private and regional service endpoints to iterate over.
///
/// An instance of containing the default global endpoint.
/// An instance of containing the preferred serviceEndpoint names.
/// An instance of containing the custom private endpoints.
/// An instance of containing the service endpoints.
private static IEnumerable GetServiceEndpoints(
Uri defaultEndpoint,
IList? locations,
IList? accountInitializationCustomEndpoints)
{
// We first iterate over all the private endpoints and yield return them.
if (accountInitializationCustomEndpoints?.Count > 0)
{
foreach (Uri customEndpoint in accountInitializationCustomEndpoints)
{
// Yield return all of the custom private endpoints first.
yield return customEndpoint;
}
}
// The next step is to iterate over the preferred locations, construct and yield return the regional endpoints one by one.
// The regional endpoints will be constructed by appending the preferred region name as a suffix to the default global endpoint.
if (locations?.Count > 0)
{
foreach (string location in locations)
{
// Yield return all of the regional endpoints once the private custom endpoints are visited.
yield return LocationHelper.GetLocationEndpoint(defaultEndpoint, location);
}
}
}
public void Dispose()
{
if (Interlocked.Increment(ref this.disposeCounter) == 1)
{
this.CancellationTokenSource?.Cancel();
this.CancellationTokenSource?.Dispose();
}
}
}
public virtual Uri ResolveServiceEndpoint(DocumentServiceRequest request)
{
return this.locationCache.ResolveServiceEndpoint(request);
}
///
/// Gets the default endpoint of the account
///
/// the default endpoint.
public Uri GetDefaultEndpoint()
{
return this.locationCache.GetDefaultEndpoint();
}
///
/// Gets the mapping of available write region names to the respective endpoints
///
public ReadOnlyDictionary GetAvailableWriteEndpointsByLocation()
{
return this.locationCache.GetAvailableWriteEndpointsByLocation();
}
///
/// Gets the mapping of available read region names to the respective endpoints
///
public ReadOnlyDictionary GetAvailableReadEndpointsByLocation()
{
return this.locationCache.GetAvailableReadEndpointsByLocation();
}
///
/// Returns serviceEndpoint corresponding to the endpoint
///
///
public string GetLocation(Uri endpoint)
{
return this.locationCache.GetLocation(endpoint);
}
public ReadOnlyCollection GetApplicableEndpoints(DocumentServiceRequest request, bool isReadRequest)
{
return this.locationCache.GetApplicableEndpoints(request, isReadRequest);
}
public ReadOnlyCollection GetApplicableRegions(IEnumerable excludeRegions, bool isReadRequest)
{
return this.locationCache.GetApplicableRegions(excludeRegions, isReadRequest);
}
public bool TryGetLocationForGatewayDiagnostics(Uri endpoint, out string regionName)
{
return this.locationCache.TryGetLocationForGatewayDiagnostics(endpoint, out regionName);
}
public virtual void MarkEndpointUnavailableForRead(Uri endpoint)
{
DefaultTrace.TraceInformation("GlobalEndpointManager: Marking endpoint {0} unavailable for read", endpoint);
this.locationCache.MarkEndpointUnavailableForRead(endpoint);
}
public virtual void MarkEndpointUnavailableForWrite(Uri endpoint)
{
DefaultTrace.TraceInformation("GlobalEndpointManager: Marking endpoint {0} unavailable for Write", endpoint);
this.locationCache.MarkEndpointUnavailableForWrite(endpoint);
}
public bool CanUseMultipleWriteLocations(DocumentServiceRequest request)
{
return this.locationCache.CanUseMultipleWriteLocations(request);
}
public void Dispose()
{
this.connectionPolicy.PreferenceChanged -= this.OnPreferenceChanged;
if (!this.cancellationTokenSource.IsCancellationRequested)
{
// This can cause task canceled exceptions if the user disposes of the object while awaiting an async call.
this.cancellationTokenSource.Cancel();
// The background timer task can hit a ObjectDisposedException but it's an async background task
// that is never awaited on so it will not be thrown back to the caller.
this.cancellationTokenSource.Dispose();
}
}
public virtual void InitializeAccountPropertiesAndStartBackgroundRefresh(AccountProperties databaseAccount)
{
if (this.cancellationTokenSource.IsCancellationRequested)
{
return;
}
this.locationCacheDatabaseAccountReadWriteLock.EnterWriteLock();
try
{
this.locationCache.OnDatabaseAccountRead(databaseAccount);
}
finally
{
this.locationCacheDatabaseAccountReadWriteLock.ExitWriteLock();
}
if (this.isBackgroundAccountRefreshActive)
{
return;
}
lock (this.backgroundAccountRefreshLock)
{
if (this.isBackgroundAccountRefreshActive)
{
return;
}
this.isBackgroundAccountRefreshActive = true;
}
try
{
this.StartLocationBackgroundRefreshLoop();
}
catch
{
this.isBackgroundAccountRefreshActive = false;
throw;
}
}
public virtual async Task RefreshLocationAsync(bool forceRefresh = false)
{
if (this.cancellationTokenSource.IsCancellationRequested)
{
return;
}
await this.RefreshDatabaseAccountInternalAsync(forceRefresh: forceRefresh);
}
#pragma warning disable VSTHRD100 // Avoid async void methods
private async void StartLocationBackgroundRefreshLoop()
#pragma warning restore VSTHRD100 // Avoid async void methods
{
if (this.cancellationTokenSource.IsCancellationRequested)
{
return;
}
DefaultTrace.TraceInformation("GlobalEndpointManager: StartLocationBackgroundRefreshWithTimer() refreshing locations");
if (!this.locationCache.ShouldRefreshEndpoints(out bool canRefreshInBackground))
{
if (!canRefreshInBackground)
{
DefaultTrace.TraceInformation("GlobalEndpointManager: StartLocationBackgroundRefreshWithTimer() stropped.");
lock (this.backgroundAccountRefreshLock)
{
this.isBackgroundAccountRefreshActive = false;
}
return;
}
}
try
{
await Task.Delay(this.backgroundRefreshLocationTimeIntervalInMS, this.cancellationTokenSource.Token);
DefaultTrace.TraceInformation("GlobalEndpointManager: StartLocationBackgroundRefreshWithTimer() - Invoking refresh");
if (this.cancellationTokenSource.IsCancellationRequested)
{
return;
}
await this.RefreshDatabaseAccountInternalAsync(forceRefresh: false);
}
catch (Exception ex)
{
if (this.cancellationTokenSource.IsCancellationRequested && (ex is OperationCanceledException || ex is ObjectDisposedException))
{
return;
}
DefaultTrace.TraceCritical("GlobalEndpointManager: StartLocationBackgroundRefreshWithTimer() - Unable to refresh database account from any serviceEndpoint. Exception: {0}", ex.ToString());
}
// Call itself to create a loop to continuously do background refresh every 5 minutes
this.StartLocationBackgroundRefreshLoop();
}
private Task GetDatabaseAccountAsync(Uri serviceEndpoint)
{
return this.owner.GetDatabaseAccountInternalAsync(serviceEndpoint, this.cancellationTokenSource.Token);
}
private void OnPreferenceChanged(object sender, NotifyCollectionChangedEventArgs e)
{
this.locationCache.OnLocationPreferenceChanged(new ReadOnlyCollection(
this.connectionPolicy.PreferredLocations));
}
///
/// Thread safe refresh account and serviceEndpoint info.
///
private async Task RefreshDatabaseAccountInternalAsync(bool forceRefresh)
{
if (this.cancellationTokenSource.IsCancellationRequested)
{
return;
}
if (this.SkipRefresh(forceRefresh))
{
return;
}
lock (this.isAccountRefreshInProgressLock)
{
// Check again if should refresh after obtaining the lock
if (this.SkipRefresh(forceRefresh))
{
return;
}
// If the refresh is already in progress just return. No reason to do another refresh.
if (this.isAccountRefreshInProgress)
{
return;
}
this.isAccountRefreshInProgress = true;
}
try
{
this.LastBackgroundRefreshUtc = DateTime.UtcNow;
AccountProperties accountProperties = await this.GetDatabaseAccountAsync(true);
this.locationCacheDatabaseAccountReadWriteLock.EnterWriteLock();
try
{
this.locationCache.OnDatabaseAccountRead(accountProperties);
}
finally
{
this.locationCacheDatabaseAccountReadWriteLock.ExitWriteLock();
}
}
catch (Exception ex)
{
DefaultTrace.TraceWarning("Failed to refresh database account with exception: {0}. Activity Id: '{1}'",
ex,
System.Diagnostics.Trace.CorrelationManager.ActivityId);
}
finally
{
lock (this.isAccountRefreshInProgressLock)
{
this.isAccountRefreshInProgress = false;
}
}
}
internal async Task GetDatabaseAccountAsync(bool forceRefresh = false)
{
#nullable disable // Needed because AsyncCache does not have nullable enabled
return await this.databaseAccountCache.GetAsync(
key: string.Empty,
obsoleteValue: null,
singleValueInitFunc: () => GlobalEndpointManager.GetDatabaseAccountFromAnyLocationsAsync(
this.defaultEndpoint,
this.GetEffectivePreferredLocations(),
this.connectionPolicy.AccountInitializationCustomEndpoints,
this.GetDatabaseAccountAsync,
this.cancellationTokenSource.Token,
this.locationCacheDatabaseAccountReadWriteLock),
cancellationToken: this.cancellationTokenSource.Token,
forceRefresh: forceRefresh);
#nullable enable
}
///
/// If the account is currently refreshing or the last refresh occurred less than the minimum time
/// just return. This is used to avoid refreshing to often and preventing to much pressure on the gateway.
///
private bool SkipRefresh(bool forceRefresh)
{
TimeSpan timeSinceLastRefresh = DateTime.UtcNow - this.LastBackgroundRefreshUtc;
return (this.isAccountRefreshInProgress || this.MinTimeBetweenAccountRefresh > timeSinceLastRefresh)
&& !forceRefresh;
}
public Collection GetEffectivePreferredLocations()
{
if (this.connectionPolicy.PreferredLocations != null && this.connectionPolicy.PreferredLocations.Count > 0)
{
return this.connectionPolicy.PreferredLocations;
}
this.locationCacheDatabaseAccountReadWriteLock.EnterReadLock();
try
{
return new Collection(this.locationCache.EffectivePreferredLocations);
}
finally
{
this.locationCacheDatabaseAccountReadWriteLock.ExitReadLock();
}
}
}
}
\ No newline at end of file
+//------------------------------------------------------------
+// Copyright (c) Microsoft Corporation. All rights reserved.
+//------------------------------------------------------------
+#nullable enable
+namespace Microsoft.Azure.Cosmos.Routing
+{
+ using System;
+ using System.Collections.Generic;
+ using System.Collections.ObjectModel;
+ using System.Collections.Specialized;
+ using System.Linq;
+ using System.Net;
+ using System.Runtime.ExceptionServices;
+ using System.Threading;
+ using System.Threading.Tasks;
+ using Microsoft.Azure.Cosmos.Common;
+ using Microsoft.Azure.Cosmos.Core.Trace;
+ using Microsoft.Azure.Documents;
+
+ ///
+ /// AddressCache implementation for client SDK. Supports cross region address routing based on
+ /// availability and preference list.
+ ///
+ /// Marking it as non-sealed in order to unit test it using Moq framework
+ internal class GlobalEndpointManager : IGlobalEndpointManager
+ {
+ private const int DefaultBackgroundRefreshLocationTimeIntervalInMS = 5 * 60 * 1000;
+
+ private const string BackgroundRefreshLocationTimeIntervalInMS = "BackgroundRefreshLocationTimeIntervalInMS";
+ private const string MinimumIntervalForNonForceRefreshLocationInMS = "MinimumIntervalForNonForceRefreshLocationInMS";
+ private readonly CancellationTokenSource cancellationTokenSource = new CancellationTokenSource();
+ private readonly LocationCache locationCache;
+ private readonly Uri defaultEndpoint;
+ private readonly ConnectionPolicy connectionPolicy;
+ private readonly IDocumentClientInternal owner;
+ private readonly AsyncCache databaseAccountCache = new AsyncCache();
+ private readonly TimeSpan MinTimeBetweenAccountRefresh = TimeSpan.FromSeconds(15);
+ private readonly int backgroundRefreshLocationTimeIntervalInMS = GlobalEndpointManager.DefaultBackgroundRefreshLocationTimeIntervalInMS;
+ private readonly object backgroundAccountRefreshLock = new object();
+ private readonly object isAccountRefreshInProgressLock = new object();
+ private readonly ReaderWriterLockSlim locationCacheDatabaseAccountReadWriteLock = new ReaderWriterLockSlim();
+ private bool isAccountRefreshInProgress = false;
+ private bool isBackgroundAccountRefreshActive = false;
+ private DateTime LastBackgroundRefreshUtc = DateTime.MinValue;
+
+ public GlobalEndpointManager(IDocumentClientInternal owner, ConnectionPolicy connectionPolicy)
+ {
+ this.locationCache = new LocationCache(
+ new ReadOnlyCollection(connectionPolicy.PreferredLocations),
+ owner.ServiceEndpoint,
+ connectionPolicy.EnableEndpointDiscovery,
+ connectionPolicy.MaxConnectionLimit,
+ connectionPolicy.UseMultipleWriteLocations);
+
+ this.owner = owner;
+ this.defaultEndpoint = owner.ServiceEndpoint;
+ this.connectionPolicy = connectionPolicy;
+
+ this.connectionPolicy.PreferenceChanged += this.OnPreferenceChanged;
+
+#if !(NETSTANDARD15 || NETSTANDARD16)
+#if NETSTANDARD20
+ // GetEntryAssembly returns null when loaded from native netstandard2.0
+ if (System.Reflection.Assembly.GetEntryAssembly() != null)
+ {
+#endif
+ string backgroundRefreshLocationTimeIntervalInMSConfig = System.Configuration.ConfigurationManager.AppSettings[GlobalEndpointManager.BackgroundRefreshLocationTimeIntervalInMS];
+ if (!string.IsNullOrEmpty(backgroundRefreshLocationTimeIntervalInMSConfig))
+ {
+ if (!int.TryParse(backgroundRefreshLocationTimeIntervalInMSConfig, out this.backgroundRefreshLocationTimeIntervalInMS))
+ {
+ this.backgroundRefreshLocationTimeIntervalInMS = GlobalEndpointManager.DefaultBackgroundRefreshLocationTimeIntervalInMS;
+ }
+ }
+#if NETSTANDARD20
+ }
+#endif
+#endif
+ string minimumIntervalForNonForceRefreshLocationInMSConfig = Environment.GetEnvironmentVariable(GlobalEndpointManager.MinimumIntervalForNonForceRefreshLocationInMS);
+ if (!string.IsNullOrEmpty(minimumIntervalForNonForceRefreshLocationInMSConfig))
+ {
+ if (int.TryParse(minimumIntervalForNonForceRefreshLocationInMSConfig, out int minimumIntervalForNonForceRefreshLocationInMS))
+ {
+ this.MinTimeBetweenAccountRefresh = TimeSpan.FromMilliseconds(minimumIntervalForNonForceRefreshLocationInMS);
+ }
+ else
+ {
+ DefaultTrace.TraceError($"GlobalEndpointManager: Failed to parse {GlobalEndpointManager.MinimumIntervalForNonForceRefreshLocationInMS}; Value:{minimumIntervalForNonForceRefreshLocationInMSConfig}");
+ }
+ }
+ }
+
+ public ReadOnlyCollection ReadEndpoints => this.locationCache.ReadEndpoints;
+
+ public ReadOnlyCollection AccountReadEndpoints => this.locationCache.AccountReadEndpoints;
+
+ public ReadOnlyCollection WriteEndpoints => this.locationCache.WriteEndpoints;
+
+ public int PreferredLocationCount
+ {
+ get
+ {
+ Collection effectivePreferredLocations = this.GetEffectivePreferredLocations();
+
+ return effectivePreferredLocations.Count;
+ }
+ }
+
+ public bool IsMultimasterMetadataWriteRequest(DocumentServiceRequest request)
+ {
+ return this.locationCache.IsMultimasterMetadataWriteRequest(request);
+ }
+
+ public Uri GetHubUri()
+ {
+ return this.locationCache.GetHubUri();
+ }
+
+ ///
+ /// This will get the account information.
+ /// It will try the global endpoint first.
+ /// If no response in 5 seconds it will create 2 additional tasks
+ /// The 2 additional tasks will go through all the preferred regions in parallel
+ /// It will return the first success and stop the parallel tasks.
+ ///
+ public static async Task GetDatabaseAccountFromAnyLocationsAsync(
+ Uri defaultEndpoint,
+ IList? locations,
+ IList? accountInitializationCustomEndpoints,
+ Func> getDatabaseAccountFn,
+ CancellationToken cancellationToken,
+ ReaderWriterLockSlim accountPropertiesReaderWriterLock)
+ {
+ using (GetAccountPropertiesHelper threadSafeGetAccountHelper = new GetAccountPropertiesHelper(
+ defaultEndpoint,
+ locations,
+ accountInitializationCustomEndpoints,
+ getDatabaseAccountFn,
+ cancellationToken))
+ {
+ return await threadSafeGetAccountHelper.GetAccountPropertiesAsync(accountPropertiesReaderWriterLock);
+ }
+ }
+
+ ///
+ /// This is a helper class to
+ ///
+ private class GetAccountPropertiesHelper : IDisposable
+ {
+ private readonly CancellationTokenSource CancellationTokenSource;
+ private readonly Uri DefaultEndpoint;
+ private readonly bool LimitToGlobalEndpointOnly;
+ private readonly IEnumerator ServiceEndpointEnumerator;
+ private readonly Func> GetDatabaseAccountFn;
+ private readonly List TransientExceptions = new List();
+ private AccountProperties? AccountProperties = null;
+ private Exception? NonRetriableException = null;
+ private int disposeCounter = 0;
+
+ public GetAccountPropertiesHelper(
+ Uri defaultEndpoint,
+ IList? locations,
+ IList? accountInitializationCustomEndpoints,
+ Func> getDatabaseAccountFn,
+ CancellationToken cancellationToken)
+ {
+ this.DefaultEndpoint = defaultEndpoint;
+ this.LimitToGlobalEndpointOnly = (locations == null || locations.Count == 0) && (accountInitializationCustomEndpoints == null || accountInitializationCustomEndpoints.Count == 0);
+ this.GetDatabaseAccountFn = getDatabaseAccountFn;
+ this.CancellationTokenSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
+ this.ServiceEndpointEnumerator = GetAccountPropertiesHelper
+ .GetServiceEndpoints(
+ defaultEndpoint,
+ locations,
+ accountInitializationCustomEndpoints)
+ .GetEnumerator();
+ }
+
+ public async Task GetAccountPropertiesAsync(ReaderWriterLockSlim readerWriterLock)
+ {
+ // If there are no preferred regions or private endpoints, then just wait for the global endpoint results
+ if (this.LimitToGlobalEndpointOnly)
+ {
+ return await this.GetOnlyGlobalEndpointAsync(readerWriterLock);
+ }
+
+ Task globalEndpointTask = this.GetAndUpdateAccountPropertiesAsync(this.DefaultEndpoint, readerWriterLock);
+
+ // Start a timer to start secondary requests in parallel.
+ Task timerTask = Task.Delay(TimeSpan.FromSeconds(5));
+ await Task.WhenAny(globalEndpointTask, timerTask);
+ if (this.AccountProperties != null)
+ {
+ return this.AccountProperties;
+ }
+
+ if (this.NonRetriableException != null)
+ {
+ ExceptionDispatchInfo.Capture(this.NonRetriableException).Throw();
+ }
+
+ // Start 2 additional tasks to try to get the account information
+ // from the preferred region list since global account has not succeed yet.
+ HashSet tasksToWaitOn = new HashSet
+ {
+ globalEndpointTask,
+ this.TryGetAccountPropertiesFromAllLocationsAsync(readerWriterLock),
+ this.TryGetAccountPropertiesFromAllLocationsAsync(readerWriterLock)
+ };
+
+ while (tasksToWaitOn.Any())
+ {
+ Task completedTask = await Task.WhenAny(tasksToWaitOn);
+ if (this.AccountProperties != null)
+ {
+ return this.AccountProperties;
+ }
+
+ if (this.NonRetriableException != null)
+ {
+ ExceptionDispatchInfo.Capture(this.NonRetriableException).Throw();
+ }
+
+ tasksToWaitOn.Remove(completedTask);
+ }
+
+ if (this.TransientExceptions.Count == 0)
+ {
+ throw new ArgumentException("Account properties and NonRetriableException are null and there are no TransientExceptions.");
+ }
+
+ if (this.TransientExceptions.Count == 1)
+ {
+ ExceptionDispatchInfo.Capture(this.TransientExceptions[0]).Throw();
+ }
+
+ throw new AggregateException(this.TransientExceptions);
+ }
+
+ private async Task GetOnlyGlobalEndpointAsync(ReaderWriterLockSlim readerWriterLock)
+ {
+ if (!this.LimitToGlobalEndpointOnly)
+ {
+ throw new ArgumentException("GetOnlyGlobalEndpointAsync should only be called if there are no other private endpoints or regions");
+ }
+
+ await this.GetAndUpdateAccountPropertiesAsync(this.DefaultEndpoint, readerWriterLock);
+
+ if (this.AccountProperties != null)
+ {
+ return this.AccountProperties;
+ }
+
+ if (this.NonRetriableException != null)
+ {
+ throw this.NonRetriableException;
+ }
+
+ if (this.TransientExceptions.Count == 0)
+ {
+ throw new ArgumentException("Account properties and NonRetriableException are null and there are no TransientExceptions.");
+ }
+
+ if (this.TransientExceptions.Count == 1)
+ {
+ throw this.TransientExceptions[0];
+ }
+
+ throw new AggregateException(this.TransientExceptions);
+ }
+
+ ///
+ /// This is done in a thread safe way to allow multiple tasks to iterate over the list of service endpoints.
+ ///
+ private async Task TryGetAccountPropertiesFromAllLocationsAsync(ReaderWriterLockSlim readerWriterLock)
+ {
+ while (this.TryMoveNextServiceEndpointhreadSafe(
+ out Uri? serviceEndpoint))
+ {
+ if (serviceEndpoint == null)
+ {
+ DefaultTrace.TraceCritical("GlobalEndpointManager: serviceEndpoint is null for TryMoveNextServiceEndpointhreadSafe.");
+ return;
+ }
+
+ await this.GetAndUpdateAccountPropertiesAsync(
+ endpoint: serviceEndpoint,
+ readerWriterLock);
+ }
+ }
+
+ ///
+ /// We first iterate through all the private endpoints to fetch the account information.
+ /// If all the attempt fails to fetch the metadata from the private endpoints, we will
+ /// attempt to retrieve the account information from the regional endpoints constructed
+ /// using the preferred regions list.
+ ///
+ /// An instance of that will contain the service endpoint.
+ /// A boolean flag indicating if the was advanced in a thread safe manner.
+ private bool TryMoveNextServiceEndpointhreadSafe(
+ out Uri? serviceEndpoint)
+ {
+ if (this.CancellationTokenSource.IsCancellationRequested)
+ {
+ serviceEndpoint = null;
+ return false;
+ }
+
+ lock (this.ServiceEndpointEnumerator)
+ {
+ if (!this.ServiceEndpointEnumerator.MoveNext())
+ {
+ serviceEndpoint = null;
+ return false;
+ }
+
+ serviceEndpoint = this.ServiceEndpointEnumerator.Current;
+ return true;
+ }
+ }
+
+ private async Task GetAndUpdateAccountPropertiesAsync(Uri endpoint, ReaderWriterLockSlim readerWriterLock)
+ {
+ try
+ {
+ if (this.CancellationTokenSource.IsCancellationRequested)
+ {
+ lock (this.TransientExceptions)
+ {
+ this.TransientExceptions.Add(new OperationCanceledException($"GlobalEndpointManager: Get account information canceled for URI: {endpoint}"));
+ }
+
+ return;
+ }
+
+ AccountProperties databaseAccount = await this.GetDatabaseAccountFn(endpoint);
+
+ if (databaseAccount != null)
+ {
+ readerWriterLock.EnterWriteLock();
+
+ try
+ {
+ this.AccountProperties = databaseAccount;
+ this.CancellationTokenSource.Cancel();
+ }
+ finally
+ {
+ readerWriterLock.ExitWriteLock();
+ }
+ }
+ }
+ catch (Exception e)
+ {
+ DefaultTrace.TraceInformation("GlobalEndpointManager: Fail to reach gateway endpoint {0}, {1}", endpoint, e.ToString());
+ if (GetAccountPropertiesHelper.IsNonRetriableException(e))
+ {
+ DefaultTrace.TraceInformation("GlobalEndpointManager: Exception is not retriable");
+ this.CancellationTokenSource.Cancel();
+ this.NonRetriableException = e;
+ }
+ else
+ {
+ lock (this.TransientExceptions)
+ {
+ this.TransientExceptions.Add(e);
+ }
+ }
+ }
+ }
+
+ private static bool IsNonRetriableException(Exception exception)
+ {
+ if (exception is DocumentClientException dce &&
+ (dce.StatusCode == HttpStatusCode.Unauthorized || dce.StatusCode == HttpStatusCode.Forbidden))
+ {
+ return true;
+ }
+
+ return false;
+ }
+
+ ///
+ /// Returns an instance of containing the private and regional service endpoints to iterate over.
+ ///
+ /// An instance of containing the default global endpoint.
+ /// An instance of containing the preferred serviceEndpoint names.
+ /// An instance of containing the custom private endpoints.
+ /// An instance of containing the service endpoints.
+ private static IEnumerable GetServiceEndpoints(
+ Uri defaultEndpoint,
+ IList? locations,
+ IList? accountInitializationCustomEndpoints)
+ {
+ // We first iterate over all the private endpoints and yield return them.
+ if (accountInitializationCustomEndpoints?.Count > 0)
+ {
+ foreach (Uri customEndpoint in accountInitializationCustomEndpoints)
+ {
+ // Yield return all of the custom private endpoints first.
+ yield return customEndpoint;
+ }
+ }
+
+ // The next step is to iterate over the preferred locations, construct and yield return the regional endpoints one by one.
+ // The regional endpoints will be constructed by appending the preferred region name as a suffix to the default global endpoint.
+ if (locations?.Count > 0)
+ {
+ foreach (string location in locations)
+ {
+ // Yield return all of the regional endpoints once the private custom endpoints are visited.
+ yield return LocationHelper.GetLocationEndpoint(defaultEndpoint, location);
+ }
+ }
+ }
+
+ public void Dispose()
+ {
+ if (Interlocked.Increment(ref this.disposeCounter) == 1)
+ {
+ this.CancellationTokenSource?.Cancel();
+ this.CancellationTokenSource?.Dispose();
+ }
+ }
+ }
+
+ public virtual Uri ResolveServiceEndpoint(DocumentServiceRequest request)
+ {
+ return this.locationCache.ResolveServiceEndpoint(request);
+ }
+
+ ///
+ /// Gets the default endpoint of the account
+ ///
+ /// the default endpoint.
+ public Uri GetDefaultEndpoint()
+ {
+ return this.locationCache.GetDefaultEndpoint();
+ }
+
+ ///
+ /// Gets the mapping of available write region names to the respective endpoints
+ ///
+ public ReadOnlyDictionary GetAvailableWriteEndpointsByLocation()
+ {
+ return this.locationCache.GetAvailableWriteEndpointsByLocation();
+ }
+
+ ///
+ /// Gets the mapping of available read region names to the respective endpoints
+ ///
+ public ReadOnlyDictionary GetAvailableReadEndpointsByLocation()
+ {
+ return this.locationCache.GetAvailableReadEndpointsByLocation();
+ }
+
+ ///
+ /// Returns serviceEndpoint corresponding to the endpoint
+ ///
+ ///
+ public string GetLocation(Uri endpoint)
+ {
+ return this.locationCache.GetLocation(endpoint);
+ }
+
+ public ReadOnlyCollection GetApplicableEndpoints(DocumentServiceRequest request, bool isReadRequest)
+ {
+ return this.locationCache.GetApplicableEndpoints(request, isReadRequest);
+ }
+
+ public ReadOnlyCollection GetApplicableRegions(IEnumerable excludeRegions, bool isReadRequest)
+ {
+ return this.locationCache.GetApplicableRegions(excludeRegions, isReadRequest);
+ }
+
+ public bool TryGetLocationForGatewayDiagnostics(Uri endpoint, out string regionName)
+ {
+ return this.locationCache.TryGetLocationForGatewayDiagnostics(endpoint, out regionName);
+ }
+
+ public virtual void MarkEndpointUnavailableForRead(Uri endpoint)
+ {
+ DefaultTrace.TraceInformation("GlobalEndpointManager: Marking endpoint {0} unavailable for read", endpoint);
+
+ this.locationCache.MarkEndpointUnavailableForRead(endpoint);
+ }
+
+ public virtual void MarkEndpointUnavailableForWrite(Uri endpoint)
+ {
+ DefaultTrace.TraceInformation("GlobalEndpointManager: Marking endpoint {0} unavailable for Write", endpoint);
+
+ this.locationCache.MarkEndpointUnavailableForWrite(endpoint);
+ }
+
+ public bool CanUseMultipleWriteLocations(DocumentServiceRequest request)
+ {
+ return this.locationCache.CanUseMultipleWriteLocations(request);
+ }
+
+ public void Dispose()
+ {
+ this.connectionPolicy.PreferenceChanged -= this.OnPreferenceChanged;
+ if (!this.cancellationTokenSource.IsCancellationRequested)
+ {
+ // This can cause task canceled exceptions if the user disposes of the object while awaiting an async call.
+ this.cancellationTokenSource.Cancel();
+ // The background timer task can hit a ObjectDisposedException but it's an async background task
+ // that is never awaited on so it will not be thrown back to the caller.
+ this.cancellationTokenSource.Dispose();
+ }
+ }
+
+ public virtual void InitializeAccountPropertiesAndStartBackgroundRefresh(AccountProperties databaseAccount)
+ {
+ if (this.cancellationTokenSource.IsCancellationRequested)
+ {
+ return;
+ }
+
+ this.locationCacheDatabaseAccountReadWriteLock.EnterWriteLock();
+ try
+ {
+ this.locationCache.OnDatabaseAccountRead(databaseAccount);
+ }
+ finally
+ {
+ this.locationCacheDatabaseAccountReadWriteLock.ExitWriteLock();
+ }
+
+ if (this.isBackgroundAccountRefreshActive)
+ {
+ return;
+ }
+
+ lock (this.backgroundAccountRefreshLock)
+ {
+ if (this.isBackgroundAccountRefreshActive)
+ {
+ return;
+ }
+
+ this.isBackgroundAccountRefreshActive = true;
+ }
+
+ try
+ {
+ this.StartLocationBackgroundRefreshLoop();
+ }
+ catch
+ {
+ this.isBackgroundAccountRefreshActive = false;
+ throw;
+ }
+ }
+
+ public virtual async Task RefreshLocationAsync(bool forceRefresh = false)
+ {
+ if (this.cancellationTokenSource.IsCancellationRequested)
+ {
+ return;
+ }
+
+ await this.RefreshDatabaseAccountInternalAsync(forceRefresh: forceRefresh);
+ }
+
+#pragma warning disable VSTHRD100 // Avoid async void methods
+ private async void StartLocationBackgroundRefreshLoop()
+#pragma warning restore VSTHRD100 // Avoid async void methods
+ {
+ if (this.cancellationTokenSource.IsCancellationRequested)
+ {
+ return;
+ }
+
+ DefaultTrace.TraceInformation("GlobalEndpointManager: StartLocationBackgroundRefreshWithTimer() refreshing locations");
+
+ if (!this.locationCache.ShouldRefreshEndpoints(out bool canRefreshInBackground))
+ {
+ if (!canRefreshInBackground)
+ {
+ DefaultTrace.TraceInformation("GlobalEndpointManager: StartLocationBackgroundRefreshWithTimer() stropped.");
+ lock (this.backgroundAccountRefreshLock)
+ {
+ this.isBackgroundAccountRefreshActive = false;
+ }
+
+ return;
+ }
+ }
+
+ try
+ {
+ await Task.Delay(this.backgroundRefreshLocationTimeIntervalInMS, this.cancellationTokenSource.Token);
+
+ DefaultTrace.TraceInformation("GlobalEndpointManager: StartLocationBackgroundRefreshWithTimer() - Invoking refresh");
+
+ if (this.cancellationTokenSource.IsCancellationRequested)
+ {
+ return;
+ }
+
+ await this.RefreshDatabaseAccountInternalAsync(forceRefresh: false);
+ }
+ catch (Exception ex)
+ {
+ if (this.cancellationTokenSource.IsCancellationRequested && (ex is OperationCanceledException || ex is ObjectDisposedException))
+ {
+ return;
+ }
+
+ DefaultTrace.TraceCritical("GlobalEndpointManager: StartLocationBackgroundRefreshWithTimer() - Unable to refresh database account from any serviceEndpoint. Exception: {0}", ex.ToString());
+ }
+
+ // Call itself to create a loop to continuously do background refresh every 5 minutes
+ this.StartLocationBackgroundRefreshLoop();
+ }
+
+ private Task GetDatabaseAccountAsync(Uri serviceEndpoint)
+ {
+ return this.owner.GetDatabaseAccountInternalAsync(serviceEndpoint, this.cancellationTokenSource.Token);
+ }
+
+ private void OnPreferenceChanged(object sender, NotifyCollectionChangedEventArgs e)
+ {
+ this.locationCache.OnLocationPreferenceChanged(new ReadOnlyCollection(
+ this.connectionPolicy.PreferredLocations));
+ }
+
+ ///
+ /// Thread safe refresh account and serviceEndpoint info.
+ ///
+ private async Task RefreshDatabaseAccountInternalAsync(bool forceRefresh)
+ {
+ if (this.cancellationTokenSource.IsCancellationRequested)
+ {
+ return;
+ }
+
+ if (this.SkipRefresh(forceRefresh))
+ {
+ return;
+ }
+
+ lock (this.isAccountRefreshInProgressLock)
+ {
+ // Check again if should refresh after obtaining the lock
+ if (this.SkipRefresh(forceRefresh))
+ {
+ return;
+ }
+
+ // If the refresh is already in progress just return. No reason to do another refresh.
+ if (this.isAccountRefreshInProgress)
+ {
+ return;
+ }
+
+ this.isAccountRefreshInProgress = true;
+ }
+
+ try
+ {
+ this.LastBackgroundRefreshUtc = DateTime.UtcNow;
+
+ AccountProperties accountProperties = await this.GetDatabaseAccountAsync(true);
+
+ this.locationCacheDatabaseAccountReadWriteLock.EnterWriteLock();
+
+ try
+ {
+ this.locationCache.OnDatabaseAccountRead(accountProperties);
+ }
+ finally
+ {
+ this.locationCacheDatabaseAccountReadWriteLock.ExitWriteLock();
+ }
+ }
+ catch (Exception ex)
+ {
+ DefaultTrace.TraceWarning("Failed to refresh database account with exception: {0}. Activity Id: '{1}'",
+ ex,
+ System.Diagnostics.Trace.CorrelationManager.ActivityId);
+ }
+ finally
+ {
+ lock (this.isAccountRefreshInProgressLock)
+ {
+ this.isAccountRefreshInProgress = false;
+ }
+ }
+ }
+ internal async Task GetDatabaseAccountAsync(bool forceRefresh = false)
+ {
+#nullable disable // Needed because AsyncCache does not have nullable enabled
+ return await this.databaseAccountCache.GetAsync(
+ key: string.Empty,
+ obsoleteValue: null,
+ singleValueInitFunc: () => GlobalEndpointManager.GetDatabaseAccountFromAnyLocationsAsync(
+ this.defaultEndpoint,
+ this.GetEffectivePreferredLocations(),
+ this.connectionPolicy.AccountInitializationCustomEndpoints,
+ this.GetDatabaseAccountAsync,
+ this.cancellationTokenSource.Token,
+ this.locationCacheDatabaseAccountReadWriteLock),
+ cancellationToken: this.cancellationTokenSource.Token,
+ forceRefresh: forceRefresh);
+#nullable enable
+ }
+
+ ///
+ /// If the account is currently refreshing or the last refresh occurred less than the minimum time
+ /// just return. This is used to avoid refreshing to often and preventing to much pressure on the gateway.
+ ///
+ private bool SkipRefresh(bool forceRefresh)
+ {
+ TimeSpan timeSinceLastRefresh = DateTime.UtcNow - this.LastBackgroundRefreshUtc;
+ return (this.isAccountRefreshInProgress || this.MinTimeBetweenAccountRefresh > timeSinceLastRefresh)
+ && !forceRefresh;
+ }
+
+ public Collection GetEffectivePreferredLocations()
+ {
+ if (this.connectionPolicy.PreferredLocations != null && this.connectionPolicy.PreferredLocations.Count > 0)
+ {
+ return this.connectionPolicy.PreferredLocations;
+ }
+
+ this.locationCacheDatabaseAccountReadWriteLock.EnterReadLock();
+
+ try
+ {
+ return new Collection(this.locationCache.EffectivePreferredLocations);
+ }
+ finally
+ {
+ this.locationCacheDatabaseAccountReadWriteLock.ExitReadLock();
+ }
+ }
+ }
+}
diff --git a/Microsoft.Azure.Cosmos/src/Routing/LocationCache.cs b/Microsoft.Azure.Cosmos/src/Routing/LocationCache.cs
index 12d8d29288..c5875e9179 100644
--- a/Microsoft.Azure.Cosmos/src/Routing/LocationCache.cs
+++ b/Microsoft.Azure.Cosmos/src/Routing/LocationCache.cs
@@ -1 +1,954 @@
-//------------------------------------------------------------
// Copyright (c) Microsoft Corporation. All rights reserved.
//------------------------------------------------------------
namespace Microsoft.Azure.Cosmos.Routing
{
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Collections.ObjectModel;
using System.Linq;
using Microsoft.Azure.Cosmos.Core.Trace;
using Microsoft.Azure.Documents;
///
/// Implements the abstraction to resolve target location for geo-replicated DatabaseAccount
/// with multiple writable and readable locations.
///
internal sealed class LocationCache
{
private const string UnavailableLocationsExpirationTimeInSeconds = "UnavailableLocationsExpirationTimeInSeconds";
private static int DefaultUnavailableLocationsExpirationTimeInSeconds = 5 * 60;
private readonly bool enableEndpointDiscovery;
private readonly Uri defaultEndpoint;
private readonly bool useMultipleWriteLocations;
private readonly object lockObject;
private readonly TimeSpan unavailableLocationsExpirationTime;
private readonly int connectionLimit;
private readonly ConcurrentDictionary locationUnavailablityInfoByEndpoint;
private readonly RegionNameMapper regionNameMapper;
private DatabaseAccountLocationsInfo locationInfo;
private DateTime lastCacheUpdateTimestamp;
private bool enableMultipleWriteLocations;
public LocationCache(
ReadOnlyCollection preferredLocations,
Uri defaultEndpoint,
bool enableEndpointDiscovery,
int connectionLimit,
bool useMultipleWriteLocations)
{
this.locationInfo = new DatabaseAccountLocationsInfo(preferredLocations, defaultEndpoint);
this.defaultEndpoint = defaultEndpoint;
this.enableEndpointDiscovery = enableEndpointDiscovery;
this.useMultipleWriteLocations = useMultipleWriteLocations;
this.connectionLimit = connectionLimit;
this.lockObject = new object();
this.locationUnavailablityInfoByEndpoint = new ConcurrentDictionary();
this.lastCacheUpdateTimestamp = DateTime.MinValue;
this.enableMultipleWriteLocations = false;
this.unavailableLocationsExpirationTime = TimeSpan.FromSeconds(LocationCache.DefaultUnavailableLocationsExpirationTimeInSeconds);
this.regionNameMapper = new RegionNameMapper();
#if !(NETSTANDARD15 || NETSTANDARD16)
#if NETSTANDARD20
// GetEntryAssembly returns null when loaded from native netstandard2.0
if (System.Reflection.Assembly.GetEntryAssembly() != null)
{
#endif
string unavailableLocationsExpirationTimeInSecondsConfig = System.Configuration.ConfigurationManager.AppSettings[LocationCache.UnavailableLocationsExpirationTimeInSeconds];
if (!string.IsNullOrEmpty(unavailableLocationsExpirationTimeInSecondsConfig))
{
int unavailableLocationsExpirationTimeinSecondsConfigValue;
if (!int.TryParse(unavailableLocationsExpirationTimeInSecondsConfig, out unavailableLocationsExpirationTimeinSecondsConfigValue))
{
this.unavailableLocationsExpirationTime = TimeSpan.FromSeconds(LocationCache.DefaultUnavailableLocationsExpirationTimeInSeconds);
}
else
{
this.unavailableLocationsExpirationTime = TimeSpan.FromSeconds(unavailableLocationsExpirationTimeinSecondsConfigValue);
}
}
#if NETSTANDARD20
}
#endif
#endif
}
///
/// Gets list of read endpoints ordered by
/// 1. Preferred location
/// 2. Endpoint availablity
///
public ReadOnlyCollection ReadEndpoints
{
get
{
// Hot-path: avoid ConcurrentDictionary methods which acquire locks
if (DateTime.UtcNow - this.lastCacheUpdateTimestamp > this.unavailableLocationsExpirationTime
&& this.locationUnavailablityInfoByEndpoint.Any())
{
this.UpdateLocationCache();
}
return this.locationInfo.ReadEndpoints;
}
}
///
/// Gets list of account level read endpoints.
///
public ReadOnlyCollection AccountReadEndpoints => this.locationInfo.AccountReadEndpoints;
///
/// Gets list of write endpoints ordered by
/// 1. Preferred location
/// 2. Endpoint availablity
///
public ReadOnlyCollection WriteEndpoints
{
get
{
// Hot-path: avoid ConcurrentDictionary methods which acquire locks
if (DateTime.UtcNow - this.lastCacheUpdateTimestamp > this.unavailableLocationsExpirationTime
&& this.locationUnavailablityInfoByEndpoint.Any())
{
this.UpdateLocationCache();
}
return this.locationInfo.WriteEndpoints;
}
}
public ReadOnlyCollection EffectivePreferredLocations => this.locationInfo.EffectivePreferredLocations;
///
/// Returns the location corresponding to the endpoint if location specific endpoint is provided.
/// For the defaultEndPoint, we will return the first available write location.
/// Returns null, in other cases.
///
///
/// Today we return null for defaultEndPoint if multiple write locations can be used.
/// This needs to be modifed to figure out proper location in such case.
///
public string GetLocation(Uri endpoint)
{
string location = this.locationInfo.AvailableWriteEndpointByLocation.FirstOrDefault(uri => uri.Value == endpoint).Key ?? this.locationInfo.AvailableReadEndpointByLocation.FirstOrDefault(uri => uri.Value == endpoint).Key;
if (location == null && endpoint == this.defaultEndpoint && !this.CanUseMultipleWriteLocations())
{
if (this.locationInfo.AvailableWriteEndpointByLocation.Any())
{
return this.locationInfo.AvailableWriteEndpointByLocation.First().Key;
}
}
return location;
}
///
/// Set region name for a location if present in the locationcache otherwise set region name as null.
/// If endpoint's hostname is same as default endpoint hostname, set regionName as null.
///
///
///
/// true if region found else false
public bool TryGetLocationForGatewayDiagnostics(Uri endpoint, out string regionName)
{
if (Uri.Compare(
endpoint,
this.defaultEndpoint,
UriComponents.Host,
UriFormat.SafeUnescaped,
StringComparison.OrdinalIgnoreCase) == 0)
{
regionName = null;
return false;
}
regionName = this.GetLocation(endpoint);
return true;
}
///
/// Marks the current location unavailable for read
///
public void MarkEndpointUnavailableForRead(Uri endpoint)
{
this.MarkEndpointUnavailable(endpoint, OperationType.Read);
}
///
/// Marks the current location unavailable for write
///
public void MarkEndpointUnavailableForWrite(Uri endpoint)
{
this.MarkEndpointUnavailable(endpoint, OperationType.Write);
}
///
/// Invoked when is read
///
/// Read DatabaseAccoaunt
public void OnDatabaseAccountRead(AccountProperties databaseAccount)
{
this.UpdateLocationCache(
databaseAccount.WritableRegions,
databaseAccount.ReadableRegions,
preferenceList: null,
enableMultipleWriteLocations: databaseAccount.EnableMultipleWriteLocations);
}
///
/// Invoked when changes
///
///
public void OnLocationPreferenceChanged(ReadOnlyCollection preferredLocations)
{
this.UpdateLocationCache(
preferenceList: preferredLocations);
}
public bool IsMetaData(DocumentServiceRequest request)
{
return (request.OperationType != Documents.OperationType.ExecuteJavaScript && request.ResourceType == ResourceType.StoredProcedure) ||
request.ResourceType != ResourceType.Document;
}
public bool IsMultimasterMetadataWriteRequest(DocumentServiceRequest request)
{
return !request.IsReadOnlyRequest && this.locationInfo.AvailableWriteLocations.Count > 1
&& this.IsMetaData(request)
&& this.CanUseMultipleWriteLocations();
}
///
/// Gets the default endpoint of the account
///
/// the default endpoint.
public Uri GetDefaultEndpoint()
{
return this.defaultEndpoint;
}
///
/// Gets the mapping of available write region names to the respective endpoints
///
public ReadOnlyDictionary GetAvailableWriteEndpointsByLocation()
{
return this.locationInfo.AvailableWriteEndpointByLocation;
}
///
/// Gets the mapping of available read region names to the respective endpoints
///
public ReadOnlyDictionary GetAvailableReadEndpointsByLocation()
{
return this.locationInfo.AvailableReadEndpointByLocation;
}
public Uri GetHubUri()
{
DatabaseAccountLocationsInfo currentLocationInfo = this.locationInfo;
string writeLocation = currentLocationInfo.AvailableWriteLocations[0];
Uri locationEndpointToRoute = currentLocationInfo.AvailableWriteEndpointByLocation[writeLocation];
return locationEndpointToRoute;
}
///
/// Gets account-level read locations.
///
public ReadOnlyCollection GetAvailableReadLocations()
{
return this.locationInfo.AvailableReadLocations;
}
///
/// Gets account-level write locations.
///
public ReadOnlyCollection GetAvailableWriteLocations()
{
return this.locationInfo.AvailableWriteLocations;
}
///
/// Resolves request to service endpoint.
/// 1. If this is a write request
/// (a) If UseMultipleWriteLocations = true
/// (i) For document writes, resolve to most preferred and available write endpoint.
/// Once the endpoint is marked unavailable, it is moved to the end of available write endpoint. Current request will
/// be retried on next preferred available write endpoint.
/// (ii) For all other resources, always resolve to first/second (regardless of preferred locations)
/// write endpoint in .
/// Endpoint of first write location in is the only endpoint that supports
/// write operation on all resource types (except during that region's failover).
/// Only during manual failover, client would retry write on second write location in .
/// (b) Else resolve the request to first write endpoint in OR
/// second write endpoint in in case of manual failover of that location.
/// 2. Else resolve the request to most preferred available read endpoint (automatic failover for read requests)
///
/// Request for which endpoint is to be resolved
/// Resolved endpoint
public Uri ResolveServiceEndpoint(DocumentServiceRequest request)
{
if (request.RequestContext != null && request.RequestContext.LocationEndpointToRoute != null)
{
return request.RequestContext.LocationEndpointToRoute;
}
int locationIndex = request.RequestContext.LocationIndexToRoute.GetValueOrDefault(0);
Uri locationEndpointToRoute = this.defaultEndpoint;
if (!request.RequestContext.UsePreferredLocations.GetValueOrDefault(true) // Should not use preferred location ?
|| (request.OperationType.IsWriteOperation() && !this.CanUseMultipleWriteLocations(request)))
{
// For non-document resource types in case of client can use multiple write locations
// or when client cannot use multiple write locations, flip-flop between the
// first and the second writable region in DatabaseAccount (for manual failover)
DatabaseAccountLocationsInfo currentLocationInfo = this.locationInfo;
if (this.enableEndpointDiscovery && currentLocationInfo.AvailableWriteLocations.Count > 0)
{
locationIndex = Math.Min(locationIndex % 2, currentLocationInfo.AvailableWriteLocations.Count - 1);
string writeLocation = currentLocationInfo.AvailableWriteLocations[locationIndex];
locationEndpointToRoute = currentLocationInfo.AvailableWriteEndpointByLocation[writeLocation];
}
}
else
{
ReadOnlyCollection endpoints = this.GetApplicableEndpoints(request, !request.OperationType.IsWriteOperation());
locationEndpointToRoute = endpoints[locationIndex % endpoints.Count];
}
request.RequestContext.RouteToLocation(locationEndpointToRoute);
return locationEndpointToRoute;
}
public ReadOnlyCollection GetApplicableEndpoints(DocumentServiceRequest request, bool isReadRequest)
{
if (request.RequestContext.ExcludeRegions == null || request.RequestContext.ExcludeRegions.Count == 0)
{
return isReadRequest ? this.ReadEndpoints : this.WriteEndpoints;
}
DatabaseAccountLocationsInfo databaseAccountLocationsInfoSnapshot = this.locationInfo;
ReadOnlyCollection effectivePreferredLocations = databaseAccountLocationsInfoSnapshot.PreferredLocations;
if (effectivePreferredLocations == null || effectivePreferredLocations.Count == 0)
{
effectivePreferredLocations = databaseAccountLocationsInfoSnapshot.EffectivePreferredLocations;
}
return this.GetApplicableEndpoints(
isReadRequest ? this.locationInfo.AvailableReadEndpointByLocation : this.locationInfo.AvailableWriteEndpointByLocation,
effectivePreferredLocations,
this.defaultEndpoint,
request.RequestContext.ExcludeRegions);
}
public ReadOnlyCollection GetApplicableRegions(IEnumerable excludeRegions, bool isReadRequest)
{
bool isPreferredLocationsEmpty = this.locationInfo.PreferredLocations == null || this.locationInfo.PreferredLocations.Count == 0;
DatabaseAccountLocationsInfo databaseAccountLocationsInfoSnapshot = this.locationInfo;
ReadOnlyCollection effectivePreferredLocations = this.locationInfo.PreferredLocations;
if (effectivePreferredLocations == null || effectivePreferredLocations.Count == 0)
{
effectivePreferredLocations = databaseAccountLocationsInfoSnapshot.EffectivePreferredLocations;
}
Uri firstEffectivePreferredEndpoint = isReadRequest ? databaseAccountLocationsInfoSnapshot.ReadEndpoints[0] : databaseAccountLocationsInfoSnapshot.WriteEndpoints[0];
if (isReadRequest)
{
databaseAccountLocationsInfoSnapshot.AvailableReadLocationByEndpoint.TryGetValue(firstEffectivePreferredEndpoint, out string firstEffectivePreferredLocation);
return this.GetApplicableRegions(
databaseAccountLocationsInfoSnapshot.AvailableReadLocations,
effectivePreferredLocations,
isPreferredLocationsEmpty ? firstEffectivePreferredLocation : databaseAccountLocationsInfoSnapshot.PreferredLocations[0],
excludeRegions);
}
else
{
databaseAccountLocationsInfoSnapshot.AvailableWriteLocationByEndpoint.TryGetValue(firstEffectivePreferredEndpoint, out string firstEffectivePreferredLocation);
return this.GetApplicableRegions(
databaseAccountLocationsInfoSnapshot.AvailableWriteLocations,
effectivePreferredLocations,
isPreferredLocationsEmpty ? firstEffectivePreferredLocation : databaseAccountLocationsInfoSnapshot.PreferredLocations[0],
excludeRegions);
}
}
///
/// Gets applicable endpoints for a request, if there are no applicable endpoints, returns the fallback endpoint
///
///
///
///
///
/// a list of applicable endpoints for a request
private ReadOnlyCollection GetApplicableEndpoints(
ReadOnlyDictionary regionNameByEndpoint,
ReadOnlyCollection effectivePreferredLocations,
Uri fallbackEndpoint,
IEnumerable excludeRegions)
{
List applicableEndpoints = new List(regionNameByEndpoint.Count);
HashSet excludeRegionsHash = excludeRegions == null ? null : new HashSet(excludeRegions);
if (excludeRegions != null)
{
foreach (string region in effectivePreferredLocations)
{
if (!excludeRegionsHash.Contains(region)
&& regionNameByEndpoint.TryGetValue(region, out Uri endpoint))
{
applicableEndpoints.Add(endpoint);
}
}
}
else
{
foreach (string region in effectivePreferredLocations)
{
if (regionNameByEndpoint.TryGetValue(region, out Uri endpoint))
{
applicableEndpoints.Add(endpoint);
}
}
}
if (applicableEndpoints.Count == 0)
{
applicableEndpoints.Add(fallbackEndpoint);
}
return new ReadOnlyCollection(applicableEndpoints);
}
///
/// Gets applicable endpoints for a request, if there are no applicable endpoints, returns the fallback endpoint
///
///
///
///
///
/// a list of applicable endpoints for a request
private ReadOnlyCollection GetApplicableRegions(
ReadOnlyCollection availableLocations,
ReadOnlyCollection effectivePreferredLocations,
string fallbackRegion,
IEnumerable excludeRegions)
{
List applicableRegions = new List(availableLocations.Count);
HashSet excludeRegionsHash = excludeRegions == null ? null : new HashSet(excludeRegions);
if (excludeRegions != null)
{
foreach (string region in effectivePreferredLocations)
{
if (availableLocations.Contains(region)
&& !excludeRegionsHash.Contains(region))
{
applicableRegions.Add(region);
}
}
}
else
{
foreach (string region in effectivePreferredLocations)
{
if (availableLocations.Contains(region))
{
applicableRegions.Add(region);
}
}
}
if (applicableRegions.Count == 0)
{
applicableRegions.Add(fallbackRegion);
}
return new ReadOnlyCollection(applicableRegions);
}
public bool ShouldRefreshEndpoints(out bool canRefreshInBackground)
{
canRefreshInBackground = true;
DatabaseAccountLocationsInfo currentLocationInfo = this.locationInfo;
string mostPreferredLocation = currentLocationInfo.PreferredLocations.FirstOrDefault();
if (currentLocationInfo.PreferredLocations == null || currentLocationInfo.PreferredLocations.Count == 0)
{
mostPreferredLocation = currentLocationInfo.EffectivePreferredLocations.FirstOrDefault();
}
// we should schedule refresh in background if we are unable to target the user's most preferredLocation.
if (this.enableEndpointDiscovery)
{
// Refresh if client opts-in to useMultipleWriteLocations but server-side setting is disabled
bool shouldRefresh = this.useMultipleWriteLocations && !this.enableMultipleWriteLocations;
ReadOnlyCollection readLocationEndpoints = currentLocationInfo.ReadEndpoints;
if (this.IsEndpointUnavailable(readLocationEndpoints[0], OperationType.Read))
{
canRefreshInBackground = readLocationEndpoints.Count > 1;
DefaultTrace.TraceInformation("ShouldRefreshEndpoints = true since the first read endpoint {0} is not available for read. canRefreshInBackground = {1}",
readLocationEndpoints[0],
canRefreshInBackground);
return true;
}
if (!string.IsNullOrEmpty(mostPreferredLocation))
{
Uri mostPreferredReadEndpoint;
if (currentLocationInfo.AvailableReadEndpointByLocation.TryGetValue(mostPreferredLocation, out mostPreferredReadEndpoint))
{
if (mostPreferredReadEndpoint != readLocationEndpoints[0])
{
// For reads, we can always refresh in background as we can alternate to
// other available read endpoints
DefaultTrace.TraceInformation("ShouldRefreshEndpoints = true since most preferred location {0} is not available for read.", mostPreferredLocation);
return true;
}
}
else
{
DefaultTrace.TraceInformation("ShouldRefreshEndpoints = true since most preferred location {0} is not in available read locations.", mostPreferredLocation);
return true;
}
}
Uri mostPreferredWriteEndpoint;
ReadOnlyCollection writeLocationEndpoints = currentLocationInfo.WriteEndpoints;
if (!this.CanUseMultipleWriteLocations())
{
if (this.IsEndpointUnavailable(writeLocationEndpoints[0], OperationType.Write))
{
// Since most preferred write endpoint is unavailable, we can only refresh in background if
// we have an alternate write endpoint
canRefreshInBackground = writeLocationEndpoints.Count > 1;
DefaultTrace.TraceInformation("ShouldRefreshEndpoints = true since most preferred location {0} endpoint {1} is not available for write. canRefreshInBackground = {2}",
mostPreferredLocation,
writeLocationEndpoints[0],
canRefreshInBackground);
return true;
}
else
{
return shouldRefresh;
}
}
else if (!string.IsNullOrEmpty(mostPreferredLocation))
{
if (currentLocationInfo.AvailableWriteEndpointByLocation.TryGetValue(mostPreferredLocation, out mostPreferredWriteEndpoint))
{
shouldRefresh |= mostPreferredWriteEndpoint != writeLocationEndpoints[0];
DefaultTrace.TraceInformation("ShouldRefreshEndpoints = {0} since most preferred location {1} is not available for write.", shouldRefresh, mostPreferredLocation);
return shouldRefresh;
}
else
{
DefaultTrace.TraceInformation("ShouldRefreshEndpoints = true since most preferred location {0} is not in available write locations", mostPreferredLocation);
return true;
}
}
else
{
return shouldRefresh;
}
}
else
{
return false;
}
}
public bool CanUseMultipleWriteLocations(DocumentServiceRequest request)
{
return this.CanUseMultipleWriteLocations() &&
(request.ResourceType == ResourceType.Document ||
(request.ResourceType == ResourceType.StoredProcedure && request.OperationType == Documents.OperationType.ExecuteJavaScript));
}
private void ClearStaleEndpointUnavailabilityInfo()
{
if (this.locationUnavailablityInfoByEndpoint.Any())
{
List unavailableEndpoints = this.locationUnavailablityInfoByEndpoint.Keys.ToList();
foreach (Uri unavailableEndpoint in unavailableEndpoints)
{
LocationUnavailabilityInfo unavailabilityInfo;
LocationUnavailabilityInfo removed;
if (this.locationUnavailablityInfoByEndpoint.TryGetValue(unavailableEndpoint, out unavailabilityInfo)
&& DateTime.UtcNow - unavailabilityInfo.LastUnavailabilityCheckTimeStamp > this.unavailableLocationsExpirationTime
&& this.locationUnavailablityInfoByEndpoint.TryRemove(unavailableEndpoint, out removed))
{
DefaultTrace.TraceInformation(
"Removed endpoint {0} unavailable for operations {1} from unavailableEndpoints",
unavailableEndpoint,
unavailabilityInfo.UnavailableOperations);
}
}
}
}
private bool IsEndpointUnavailable(Uri endpoint, OperationType expectedAvailableOperations)
{
LocationUnavailabilityInfo unavailabilityInfo;
if (expectedAvailableOperations == OperationType.None
|| !this.locationUnavailablityInfoByEndpoint.TryGetValue(endpoint, out unavailabilityInfo)
|| !unavailabilityInfo.UnavailableOperations.HasFlag(expectedAvailableOperations))
{
return false;
}
else
{
if (DateTime.UtcNow - unavailabilityInfo.LastUnavailabilityCheckTimeStamp > this.unavailableLocationsExpirationTime)
{
return false;
}
else
{
DefaultTrace.TraceInformation(
"Endpoint {0} unavailable for operations {1} present in unavailableEndpoints",
endpoint,
unavailabilityInfo.UnavailableOperations);
// Unexpired entry present. Endpoint is unavailable
return true;
}
}
}
private void MarkEndpointUnavailable(
Uri unavailableEndpoint,
OperationType unavailableOperationType)
{
DateTime currentTime = DateTime.UtcNow;
LocationUnavailabilityInfo updatedInfo = this.locationUnavailablityInfoByEndpoint.AddOrUpdate(
unavailableEndpoint,
(Uri endpoint) =>
{
return new LocationUnavailabilityInfo()
{
LastUnavailabilityCheckTimeStamp = currentTime,
UnavailableOperations = unavailableOperationType,
};
},
(Uri endpoint, LocationUnavailabilityInfo info) =>
{
info.LastUnavailabilityCheckTimeStamp = currentTime;
info.UnavailableOperations |= unavailableOperationType;
return info;
});
this.UpdateLocationCache();
DefaultTrace.TraceInformation(
"Endpoint {0} unavailable for {1} added/updated to unavailableEndpoints with timestamp {2}",
unavailableEndpoint,
unavailableOperationType,
updatedInfo.LastUnavailabilityCheckTimeStamp);
}
private void UpdateLocationCache(
IEnumerable writeLocations = null,
IEnumerable readLocations = null,
ReadOnlyCollection preferenceList = null,
bool? enableMultipleWriteLocations = null)
{
lock (this.lockObject)
{
DatabaseAccountLocationsInfo nextLocationInfo = new DatabaseAccountLocationsInfo(this.locationInfo);
if (preferenceList != null)
{
nextLocationInfo.PreferredLocations = preferenceList;
}
if (enableMultipleWriteLocations.HasValue)
{
this.enableMultipleWriteLocations = enableMultipleWriteLocations.Value;
}
this.ClearStaleEndpointUnavailabilityInfo();
if (readLocations != null)
{
nextLocationInfo.AvailableReadEndpointByLocation = this.GetEndpointByLocation(
readLocations,
out ReadOnlyCollection availableReadLocations,
out ReadOnlyDictionary availableReadLocationsByEndpoint);
nextLocationInfo.AvailableReadLocations = availableReadLocations;
nextLocationInfo.AccountReadEndpoints = nextLocationInfo.AvailableReadEndpointByLocation.Select(x => x.Value).ToList().AsReadOnly();
nextLocationInfo.AvailableReadLocationByEndpoint = availableReadLocationsByEndpoint;
}
if (writeLocations != null)
{
nextLocationInfo.AvailableWriteEndpointByLocation = this.GetEndpointByLocation(
writeLocations,
out ReadOnlyCollection availableWriteLocations,
out ReadOnlyDictionary availableWriteLocationsByEndpoint);
nextLocationInfo.AvailableWriteLocations = availableWriteLocations;
nextLocationInfo.AvailableWriteLocationByEndpoint = availableWriteLocationsByEndpoint;
}
nextLocationInfo.WriteEndpoints = this.GetPreferredAvailableEndpoints(
endpointsByLocation: nextLocationInfo.AvailableWriteEndpointByLocation,
orderedLocations: nextLocationInfo.AvailableWriteLocations,
expectedAvailableOperation: OperationType.Write,
fallbackEndpoint: this.defaultEndpoint);
nextLocationInfo.ReadEndpoints = this.GetPreferredAvailableEndpoints(
endpointsByLocation: nextLocationInfo.AvailableReadEndpointByLocation,
orderedLocations: nextLocationInfo.AvailableReadLocations,
expectedAvailableOperation: OperationType.Read,
fallbackEndpoint: nextLocationInfo.WriteEndpoints[0]);
if (nextLocationInfo.PreferredLocations == null || nextLocationInfo.PreferredLocations.Count == 0)
{
if (!nextLocationInfo.AvailableReadLocationByEndpoint.TryGetValue(this.defaultEndpoint, out string regionForDefaultEndpoint))
{
nextLocationInfo.EffectivePreferredLocations = nextLocationInfo.AvailableReadLocations;
}
else
{
List locations = new ()
{
regionForDefaultEndpoint
};
nextLocationInfo.EffectivePreferredLocations = new ReadOnlyCollection(locations);
}
}
this.lastCacheUpdateTimestamp = DateTime.UtcNow;
DefaultTrace.TraceInformation("Current WriteEndpoints = ({0}) ReadEndpoints = ({1})",
string.Join(", ", nextLocationInfo.WriteEndpoints.Select(endpoint => endpoint.ToString())),
string.Join(", ", nextLocationInfo.ReadEndpoints.Select(endpoint => endpoint.ToString())));
this.locationInfo = nextLocationInfo;
}
}
private ReadOnlyCollection GetPreferredAvailableEndpoints(ReadOnlyDictionary endpointsByLocation, ReadOnlyCollection orderedLocations, OperationType expectedAvailableOperation, Uri fallbackEndpoint)
{
List endpoints = new List();
DatabaseAccountLocationsInfo currentLocationInfo = this.locationInfo;
// if enableEndpointDiscovery is false, we always use the defaultEndpoint that user passed in during documentClient init
if (this.enableEndpointDiscovery)
{
if (this.CanUseMultipleWriteLocations() || expectedAvailableOperation.HasFlag(OperationType.Read))
{
List unavailableEndpoints = new List();
// When client can not use multiple write locations, preferred locations list should only be used
// determining read endpoints order.
// If client can use multiple write locations, preferred locations list should be used for determining
// both read and write endpoints order.
if (currentLocationInfo.PreferredLocations != null && currentLocationInfo.PreferredLocations.Count >= 1)
{
foreach (string location in currentLocationInfo.PreferredLocations)
{
if (endpointsByLocation.TryGetValue(location, out Uri endpoint))
{
if (this.IsEndpointUnavailable(endpoint, expectedAvailableOperation))
{
unavailableEndpoints.Add(endpoint);
}
else
{
endpoints.Add(endpoint);
}
}
}
}
else
{
foreach (string location in orderedLocations)
{
if (endpointsByLocation.TryGetValue(location, out Uri endpoint))
{
if (this.defaultEndpoint.Equals(endpoint))
{
endpoints = new List();
break;
}
if (this.IsEndpointUnavailable(endpoint, expectedAvailableOperation))
{
unavailableEndpoints.Add(endpoint);
}
else
{
endpoints.Add(endpoint);
}
}
}
}
if (endpoints.Count == 0)
{
endpoints.Add(fallbackEndpoint);
unavailableEndpoints.Remove(fallbackEndpoint);
}
endpoints.AddRange(unavailableEndpoints);
}
else
{
foreach (string location in orderedLocations)
{
if (!string.IsNullOrEmpty(location) && // location is empty during manual failover
endpointsByLocation.TryGetValue(location, out Uri endpoint))
{
endpoints.Add(endpoint);
}
}
}
}
if (endpoints.Count == 0)
{
endpoints.Add(fallbackEndpoint);
}
return endpoints.AsReadOnly();
}
private ReadOnlyDictionary GetEndpointByLocation(IEnumerable locations, out ReadOnlyCollection orderedLocations, out ReadOnlyDictionary availableLocationsByEndpoint)
{
Dictionary endpointsByLocation = new Dictionary(StringComparer.OrdinalIgnoreCase);
Dictionary mutableAvailableLocationsByEndpoint = new Dictionary();
List parsedLocations = new List();
foreach (AccountRegion location in locations)
{
Uri endpoint;
if (!string.IsNullOrEmpty(location.Name)
&& Uri.TryCreate(location.Endpoint, UriKind.Absolute, out endpoint))
{
endpointsByLocation[location.Name] = endpoint;
parsedLocations.Add(location.Name);
mutableAvailableLocationsByEndpoint[endpoint] = location.Name;
this.SetServicePointConnectionLimit(endpoint);
}
else
{
DefaultTrace.TraceInformation("GetAvailableEndpointsByLocation() - skipping add for location = {0} as it is location name is either empty or endpoint is malformed {1}",
location.Name,
location.Endpoint);
}
}
orderedLocations = parsedLocations.AsReadOnly();
availableLocationsByEndpoint = new ReadOnlyDictionary(mutableAvailableLocationsByEndpoint);
return new ReadOnlyDictionary(endpointsByLocation);
}
private bool CanUseMultipleWriteLocations()
{
return this.useMultipleWriteLocations && this.enableMultipleWriteLocations;
}
private void SetServicePointConnectionLimit(Uri endpoint)
{
#if !NETSTANDARD16
ServicePointAccessor servicePoint = ServicePointAccessor.FindServicePoint(endpoint);
servicePoint.ConnectionLimit = this.connectionLimit;
#endif
}
private sealed class LocationUnavailabilityInfo
{
public DateTime LastUnavailabilityCheckTimeStamp { get; set; }
public OperationType UnavailableOperations { get; set; }
}
private sealed class DatabaseAccountLocationsInfo
{
public DatabaseAccountLocationsInfo(ReadOnlyCollection preferredLocations, Uri defaultEndpoint)
{
this.PreferredLocations = preferredLocations;
this.AvailableWriteLocations = new List().AsReadOnly();
this.AvailableReadLocations = new List().AsReadOnly();
this.AvailableWriteEndpointByLocation = new ReadOnlyDictionary(new Dictionary(StringComparer.OrdinalIgnoreCase));
this.AvailableReadEndpointByLocation = new ReadOnlyDictionary(new Dictionary(StringComparer.OrdinalIgnoreCase));
this.AvailableWriteLocationByEndpoint = new ReadOnlyDictionary(new Dictionary());
this.AvailableReadLocationByEndpoint = new ReadOnlyDictionary(new Dictionary());
this.WriteEndpoints = new List() { defaultEndpoint }.AsReadOnly();
this.AccountReadEndpoints = new List() { defaultEndpoint }.AsReadOnly();
this.ReadEndpoints = new List() { defaultEndpoint }.AsReadOnly();
this.EffectivePreferredLocations = new List().AsReadOnly();
}
public DatabaseAccountLocationsInfo(DatabaseAccountLocationsInfo other)
{
this.PreferredLocations = other.PreferredLocations;
this.AvailableWriteLocations = other.AvailableWriteLocations;
this.AvailableReadLocations = other.AvailableReadLocations;
this.AvailableWriteEndpointByLocation = other.AvailableWriteEndpointByLocation;
this.AvailableReadEndpointByLocation = other.AvailableReadEndpointByLocation;
this.AvailableReadLocationByEndpoint = other.AvailableReadLocationByEndpoint;
this.AvailableWriteLocationByEndpoint = other.AvailableWriteLocationByEndpoint;
this.WriteEndpoints = other.WriteEndpoints;
this.AccountReadEndpoints = other.AccountReadEndpoints;
this.ReadEndpoints = other.ReadEndpoints;
this.EffectivePreferredLocations = other.EffectivePreferredLocations;
}
public ReadOnlyCollection PreferredLocations { get; set; }
public ReadOnlyCollection AvailableWriteLocations { get; set; }
public ReadOnlyCollection AvailableReadLocations { get; set; }
public ReadOnlyDictionary AvailableWriteEndpointByLocation { get; set; }
public ReadOnlyDictionary AvailableReadEndpointByLocation { get; set; }
public ReadOnlyDictionary AvailableWriteLocationByEndpoint { get; set; }
public ReadOnlyDictionary AvailableReadLocationByEndpoint { get; set; }
public ReadOnlyCollection WriteEndpoints { get; set; }
public ReadOnlyCollection ReadEndpoints { get; set; }
public ReadOnlyCollection AccountReadEndpoints { get; set; }
public ReadOnlyCollection EffectivePreferredLocations { get; set; }
}
[Flags]
private enum OperationType
{
None = 0x0,
Read = 0x1,
Write = 0x2
}
}
}
\ No newline at end of file
+//------------------------------------------------------------
+// Copyright (c) Microsoft Corporation. All rights reserved.
+//------------------------------------------------------------
+
+namespace Microsoft.Azure.Cosmos.Routing
+{
+ using System;
+ using System.Collections.Concurrent;
+ using System.Collections.Generic;
+ using System.Collections.ObjectModel;
+ using System.Linq;
+ using Microsoft.Azure.Cosmos.Core.Trace;
+ using Microsoft.Azure.Documents;
+
+ ///
+ /// Implements the abstraction to resolve target location for geo-replicated DatabaseAccount
+ /// with multiple writable and readable locations.
+ ///
+ internal sealed class LocationCache
+ {
+ private const string UnavailableLocationsExpirationTimeInSeconds = "UnavailableLocationsExpirationTimeInSeconds";
+ private static int DefaultUnavailableLocationsExpirationTimeInSeconds = 5 * 60;
+
+ private readonly bool enableEndpointDiscovery;
+ private readonly Uri defaultEndpoint;
+ private readonly bool useMultipleWriteLocations;
+ private readonly object lockObject;
+ private readonly TimeSpan unavailableLocationsExpirationTime;
+ private readonly int connectionLimit;
+ private readonly ConcurrentDictionary locationUnavailablityInfoByEndpoint;
+ private readonly RegionNameMapper regionNameMapper;
+
+ private DatabaseAccountLocationsInfo locationInfo;
+ private DateTime lastCacheUpdateTimestamp;
+ private bool enableMultipleWriteLocations;
+
+ public LocationCache(
+ ReadOnlyCollection preferredLocations,
+ Uri defaultEndpoint,
+ bool enableEndpointDiscovery,
+ int connectionLimit,
+ bool useMultipleWriteLocations)
+ {
+ this.locationInfo = new DatabaseAccountLocationsInfo(preferredLocations, defaultEndpoint);
+ this.defaultEndpoint = defaultEndpoint;
+ this.enableEndpointDiscovery = enableEndpointDiscovery;
+ this.useMultipleWriteLocations = useMultipleWriteLocations;
+ this.connectionLimit = connectionLimit;
+
+ this.lockObject = new object();
+ this.locationUnavailablityInfoByEndpoint = new ConcurrentDictionary();
+ this.lastCacheUpdateTimestamp = DateTime.MinValue;
+ this.enableMultipleWriteLocations = false;
+ this.unavailableLocationsExpirationTime = TimeSpan.FromSeconds(LocationCache.DefaultUnavailableLocationsExpirationTimeInSeconds);
+ this.regionNameMapper = new RegionNameMapper();
+
+#if !(NETSTANDARD15 || NETSTANDARD16)
+#if NETSTANDARD20
+ // GetEntryAssembly returns null when loaded from native netstandard2.0
+ if (System.Reflection.Assembly.GetEntryAssembly() != null)
+ {
+#endif
+ string unavailableLocationsExpirationTimeInSecondsConfig = System.Configuration.ConfigurationManager.AppSettings[LocationCache.UnavailableLocationsExpirationTimeInSeconds];
+ if (!string.IsNullOrEmpty(unavailableLocationsExpirationTimeInSecondsConfig))
+ {
+ int unavailableLocationsExpirationTimeinSecondsConfigValue;
+
+ if (!int.TryParse(unavailableLocationsExpirationTimeInSecondsConfig, out unavailableLocationsExpirationTimeinSecondsConfigValue))
+ {
+ this.unavailableLocationsExpirationTime = TimeSpan.FromSeconds(LocationCache.DefaultUnavailableLocationsExpirationTimeInSeconds);
+ }
+ else
+ {
+ this.unavailableLocationsExpirationTime = TimeSpan.FromSeconds(unavailableLocationsExpirationTimeinSecondsConfigValue);
+ }
+ }
+#if NETSTANDARD20
+ }
+#endif
+#endif
+ }
+
+ ///
+ /// Gets list of read endpoints ordered by
+ /// 1. Preferred location
+ /// 2. Endpoint availablity
+ ///
+ public ReadOnlyCollection ReadEndpoints
+ {
+ get
+ {
+ // Hot-path: avoid ConcurrentDictionary methods which acquire locks
+ if (DateTime.UtcNow - this.lastCacheUpdateTimestamp > this.unavailableLocationsExpirationTime
+ && this.locationUnavailablityInfoByEndpoint.Any())
+ {
+ this.UpdateLocationCache();
+ }
+
+ return this.locationInfo.ReadEndpoints;
+ }
+ }
+
+ ///
+ /// Gets list of account level read endpoints.
+ ///
+ public ReadOnlyCollection AccountReadEndpoints => this.locationInfo.AccountReadEndpoints;
+
+ ///
+ /// Gets list of write endpoints ordered by
+ /// 1. Preferred location
+ /// 2. Endpoint availablity
+ ///
+ public ReadOnlyCollection WriteEndpoints
+ {
+ get
+ {
+ // Hot-path: avoid ConcurrentDictionary methods which acquire locks
+ if (DateTime.UtcNow - this.lastCacheUpdateTimestamp > this.unavailableLocationsExpirationTime
+ && this.locationUnavailablityInfoByEndpoint.Any())
+ {
+ this.UpdateLocationCache();
+ }
+
+ return this.locationInfo.WriteEndpoints;
+ }
+ }
+
+ public ReadOnlyCollection EffectivePreferredLocations => this.locationInfo.EffectivePreferredLocations;
+
+ ///
+ /// Returns the location corresponding to the endpoint if location specific endpoint is provided.
+ /// For the defaultEndPoint, we will return the first available write location.
+ /// Returns null, in other cases.
+ ///
+ ///
+ /// Today we return null for defaultEndPoint if multiple write locations can be used.
+ /// This needs to be modifed to figure out proper location in such case.
+ ///
+ public string GetLocation(Uri endpoint)
+ {
+ string location = this.locationInfo.AvailableWriteEndpointByLocation.FirstOrDefault(uri => uri.Value == endpoint).Key ?? this.locationInfo.AvailableReadEndpointByLocation.FirstOrDefault(uri => uri.Value == endpoint).Key;
+
+ if (location == null && endpoint == this.defaultEndpoint && !this.CanUseMultipleWriteLocations())
+ {
+ if (this.locationInfo.AvailableWriteEndpointByLocation.Any())
+ {
+ return this.locationInfo.AvailableWriteEndpointByLocation.First().Key;
+ }
+ }
+
+ return location;
+ }
+
+ ///
+ /// Set region name for a location if present in the locationcache otherwise set region name as null.
+ /// If endpoint's hostname is same as default endpoint hostname, set regionName as null.
+ ///
+ ///
+ ///
+ /// true if region found else false
+ public bool TryGetLocationForGatewayDiagnostics(Uri endpoint, out string regionName)
+ {
+ if (Uri.Compare(
+ endpoint,
+ this.defaultEndpoint,
+ UriComponents.Host,
+ UriFormat.SafeUnescaped,
+ StringComparison.OrdinalIgnoreCase) == 0)
+ {
+ regionName = null;
+ return false;
+ }
+
+ regionName = this.GetLocation(endpoint);
+ return true;
+ }
+
+ ///
+ /// Marks the current location unavailable for read
+ ///
+ public void MarkEndpointUnavailableForRead(Uri endpoint)
+ {
+ this.MarkEndpointUnavailable(endpoint, OperationType.Read);
+ }
+
+ ///
+ /// Marks the current location unavailable for write
+ ///
+ public void MarkEndpointUnavailableForWrite(Uri endpoint)
+ {
+ this.MarkEndpointUnavailable(endpoint, OperationType.Write);
+ }
+
+ ///
+ /// Invoked when is read
+ ///
+ /// Read DatabaseAccoaunt
+ public void OnDatabaseAccountRead(AccountProperties databaseAccount)
+ {
+ this.UpdateLocationCache(
+ databaseAccount.WritableRegions,
+ databaseAccount.ReadableRegions,
+ preferenceList: null,
+ enableMultipleWriteLocations: databaseAccount.EnableMultipleWriteLocations);
+ }
+
+ ///
+ /// Invoked when changes
+ ///
+ ///
+ public void OnLocationPreferenceChanged(ReadOnlyCollection preferredLocations)
+ {
+ this.UpdateLocationCache(
+ preferenceList: preferredLocations);
+ }
+
+ public bool IsMetaData(DocumentServiceRequest request)
+ {
+ return (request.OperationType != Documents.OperationType.ExecuteJavaScript && request.ResourceType == ResourceType.StoredProcedure) ||
+ request.ResourceType != ResourceType.Document;
+
+ }
+ public bool IsMultimasterMetadataWriteRequest(DocumentServiceRequest request)
+ {
+ return !request.IsReadOnlyRequest && this.locationInfo.AvailableWriteLocations.Count > 1
+ && this.IsMetaData(request)
+ && this.CanUseMultipleWriteLocations();
+
+ }
+
+ ///
+ /// Gets the default endpoint of the account
+ ///
+ /// the default endpoint.
+ public Uri GetDefaultEndpoint()
+ {
+ return this.defaultEndpoint;
+ }
+
+ ///
+ /// Gets the mapping of available write region names to the respective endpoints
+ ///
+ public ReadOnlyDictionary GetAvailableWriteEndpointsByLocation()
+ {
+ return this.locationInfo.AvailableWriteEndpointByLocation;
+ }
+
+ ///
+ /// Gets the mapping of available read region names to the respective endpoints
+ ///
+ public ReadOnlyDictionary GetAvailableReadEndpointsByLocation()
+ {
+ return this.locationInfo.AvailableReadEndpointByLocation;
+ }
+
+ public Uri GetHubUri()
+ {
+ DatabaseAccountLocationsInfo currentLocationInfo = this.locationInfo;
+ string writeLocation = currentLocationInfo.AvailableWriteLocations[0];
+ Uri locationEndpointToRoute = currentLocationInfo.AvailableWriteEndpointByLocation[writeLocation];
+ return locationEndpointToRoute;
+ }
+
+ ///
+ /// Gets account-level read locations.
+ ///
+ public ReadOnlyCollection GetAvailableReadLocations()
+ {
+ return this.locationInfo.AvailableReadLocations;
+ }
+
+ ///
+ /// Gets account-level write locations.
+ ///
+ public ReadOnlyCollection GetAvailableWriteLocations()
+ {
+ return this.locationInfo.AvailableWriteLocations;
+ }
+
+ ///
+ /// Resolves request to service endpoint.
+ /// 1. If this is a write request
+ /// (a) If UseMultipleWriteLocations = true
+ /// (i) For document writes, resolve to most preferred and available write endpoint.
+ /// Once the endpoint is marked unavailable, it is moved to the end of available write endpoint. Current request will
+ /// be retried on next preferred available write endpoint.
+ /// (ii) For all other resources, always resolve to first/second (regardless of preferred locations)
+ /// write endpoint in .
+ /// Endpoint of first write location in is the only endpoint that supports
+ /// write operation on all resource types (except during that region's failover).
+ /// Only during manual failover, client would retry write on second write location in .
+ /// (b) Else resolve the request to first write endpoint in OR
+ /// second write endpoint in in case of manual failover of that location.
+ /// 2. Else resolve the request to most preferred available read endpoint (automatic failover for read requests)
+ ///
+ /// Request for which endpoint is to be resolved
+ /// Resolved endpoint
+ public Uri ResolveServiceEndpoint(DocumentServiceRequest request)
+ {
+ if (request.RequestContext != null && request.RequestContext.LocationEndpointToRoute != null)
+ {
+ return request.RequestContext.LocationEndpointToRoute;
+ }
+
+ int locationIndex = request.RequestContext.LocationIndexToRoute.GetValueOrDefault(0);
+
+ Uri locationEndpointToRoute = this.defaultEndpoint;
+
+ if (!request.RequestContext.UsePreferredLocations.GetValueOrDefault(true) // Should not use preferred location ?
+ || (request.OperationType.IsWriteOperation() && !this.CanUseMultipleWriteLocations(request)))
+ {
+ // For non-document resource types in case of client can use multiple write locations
+ // or when client cannot use multiple write locations, flip-flop between the
+ // first and the second writable region in DatabaseAccount (for manual failover)
+ DatabaseAccountLocationsInfo currentLocationInfo = this.locationInfo;
+
+ if (this.enableEndpointDiscovery && currentLocationInfo.AvailableWriteLocations.Count > 0)
+ {
+ locationIndex = Math.Min(locationIndex % 2, currentLocationInfo.AvailableWriteLocations.Count - 1);
+ string writeLocation = currentLocationInfo.AvailableWriteLocations[locationIndex];
+ locationEndpointToRoute = currentLocationInfo.AvailableWriteEndpointByLocation[writeLocation];
+ }
+ }
+ else
+ {
+ ReadOnlyCollection endpoints = this.GetApplicableEndpoints(request, !request.OperationType.IsWriteOperation());
+ locationEndpointToRoute = endpoints[locationIndex % endpoints.Count];
+ }
+
+ request.RequestContext.RouteToLocation(locationEndpointToRoute);
+ return locationEndpointToRoute;
+ }
+
+ public ReadOnlyCollection GetApplicableEndpoints(DocumentServiceRequest request, bool isReadRequest)
+ {
+ if (request.RequestContext.ExcludeRegions == null || request.RequestContext.ExcludeRegions.Count == 0)
+ {
+ return isReadRequest ? this.ReadEndpoints : this.WriteEndpoints;
+ }
+
+ DatabaseAccountLocationsInfo databaseAccountLocationsInfoSnapshot = this.locationInfo;
+
+ ReadOnlyCollection effectivePreferredLocations = databaseAccountLocationsInfoSnapshot.PreferredLocations;
+
+ if (effectivePreferredLocations == null || effectivePreferredLocations.Count == 0)
+ {
+ effectivePreferredLocations = databaseAccountLocationsInfoSnapshot.EffectivePreferredLocations;
+ }
+
+ return this.GetApplicableEndpoints(
+ isReadRequest ? this.locationInfo.AvailableReadEndpointByLocation : this.locationInfo.AvailableWriteEndpointByLocation,
+ effectivePreferredLocations,
+ this.defaultEndpoint,
+ request.RequestContext.ExcludeRegions);
+ }
+
+ public ReadOnlyCollection GetApplicableRegions(IEnumerable excludeRegions, bool isReadRequest)
+ {
+ bool isPreferredLocationsEmpty = this.locationInfo.PreferredLocations == null || this.locationInfo.PreferredLocations.Count == 0;
+
+ DatabaseAccountLocationsInfo databaseAccountLocationsInfoSnapshot = this.locationInfo;
+
+ ReadOnlyCollection effectivePreferredLocations = this.locationInfo.PreferredLocations;
+
+ if (effectivePreferredLocations == null || effectivePreferredLocations.Count == 0)
+ {
+ effectivePreferredLocations = databaseAccountLocationsInfoSnapshot.EffectivePreferredLocations;
+ }
+
+ Uri firstEffectivePreferredEndpoint = isReadRequest ? databaseAccountLocationsInfoSnapshot.ReadEndpoints[0] : databaseAccountLocationsInfoSnapshot.WriteEndpoints[0];
+
+ if (isReadRequest)
+ {
+ databaseAccountLocationsInfoSnapshot.AvailableReadLocationByEndpoint.TryGetValue(firstEffectivePreferredEndpoint, out string firstEffectivePreferredLocation);
+
+ return this.GetApplicableRegions(
+ databaseAccountLocationsInfoSnapshot.AvailableReadLocations,
+ effectivePreferredLocations,
+ isPreferredLocationsEmpty ? firstEffectivePreferredLocation : databaseAccountLocationsInfoSnapshot.PreferredLocations[0],
+ excludeRegions);
+ }
+ else
+ {
+ databaseAccountLocationsInfoSnapshot.AvailableWriteLocationByEndpoint.TryGetValue(firstEffectivePreferredEndpoint, out string firstEffectivePreferredLocation);
+
+ return this.GetApplicableRegions(
+ databaseAccountLocationsInfoSnapshot.AvailableWriteLocations,
+ effectivePreferredLocations,
+ isPreferredLocationsEmpty ? firstEffectivePreferredLocation : databaseAccountLocationsInfoSnapshot.PreferredLocations[0],
+ excludeRegions);
+ }
+ }
+
+ ///
+ /// Gets applicable endpoints for a request, if there are no applicable endpoints, returns the fallback endpoint
+ ///
+ ///
+ ///
+ ///
+ ///
+ /// a list of applicable endpoints for a request
+ private ReadOnlyCollection GetApplicableEndpoints(
+ ReadOnlyDictionary regionNameByEndpoint,
+ ReadOnlyCollection effectivePreferredLocations,
+ Uri fallbackEndpoint,
+ IEnumerable excludeRegions)
+ {
+ List applicableEndpoints = new List(regionNameByEndpoint.Count);
+ HashSet excludeRegionsHash = excludeRegions == null ? null : new HashSet(excludeRegions);
+
+ if (excludeRegions != null)
+ {
+ foreach (string region in effectivePreferredLocations)
+ {
+ if (!excludeRegionsHash.Contains(region)
+ && regionNameByEndpoint.TryGetValue(region, out Uri endpoint))
+ {
+ applicableEndpoints.Add(endpoint);
+ }
+ }
+ }
+ else
+ {
+ foreach (string region in effectivePreferredLocations)
+ {
+ if (regionNameByEndpoint.TryGetValue(region, out Uri endpoint))
+ {
+ applicableEndpoints.Add(endpoint);
+ }
+ }
+ }
+
+ if (applicableEndpoints.Count == 0)
+ {
+ applicableEndpoints.Add(fallbackEndpoint);
+ }
+
+ return new ReadOnlyCollection(applicableEndpoints);
+ }
+
+ ///
+ /// Gets applicable endpoints for a request, if there are no applicable endpoints, returns the fallback endpoint
+ ///
+ ///
+ ///
+ ///
+ ///
+ /// a list of applicable endpoints for a request
+ private ReadOnlyCollection GetApplicableRegions(
+ ReadOnlyCollection availableLocations,
+ ReadOnlyCollection effectivePreferredLocations,
+ string fallbackRegion,
+ IEnumerable excludeRegions)
+ {
+ List applicableRegions = new List(availableLocations.Count);
+ HashSet excludeRegionsHash = excludeRegions == null ? null : new HashSet(excludeRegions);
+
+ if (excludeRegions != null)
+ {
+ foreach (string region in effectivePreferredLocations)
+ {
+ if (availableLocations.Contains(region)
+ && !excludeRegionsHash.Contains(region))
+ {
+ applicableRegions.Add(region);
+ }
+ }
+ }
+ else
+ {
+ foreach (string region in effectivePreferredLocations)
+ {
+ if (availableLocations.Contains(region))
+ {
+ applicableRegions.Add(region);
+ }
+ }
+ }
+
+ if (applicableRegions.Count == 0)
+ {
+ applicableRegions.Add(fallbackRegion);
+ }
+
+ return new ReadOnlyCollection(applicableRegions);
+ }
+
+ public bool ShouldRefreshEndpoints(out bool canRefreshInBackground)
+ {
+ canRefreshInBackground = true;
+ DatabaseAccountLocationsInfo currentLocationInfo = this.locationInfo;
+
+ string mostPreferredLocation = currentLocationInfo.PreferredLocations.FirstOrDefault();
+
+ if (currentLocationInfo.PreferredLocations == null || currentLocationInfo.PreferredLocations.Count == 0)
+ {
+ mostPreferredLocation = currentLocationInfo.EffectivePreferredLocations.FirstOrDefault();
+ }
+
+ // we should schedule refresh in background if we are unable to target the user's most preferredLocation.
+ if (this.enableEndpointDiscovery)
+ {
+ // Refresh if client opts-in to useMultipleWriteLocations but server-side setting is disabled
+ bool shouldRefresh = this.useMultipleWriteLocations && !this.enableMultipleWriteLocations;
+
+ ReadOnlyCollection readLocationEndpoints = currentLocationInfo.ReadEndpoints;
+
+ if (this.IsEndpointUnavailable(readLocationEndpoints[0], OperationType.Read))
+ {
+ canRefreshInBackground = readLocationEndpoints.Count > 1;
+ DefaultTrace.TraceInformation("ShouldRefreshEndpoints = true since the first read endpoint {0} is not available for read. canRefreshInBackground = {1}",
+ readLocationEndpoints[0],
+ canRefreshInBackground);
+
+ return true;
+ }
+
+ if (!string.IsNullOrEmpty(mostPreferredLocation))
+ {
+ Uri mostPreferredReadEndpoint;
+
+ if (currentLocationInfo.AvailableReadEndpointByLocation.TryGetValue(mostPreferredLocation, out mostPreferredReadEndpoint))
+ {
+ if (mostPreferredReadEndpoint != readLocationEndpoints[0])
+ {
+ // For reads, we can always refresh in background as we can alternate to
+ // other available read endpoints
+ DefaultTrace.TraceInformation("ShouldRefreshEndpoints = true since most preferred location {0} is not available for read.", mostPreferredLocation);
+ return true;
+ }
+ }
+ else
+ {
+ DefaultTrace.TraceInformation("ShouldRefreshEndpoints = true since most preferred location {0} is not in available read locations.", mostPreferredLocation);
+ return true;
+ }
+ }
+
+ Uri mostPreferredWriteEndpoint;
+ ReadOnlyCollection writeLocationEndpoints = currentLocationInfo.WriteEndpoints;
+
+ if (!this.CanUseMultipleWriteLocations())
+ {
+ if (this.IsEndpointUnavailable(writeLocationEndpoints[0], OperationType.Write))
+ {
+ // Since most preferred write endpoint is unavailable, we can only refresh in background if
+ // we have an alternate write endpoint
+ canRefreshInBackground = writeLocationEndpoints.Count > 1;
+ DefaultTrace.TraceInformation("ShouldRefreshEndpoints = true since most preferred location {0} endpoint {1} is not available for write. canRefreshInBackground = {2}",
+ mostPreferredLocation,
+ writeLocationEndpoints[0],
+ canRefreshInBackground);
+
+ return true;
+ }
+ else
+ {
+ return shouldRefresh;
+ }
+ }
+ else if (!string.IsNullOrEmpty(mostPreferredLocation))
+ {
+ if (currentLocationInfo.AvailableWriteEndpointByLocation.TryGetValue(mostPreferredLocation, out mostPreferredWriteEndpoint))
+ {
+ shouldRefresh |= mostPreferredWriteEndpoint != writeLocationEndpoints[0];
+ DefaultTrace.TraceInformation("ShouldRefreshEndpoints = {0} since most preferred location {1} is not available for write.", shouldRefresh, mostPreferredLocation);
+ return shouldRefresh;
+ }
+ else
+ {
+ DefaultTrace.TraceInformation("ShouldRefreshEndpoints = true since most preferred location {0} is not in available write locations", mostPreferredLocation);
+ return true;
+ }
+ }
+ else
+ {
+ return shouldRefresh;
+ }
+ }
+ else
+ {
+ return false;
+ }
+ }
+
+ public bool CanUseMultipleWriteLocations(DocumentServiceRequest request)
+ {
+ return this.CanUseMultipleWriteLocations() &&
+ (request.ResourceType == ResourceType.Document ||
+ (request.ResourceType == ResourceType.StoredProcedure && request.OperationType == Documents.OperationType.ExecuteJavaScript));
+ }
+
+ private void ClearStaleEndpointUnavailabilityInfo()
+ {
+ if (this.locationUnavailablityInfoByEndpoint.Any())
+ {
+ List unavailableEndpoints = this.locationUnavailablityInfoByEndpoint.Keys.ToList();
+
+ foreach (Uri unavailableEndpoint in unavailableEndpoints)
+ {
+ LocationUnavailabilityInfo unavailabilityInfo;
+ LocationUnavailabilityInfo removed;
+
+ if (this.locationUnavailablityInfoByEndpoint.TryGetValue(unavailableEndpoint, out unavailabilityInfo)
+ && DateTime.UtcNow - unavailabilityInfo.LastUnavailabilityCheckTimeStamp > this.unavailableLocationsExpirationTime
+ && this.locationUnavailablityInfoByEndpoint.TryRemove(unavailableEndpoint, out removed))
+ {
+ DefaultTrace.TraceInformation(
+ "Removed endpoint {0} unavailable for operations {1} from unavailableEndpoints",
+ unavailableEndpoint,
+ unavailabilityInfo.UnavailableOperations);
+ }
+ }
+ }
+ }
+
+ private bool IsEndpointUnavailable(Uri endpoint, OperationType expectedAvailableOperations)
+ {
+ LocationUnavailabilityInfo unavailabilityInfo;
+
+ if (expectedAvailableOperations == OperationType.None
+ || !this.locationUnavailablityInfoByEndpoint.TryGetValue(endpoint, out unavailabilityInfo)
+ || !unavailabilityInfo.UnavailableOperations.HasFlag(expectedAvailableOperations))
+ {
+ return false;
+ }
+ else
+ {
+ if (DateTime.UtcNow - unavailabilityInfo.LastUnavailabilityCheckTimeStamp > this.unavailableLocationsExpirationTime)
+ {
+ return false;
+ }
+ else
+ {
+ DefaultTrace.TraceInformation(
+ "Endpoint {0} unavailable for operations {1} present in unavailableEndpoints",
+ endpoint,
+ unavailabilityInfo.UnavailableOperations);
+ // Unexpired entry present. Endpoint is unavailable
+ return true;
+ }
+ }
+ }
+
+ private void MarkEndpointUnavailable(
+ Uri unavailableEndpoint,
+ OperationType unavailableOperationType)
+ {
+ DateTime currentTime = DateTime.UtcNow;
+ LocationUnavailabilityInfo updatedInfo = this.locationUnavailablityInfoByEndpoint.AddOrUpdate(
+ unavailableEndpoint,
+ (Uri endpoint) =>
+ {
+ return new LocationUnavailabilityInfo()
+ {
+ LastUnavailabilityCheckTimeStamp = currentTime,
+ UnavailableOperations = unavailableOperationType,
+ };
+ },
+ (Uri endpoint, LocationUnavailabilityInfo info) =>
+ {
+ info.LastUnavailabilityCheckTimeStamp = currentTime;
+ info.UnavailableOperations |= unavailableOperationType;
+ return info;
+ });
+
+ this.UpdateLocationCache();
+
+ DefaultTrace.TraceInformation(
+ "Endpoint {0} unavailable for {1} added/updated to unavailableEndpoints with timestamp {2}",
+ unavailableEndpoint,
+ unavailableOperationType,
+ updatedInfo.LastUnavailabilityCheckTimeStamp);
+ }
+
+ private void UpdateLocationCache(
+ IEnumerable writeLocations = null,
+ IEnumerable readLocations = null,
+ ReadOnlyCollection preferenceList = null,
+ bool? enableMultipleWriteLocations = null)
+ {
+ lock (this.lockObject)
+ {
+ DatabaseAccountLocationsInfo nextLocationInfo = new DatabaseAccountLocationsInfo(this.locationInfo);
+
+ if (preferenceList != null)
+ {
+ nextLocationInfo.PreferredLocations = preferenceList;
+ }
+
+ if (enableMultipleWriteLocations.HasValue)
+ {
+ this.enableMultipleWriteLocations = enableMultipleWriteLocations.Value;
+ }
+
+ this.ClearStaleEndpointUnavailabilityInfo();
+
+ if (readLocations != null)
+ {
+ nextLocationInfo.AvailableReadEndpointByLocation = this.GetEndpointByLocation(
+ readLocations,
+ out ReadOnlyCollection availableReadLocations,
+ out ReadOnlyDictionary availableReadLocationsByEndpoint);
+
+ nextLocationInfo.AvailableReadLocations = availableReadLocations;
+ nextLocationInfo.AccountReadEndpoints = nextLocationInfo.AvailableReadEndpointByLocation.Select(x => x.Value).ToList().AsReadOnly();
+ nextLocationInfo.AvailableReadLocationByEndpoint = availableReadLocationsByEndpoint;
+ }
+
+ if (writeLocations != null)
+ {
+ nextLocationInfo.AvailableWriteEndpointByLocation = this.GetEndpointByLocation(
+ writeLocations,
+ out ReadOnlyCollection availableWriteLocations,
+ out ReadOnlyDictionary availableWriteLocationsByEndpoint);
+
+ nextLocationInfo.AvailableWriteLocations = availableWriteLocations;
+ nextLocationInfo.AvailableWriteLocationByEndpoint = availableWriteLocationsByEndpoint;
+ }
+
+ nextLocationInfo.WriteEndpoints = this.GetPreferredAvailableEndpoints(
+ endpointsByLocation: nextLocationInfo.AvailableWriteEndpointByLocation,
+ orderedLocations: nextLocationInfo.AvailableWriteLocations,
+ expectedAvailableOperation: OperationType.Write,
+ fallbackEndpoint: this.defaultEndpoint);
+
+ nextLocationInfo.ReadEndpoints = this.GetPreferredAvailableEndpoints(
+ endpointsByLocation: nextLocationInfo.AvailableReadEndpointByLocation,
+ orderedLocations: nextLocationInfo.AvailableReadLocations,
+ expectedAvailableOperation: OperationType.Read,
+ fallbackEndpoint: nextLocationInfo.WriteEndpoints[0]);
+
+ if (nextLocationInfo.PreferredLocations == null || nextLocationInfo.PreferredLocations.Count == 0)
+ {
+ if (!nextLocationInfo.AvailableReadLocationByEndpoint.TryGetValue(this.defaultEndpoint, out string regionForDefaultEndpoint))
+ {
+ nextLocationInfo.EffectivePreferredLocations = nextLocationInfo.AvailableReadLocations;
+ }
+ else
+ {
+ List locations = new ()
+ {
+ regionForDefaultEndpoint
+ };
+
+ nextLocationInfo.EffectivePreferredLocations = new ReadOnlyCollection(locations);
+ }
+ }
+
+ this.lastCacheUpdateTimestamp = DateTime.UtcNow;
+
+ DefaultTrace.TraceInformation("Current WriteEndpoints = ({0}) ReadEndpoints = ({1})",
+ string.Join(", ", nextLocationInfo.WriteEndpoints.Select(endpoint => endpoint.ToString())),
+ string.Join(", ", nextLocationInfo.ReadEndpoints.Select(endpoint => endpoint.ToString())));
+
+ this.locationInfo = nextLocationInfo;
+ }
+ }
+
+ private ReadOnlyCollection GetPreferredAvailableEndpoints(ReadOnlyDictionary endpointsByLocation, ReadOnlyCollection orderedLocations, OperationType expectedAvailableOperation, Uri fallbackEndpoint)
+ {
+ List endpoints = new List();
+ DatabaseAccountLocationsInfo currentLocationInfo = this.locationInfo;
+
+ // if enableEndpointDiscovery is false, we always use the defaultEndpoint that user passed in during documentClient init
+ if (this.enableEndpointDiscovery)
+ {
+ if (this.CanUseMultipleWriteLocations() || expectedAvailableOperation.HasFlag(OperationType.Read))
+ {
+ List unavailableEndpoints = new List();
+
+ // When client can not use multiple write locations, preferred locations list should only be used
+ // determining read endpoints order.
+ // If client can use multiple write locations, preferred locations list should be used for determining
+ // both read and write endpoints order.
+
+ if (currentLocationInfo.PreferredLocations != null && currentLocationInfo.PreferredLocations.Count >= 1)
+ {
+ foreach (string location in currentLocationInfo.PreferredLocations)
+ {
+ if (endpointsByLocation.TryGetValue(location, out Uri endpoint))
+ {
+ if (this.IsEndpointUnavailable(endpoint, expectedAvailableOperation))
+ {
+ unavailableEndpoints.Add(endpoint);
+ }
+ else
+ {
+ endpoints.Add(endpoint);
+ }
+ }
+ }
+ }
+ else
+ {
+ foreach (string location in orderedLocations)
+ {
+ if (endpointsByLocation.TryGetValue(location, out Uri endpoint))
+ {
+ if (this.defaultEndpoint.Equals(endpoint))
+ {
+ endpoints = new List();
+ break;
+ }
+
+ if (this.IsEndpointUnavailable(endpoint, expectedAvailableOperation))
+ {
+ unavailableEndpoints.Add(endpoint);
+ }
+ else
+ {
+ endpoints.Add(endpoint);
+ }
+ }
+ }
+ }
+
+ if (endpoints.Count == 0)
+ {
+ endpoints.Add(fallbackEndpoint);
+ unavailableEndpoints.Remove(fallbackEndpoint);
+ }
+
+ endpoints.AddRange(unavailableEndpoints);
+ }
+ else
+ {
+ foreach (string location in orderedLocations)
+ {
+ if (!string.IsNullOrEmpty(location) && // location is empty during manual failover
+ endpointsByLocation.TryGetValue(location, out Uri endpoint))
+ {
+ endpoints.Add(endpoint);
+ }
+ }
+ }
+ }
+
+ if (endpoints.Count == 0)
+ {
+ endpoints.Add(fallbackEndpoint);
+ }
+
+ return endpoints.AsReadOnly();
+ }
+
+ private ReadOnlyDictionary GetEndpointByLocation(IEnumerable locations, out ReadOnlyCollection orderedLocations, out ReadOnlyDictionary availableLocationsByEndpoint)
+ {
+ Dictionary endpointsByLocation = new Dictionary(StringComparer.OrdinalIgnoreCase);
+ Dictionary mutableAvailableLocationsByEndpoint = new Dictionary();
+
+ List parsedLocations = new List();
+
+ foreach (AccountRegion location in locations)
+ {
+ Uri endpoint;
+ if (!string.IsNullOrEmpty(location.Name)
+ && Uri.TryCreate(location.Endpoint, UriKind.Absolute, out endpoint))
+ {
+ endpointsByLocation[location.Name] = endpoint;
+ parsedLocations.Add(location.Name);
+
+ mutableAvailableLocationsByEndpoint[endpoint] = location.Name;
+
+ this.SetServicePointConnectionLimit(endpoint);
+ }
+ else
+ {
+ DefaultTrace.TraceInformation("GetAvailableEndpointsByLocation() - skipping add for location = {0} as it is location name is either empty or endpoint is malformed {1}",
+ location.Name,
+ location.Endpoint);
+ }
+ }
+
+ orderedLocations = parsedLocations.AsReadOnly();
+ availableLocationsByEndpoint = new ReadOnlyDictionary(mutableAvailableLocationsByEndpoint);
+
+ return new ReadOnlyDictionary(endpointsByLocation);
+ }
+
+ private bool CanUseMultipleWriteLocations()
+ {
+ return this.useMultipleWriteLocations && this.enableMultipleWriteLocations;
+ }
+
+ private void SetServicePointConnectionLimit(Uri endpoint)
+ {
+#if !NETSTANDARD16
+ ServicePointAccessor servicePoint = ServicePointAccessor.FindServicePoint(endpoint);
+ servicePoint.ConnectionLimit = this.connectionLimit;
+#endif
+ }
+
+ private sealed class LocationUnavailabilityInfo
+ {
+ public DateTime LastUnavailabilityCheckTimeStamp { get; set; }
+ public OperationType UnavailableOperations { get; set; }
+ }
+
+ private sealed class DatabaseAccountLocationsInfo
+ {
+ public DatabaseAccountLocationsInfo(ReadOnlyCollection preferredLocations, Uri defaultEndpoint)
+ {
+ this.PreferredLocations = preferredLocations;
+ this.AvailableWriteLocations = new List().AsReadOnly();
+ this.AvailableReadLocations = new List().AsReadOnly();
+ this.AvailableWriteEndpointByLocation = new ReadOnlyDictionary(new Dictionary(StringComparer.OrdinalIgnoreCase));
+ this.AvailableReadEndpointByLocation = new ReadOnlyDictionary(new Dictionary(StringComparer.OrdinalIgnoreCase));
+ this.AvailableWriteLocationByEndpoint = new ReadOnlyDictionary(new Dictionary());
+ this.AvailableReadLocationByEndpoint = new ReadOnlyDictionary(new Dictionary());
+ this.WriteEndpoints = new List() { defaultEndpoint }.AsReadOnly();
+ this.AccountReadEndpoints = new List() { defaultEndpoint }.AsReadOnly();
+ this.ReadEndpoints = new List() { defaultEndpoint }.AsReadOnly();
+ this.EffectivePreferredLocations = new List().AsReadOnly();
+ }
+
+ public DatabaseAccountLocationsInfo(DatabaseAccountLocationsInfo other)
+ {
+ this.PreferredLocations = other.PreferredLocations;
+ this.AvailableWriteLocations = other.AvailableWriteLocations;
+ this.AvailableReadLocations = other.AvailableReadLocations;
+ this.AvailableWriteEndpointByLocation = other.AvailableWriteEndpointByLocation;
+ this.AvailableReadEndpointByLocation = other.AvailableReadEndpointByLocation;
+ this.AvailableReadLocationByEndpoint = other.AvailableReadLocationByEndpoint;
+ this.AvailableWriteLocationByEndpoint = other.AvailableWriteLocationByEndpoint;
+ this.WriteEndpoints = other.WriteEndpoints;
+ this.AccountReadEndpoints = other.AccountReadEndpoints;
+ this.ReadEndpoints = other.ReadEndpoints;
+ this.EffectivePreferredLocations = other.EffectivePreferredLocations;
+ }
+
+ public ReadOnlyCollection PreferredLocations { get; set; }
+ public ReadOnlyCollection AvailableWriteLocations { get; set; }
+ public ReadOnlyCollection AvailableReadLocations { get; set; }
+ public ReadOnlyDictionary AvailableWriteEndpointByLocation { get; set; }
+ public ReadOnlyDictionary AvailableReadEndpointByLocation { get; set; }
+ public ReadOnlyDictionary AvailableWriteLocationByEndpoint { get; set; }
+ public ReadOnlyDictionary AvailableReadLocationByEndpoint { get; set; }
+
+ public ReadOnlyCollection