From 7fee7f05ba20110c0b836b53f5f1cfb95b76fdf1 Mon Sep 17 00:00:00 2001 From: Abhijeet Mohanty Date: Wed, 18 Sep 2024 21:28:33 -0400 Subject: [PATCH] Fix. --- .../src/Routing/GlobalEndpointManager.cs | 741 +++++++++++++- .../src/Routing/LocationCache.cs | 955 +++++++++++++++++- 2 files changed, 1694 insertions(+), 2 deletions(-) diff --git a/Microsoft.Azure.Cosmos/src/Routing/GlobalEndpointManager.cs b/Microsoft.Azure.Cosmos/src/Routing/GlobalEndpointManager.cs index 7b66310a34..6cc829cd3d 100644 --- a/Microsoft.Azure.Cosmos/src/Routing/GlobalEndpointManager.cs +++ b/Microsoft.Azure.Cosmos/src/Routing/GlobalEndpointManager.cs @@ -1 +1,740 @@ -//------------------------------------------------------------ // Copyright (c) Microsoft Corporation. All rights reserved. //------------------------------------------------------------ #nullable enable namespace Microsoft.Azure.Cosmos.Routing { using System; using System.Collections.Generic; using System.Collections.ObjectModel; using System.Collections.Specialized; using System.Linq; using System.Net; using System.Runtime.ExceptionServices; using System.Threading; using System.Threading.Tasks; using Microsoft.Azure.Cosmos.Common; using Microsoft.Azure.Cosmos.Core.Trace; using Microsoft.Azure.Documents; /// /// AddressCache implementation for client SDK. Supports cross region address routing based on /// availability and preference list. /// /// Marking it as non-sealed in order to unit test it using Moq framework internal class GlobalEndpointManager : IGlobalEndpointManager { private const int DefaultBackgroundRefreshLocationTimeIntervalInMS = 5 * 60 * 1000; private const string BackgroundRefreshLocationTimeIntervalInMS = "BackgroundRefreshLocationTimeIntervalInMS"; private const string MinimumIntervalForNonForceRefreshLocationInMS = "MinimumIntervalForNonForceRefreshLocationInMS"; private readonly CancellationTokenSource cancellationTokenSource = new CancellationTokenSource(); private readonly LocationCache locationCache; private readonly Uri defaultEndpoint; private readonly ConnectionPolicy connectionPolicy; private readonly IDocumentClientInternal owner; private readonly AsyncCache databaseAccountCache = new AsyncCache(); private readonly TimeSpan MinTimeBetweenAccountRefresh = TimeSpan.FromSeconds(15); private readonly int backgroundRefreshLocationTimeIntervalInMS = GlobalEndpointManager.DefaultBackgroundRefreshLocationTimeIntervalInMS; private readonly object backgroundAccountRefreshLock = new object(); private readonly object isAccountRefreshInProgressLock = new object(); private readonly ReaderWriterLockSlim locationCacheDatabaseAccountReadWriteLock = new ReaderWriterLockSlim(); private bool isAccountRefreshInProgress = false; private bool isBackgroundAccountRefreshActive = false; private DateTime LastBackgroundRefreshUtc = DateTime.MinValue; public GlobalEndpointManager(IDocumentClientInternal owner, ConnectionPolicy connectionPolicy) { this.locationCache = new LocationCache( new ReadOnlyCollection(connectionPolicy.PreferredLocations), owner.ServiceEndpoint, connectionPolicy.EnableEndpointDiscovery, connectionPolicy.MaxConnectionLimit, connectionPolicy.UseMultipleWriteLocations); this.owner = owner; this.defaultEndpoint = owner.ServiceEndpoint; this.connectionPolicy = connectionPolicy; this.connectionPolicy.PreferenceChanged += this.OnPreferenceChanged; #if !(NETSTANDARD15 || NETSTANDARD16) #if NETSTANDARD20 // GetEntryAssembly returns null when loaded from native netstandard2.0 if (System.Reflection.Assembly.GetEntryAssembly() != null) { #endif string backgroundRefreshLocationTimeIntervalInMSConfig = System.Configuration.ConfigurationManager.AppSettings[GlobalEndpointManager.BackgroundRefreshLocationTimeIntervalInMS]; if (!string.IsNullOrEmpty(backgroundRefreshLocationTimeIntervalInMSConfig)) { if (!int.TryParse(backgroundRefreshLocationTimeIntervalInMSConfig, out this.backgroundRefreshLocationTimeIntervalInMS)) { this.backgroundRefreshLocationTimeIntervalInMS = GlobalEndpointManager.DefaultBackgroundRefreshLocationTimeIntervalInMS; } } #if NETSTANDARD20 } #endif #endif string minimumIntervalForNonForceRefreshLocationInMSConfig = Environment.GetEnvironmentVariable(GlobalEndpointManager.MinimumIntervalForNonForceRefreshLocationInMS); if (!string.IsNullOrEmpty(minimumIntervalForNonForceRefreshLocationInMSConfig)) { if (int.TryParse(minimumIntervalForNonForceRefreshLocationInMSConfig, out int minimumIntervalForNonForceRefreshLocationInMS)) { this.MinTimeBetweenAccountRefresh = TimeSpan.FromMilliseconds(minimumIntervalForNonForceRefreshLocationInMS); } else { DefaultTrace.TraceError($"GlobalEndpointManager: Failed to parse {GlobalEndpointManager.MinimumIntervalForNonForceRefreshLocationInMS}; Value:{minimumIntervalForNonForceRefreshLocationInMSConfig}"); } } } public ReadOnlyCollection ReadEndpoints => this.locationCache.ReadEndpoints; public ReadOnlyCollection AccountReadEndpoints => this.locationCache.AccountReadEndpoints; public ReadOnlyCollection WriteEndpoints => this.locationCache.WriteEndpoints; public int PreferredLocationCount { get { Collection effectivePreferredLocations = this.GetEffectivePreferredLocations(); return effectivePreferredLocations.Count; } } public bool IsMultimasterMetadataWriteRequest(DocumentServiceRequest request) { return this.locationCache.IsMultimasterMetadataWriteRequest(request); } public Uri GetHubUri() { return this.locationCache.GetHubUri(); } /// /// This will get the account information. /// It will try the global endpoint first. /// If no response in 5 seconds it will create 2 additional tasks /// The 2 additional tasks will go through all the preferred regions in parallel /// It will return the first success and stop the parallel tasks. /// public static async Task GetDatabaseAccountFromAnyLocationsAsync( Uri defaultEndpoint, IList? locations, IList? accountInitializationCustomEndpoints, Func> getDatabaseAccountFn, CancellationToken cancellationToken, ReaderWriterLockSlim accountPropertiesReaderWriterLock) { using (GetAccountPropertiesHelper threadSafeGetAccountHelper = new GetAccountPropertiesHelper( defaultEndpoint, locations, accountInitializationCustomEndpoints, getDatabaseAccountFn, cancellationToken)) { return await threadSafeGetAccountHelper.GetAccountPropertiesAsync(accountPropertiesReaderWriterLock); } } /// /// This is a helper class to /// private class GetAccountPropertiesHelper : IDisposable { private readonly CancellationTokenSource CancellationTokenSource; private readonly Uri DefaultEndpoint; private readonly bool LimitToGlobalEndpointOnly; private readonly IEnumerator ServiceEndpointEnumerator; private readonly Func> GetDatabaseAccountFn; private readonly List TransientExceptions = new List(); private AccountProperties? AccountProperties = null; private Exception? NonRetriableException = null; private int disposeCounter = 0; public GetAccountPropertiesHelper( Uri defaultEndpoint, IList? locations, IList? accountInitializationCustomEndpoints, Func> getDatabaseAccountFn, CancellationToken cancellationToken) { this.DefaultEndpoint = defaultEndpoint; this.LimitToGlobalEndpointOnly = (locations == null || locations.Count == 0) && (accountInitializationCustomEndpoints == null || accountInitializationCustomEndpoints.Count == 0); this.GetDatabaseAccountFn = getDatabaseAccountFn; this.CancellationTokenSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken); this.ServiceEndpointEnumerator = GetAccountPropertiesHelper .GetServiceEndpoints( defaultEndpoint, locations, accountInitializationCustomEndpoints) .GetEnumerator(); } public async Task GetAccountPropertiesAsync(ReaderWriterLockSlim readerWriterLock) { // If there are no preferred regions or private endpoints, then just wait for the global endpoint results if (this.LimitToGlobalEndpointOnly) { return await this.GetOnlyGlobalEndpointAsync(readerWriterLock); } Task globalEndpointTask = this.GetAndUpdateAccountPropertiesAsync(this.DefaultEndpoint, readerWriterLock); // Start a timer to start secondary requests in parallel. Task timerTask = Task.Delay(TimeSpan.FromSeconds(5)); await Task.WhenAny(globalEndpointTask, timerTask); if (this.AccountProperties != null) { return this.AccountProperties; } if (this.NonRetriableException != null) { ExceptionDispatchInfo.Capture(this.NonRetriableException).Throw(); } // Start 2 additional tasks to try to get the account information // from the preferred region list since global account has not succeed yet. HashSet tasksToWaitOn = new HashSet { globalEndpointTask, this.TryGetAccountPropertiesFromAllLocationsAsync(readerWriterLock), this.TryGetAccountPropertiesFromAllLocationsAsync(readerWriterLock) }; while (tasksToWaitOn.Any()) { Task completedTask = await Task.WhenAny(tasksToWaitOn); if (this.AccountProperties != null) { return this.AccountProperties; } if (this.NonRetriableException != null) { ExceptionDispatchInfo.Capture(this.NonRetriableException).Throw(); } tasksToWaitOn.Remove(completedTask); } if (this.TransientExceptions.Count == 0) { throw new ArgumentException("Account properties and NonRetriableException are null and there are no TransientExceptions."); } if (this.TransientExceptions.Count == 1) { ExceptionDispatchInfo.Capture(this.TransientExceptions[0]).Throw(); } throw new AggregateException(this.TransientExceptions); } private async Task GetOnlyGlobalEndpointAsync(ReaderWriterLockSlim readerWriterLock) { if (!this.LimitToGlobalEndpointOnly) { throw new ArgumentException("GetOnlyGlobalEndpointAsync should only be called if there are no other private endpoints or regions"); } await this.GetAndUpdateAccountPropertiesAsync(this.DefaultEndpoint, readerWriterLock); if (this.AccountProperties != null) { return this.AccountProperties; } if (this.NonRetriableException != null) { throw this.NonRetriableException; } if (this.TransientExceptions.Count == 0) { throw new ArgumentException("Account properties and NonRetriableException are null and there are no TransientExceptions."); } if (this.TransientExceptions.Count == 1) { throw this.TransientExceptions[0]; } throw new AggregateException(this.TransientExceptions); } /// /// This is done in a thread safe way to allow multiple tasks to iterate over the list of service endpoints. /// private async Task TryGetAccountPropertiesFromAllLocationsAsync(ReaderWriterLockSlim readerWriterLock) { while (this.TryMoveNextServiceEndpointhreadSafe( out Uri? serviceEndpoint)) { if (serviceEndpoint == null) { DefaultTrace.TraceCritical("GlobalEndpointManager: serviceEndpoint is null for TryMoveNextServiceEndpointhreadSafe."); return; } await this.GetAndUpdateAccountPropertiesAsync( endpoint: serviceEndpoint, readerWriterLock); } } /// /// We first iterate through all the private endpoints to fetch the account information. /// If all the attempt fails to fetch the metadata from the private endpoints, we will /// attempt to retrieve the account information from the regional endpoints constructed /// using the preferred regions list. /// /// An instance of that will contain the service endpoint. /// A boolean flag indicating if the was advanced in a thread safe manner. private bool TryMoveNextServiceEndpointhreadSafe( out Uri? serviceEndpoint) { if (this.CancellationTokenSource.IsCancellationRequested) { serviceEndpoint = null; return false; } lock (this.ServiceEndpointEnumerator) { if (!this.ServiceEndpointEnumerator.MoveNext()) { serviceEndpoint = null; return false; } serviceEndpoint = this.ServiceEndpointEnumerator.Current; return true; } } private async Task GetAndUpdateAccountPropertiesAsync(Uri endpoint, ReaderWriterLockSlim readerWriterLock) { try { if (this.CancellationTokenSource.IsCancellationRequested) { lock (this.TransientExceptions) { this.TransientExceptions.Add(new OperationCanceledException($"GlobalEndpointManager: Get account information canceled for URI: {endpoint}")); } return; } AccountProperties databaseAccount = await this.GetDatabaseAccountFn(endpoint); if (databaseAccount != null) { readerWriterLock.EnterWriteLock(); try { this.AccountProperties = databaseAccount; this.CancellationTokenSource.Cancel(); } finally { readerWriterLock.ExitWriteLock(); } } } catch (Exception e) { DefaultTrace.TraceInformation("GlobalEndpointManager: Fail to reach gateway endpoint {0}, {1}", endpoint, e.ToString()); if (GetAccountPropertiesHelper.IsNonRetriableException(e)) { DefaultTrace.TraceInformation("GlobalEndpointManager: Exception is not retriable"); this.CancellationTokenSource.Cancel(); this.NonRetriableException = e; } else { lock (this.TransientExceptions) { this.TransientExceptions.Add(e); } } } } private static bool IsNonRetriableException(Exception exception) { if (exception is DocumentClientException dce && (dce.StatusCode == HttpStatusCode.Unauthorized || dce.StatusCode == HttpStatusCode.Forbidden)) { return true; } return false; } /// /// Returns an instance of containing the private and regional service endpoints to iterate over. /// /// An instance of containing the default global endpoint. /// An instance of containing the preferred serviceEndpoint names. /// An instance of containing the custom private endpoints. /// An instance of containing the service endpoints. private static IEnumerable GetServiceEndpoints( Uri defaultEndpoint, IList? locations, IList? accountInitializationCustomEndpoints) { // We first iterate over all the private endpoints and yield return them. if (accountInitializationCustomEndpoints?.Count > 0) { foreach (Uri customEndpoint in accountInitializationCustomEndpoints) { // Yield return all of the custom private endpoints first. yield return customEndpoint; } } // The next step is to iterate over the preferred locations, construct and yield return the regional endpoints one by one. // The regional endpoints will be constructed by appending the preferred region name as a suffix to the default global endpoint. if (locations?.Count > 0) { foreach (string location in locations) { // Yield return all of the regional endpoints once the private custom endpoints are visited. yield return LocationHelper.GetLocationEndpoint(defaultEndpoint, location); } } } public void Dispose() { if (Interlocked.Increment(ref this.disposeCounter) == 1) { this.CancellationTokenSource?.Cancel(); this.CancellationTokenSource?.Dispose(); } } } public virtual Uri ResolveServiceEndpoint(DocumentServiceRequest request) { return this.locationCache.ResolveServiceEndpoint(request); } /// /// Gets the default endpoint of the account /// /// the default endpoint. public Uri GetDefaultEndpoint() { return this.locationCache.GetDefaultEndpoint(); } /// /// Gets the mapping of available write region names to the respective endpoints /// public ReadOnlyDictionary GetAvailableWriteEndpointsByLocation() { return this.locationCache.GetAvailableWriteEndpointsByLocation(); } /// /// Gets the mapping of available read region names to the respective endpoints /// public ReadOnlyDictionary GetAvailableReadEndpointsByLocation() { return this.locationCache.GetAvailableReadEndpointsByLocation(); } /// /// Returns serviceEndpoint corresponding to the endpoint /// /// public string GetLocation(Uri endpoint) { return this.locationCache.GetLocation(endpoint); } public ReadOnlyCollection GetApplicableEndpoints(DocumentServiceRequest request, bool isReadRequest) { return this.locationCache.GetApplicableEndpoints(request, isReadRequest); } public ReadOnlyCollection GetApplicableRegions(IEnumerable excludeRegions, bool isReadRequest) { return this.locationCache.GetApplicableRegions(excludeRegions, isReadRequest); } public bool TryGetLocationForGatewayDiagnostics(Uri endpoint, out string regionName) { return this.locationCache.TryGetLocationForGatewayDiagnostics(endpoint, out regionName); } public virtual void MarkEndpointUnavailableForRead(Uri endpoint) { DefaultTrace.TraceInformation("GlobalEndpointManager: Marking endpoint {0} unavailable for read", endpoint); this.locationCache.MarkEndpointUnavailableForRead(endpoint); } public virtual void MarkEndpointUnavailableForWrite(Uri endpoint) { DefaultTrace.TraceInformation("GlobalEndpointManager: Marking endpoint {0} unavailable for Write", endpoint); this.locationCache.MarkEndpointUnavailableForWrite(endpoint); } public bool CanUseMultipleWriteLocations(DocumentServiceRequest request) { return this.locationCache.CanUseMultipleWriteLocations(request); } public void Dispose() { this.connectionPolicy.PreferenceChanged -= this.OnPreferenceChanged; if (!this.cancellationTokenSource.IsCancellationRequested) { // This can cause task canceled exceptions if the user disposes of the object while awaiting an async call. this.cancellationTokenSource.Cancel(); // The background timer task can hit a ObjectDisposedException but it's an async background task // that is never awaited on so it will not be thrown back to the caller. this.cancellationTokenSource.Dispose(); } } public virtual void InitializeAccountPropertiesAndStartBackgroundRefresh(AccountProperties databaseAccount) { if (this.cancellationTokenSource.IsCancellationRequested) { return; } this.locationCacheDatabaseAccountReadWriteLock.EnterWriteLock(); try { this.locationCache.OnDatabaseAccountRead(databaseAccount); } finally { this.locationCacheDatabaseAccountReadWriteLock.ExitWriteLock(); } if (this.isBackgroundAccountRefreshActive) { return; } lock (this.backgroundAccountRefreshLock) { if (this.isBackgroundAccountRefreshActive) { return; } this.isBackgroundAccountRefreshActive = true; } try { this.StartLocationBackgroundRefreshLoop(); } catch { this.isBackgroundAccountRefreshActive = false; throw; } } public virtual async Task RefreshLocationAsync(bool forceRefresh = false) { if (this.cancellationTokenSource.IsCancellationRequested) { return; } await this.RefreshDatabaseAccountInternalAsync(forceRefresh: forceRefresh); } #pragma warning disable VSTHRD100 // Avoid async void methods private async void StartLocationBackgroundRefreshLoop() #pragma warning restore VSTHRD100 // Avoid async void methods { if (this.cancellationTokenSource.IsCancellationRequested) { return; } DefaultTrace.TraceInformation("GlobalEndpointManager: StartLocationBackgroundRefreshWithTimer() refreshing locations"); if (!this.locationCache.ShouldRefreshEndpoints(out bool canRefreshInBackground)) { if (!canRefreshInBackground) { DefaultTrace.TraceInformation("GlobalEndpointManager: StartLocationBackgroundRefreshWithTimer() stropped."); lock (this.backgroundAccountRefreshLock) { this.isBackgroundAccountRefreshActive = false; } return; } } try { await Task.Delay(this.backgroundRefreshLocationTimeIntervalInMS, this.cancellationTokenSource.Token); DefaultTrace.TraceInformation("GlobalEndpointManager: StartLocationBackgroundRefreshWithTimer() - Invoking refresh"); if (this.cancellationTokenSource.IsCancellationRequested) { return; } await this.RefreshDatabaseAccountInternalAsync(forceRefresh: false); } catch (Exception ex) { if (this.cancellationTokenSource.IsCancellationRequested && (ex is OperationCanceledException || ex is ObjectDisposedException)) { return; } DefaultTrace.TraceCritical("GlobalEndpointManager: StartLocationBackgroundRefreshWithTimer() - Unable to refresh database account from any serviceEndpoint. Exception: {0}", ex.ToString()); } // Call itself to create a loop to continuously do background refresh every 5 minutes this.StartLocationBackgroundRefreshLoop(); } private Task GetDatabaseAccountAsync(Uri serviceEndpoint) { return this.owner.GetDatabaseAccountInternalAsync(serviceEndpoint, this.cancellationTokenSource.Token); } private void OnPreferenceChanged(object sender, NotifyCollectionChangedEventArgs e) { this.locationCache.OnLocationPreferenceChanged(new ReadOnlyCollection( this.connectionPolicy.PreferredLocations)); } /// /// Thread safe refresh account and serviceEndpoint info. /// private async Task RefreshDatabaseAccountInternalAsync(bool forceRefresh) { if (this.cancellationTokenSource.IsCancellationRequested) { return; } if (this.SkipRefresh(forceRefresh)) { return; } lock (this.isAccountRefreshInProgressLock) { // Check again if should refresh after obtaining the lock if (this.SkipRefresh(forceRefresh)) { return; } // If the refresh is already in progress just return. No reason to do another refresh. if (this.isAccountRefreshInProgress) { return; } this.isAccountRefreshInProgress = true; } try { this.LastBackgroundRefreshUtc = DateTime.UtcNow; AccountProperties accountProperties = await this.GetDatabaseAccountAsync(true); this.locationCacheDatabaseAccountReadWriteLock.EnterWriteLock(); try { this.locationCache.OnDatabaseAccountRead(accountProperties); } finally { this.locationCacheDatabaseAccountReadWriteLock.ExitWriteLock(); } } catch (Exception ex) { DefaultTrace.TraceWarning("Failed to refresh database account with exception: {0}. Activity Id: '{1}'", ex, System.Diagnostics.Trace.CorrelationManager.ActivityId); } finally { lock (this.isAccountRefreshInProgressLock) { this.isAccountRefreshInProgress = false; } } } internal async Task GetDatabaseAccountAsync(bool forceRefresh = false) { #nullable disable // Needed because AsyncCache does not have nullable enabled return await this.databaseAccountCache.GetAsync( key: string.Empty, obsoleteValue: null, singleValueInitFunc: () => GlobalEndpointManager.GetDatabaseAccountFromAnyLocationsAsync( this.defaultEndpoint, this.GetEffectivePreferredLocations(), this.connectionPolicy.AccountInitializationCustomEndpoints, this.GetDatabaseAccountAsync, this.cancellationTokenSource.Token, this.locationCacheDatabaseAccountReadWriteLock), cancellationToken: this.cancellationTokenSource.Token, forceRefresh: forceRefresh); #nullable enable } /// /// If the account is currently refreshing or the last refresh occurred less than the minimum time /// just return. This is used to avoid refreshing to often and preventing to much pressure on the gateway. /// private bool SkipRefresh(bool forceRefresh) { TimeSpan timeSinceLastRefresh = DateTime.UtcNow - this.LastBackgroundRefreshUtc; return (this.isAccountRefreshInProgress || this.MinTimeBetweenAccountRefresh > timeSinceLastRefresh) && !forceRefresh; } public Collection GetEffectivePreferredLocations() { if (this.connectionPolicy.PreferredLocations != null && this.connectionPolicy.PreferredLocations.Count > 0) { return this.connectionPolicy.PreferredLocations; } this.locationCacheDatabaseAccountReadWriteLock.EnterReadLock(); try { return new Collection(this.locationCache.EffectivePreferredLocations); } finally { this.locationCacheDatabaseAccountReadWriteLock.ExitReadLock(); } } } } \ No newline at end of file +//------------------------------------------------------------ +// Copyright (c) Microsoft Corporation. All rights reserved. +//------------------------------------------------------------ +#nullable enable +namespace Microsoft.Azure.Cosmos.Routing +{ + using System; + using System.Collections.Generic; + using System.Collections.ObjectModel; + using System.Collections.Specialized; + using System.Linq; + using System.Net; + using System.Runtime.ExceptionServices; + using System.Threading; + using System.Threading.Tasks; + using Microsoft.Azure.Cosmos.Common; + using Microsoft.Azure.Cosmos.Core.Trace; + using Microsoft.Azure.Documents; + + /// + /// AddressCache implementation for client SDK. Supports cross region address routing based on + /// availability and preference list. + /// + /// Marking it as non-sealed in order to unit test it using Moq framework + internal class GlobalEndpointManager : IGlobalEndpointManager + { + private const int DefaultBackgroundRefreshLocationTimeIntervalInMS = 5 * 60 * 1000; + + private const string BackgroundRefreshLocationTimeIntervalInMS = "BackgroundRefreshLocationTimeIntervalInMS"; + private const string MinimumIntervalForNonForceRefreshLocationInMS = "MinimumIntervalForNonForceRefreshLocationInMS"; + private readonly CancellationTokenSource cancellationTokenSource = new CancellationTokenSource(); + private readonly LocationCache locationCache; + private readonly Uri defaultEndpoint; + private readonly ConnectionPolicy connectionPolicy; + private readonly IDocumentClientInternal owner; + private readonly AsyncCache databaseAccountCache = new AsyncCache(); + private readonly TimeSpan MinTimeBetweenAccountRefresh = TimeSpan.FromSeconds(15); + private readonly int backgroundRefreshLocationTimeIntervalInMS = GlobalEndpointManager.DefaultBackgroundRefreshLocationTimeIntervalInMS; + private readonly object backgroundAccountRefreshLock = new object(); + private readonly object isAccountRefreshInProgressLock = new object(); + private readonly ReaderWriterLockSlim locationCacheDatabaseAccountReadWriteLock = new ReaderWriterLockSlim(); + private bool isAccountRefreshInProgress = false; + private bool isBackgroundAccountRefreshActive = false; + private DateTime LastBackgroundRefreshUtc = DateTime.MinValue; + + public GlobalEndpointManager(IDocumentClientInternal owner, ConnectionPolicy connectionPolicy) + { + this.locationCache = new LocationCache( + new ReadOnlyCollection(connectionPolicy.PreferredLocations), + owner.ServiceEndpoint, + connectionPolicy.EnableEndpointDiscovery, + connectionPolicy.MaxConnectionLimit, + connectionPolicy.UseMultipleWriteLocations); + + this.owner = owner; + this.defaultEndpoint = owner.ServiceEndpoint; + this.connectionPolicy = connectionPolicy; + + this.connectionPolicy.PreferenceChanged += this.OnPreferenceChanged; + +#if !(NETSTANDARD15 || NETSTANDARD16) +#if NETSTANDARD20 + // GetEntryAssembly returns null when loaded from native netstandard2.0 + if (System.Reflection.Assembly.GetEntryAssembly() != null) + { +#endif + string backgroundRefreshLocationTimeIntervalInMSConfig = System.Configuration.ConfigurationManager.AppSettings[GlobalEndpointManager.BackgroundRefreshLocationTimeIntervalInMS]; + if (!string.IsNullOrEmpty(backgroundRefreshLocationTimeIntervalInMSConfig)) + { + if (!int.TryParse(backgroundRefreshLocationTimeIntervalInMSConfig, out this.backgroundRefreshLocationTimeIntervalInMS)) + { + this.backgroundRefreshLocationTimeIntervalInMS = GlobalEndpointManager.DefaultBackgroundRefreshLocationTimeIntervalInMS; + } + } +#if NETSTANDARD20 + } +#endif +#endif + string minimumIntervalForNonForceRefreshLocationInMSConfig = Environment.GetEnvironmentVariable(GlobalEndpointManager.MinimumIntervalForNonForceRefreshLocationInMS); + if (!string.IsNullOrEmpty(minimumIntervalForNonForceRefreshLocationInMSConfig)) + { + if (int.TryParse(minimumIntervalForNonForceRefreshLocationInMSConfig, out int minimumIntervalForNonForceRefreshLocationInMS)) + { + this.MinTimeBetweenAccountRefresh = TimeSpan.FromMilliseconds(minimumIntervalForNonForceRefreshLocationInMS); + } + else + { + DefaultTrace.TraceError($"GlobalEndpointManager: Failed to parse {GlobalEndpointManager.MinimumIntervalForNonForceRefreshLocationInMS}; Value:{minimumIntervalForNonForceRefreshLocationInMSConfig}"); + } + } + } + + public ReadOnlyCollection ReadEndpoints => this.locationCache.ReadEndpoints; + + public ReadOnlyCollection AccountReadEndpoints => this.locationCache.AccountReadEndpoints; + + public ReadOnlyCollection WriteEndpoints => this.locationCache.WriteEndpoints; + + public int PreferredLocationCount + { + get + { + Collection effectivePreferredLocations = this.GetEffectivePreferredLocations(); + + return effectivePreferredLocations.Count; + } + } + + public bool IsMultimasterMetadataWriteRequest(DocumentServiceRequest request) + { + return this.locationCache.IsMultimasterMetadataWriteRequest(request); + } + + public Uri GetHubUri() + { + return this.locationCache.GetHubUri(); + } + + /// + /// This will get the account information. + /// It will try the global endpoint first. + /// If no response in 5 seconds it will create 2 additional tasks + /// The 2 additional tasks will go through all the preferred regions in parallel + /// It will return the first success and stop the parallel tasks. + /// + public static async Task GetDatabaseAccountFromAnyLocationsAsync( + Uri defaultEndpoint, + IList? locations, + IList? accountInitializationCustomEndpoints, + Func> getDatabaseAccountFn, + CancellationToken cancellationToken, + ReaderWriterLockSlim accountPropertiesReaderWriterLock) + { + using (GetAccountPropertiesHelper threadSafeGetAccountHelper = new GetAccountPropertiesHelper( + defaultEndpoint, + locations, + accountInitializationCustomEndpoints, + getDatabaseAccountFn, + cancellationToken)) + { + return await threadSafeGetAccountHelper.GetAccountPropertiesAsync(accountPropertiesReaderWriterLock); + } + } + + /// + /// This is a helper class to + /// + private class GetAccountPropertiesHelper : IDisposable + { + private readonly CancellationTokenSource CancellationTokenSource; + private readonly Uri DefaultEndpoint; + private readonly bool LimitToGlobalEndpointOnly; + private readonly IEnumerator ServiceEndpointEnumerator; + private readonly Func> GetDatabaseAccountFn; + private readonly List TransientExceptions = new List(); + private AccountProperties? AccountProperties = null; + private Exception? NonRetriableException = null; + private int disposeCounter = 0; + + public GetAccountPropertiesHelper( + Uri defaultEndpoint, + IList? locations, + IList? accountInitializationCustomEndpoints, + Func> getDatabaseAccountFn, + CancellationToken cancellationToken) + { + this.DefaultEndpoint = defaultEndpoint; + this.LimitToGlobalEndpointOnly = (locations == null || locations.Count == 0) && (accountInitializationCustomEndpoints == null || accountInitializationCustomEndpoints.Count == 0); + this.GetDatabaseAccountFn = getDatabaseAccountFn; + this.CancellationTokenSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken); + this.ServiceEndpointEnumerator = GetAccountPropertiesHelper + .GetServiceEndpoints( + defaultEndpoint, + locations, + accountInitializationCustomEndpoints) + .GetEnumerator(); + } + + public async Task GetAccountPropertiesAsync(ReaderWriterLockSlim readerWriterLock) + { + // If there are no preferred regions or private endpoints, then just wait for the global endpoint results + if (this.LimitToGlobalEndpointOnly) + { + return await this.GetOnlyGlobalEndpointAsync(readerWriterLock); + } + + Task globalEndpointTask = this.GetAndUpdateAccountPropertiesAsync(this.DefaultEndpoint, readerWriterLock); + + // Start a timer to start secondary requests in parallel. + Task timerTask = Task.Delay(TimeSpan.FromSeconds(5)); + await Task.WhenAny(globalEndpointTask, timerTask); + if (this.AccountProperties != null) + { + return this.AccountProperties; + } + + if (this.NonRetriableException != null) + { + ExceptionDispatchInfo.Capture(this.NonRetriableException).Throw(); + } + + // Start 2 additional tasks to try to get the account information + // from the preferred region list since global account has not succeed yet. + HashSet tasksToWaitOn = new HashSet + { + globalEndpointTask, + this.TryGetAccountPropertiesFromAllLocationsAsync(readerWriterLock), + this.TryGetAccountPropertiesFromAllLocationsAsync(readerWriterLock) + }; + + while (tasksToWaitOn.Any()) + { + Task completedTask = await Task.WhenAny(tasksToWaitOn); + if (this.AccountProperties != null) + { + return this.AccountProperties; + } + + if (this.NonRetriableException != null) + { + ExceptionDispatchInfo.Capture(this.NonRetriableException).Throw(); + } + + tasksToWaitOn.Remove(completedTask); + } + + if (this.TransientExceptions.Count == 0) + { + throw new ArgumentException("Account properties and NonRetriableException are null and there are no TransientExceptions."); + } + + if (this.TransientExceptions.Count == 1) + { + ExceptionDispatchInfo.Capture(this.TransientExceptions[0]).Throw(); + } + + throw new AggregateException(this.TransientExceptions); + } + + private async Task GetOnlyGlobalEndpointAsync(ReaderWriterLockSlim readerWriterLock) + { + if (!this.LimitToGlobalEndpointOnly) + { + throw new ArgumentException("GetOnlyGlobalEndpointAsync should only be called if there are no other private endpoints or regions"); + } + + await this.GetAndUpdateAccountPropertiesAsync(this.DefaultEndpoint, readerWriterLock); + + if (this.AccountProperties != null) + { + return this.AccountProperties; + } + + if (this.NonRetriableException != null) + { + throw this.NonRetriableException; + } + + if (this.TransientExceptions.Count == 0) + { + throw new ArgumentException("Account properties and NonRetriableException are null and there are no TransientExceptions."); + } + + if (this.TransientExceptions.Count == 1) + { + throw this.TransientExceptions[0]; + } + + throw new AggregateException(this.TransientExceptions); + } + + /// + /// This is done in a thread safe way to allow multiple tasks to iterate over the list of service endpoints. + /// + private async Task TryGetAccountPropertiesFromAllLocationsAsync(ReaderWriterLockSlim readerWriterLock) + { + while (this.TryMoveNextServiceEndpointhreadSafe( + out Uri? serviceEndpoint)) + { + if (serviceEndpoint == null) + { + DefaultTrace.TraceCritical("GlobalEndpointManager: serviceEndpoint is null for TryMoveNextServiceEndpointhreadSafe."); + return; + } + + await this.GetAndUpdateAccountPropertiesAsync( + endpoint: serviceEndpoint, + readerWriterLock); + } + } + + /// + /// We first iterate through all the private endpoints to fetch the account information. + /// If all the attempt fails to fetch the metadata from the private endpoints, we will + /// attempt to retrieve the account information from the regional endpoints constructed + /// using the preferred regions list. + /// + /// An instance of that will contain the service endpoint. + /// A boolean flag indicating if the was advanced in a thread safe manner. + private bool TryMoveNextServiceEndpointhreadSafe( + out Uri? serviceEndpoint) + { + if (this.CancellationTokenSource.IsCancellationRequested) + { + serviceEndpoint = null; + return false; + } + + lock (this.ServiceEndpointEnumerator) + { + if (!this.ServiceEndpointEnumerator.MoveNext()) + { + serviceEndpoint = null; + return false; + } + + serviceEndpoint = this.ServiceEndpointEnumerator.Current; + return true; + } + } + + private async Task GetAndUpdateAccountPropertiesAsync(Uri endpoint, ReaderWriterLockSlim readerWriterLock) + { + try + { + if (this.CancellationTokenSource.IsCancellationRequested) + { + lock (this.TransientExceptions) + { + this.TransientExceptions.Add(new OperationCanceledException($"GlobalEndpointManager: Get account information canceled for URI: {endpoint}")); + } + + return; + } + + AccountProperties databaseAccount = await this.GetDatabaseAccountFn(endpoint); + + if (databaseAccount != null) + { + readerWriterLock.EnterWriteLock(); + + try + { + this.AccountProperties = databaseAccount; + this.CancellationTokenSource.Cancel(); + } + finally + { + readerWriterLock.ExitWriteLock(); + } + } + } + catch (Exception e) + { + DefaultTrace.TraceInformation("GlobalEndpointManager: Fail to reach gateway endpoint {0}, {1}", endpoint, e.ToString()); + if (GetAccountPropertiesHelper.IsNonRetriableException(e)) + { + DefaultTrace.TraceInformation("GlobalEndpointManager: Exception is not retriable"); + this.CancellationTokenSource.Cancel(); + this.NonRetriableException = e; + } + else + { + lock (this.TransientExceptions) + { + this.TransientExceptions.Add(e); + } + } + } + } + + private static bool IsNonRetriableException(Exception exception) + { + if (exception is DocumentClientException dce && + (dce.StatusCode == HttpStatusCode.Unauthorized || dce.StatusCode == HttpStatusCode.Forbidden)) + { + return true; + } + + return false; + } + + /// + /// Returns an instance of containing the private and regional service endpoints to iterate over. + /// + /// An instance of containing the default global endpoint. + /// An instance of containing the preferred serviceEndpoint names. + /// An instance of containing the custom private endpoints. + /// An instance of containing the service endpoints. + private static IEnumerable GetServiceEndpoints( + Uri defaultEndpoint, + IList? locations, + IList? accountInitializationCustomEndpoints) + { + // We first iterate over all the private endpoints and yield return them. + if (accountInitializationCustomEndpoints?.Count > 0) + { + foreach (Uri customEndpoint in accountInitializationCustomEndpoints) + { + // Yield return all of the custom private endpoints first. + yield return customEndpoint; + } + } + + // The next step is to iterate over the preferred locations, construct and yield return the regional endpoints one by one. + // The regional endpoints will be constructed by appending the preferred region name as a suffix to the default global endpoint. + if (locations?.Count > 0) + { + foreach (string location in locations) + { + // Yield return all of the regional endpoints once the private custom endpoints are visited. + yield return LocationHelper.GetLocationEndpoint(defaultEndpoint, location); + } + } + } + + public void Dispose() + { + if (Interlocked.Increment(ref this.disposeCounter) == 1) + { + this.CancellationTokenSource?.Cancel(); + this.CancellationTokenSource?.Dispose(); + } + } + } + + public virtual Uri ResolveServiceEndpoint(DocumentServiceRequest request) + { + return this.locationCache.ResolveServiceEndpoint(request); + } + + /// + /// Gets the default endpoint of the account + /// + /// the default endpoint. + public Uri GetDefaultEndpoint() + { + return this.locationCache.GetDefaultEndpoint(); + } + + /// + /// Gets the mapping of available write region names to the respective endpoints + /// + public ReadOnlyDictionary GetAvailableWriteEndpointsByLocation() + { + return this.locationCache.GetAvailableWriteEndpointsByLocation(); + } + + /// + /// Gets the mapping of available read region names to the respective endpoints + /// + public ReadOnlyDictionary GetAvailableReadEndpointsByLocation() + { + return this.locationCache.GetAvailableReadEndpointsByLocation(); + } + + /// + /// Returns serviceEndpoint corresponding to the endpoint + /// + /// + public string GetLocation(Uri endpoint) + { + return this.locationCache.GetLocation(endpoint); + } + + public ReadOnlyCollection GetApplicableEndpoints(DocumentServiceRequest request, bool isReadRequest) + { + return this.locationCache.GetApplicableEndpoints(request, isReadRequest); + } + + public ReadOnlyCollection GetApplicableRegions(IEnumerable excludeRegions, bool isReadRequest) + { + return this.locationCache.GetApplicableRegions(excludeRegions, isReadRequest); + } + + public bool TryGetLocationForGatewayDiagnostics(Uri endpoint, out string regionName) + { + return this.locationCache.TryGetLocationForGatewayDiagnostics(endpoint, out regionName); + } + + public virtual void MarkEndpointUnavailableForRead(Uri endpoint) + { + DefaultTrace.TraceInformation("GlobalEndpointManager: Marking endpoint {0} unavailable for read", endpoint); + + this.locationCache.MarkEndpointUnavailableForRead(endpoint); + } + + public virtual void MarkEndpointUnavailableForWrite(Uri endpoint) + { + DefaultTrace.TraceInformation("GlobalEndpointManager: Marking endpoint {0} unavailable for Write", endpoint); + + this.locationCache.MarkEndpointUnavailableForWrite(endpoint); + } + + public bool CanUseMultipleWriteLocations(DocumentServiceRequest request) + { + return this.locationCache.CanUseMultipleWriteLocations(request); + } + + public void Dispose() + { + this.connectionPolicy.PreferenceChanged -= this.OnPreferenceChanged; + if (!this.cancellationTokenSource.IsCancellationRequested) + { + // This can cause task canceled exceptions if the user disposes of the object while awaiting an async call. + this.cancellationTokenSource.Cancel(); + // The background timer task can hit a ObjectDisposedException but it's an async background task + // that is never awaited on so it will not be thrown back to the caller. + this.cancellationTokenSource.Dispose(); + } + } + + public virtual void InitializeAccountPropertiesAndStartBackgroundRefresh(AccountProperties databaseAccount) + { + if (this.cancellationTokenSource.IsCancellationRequested) + { + return; + } + + this.locationCacheDatabaseAccountReadWriteLock.EnterWriteLock(); + try + { + this.locationCache.OnDatabaseAccountRead(databaseAccount); + } + finally + { + this.locationCacheDatabaseAccountReadWriteLock.ExitWriteLock(); + } + + if (this.isBackgroundAccountRefreshActive) + { + return; + } + + lock (this.backgroundAccountRefreshLock) + { + if (this.isBackgroundAccountRefreshActive) + { + return; + } + + this.isBackgroundAccountRefreshActive = true; + } + + try + { + this.StartLocationBackgroundRefreshLoop(); + } + catch + { + this.isBackgroundAccountRefreshActive = false; + throw; + } + } + + public virtual async Task RefreshLocationAsync(bool forceRefresh = false) + { + if (this.cancellationTokenSource.IsCancellationRequested) + { + return; + } + + await this.RefreshDatabaseAccountInternalAsync(forceRefresh: forceRefresh); + } + +#pragma warning disable VSTHRD100 // Avoid async void methods + private async void StartLocationBackgroundRefreshLoop() +#pragma warning restore VSTHRD100 // Avoid async void methods + { + if (this.cancellationTokenSource.IsCancellationRequested) + { + return; + } + + DefaultTrace.TraceInformation("GlobalEndpointManager: StartLocationBackgroundRefreshWithTimer() refreshing locations"); + + if (!this.locationCache.ShouldRefreshEndpoints(out bool canRefreshInBackground)) + { + if (!canRefreshInBackground) + { + DefaultTrace.TraceInformation("GlobalEndpointManager: StartLocationBackgroundRefreshWithTimer() stropped."); + lock (this.backgroundAccountRefreshLock) + { + this.isBackgroundAccountRefreshActive = false; + } + + return; + } + } + + try + { + await Task.Delay(this.backgroundRefreshLocationTimeIntervalInMS, this.cancellationTokenSource.Token); + + DefaultTrace.TraceInformation("GlobalEndpointManager: StartLocationBackgroundRefreshWithTimer() - Invoking refresh"); + + if (this.cancellationTokenSource.IsCancellationRequested) + { + return; + } + + await this.RefreshDatabaseAccountInternalAsync(forceRefresh: false); + } + catch (Exception ex) + { + if (this.cancellationTokenSource.IsCancellationRequested && (ex is OperationCanceledException || ex is ObjectDisposedException)) + { + return; + } + + DefaultTrace.TraceCritical("GlobalEndpointManager: StartLocationBackgroundRefreshWithTimer() - Unable to refresh database account from any serviceEndpoint. Exception: {0}", ex.ToString()); + } + + // Call itself to create a loop to continuously do background refresh every 5 minutes + this.StartLocationBackgroundRefreshLoop(); + } + + private Task GetDatabaseAccountAsync(Uri serviceEndpoint) + { + return this.owner.GetDatabaseAccountInternalAsync(serviceEndpoint, this.cancellationTokenSource.Token); + } + + private void OnPreferenceChanged(object sender, NotifyCollectionChangedEventArgs e) + { + this.locationCache.OnLocationPreferenceChanged(new ReadOnlyCollection( + this.connectionPolicy.PreferredLocations)); + } + + /// + /// Thread safe refresh account and serviceEndpoint info. + /// + private async Task RefreshDatabaseAccountInternalAsync(bool forceRefresh) + { + if (this.cancellationTokenSource.IsCancellationRequested) + { + return; + } + + if (this.SkipRefresh(forceRefresh)) + { + return; + } + + lock (this.isAccountRefreshInProgressLock) + { + // Check again if should refresh after obtaining the lock + if (this.SkipRefresh(forceRefresh)) + { + return; + } + + // If the refresh is already in progress just return. No reason to do another refresh. + if (this.isAccountRefreshInProgress) + { + return; + } + + this.isAccountRefreshInProgress = true; + } + + try + { + this.LastBackgroundRefreshUtc = DateTime.UtcNow; + + AccountProperties accountProperties = await this.GetDatabaseAccountAsync(true); + + this.locationCacheDatabaseAccountReadWriteLock.EnterWriteLock(); + + try + { + this.locationCache.OnDatabaseAccountRead(accountProperties); + } + finally + { + this.locationCacheDatabaseAccountReadWriteLock.ExitWriteLock(); + } + } + catch (Exception ex) + { + DefaultTrace.TraceWarning("Failed to refresh database account with exception: {0}. Activity Id: '{1}'", + ex, + System.Diagnostics.Trace.CorrelationManager.ActivityId); + } + finally + { + lock (this.isAccountRefreshInProgressLock) + { + this.isAccountRefreshInProgress = false; + } + } + } + internal async Task GetDatabaseAccountAsync(bool forceRefresh = false) + { +#nullable disable // Needed because AsyncCache does not have nullable enabled + return await this.databaseAccountCache.GetAsync( + key: string.Empty, + obsoleteValue: null, + singleValueInitFunc: () => GlobalEndpointManager.GetDatabaseAccountFromAnyLocationsAsync( + this.defaultEndpoint, + this.GetEffectivePreferredLocations(), + this.connectionPolicy.AccountInitializationCustomEndpoints, + this.GetDatabaseAccountAsync, + this.cancellationTokenSource.Token, + this.locationCacheDatabaseAccountReadWriteLock), + cancellationToken: this.cancellationTokenSource.Token, + forceRefresh: forceRefresh); +#nullable enable + } + + /// + /// If the account is currently refreshing or the last refresh occurred less than the minimum time + /// just return. This is used to avoid refreshing to often and preventing to much pressure on the gateway. + /// + private bool SkipRefresh(bool forceRefresh) + { + TimeSpan timeSinceLastRefresh = DateTime.UtcNow - this.LastBackgroundRefreshUtc; + return (this.isAccountRefreshInProgress || this.MinTimeBetweenAccountRefresh > timeSinceLastRefresh) + && !forceRefresh; + } + + public Collection GetEffectivePreferredLocations() + { + if (this.connectionPolicy.PreferredLocations != null && this.connectionPolicy.PreferredLocations.Count > 0) + { + return this.connectionPolicy.PreferredLocations; + } + + this.locationCacheDatabaseAccountReadWriteLock.EnterReadLock(); + + try + { + return new Collection(this.locationCache.EffectivePreferredLocations); + } + finally + { + this.locationCacheDatabaseAccountReadWriteLock.ExitReadLock(); + } + } + } +} diff --git a/Microsoft.Azure.Cosmos/src/Routing/LocationCache.cs b/Microsoft.Azure.Cosmos/src/Routing/LocationCache.cs index 12d8d29288..c5875e9179 100644 --- a/Microsoft.Azure.Cosmos/src/Routing/LocationCache.cs +++ b/Microsoft.Azure.Cosmos/src/Routing/LocationCache.cs @@ -1 +1,954 @@ -//------------------------------------------------------------ // Copyright (c) Microsoft Corporation. All rights reserved. //------------------------------------------------------------ namespace Microsoft.Azure.Cosmos.Routing { using System; using System.Collections.Concurrent; using System.Collections.Generic; using System.Collections.ObjectModel; using System.Linq; using Microsoft.Azure.Cosmos.Core.Trace; using Microsoft.Azure.Documents; /// /// Implements the abstraction to resolve target location for geo-replicated DatabaseAccount /// with multiple writable and readable locations. /// internal sealed class LocationCache { private const string UnavailableLocationsExpirationTimeInSeconds = "UnavailableLocationsExpirationTimeInSeconds"; private static int DefaultUnavailableLocationsExpirationTimeInSeconds = 5 * 60; private readonly bool enableEndpointDiscovery; private readonly Uri defaultEndpoint; private readonly bool useMultipleWriteLocations; private readonly object lockObject; private readonly TimeSpan unavailableLocationsExpirationTime; private readonly int connectionLimit; private readonly ConcurrentDictionary locationUnavailablityInfoByEndpoint; private readonly RegionNameMapper regionNameMapper; private DatabaseAccountLocationsInfo locationInfo; private DateTime lastCacheUpdateTimestamp; private bool enableMultipleWriteLocations; public LocationCache( ReadOnlyCollection preferredLocations, Uri defaultEndpoint, bool enableEndpointDiscovery, int connectionLimit, bool useMultipleWriteLocations) { this.locationInfo = new DatabaseAccountLocationsInfo(preferredLocations, defaultEndpoint); this.defaultEndpoint = defaultEndpoint; this.enableEndpointDiscovery = enableEndpointDiscovery; this.useMultipleWriteLocations = useMultipleWriteLocations; this.connectionLimit = connectionLimit; this.lockObject = new object(); this.locationUnavailablityInfoByEndpoint = new ConcurrentDictionary(); this.lastCacheUpdateTimestamp = DateTime.MinValue; this.enableMultipleWriteLocations = false; this.unavailableLocationsExpirationTime = TimeSpan.FromSeconds(LocationCache.DefaultUnavailableLocationsExpirationTimeInSeconds); this.regionNameMapper = new RegionNameMapper(); #if !(NETSTANDARD15 || NETSTANDARD16) #if NETSTANDARD20 // GetEntryAssembly returns null when loaded from native netstandard2.0 if (System.Reflection.Assembly.GetEntryAssembly() != null) { #endif string unavailableLocationsExpirationTimeInSecondsConfig = System.Configuration.ConfigurationManager.AppSettings[LocationCache.UnavailableLocationsExpirationTimeInSeconds]; if (!string.IsNullOrEmpty(unavailableLocationsExpirationTimeInSecondsConfig)) { int unavailableLocationsExpirationTimeinSecondsConfigValue; if (!int.TryParse(unavailableLocationsExpirationTimeInSecondsConfig, out unavailableLocationsExpirationTimeinSecondsConfigValue)) { this.unavailableLocationsExpirationTime = TimeSpan.FromSeconds(LocationCache.DefaultUnavailableLocationsExpirationTimeInSeconds); } else { this.unavailableLocationsExpirationTime = TimeSpan.FromSeconds(unavailableLocationsExpirationTimeinSecondsConfigValue); } } #if NETSTANDARD20 } #endif #endif } /// /// Gets list of read endpoints ordered by /// 1. Preferred location /// 2. Endpoint availablity /// public ReadOnlyCollection ReadEndpoints { get { // Hot-path: avoid ConcurrentDictionary methods which acquire locks if (DateTime.UtcNow - this.lastCacheUpdateTimestamp > this.unavailableLocationsExpirationTime && this.locationUnavailablityInfoByEndpoint.Any()) { this.UpdateLocationCache(); } return this.locationInfo.ReadEndpoints; } } /// /// Gets list of account level read endpoints. /// public ReadOnlyCollection AccountReadEndpoints => this.locationInfo.AccountReadEndpoints; /// /// Gets list of write endpoints ordered by /// 1. Preferred location /// 2. Endpoint availablity /// public ReadOnlyCollection WriteEndpoints { get { // Hot-path: avoid ConcurrentDictionary methods which acquire locks if (DateTime.UtcNow - this.lastCacheUpdateTimestamp > this.unavailableLocationsExpirationTime && this.locationUnavailablityInfoByEndpoint.Any()) { this.UpdateLocationCache(); } return this.locationInfo.WriteEndpoints; } } public ReadOnlyCollection EffectivePreferredLocations => this.locationInfo.EffectivePreferredLocations; /// /// Returns the location corresponding to the endpoint if location specific endpoint is provided. /// For the defaultEndPoint, we will return the first available write location. /// Returns null, in other cases. /// /// /// Today we return null for defaultEndPoint if multiple write locations can be used. /// This needs to be modifed to figure out proper location in such case. /// public string GetLocation(Uri endpoint) { string location = this.locationInfo.AvailableWriteEndpointByLocation.FirstOrDefault(uri => uri.Value == endpoint).Key ?? this.locationInfo.AvailableReadEndpointByLocation.FirstOrDefault(uri => uri.Value == endpoint).Key; if (location == null && endpoint == this.defaultEndpoint && !this.CanUseMultipleWriteLocations()) { if (this.locationInfo.AvailableWriteEndpointByLocation.Any()) { return this.locationInfo.AvailableWriteEndpointByLocation.First().Key; } } return location; } /// /// Set region name for a location if present in the locationcache otherwise set region name as null. /// If endpoint's hostname is same as default endpoint hostname, set regionName as null. /// /// /// /// true if region found else false public bool TryGetLocationForGatewayDiagnostics(Uri endpoint, out string regionName) { if (Uri.Compare( endpoint, this.defaultEndpoint, UriComponents.Host, UriFormat.SafeUnescaped, StringComparison.OrdinalIgnoreCase) == 0) { regionName = null; return false; } regionName = this.GetLocation(endpoint); return true; } /// /// Marks the current location unavailable for read /// public void MarkEndpointUnavailableForRead(Uri endpoint) { this.MarkEndpointUnavailable(endpoint, OperationType.Read); } /// /// Marks the current location unavailable for write /// public void MarkEndpointUnavailableForWrite(Uri endpoint) { this.MarkEndpointUnavailable(endpoint, OperationType.Write); } /// /// Invoked when is read /// /// Read DatabaseAccoaunt public void OnDatabaseAccountRead(AccountProperties databaseAccount) { this.UpdateLocationCache( databaseAccount.WritableRegions, databaseAccount.ReadableRegions, preferenceList: null, enableMultipleWriteLocations: databaseAccount.EnableMultipleWriteLocations); } /// /// Invoked when changes /// /// public void OnLocationPreferenceChanged(ReadOnlyCollection preferredLocations) { this.UpdateLocationCache( preferenceList: preferredLocations); } public bool IsMetaData(DocumentServiceRequest request) { return (request.OperationType != Documents.OperationType.ExecuteJavaScript && request.ResourceType == ResourceType.StoredProcedure) || request.ResourceType != ResourceType.Document; } public bool IsMultimasterMetadataWriteRequest(DocumentServiceRequest request) { return !request.IsReadOnlyRequest && this.locationInfo.AvailableWriteLocations.Count > 1 && this.IsMetaData(request) && this.CanUseMultipleWriteLocations(); } /// /// Gets the default endpoint of the account /// /// the default endpoint. public Uri GetDefaultEndpoint() { return this.defaultEndpoint; } /// /// Gets the mapping of available write region names to the respective endpoints /// public ReadOnlyDictionary GetAvailableWriteEndpointsByLocation() { return this.locationInfo.AvailableWriteEndpointByLocation; } /// /// Gets the mapping of available read region names to the respective endpoints /// public ReadOnlyDictionary GetAvailableReadEndpointsByLocation() { return this.locationInfo.AvailableReadEndpointByLocation; } public Uri GetHubUri() { DatabaseAccountLocationsInfo currentLocationInfo = this.locationInfo; string writeLocation = currentLocationInfo.AvailableWriteLocations[0]; Uri locationEndpointToRoute = currentLocationInfo.AvailableWriteEndpointByLocation[writeLocation]; return locationEndpointToRoute; } /// /// Gets account-level read locations. /// public ReadOnlyCollection GetAvailableReadLocations() { return this.locationInfo.AvailableReadLocations; } /// /// Gets account-level write locations. /// public ReadOnlyCollection GetAvailableWriteLocations() { return this.locationInfo.AvailableWriteLocations; } /// /// Resolves request to service endpoint. /// 1. If this is a write request /// (a) If UseMultipleWriteLocations = true /// (i) For document writes, resolve to most preferred and available write endpoint. /// Once the endpoint is marked unavailable, it is moved to the end of available write endpoint. Current request will /// be retried on next preferred available write endpoint. /// (ii) For all other resources, always resolve to first/second (regardless of preferred locations) /// write endpoint in . /// Endpoint of first write location in is the only endpoint that supports /// write operation on all resource types (except during that region's failover). /// Only during manual failover, client would retry write on second write location in . /// (b) Else resolve the request to first write endpoint in OR /// second write endpoint in in case of manual failover of that location. /// 2. Else resolve the request to most preferred available read endpoint (automatic failover for read requests) /// /// Request for which endpoint is to be resolved /// Resolved endpoint public Uri ResolveServiceEndpoint(DocumentServiceRequest request) { if (request.RequestContext != null && request.RequestContext.LocationEndpointToRoute != null) { return request.RequestContext.LocationEndpointToRoute; } int locationIndex = request.RequestContext.LocationIndexToRoute.GetValueOrDefault(0); Uri locationEndpointToRoute = this.defaultEndpoint; if (!request.RequestContext.UsePreferredLocations.GetValueOrDefault(true) // Should not use preferred location ? || (request.OperationType.IsWriteOperation() && !this.CanUseMultipleWriteLocations(request))) { // For non-document resource types in case of client can use multiple write locations // or when client cannot use multiple write locations, flip-flop between the // first and the second writable region in DatabaseAccount (for manual failover) DatabaseAccountLocationsInfo currentLocationInfo = this.locationInfo; if (this.enableEndpointDiscovery && currentLocationInfo.AvailableWriteLocations.Count > 0) { locationIndex = Math.Min(locationIndex % 2, currentLocationInfo.AvailableWriteLocations.Count - 1); string writeLocation = currentLocationInfo.AvailableWriteLocations[locationIndex]; locationEndpointToRoute = currentLocationInfo.AvailableWriteEndpointByLocation[writeLocation]; } } else { ReadOnlyCollection endpoints = this.GetApplicableEndpoints(request, !request.OperationType.IsWriteOperation()); locationEndpointToRoute = endpoints[locationIndex % endpoints.Count]; } request.RequestContext.RouteToLocation(locationEndpointToRoute); return locationEndpointToRoute; } public ReadOnlyCollection GetApplicableEndpoints(DocumentServiceRequest request, bool isReadRequest) { if (request.RequestContext.ExcludeRegions == null || request.RequestContext.ExcludeRegions.Count == 0) { return isReadRequest ? this.ReadEndpoints : this.WriteEndpoints; } DatabaseAccountLocationsInfo databaseAccountLocationsInfoSnapshot = this.locationInfo; ReadOnlyCollection effectivePreferredLocations = databaseAccountLocationsInfoSnapshot.PreferredLocations; if (effectivePreferredLocations == null || effectivePreferredLocations.Count == 0) { effectivePreferredLocations = databaseAccountLocationsInfoSnapshot.EffectivePreferredLocations; } return this.GetApplicableEndpoints( isReadRequest ? this.locationInfo.AvailableReadEndpointByLocation : this.locationInfo.AvailableWriteEndpointByLocation, effectivePreferredLocations, this.defaultEndpoint, request.RequestContext.ExcludeRegions); } public ReadOnlyCollection GetApplicableRegions(IEnumerable excludeRegions, bool isReadRequest) { bool isPreferredLocationsEmpty = this.locationInfo.PreferredLocations == null || this.locationInfo.PreferredLocations.Count == 0; DatabaseAccountLocationsInfo databaseAccountLocationsInfoSnapshot = this.locationInfo; ReadOnlyCollection effectivePreferredLocations = this.locationInfo.PreferredLocations; if (effectivePreferredLocations == null || effectivePreferredLocations.Count == 0) { effectivePreferredLocations = databaseAccountLocationsInfoSnapshot.EffectivePreferredLocations; } Uri firstEffectivePreferredEndpoint = isReadRequest ? databaseAccountLocationsInfoSnapshot.ReadEndpoints[0] : databaseAccountLocationsInfoSnapshot.WriteEndpoints[0]; if (isReadRequest) { databaseAccountLocationsInfoSnapshot.AvailableReadLocationByEndpoint.TryGetValue(firstEffectivePreferredEndpoint, out string firstEffectivePreferredLocation); return this.GetApplicableRegions( databaseAccountLocationsInfoSnapshot.AvailableReadLocations, effectivePreferredLocations, isPreferredLocationsEmpty ? firstEffectivePreferredLocation : databaseAccountLocationsInfoSnapshot.PreferredLocations[0], excludeRegions); } else { databaseAccountLocationsInfoSnapshot.AvailableWriteLocationByEndpoint.TryGetValue(firstEffectivePreferredEndpoint, out string firstEffectivePreferredLocation); return this.GetApplicableRegions( databaseAccountLocationsInfoSnapshot.AvailableWriteLocations, effectivePreferredLocations, isPreferredLocationsEmpty ? firstEffectivePreferredLocation : databaseAccountLocationsInfoSnapshot.PreferredLocations[0], excludeRegions); } } /// /// Gets applicable endpoints for a request, if there are no applicable endpoints, returns the fallback endpoint /// /// /// /// /// /// a list of applicable endpoints for a request private ReadOnlyCollection GetApplicableEndpoints( ReadOnlyDictionary regionNameByEndpoint, ReadOnlyCollection effectivePreferredLocations, Uri fallbackEndpoint, IEnumerable excludeRegions) { List applicableEndpoints = new List(regionNameByEndpoint.Count); HashSet excludeRegionsHash = excludeRegions == null ? null : new HashSet(excludeRegions); if (excludeRegions != null) { foreach (string region in effectivePreferredLocations) { if (!excludeRegionsHash.Contains(region) && regionNameByEndpoint.TryGetValue(region, out Uri endpoint)) { applicableEndpoints.Add(endpoint); } } } else { foreach (string region in effectivePreferredLocations) { if (regionNameByEndpoint.TryGetValue(region, out Uri endpoint)) { applicableEndpoints.Add(endpoint); } } } if (applicableEndpoints.Count == 0) { applicableEndpoints.Add(fallbackEndpoint); } return new ReadOnlyCollection(applicableEndpoints); } /// /// Gets applicable endpoints for a request, if there are no applicable endpoints, returns the fallback endpoint /// /// /// /// /// /// a list of applicable endpoints for a request private ReadOnlyCollection GetApplicableRegions( ReadOnlyCollection availableLocations, ReadOnlyCollection effectivePreferredLocations, string fallbackRegion, IEnumerable excludeRegions) { List applicableRegions = new List(availableLocations.Count); HashSet excludeRegionsHash = excludeRegions == null ? null : new HashSet(excludeRegions); if (excludeRegions != null) { foreach (string region in effectivePreferredLocations) { if (availableLocations.Contains(region) && !excludeRegionsHash.Contains(region)) { applicableRegions.Add(region); } } } else { foreach (string region in effectivePreferredLocations) { if (availableLocations.Contains(region)) { applicableRegions.Add(region); } } } if (applicableRegions.Count == 0) { applicableRegions.Add(fallbackRegion); } return new ReadOnlyCollection(applicableRegions); } public bool ShouldRefreshEndpoints(out bool canRefreshInBackground) { canRefreshInBackground = true; DatabaseAccountLocationsInfo currentLocationInfo = this.locationInfo; string mostPreferredLocation = currentLocationInfo.PreferredLocations.FirstOrDefault(); if (currentLocationInfo.PreferredLocations == null || currentLocationInfo.PreferredLocations.Count == 0) { mostPreferredLocation = currentLocationInfo.EffectivePreferredLocations.FirstOrDefault(); } // we should schedule refresh in background if we are unable to target the user's most preferredLocation. if (this.enableEndpointDiscovery) { // Refresh if client opts-in to useMultipleWriteLocations but server-side setting is disabled bool shouldRefresh = this.useMultipleWriteLocations && !this.enableMultipleWriteLocations; ReadOnlyCollection readLocationEndpoints = currentLocationInfo.ReadEndpoints; if (this.IsEndpointUnavailable(readLocationEndpoints[0], OperationType.Read)) { canRefreshInBackground = readLocationEndpoints.Count > 1; DefaultTrace.TraceInformation("ShouldRefreshEndpoints = true since the first read endpoint {0} is not available for read. canRefreshInBackground = {1}", readLocationEndpoints[0], canRefreshInBackground); return true; } if (!string.IsNullOrEmpty(mostPreferredLocation)) { Uri mostPreferredReadEndpoint; if (currentLocationInfo.AvailableReadEndpointByLocation.TryGetValue(mostPreferredLocation, out mostPreferredReadEndpoint)) { if (mostPreferredReadEndpoint != readLocationEndpoints[0]) { // For reads, we can always refresh in background as we can alternate to // other available read endpoints DefaultTrace.TraceInformation("ShouldRefreshEndpoints = true since most preferred location {0} is not available for read.", mostPreferredLocation); return true; } } else { DefaultTrace.TraceInformation("ShouldRefreshEndpoints = true since most preferred location {0} is not in available read locations.", mostPreferredLocation); return true; } } Uri mostPreferredWriteEndpoint; ReadOnlyCollection writeLocationEndpoints = currentLocationInfo.WriteEndpoints; if (!this.CanUseMultipleWriteLocations()) { if (this.IsEndpointUnavailable(writeLocationEndpoints[0], OperationType.Write)) { // Since most preferred write endpoint is unavailable, we can only refresh in background if // we have an alternate write endpoint canRefreshInBackground = writeLocationEndpoints.Count > 1; DefaultTrace.TraceInformation("ShouldRefreshEndpoints = true since most preferred location {0} endpoint {1} is not available for write. canRefreshInBackground = {2}", mostPreferredLocation, writeLocationEndpoints[0], canRefreshInBackground); return true; } else { return shouldRefresh; } } else if (!string.IsNullOrEmpty(mostPreferredLocation)) { if (currentLocationInfo.AvailableWriteEndpointByLocation.TryGetValue(mostPreferredLocation, out mostPreferredWriteEndpoint)) { shouldRefresh |= mostPreferredWriteEndpoint != writeLocationEndpoints[0]; DefaultTrace.TraceInformation("ShouldRefreshEndpoints = {0} since most preferred location {1} is not available for write.", shouldRefresh, mostPreferredLocation); return shouldRefresh; } else { DefaultTrace.TraceInformation("ShouldRefreshEndpoints = true since most preferred location {0} is not in available write locations", mostPreferredLocation); return true; } } else { return shouldRefresh; } } else { return false; } } public bool CanUseMultipleWriteLocations(DocumentServiceRequest request) { return this.CanUseMultipleWriteLocations() && (request.ResourceType == ResourceType.Document || (request.ResourceType == ResourceType.StoredProcedure && request.OperationType == Documents.OperationType.ExecuteJavaScript)); } private void ClearStaleEndpointUnavailabilityInfo() { if (this.locationUnavailablityInfoByEndpoint.Any()) { List unavailableEndpoints = this.locationUnavailablityInfoByEndpoint.Keys.ToList(); foreach (Uri unavailableEndpoint in unavailableEndpoints) { LocationUnavailabilityInfo unavailabilityInfo; LocationUnavailabilityInfo removed; if (this.locationUnavailablityInfoByEndpoint.TryGetValue(unavailableEndpoint, out unavailabilityInfo) && DateTime.UtcNow - unavailabilityInfo.LastUnavailabilityCheckTimeStamp > this.unavailableLocationsExpirationTime && this.locationUnavailablityInfoByEndpoint.TryRemove(unavailableEndpoint, out removed)) { DefaultTrace.TraceInformation( "Removed endpoint {0} unavailable for operations {1} from unavailableEndpoints", unavailableEndpoint, unavailabilityInfo.UnavailableOperations); } } } } private bool IsEndpointUnavailable(Uri endpoint, OperationType expectedAvailableOperations) { LocationUnavailabilityInfo unavailabilityInfo; if (expectedAvailableOperations == OperationType.None || !this.locationUnavailablityInfoByEndpoint.TryGetValue(endpoint, out unavailabilityInfo) || !unavailabilityInfo.UnavailableOperations.HasFlag(expectedAvailableOperations)) { return false; } else { if (DateTime.UtcNow - unavailabilityInfo.LastUnavailabilityCheckTimeStamp > this.unavailableLocationsExpirationTime) { return false; } else { DefaultTrace.TraceInformation( "Endpoint {0} unavailable for operations {1} present in unavailableEndpoints", endpoint, unavailabilityInfo.UnavailableOperations); // Unexpired entry present. Endpoint is unavailable return true; } } } private void MarkEndpointUnavailable( Uri unavailableEndpoint, OperationType unavailableOperationType) { DateTime currentTime = DateTime.UtcNow; LocationUnavailabilityInfo updatedInfo = this.locationUnavailablityInfoByEndpoint.AddOrUpdate( unavailableEndpoint, (Uri endpoint) => { return new LocationUnavailabilityInfo() { LastUnavailabilityCheckTimeStamp = currentTime, UnavailableOperations = unavailableOperationType, }; }, (Uri endpoint, LocationUnavailabilityInfo info) => { info.LastUnavailabilityCheckTimeStamp = currentTime; info.UnavailableOperations |= unavailableOperationType; return info; }); this.UpdateLocationCache(); DefaultTrace.TraceInformation( "Endpoint {0} unavailable for {1} added/updated to unavailableEndpoints with timestamp {2}", unavailableEndpoint, unavailableOperationType, updatedInfo.LastUnavailabilityCheckTimeStamp); } private void UpdateLocationCache( IEnumerable writeLocations = null, IEnumerable readLocations = null, ReadOnlyCollection preferenceList = null, bool? enableMultipleWriteLocations = null) { lock (this.lockObject) { DatabaseAccountLocationsInfo nextLocationInfo = new DatabaseAccountLocationsInfo(this.locationInfo); if (preferenceList != null) { nextLocationInfo.PreferredLocations = preferenceList; } if (enableMultipleWriteLocations.HasValue) { this.enableMultipleWriteLocations = enableMultipleWriteLocations.Value; } this.ClearStaleEndpointUnavailabilityInfo(); if (readLocations != null) { nextLocationInfo.AvailableReadEndpointByLocation = this.GetEndpointByLocation( readLocations, out ReadOnlyCollection availableReadLocations, out ReadOnlyDictionary availableReadLocationsByEndpoint); nextLocationInfo.AvailableReadLocations = availableReadLocations; nextLocationInfo.AccountReadEndpoints = nextLocationInfo.AvailableReadEndpointByLocation.Select(x => x.Value).ToList().AsReadOnly(); nextLocationInfo.AvailableReadLocationByEndpoint = availableReadLocationsByEndpoint; } if (writeLocations != null) { nextLocationInfo.AvailableWriteEndpointByLocation = this.GetEndpointByLocation( writeLocations, out ReadOnlyCollection availableWriteLocations, out ReadOnlyDictionary availableWriteLocationsByEndpoint); nextLocationInfo.AvailableWriteLocations = availableWriteLocations; nextLocationInfo.AvailableWriteLocationByEndpoint = availableWriteLocationsByEndpoint; } nextLocationInfo.WriteEndpoints = this.GetPreferredAvailableEndpoints( endpointsByLocation: nextLocationInfo.AvailableWriteEndpointByLocation, orderedLocations: nextLocationInfo.AvailableWriteLocations, expectedAvailableOperation: OperationType.Write, fallbackEndpoint: this.defaultEndpoint); nextLocationInfo.ReadEndpoints = this.GetPreferredAvailableEndpoints( endpointsByLocation: nextLocationInfo.AvailableReadEndpointByLocation, orderedLocations: nextLocationInfo.AvailableReadLocations, expectedAvailableOperation: OperationType.Read, fallbackEndpoint: nextLocationInfo.WriteEndpoints[0]); if (nextLocationInfo.PreferredLocations == null || nextLocationInfo.PreferredLocations.Count == 0) { if (!nextLocationInfo.AvailableReadLocationByEndpoint.TryGetValue(this.defaultEndpoint, out string regionForDefaultEndpoint)) { nextLocationInfo.EffectivePreferredLocations = nextLocationInfo.AvailableReadLocations; } else { List locations = new () { regionForDefaultEndpoint }; nextLocationInfo.EffectivePreferredLocations = new ReadOnlyCollection(locations); } } this.lastCacheUpdateTimestamp = DateTime.UtcNow; DefaultTrace.TraceInformation("Current WriteEndpoints = ({0}) ReadEndpoints = ({1})", string.Join(", ", nextLocationInfo.WriteEndpoints.Select(endpoint => endpoint.ToString())), string.Join(", ", nextLocationInfo.ReadEndpoints.Select(endpoint => endpoint.ToString()))); this.locationInfo = nextLocationInfo; } } private ReadOnlyCollection GetPreferredAvailableEndpoints(ReadOnlyDictionary endpointsByLocation, ReadOnlyCollection orderedLocations, OperationType expectedAvailableOperation, Uri fallbackEndpoint) { List endpoints = new List(); DatabaseAccountLocationsInfo currentLocationInfo = this.locationInfo; // if enableEndpointDiscovery is false, we always use the defaultEndpoint that user passed in during documentClient init if (this.enableEndpointDiscovery) { if (this.CanUseMultipleWriteLocations() || expectedAvailableOperation.HasFlag(OperationType.Read)) { List unavailableEndpoints = new List(); // When client can not use multiple write locations, preferred locations list should only be used // determining read endpoints order. // If client can use multiple write locations, preferred locations list should be used for determining // both read and write endpoints order. if (currentLocationInfo.PreferredLocations != null && currentLocationInfo.PreferredLocations.Count >= 1) { foreach (string location in currentLocationInfo.PreferredLocations) { if (endpointsByLocation.TryGetValue(location, out Uri endpoint)) { if (this.IsEndpointUnavailable(endpoint, expectedAvailableOperation)) { unavailableEndpoints.Add(endpoint); } else { endpoints.Add(endpoint); } } } } else { foreach (string location in orderedLocations) { if (endpointsByLocation.TryGetValue(location, out Uri endpoint)) { if (this.defaultEndpoint.Equals(endpoint)) { endpoints = new List(); break; } if (this.IsEndpointUnavailable(endpoint, expectedAvailableOperation)) { unavailableEndpoints.Add(endpoint); } else { endpoints.Add(endpoint); } } } } if (endpoints.Count == 0) { endpoints.Add(fallbackEndpoint); unavailableEndpoints.Remove(fallbackEndpoint); } endpoints.AddRange(unavailableEndpoints); } else { foreach (string location in orderedLocations) { if (!string.IsNullOrEmpty(location) && // location is empty during manual failover endpointsByLocation.TryGetValue(location, out Uri endpoint)) { endpoints.Add(endpoint); } } } } if (endpoints.Count == 0) { endpoints.Add(fallbackEndpoint); } return endpoints.AsReadOnly(); } private ReadOnlyDictionary GetEndpointByLocation(IEnumerable locations, out ReadOnlyCollection orderedLocations, out ReadOnlyDictionary availableLocationsByEndpoint) { Dictionary endpointsByLocation = new Dictionary(StringComparer.OrdinalIgnoreCase); Dictionary mutableAvailableLocationsByEndpoint = new Dictionary(); List parsedLocations = new List(); foreach (AccountRegion location in locations) { Uri endpoint; if (!string.IsNullOrEmpty(location.Name) && Uri.TryCreate(location.Endpoint, UriKind.Absolute, out endpoint)) { endpointsByLocation[location.Name] = endpoint; parsedLocations.Add(location.Name); mutableAvailableLocationsByEndpoint[endpoint] = location.Name; this.SetServicePointConnectionLimit(endpoint); } else { DefaultTrace.TraceInformation("GetAvailableEndpointsByLocation() - skipping add for location = {0} as it is location name is either empty or endpoint is malformed {1}", location.Name, location.Endpoint); } } orderedLocations = parsedLocations.AsReadOnly(); availableLocationsByEndpoint = new ReadOnlyDictionary(mutableAvailableLocationsByEndpoint); return new ReadOnlyDictionary(endpointsByLocation); } private bool CanUseMultipleWriteLocations() { return this.useMultipleWriteLocations && this.enableMultipleWriteLocations; } private void SetServicePointConnectionLimit(Uri endpoint) { #if !NETSTANDARD16 ServicePointAccessor servicePoint = ServicePointAccessor.FindServicePoint(endpoint); servicePoint.ConnectionLimit = this.connectionLimit; #endif } private sealed class LocationUnavailabilityInfo { public DateTime LastUnavailabilityCheckTimeStamp { get; set; } public OperationType UnavailableOperations { get; set; } } private sealed class DatabaseAccountLocationsInfo { public DatabaseAccountLocationsInfo(ReadOnlyCollection preferredLocations, Uri defaultEndpoint) { this.PreferredLocations = preferredLocations; this.AvailableWriteLocations = new List().AsReadOnly(); this.AvailableReadLocations = new List().AsReadOnly(); this.AvailableWriteEndpointByLocation = new ReadOnlyDictionary(new Dictionary(StringComparer.OrdinalIgnoreCase)); this.AvailableReadEndpointByLocation = new ReadOnlyDictionary(new Dictionary(StringComparer.OrdinalIgnoreCase)); this.AvailableWriteLocationByEndpoint = new ReadOnlyDictionary(new Dictionary()); this.AvailableReadLocationByEndpoint = new ReadOnlyDictionary(new Dictionary()); this.WriteEndpoints = new List() { defaultEndpoint }.AsReadOnly(); this.AccountReadEndpoints = new List() { defaultEndpoint }.AsReadOnly(); this.ReadEndpoints = new List() { defaultEndpoint }.AsReadOnly(); this.EffectivePreferredLocations = new List().AsReadOnly(); } public DatabaseAccountLocationsInfo(DatabaseAccountLocationsInfo other) { this.PreferredLocations = other.PreferredLocations; this.AvailableWriteLocations = other.AvailableWriteLocations; this.AvailableReadLocations = other.AvailableReadLocations; this.AvailableWriteEndpointByLocation = other.AvailableWriteEndpointByLocation; this.AvailableReadEndpointByLocation = other.AvailableReadEndpointByLocation; this.AvailableReadLocationByEndpoint = other.AvailableReadLocationByEndpoint; this.AvailableWriteLocationByEndpoint = other.AvailableWriteLocationByEndpoint; this.WriteEndpoints = other.WriteEndpoints; this.AccountReadEndpoints = other.AccountReadEndpoints; this.ReadEndpoints = other.ReadEndpoints; this.EffectivePreferredLocations = other.EffectivePreferredLocations; } public ReadOnlyCollection PreferredLocations { get; set; } public ReadOnlyCollection AvailableWriteLocations { get; set; } public ReadOnlyCollection AvailableReadLocations { get; set; } public ReadOnlyDictionary AvailableWriteEndpointByLocation { get; set; } public ReadOnlyDictionary AvailableReadEndpointByLocation { get; set; } public ReadOnlyDictionary AvailableWriteLocationByEndpoint { get; set; } public ReadOnlyDictionary AvailableReadLocationByEndpoint { get; set; } public ReadOnlyCollection WriteEndpoints { get; set; } public ReadOnlyCollection ReadEndpoints { get; set; } public ReadOnlyCollection AccountReadEndpoints { get; set; } public ReadOnlyCollection EffectivePreferredLocations { get; set; } } [Flags] private enum OperationType { None = 0x0, Read = 0x1, Write = 0x2 } } } \ No newline at end of file +//------------------------------------------------------------ +// Copyright (c) Microsoft Corporation. All rights reserved. +//------------------------------------------------------------ + +namespace Microsoft.Azure.Cosmos.Routing +{ + using System; + using System.Collections.Concurrent; + using System.Collections.Generic; + using System.Collections.ObjectModel; + using System.Linq; + using Microsoft.Azure.Cosmos.Core.Trace; + using Microsoft.Azure.Documents; + + /// + /// Implements the abstraction to resolve target location for geo-replicated DatabaseAccount + /// with multiple writable and readable locations. + /// + internal sealed class LocationCache + { + private const string UnavailableLocationsExpirationTimeInSeconds = "UnavailableLocationsExpirationTimeInSeconds"; + private static int DefaultUnavailableLocationsExpirationTimeInSeconds = 5 * 60; + + private readonly bool enableEndpointDiscovery; + private readonly Uri defaultEndpoint; + private readonly bool useMultipleWriteLocations; + private readonly object lockObject; + private readonly TimeSpan unavailableLocationsExpirationTime; + private readonly int connectionLimit; + private readonly ConcurrentDictionary locationUnavailablityInfoByEndpoint; + private readonly RegionNameMapper regionNameMapper; + + private DatabaseAccountLocationsInfo locationInfo; + private DateTime lastCacheUpdateTimestamp; + private bool enableMultipleWriteLocations; + + public LocationCache( + ReadOnlyCollection preferredLocations, + Uri defaultEndpoint, + bool enableEndpointDiscovery, + int connectionLimit, + bool useMultipleWriteLocations) + { + this.locationInfo = new DatabaseAccountLocationsInfo(preferredLocations, defaultEndpoint); + this.defaultEndpoint = defaultEndpoint; + this.enableEndpointDiscovery = enableEndpointDiscovery; + this.useMultipleWriteLocations = useMultipleWriteLocations; + this.connectionLimit = connectionLimit; + + this.lockObject = new object(); + this.locationUnavailablityInfoByEndpoint = new ConcurrentDictionary(); + this.lastCacheUpdateTimestamp = DateTime.MinValue; + this.enableMultipleWriteLocations = false; + this.unavailableLocationsExpirationTime = TimeSpan.FromSeconds(LocationCache.DefaultUnavailableLocationsExpirationTimeInSeconds); + this.regionNameMapper = new RegionNameMapper(); + +#if !(NETSTANDARD15 || NETSTANDARD16) +#if NETSTANDARD20 + // GetEntryAssembly returns null when loaded from native netstandard2.0 + if (System.Reflection.Assembly.GetEntryAssembly() != null) + { +#endif + string unavailableLocationsExpirationTimeInSecondsConfig = System.Configuration.ConfigurationManager.AppSettings[LocationCache.UnavailableLocationsExpirationTimeInSeconds]; + if (!string.IsNullOrEmpty(unavailableLocationsExpirationTimeInSecondsConfig)) + { + int unavailableLocationsExpirationTimeinSecondsConfigValue; + + if (!int.TryParse(unavailableLocationsExpirationTimeInSecondsConfig, out unavailableLocationsExpirationTimeinSecondsConfigValue)) + { + this.unavailableLocationsExpirationTime = TimeSpan.FromSeconds(LocationCache.DefaultUnavailableLocationsExpirationTimeInSeconds); + } + else + { + this.unavailableLocationsExpirationTime = TimeSpan.FromSeconds(unavailableLocationsExpirationTimeinSecondsConfigValue); + } + } +#if NETSTANDARD20 + } +#endif +#endif + } + + /// + /// Gets list of read endpoints ordered by + /// 1. Preferred location + /// 2. Endpoint availablity + /// + public ReadOnlyCollection ReadEndpoints + { + get + { + // Hot-path: avoid ConcurrentDictionary methods which acquire locks + if (DateTime.UtcNow - this.lastCacheUpdateTimestamp > this.unavailableLocationsExpirationTime + && this.locationUnavailablityInfoByEndpoint.Any()) + { + this.UpdateLocationCache(); + } + + return this.locationInfo.ReadEndpoints; + } + } + + /// + /// Gets list of account level read endpoints. + /// + public ReadOnlyCollection AccountReadEndpoints => this.locationInfo.AccountReadEndpoints; + + /// + /// Gets list of write endpoints ordered by + /// 1. Preferred location + /// 2. Endpoint availablity + /// + public ReadOnlyCollection WriteEndpoints + { + get + { + // Hot-path: avoid ConcurrentDictionary methods which acquire locks + if (DateTime.UtcNow - this.lastCacheUpdateTimestamp > this.unavailableLocationsExpirationTime + && this.locationUnavailablityInfoByEndpoint.Any()) + { + this.UpdateLocationCache(); + } + + return this.locationInfo.WriteEndpoints; + } + } + + public ReadOnlyCollection EffectivePreferredLocations => this.locationInfo.EffectivePreferredLocations; + + /// + /// Returns the location corresponding to the endpoint if location specific endpoint is provided. + /// For the defaultEndPoint, we will return the first available write location. + /// Returns null, in other cases. + /// + /// + /// Today we return null for defaultEndPoint if multiple write locations can be used. + /// This needs to be modifed to figure out proper location in such case. + /// + public string GetLocation(Uri endpoint) + { + string location = this.locationInfo.AvailableWriteEndpointByLocation.FirstOrDefault(uri => uri.Value == endpoint).Key ?? this.locationInfo.AvailableReadEndpointByLocation.FirstOrDefault(uri => uri.Value == endpoint).Key; + + if (location == null && endpoint == this.defaultEndpoint && !this.CanUseMultipleWriteLocations()) + { + if (this.locationInfo.AvailableWriteEndpointByLocation.Any()) + { + return this.locationInfo.AvailableWriteEndpointByLocation.First().Key; + } + } + + return location; + } + + /// + /// Set region name for a location if present in the locationcache otherwise set region name as null. + /// If endpoint's hostname is same as default endpoint hostname, set regionName as null. + /// + /// + /// + /// true if region found else false + public bool TryGetLocationForGatewayDiagnostics(Uri endpoint, out string regionName) + { + if (Uri.Compare( + endpoint, + this.defaultEndpoint, + UriComponents.Host, + UriFormat.SafeUnescaped, + StringComparison.OrdinalIgnoreCase) == 0) + { + regionName = null; + return false; + } + + regionName = this.GetLocation(endpoint); + return true; + } + + /// + /// Marks the current location unavailable for read + /// + public void MarkEndpointUnavailableForRead(Uri endpoint) + { + this.MarkEndpointUnavailable(endpoint, OperationType.Read); + } + + /// + /// Marks the current location unavailable for write + /// + public void MarkEndpointUnavailableForWrite(Uri endpoint) + { + this.MarkEndpointUnavailable(endpoint, OperationType.Write); + } + + /// + /// Invoked when is read + /// + /// Read DatabaseAccoaunt + public void OnDatabaseAccountRead(AccountProperties databaseAccount) + { + this.UpdateLocationCache( + databaseAccount.WritableRegions, + databaseAccount.ReadableRegions, + preferenceList: null, + enableMultipleWriteLocations: databaseAccount.EnableMultipleWriteLocations); + } + + /// + /// Invoked when changes + /// + /// + public void OnLocationPreferenceChanged(ReadOnlyCollection preferredLocations) + { + this.UpdateLocationCache( + preferenceList: preferredLocations); + } + + public bool IsMetaData(DocumentServiceRequest request) + { + return (request.OperationType != Documents.OperationType.ExecuteJavaScript && request.ResourceType == ResourceType.StoredProcedure) || + request.ResourceType != ResourceType.Document; + + } + public bool IsMultimasterMetadataWriteRequest(DocumentServiceRequest request) + { + return !request.IsReadOnlyRequest && this.locationInfo.AvailableWriteLocations.Count > 1 + && this.IsMetaData(request) + && this.CanUseMultipleWriteLocations(); + + } + + /// + /// Gets the default endpoint of the account + /// + /// the default endpoint. + public Uri GetDefaultEndpoint() + { + return this.defaultEndpoint; + } + + /// + /// Gets the mapping of available write region names to the respective endpoints + /// + public ReadOnlyDictionary GetAvailableWriteEndpointsByLocation() + { + return this.locationInfo.AvailableWriteEndpointByLocation; + } + + /// + /// Gets the mapping of available read region names to the respective endpoints + /// + public ReadOnlyDictionary GetAvailableReadEndpointsByLocation() + { + return this.locationInfo.AvailableReadEndpointByLocation; + } + + public Uri GetHubUri() + { + DatabaseAccountLocationsInfo currentLocationInfo = this.locationInfo; + string writeLocation = currentLocationInfo.AvailableWriteLocations[0]; + Uri locationEndpointToRoute = currentLocationInfo.AvailableWriteEndpointByLocation[writeLocation]; + return locationEndpointToRoute; + } + + /// + /// Gets account-level read locations. + /// + public ReadOnlyCollection GetAvailableReadLocations() + { + return this.locationInfo.AvailableReadLocations; + } + + /// + /// Gets account-level write locations. + /// + public ReadOnlyCollection GetAvailableWriteLocations() + { + return this.locationInfo.AvailableWriteLocations; + } + + /// + /// Resolves request to service endpoint. + /// 1. If this is a write request + /// (a) If UseMultipleWriteLocations = true + /// (i) For document writes, resolve to most preferred and available write endpoint. + /// Once the endpoint is marked unavailable, it is moved to the end of available write endpoint. Current request will + /// be retried on next preferred available write endpoint. + /// (ii) For all other resources, always resolve to first/second (regardless of preferred locations) + /// write endpoint in . + /// Endpoint of first write location in is the only endpoint that supports + /// write operation on all resource types (except during that region's failover). + /// Only during manual failover, client would retry write on second write location in . + /// (b) Else resolve the request to first write endpoint in OR + /// second write endpoint in in case of manual failover of that location. + /// 2. Else resolve the request to most preferred available read endpoint (automatic failover for read requests) + /// + /// Request for which endpoint is to be resolved + /// Resolved endpoint + public Uri ResolveServiceEndpoint(DocumentServiceRequest request) + { + if (request.RequestContext != null && request.RequestContext.LocationEndpointToRoute != null) + { + return request.RequestContext.LocationEndpointToRoute; + } + + int locationIndex = request.RequestContext.LocationIndexToRoute.GetValueOrDefault(0); + + Uri locationEndpointToRoute = this.defaultEndpoint; + + if (!request.RequestContext.UsePreferredLocations.GetValueOrDefault(true) // Should not use preferred location ? + || (request.OperationType.IsWriteOperation() && !this.CanUseMultipleWriteLocations(request))) + { + // For non-document resource types in case of client can use multiple write locations + // or when client cannot use multiple write locations, flip-flop between the + // first and the second writable region in DatabaseAccount (for manual failover) + DatabaseAccountLocationsInfo currentLocationInfo = this.locationInfo; + + if (this.enableEndpointDiscovery && currentLocationInfo.AvailableWriteLocations.Count > 0) + { + locationIndex = Math.Min(locationIndex % 2, currentLocationInfo.AvailableWriteLocations.Count - 1); + string writeLocation = currentLocationInfo.AvailableWriteLocations[locationIndex]; + locationEndpointToRoute = currentLocationInfo.AvailableWriteEndpointByLocation[writeLocation]; + } + } + else + { + ReadOnlyCollection endpoints = this.GetApplicableEndpoints(request, !request.OperationType.IsWriteOperation()); + locationEndpointToRoute = endpoints[locationIndex % endpoints.Count]; + } + + request.RequestContext.RouteToLocation(locationEndpointToRoute); + return locationEndpointToRoute; + } + + public ReadOnlyCollection GetApplicableEndpoints(DocumentServiceRequest request, bool isReadRequest) + { + if (request.RequestContext.ExcludeRegions == null || request.RequestContext.ExcludeRegions.Count == 0) + { + return isReadRequest ? this.ReadEndpoints : this.WriteEndpoints; + } + + DatabaseAccountLocationsInfo databaseAccountLocationsInfoSnapshot = this.locationInfo; + + ReadOnlyCollection effectivePreferredLocations = databaseAccountLocationsInfoSnapshot.PreferredLocations; + + if (effectivePreferredLocations == null || effectivePreferredLocations.Count == 0) + { + effectivePreferredLocations = databaseAccountLocationsInfoSnapshot.EffectivePreferredLocations; + } + + return this.GetApplicableEndpoints( + isReadRequest ? this.locationInfo.AvailableReadEndpointByLocation : this.locationInfo.AvailableWriteEndpointByLocation, + effectivePreferredLocations, + this.defaultEndpoint, + request.RequestContext.ExcludeRegions); + } + + public ReadOnlyCollection GetApplicableRegions(IEnumerable excludeRegions, bool isReadRequest) + { + bool isPreferredLocationsEmpty = this.locationInfo.PreferredLocations == null || this.locationInfo.PreferredLocations.Count == 0; + + DatabaseAccountLocationsInfo databaseAccountLocationsInfoSnapshot = this.locationInfo; + + ReadOnlyCollection effectivePreferredLocations = this.locationInfo.PreferredLocations; + + if (effectivePreferredLocations == null || effectivePreferredLocations.Count == 0) + { + effectivePreferredLocations = databaseAccountLocationsInfoSnapshot.EffectivePreferredLocations; + } + + Uri firstEffectivePreferredEndpoint = isReadRequest ? databaseAccountLocationsInfoSnapshot.ReadEndpoints[0] : databaseAccountLocationsInfoSnapshot.WriteEndpoints[0]; + + if (isReadRequest) + { + databaseAccountLocationsInfoSnapshot.AvailableReadLocationByEndpoint.TryGetValue(firstEffectivePreferredEndpoint, out string firstEffectivePreferredLocation); + + return this.GetApplicableRegions( + databaseAccountLocationsInfoSnapshot.AvailableReadLocations, + effectivePreferredLocations, + isPreferredLocationsEmpty ? firstEffectivePreferredLocation : databaseAccountLocationsInfoSnapshot.PreferredLocations[0], + excludeRegions); + } + else + { + databaseAccountLocationsInfoSnapshot.AvailableWriteLocationByEndpoint.TryGetValue(firstEffectivePreferredEndpoint, out string firstEffectivePreferredLocation); + + return this.GetApplicableRegions( + databaseAccountLocationsInfoSnapshot.AvailableWriteLocations, + effectivePreferredLocations, + isPreferredLocationsEmpty ? firstEffectivePreferredLocation : databaseAccountLocationsInfoSnapshot.PreferredLocations[0], + excludeRegions); + } + } + + /// + /// Gets applicable endpoints for a request, if there are no applicable endpoints, returns the fallback endpoint + /// + /// + /// + /// + /// + /// a list of applicable endpoints for a request + private ReadOnlyCollection GetApplicableEndpoints( + ReadOnlyDictionary regionNameByEndpoint, + ReadOnlyCollection effectivePreferredLocations, + Uri fallbackEndpoint, + IEnumerable excludeRegions) + { + List applicableEndpoints = new List(regionNameByEndpoint.Count); + HashSet excludeRegionsHash = excludeRegions == null ? null : new HashSet(excludeRegions); + + if (excludeRegions != null) + { + foreach (string region in effectivePreferredLocations) + { + if (!excludeRegionsHash.Contains(region) + && regionNameByEndpoint.TryGetValue(region, out Uri endpoint)) + { + applicableEndpoints.Add(endpoint); + } + } + } + else + { + foreach (string region in effectivePreferredLocations) + { + if (regionNameByEndpoint.TryGetValue(region, out Uri endpoint)) + { + applicableEndpoints.Add(endpoint); + } + } + } + + if (applicableEndpoints.Count == 0) + { + applicableEndpoints.Add(fallbackEndpoint); + } + + return new ReadOnlyCollection(applicableEndpoints); + } + + /// + /// Gets applicable endpoints for a request, if there are no applicable endpoints, returns the fallback endpoint + /// + /// + /// + /// + /// + /// a list of applicable endpoints for a request + private ReadOnlyCollection GetApplicableRegions( + ReadOnlyCollection availableLocations, + ReadOnlyCollection effectivePreferredLocations, + string fallbackRegion, + IEnumerable excludeRegions) + { + List applicableRegions = new List(availableLocations.Count); + HashSet excludeRegionsHash = excludeRegions == null ? null : new HashSet(excludeRegions); + + if (excludeRegions != null) + { + foreach (string region in effectivePreferredLocations) + { + if (availableLocations.Contains(region) + && !excludeRegionsHash.Contains(region)) + { + applicableRegions.Add(region); + } + } + } + else + { + foreach (string region in effectivePreferredLocations) + { + if (availableLocations.Contains(region)) + { + applicableRegions.Add(region); + } + } + } + + if (applicableRegions.Count == 0) + { + applicableRegions.Add(fallbackRegion); + } + + return new ReadOnlyCollection(applicableRegions); + } + + public bool ShouldRefreshEndpoints(out bool canRefreshInBackground) + { + canRefreshInBackground = true; + DatabaseAccountLocationsInfo currentLocationInfo = this.locationInfo; + + string mostPreferredLocation = currentLocationInfo.PreferredLocations.FirstOrDefault(); + + if (currentLocationInfo.PreferredLocations == null || currentLocationInfo.PreferredLocations.Count == 0) + { + mostPreferredLocation = currentLocationInfo.EffectivePreferredLocations.FirstOrDefault(); + } + + // we should schedule refresh in background if we are unable to target the user's most preferredLocation. + if (this.enableEndpointDiscovery) + { + // Refresh if client opts-in to useMultipleWriteLocations but server-side setting is disabled + bool shouldRefresh = this.useMultipleWriteLocations && !this.enableMultipleWriteLocations; + + ReadOnlyCollection readLocationEndpoints = currentLocationInfo.ReadEndpoints; + + if (this.IsEndpointUnavailable(readLocationEndpoints[0], OperationType.Read)) + { + canRefreshInBackground = readLocationEndpoints.Count > 1; + DefaultTrace.TraceInformation("ShouldRefreshEndpoints = true since the first read endpoint {0} is not available for read. canRefreshInBackground = {1}", + readLocationEndpoints[0], + canRefreshInBackground); + + return true; + } + + if (!string.IsNullOrEmpty(mostPreferredLocation)) + { + Uri mostPreferredReadEndpoint; + + if (currentLocationInfo.AvailableReadEndpointByLocation.TryGetValue(mostPreferredLocation, out mostPreferredReadEndpoint)) + { + if (mostPreferredReadEndpoint != readLocationEndpoints[0]) + { + // For reads, we can always refresh in background as we can alternate to + // other available read endpoints + DefaultTrace.TraceInformation("ShouldRefreshEndpoints = true since most preferred location {0} is not available for read.", mostPreferredLocation); + return true; + } + } + else + { + DefaultTrace.TraceInformation("ShouldRefreshEndpoints = true since most preferred location {0} is not in available read locations.", mostPreferredLocation); + return true; + } + } + + Uri mostPreferredWriteEndpoint; + ReadOnlyCollection writeLocationEndpoints = currentLocationInfo.WriteEndpoints; + + if (!this.CanUseMultipleWriteLocations()) + { + if (this.IsEndpointUnavailable(writeLocationEndpoints[0], OperationType.Write)) + { + // Since most preferred write endpoint is unavailable, we can only refresh in background if + // we have an alternate write endpoint + canRefreshInBackground = writeLocationEndpoints.Count > 1; + DefaultTrace.TraceInformation("ShouldRefreshEndpoints = true since most preferred location {0} endpoint {1} is not available for write. canRefreshInBackground = {2}", + mostPreferredLocation, + writeLocationEndpoints[0], + canRefreshInBackground); + + return true; + } + else + { + return shouldRefresh; + } + } + else if (!string.IsNullOrEmpty(mostPreferredLocation)) + { + if (currentLocationInfo.AvailableWriteEndpointByLocation.TryGetValue(mostPreferredLocation, out mostPreferredWriteEndpoint)) + { + shouldRefresh |= mostPreferredWriteEndpoint != writeLocationEndpoints[0]; + DefaultTrace.TraceInformation("ShouldRefreshEndpoints = {0} since most preferred location {1} is not available for write.", shouldRefresh, mostPreferredLocation); + return shouldRefresh; + } + else + { + DefaultTrace.TraceInformation("ShouldRefreshEndpoints = true since most preferred location {0} is not in available write locations", mostPreferredLocation); + return true; + } + } + else + { + return shouldRefresh; + } + } + else + { + return false; + } + } + + public bool CanUseMultipleWriteLocations(DocumentServiceRequest request) + { + return this.CanUseMultipleWriteLocations() && + (request.ResourceType == ResourceType.Document || + (request.ResourceType == ResourceType.StoredProcedure && request.OperationType == Documents.OperationType.ExecuteJavaScript)); + } + + private void ClearStaleEndpointUnavailabilityInfo() + { + if (this.locationUnavailablityInfoByEndpoint.Any()) + { + List unavailableEndpoints = this.locationUnavailablityInfoByEndpoint.Keys.ToList(); + + foreach (Uri unavailableEndpoint in unavailableEndpoints) + { + LocationUnavailabilityInfo unavailabilityInfo; + LocationUnavailabilityInfo removed; + + if (this.locationUnavailablityInfoByEndpoint.TryGetValue(unavailableEndpoint, out unavailabilityInfo) + && DateTime.UtcNow - unavailabilityInfo.LastUnavailabilityCheckTimeStamp > this.unavailableLocationsExpirationTime + && this.locationUnavailablityInfoByEndpoint.TryRemove(unavailableEndpoint, out removed)) + { + DefaultTrace.TraceInformation( + "Removed endpoint {0} unavailable for operations {1} from unavailableEndpoints", + unavailableEndpoint, + unavailabilityInfo.UnavailableOperations); + } + } + } + } + + private bool IsEndpointUnavailable(Uri endpoint, OperationType expectedAvailableOperations) + { + LocationUnavailabilityInfo unavailabilityInfo; + + if (expectedAvailableOperations == OperationType.None + || !this.locationUnavailablityInfoByEndpoint.TryGetValue(endpoint, out unavailabilityInfo) + || !unavailabilityInfo.UnavailableOperations.HasFlag(expectedAvailableOperations)) + { + return false; + } + else + { + if (DateTime.UtcNow - unavailabilityInfo.LastUnavailabilityCheckTimeStamp > this.unavailableLocationsExpirationTime) + { + return false; + } + else + { + DefaultTrace.TraceInformation( + "Endpoint {0} unavailable for operations {1} present in unavailableEndpoints", + endpoint, + unavailabilityInfo.UnavailableOperations); + // Unexpired entry present. Endpoint is unavailable + return true; + } + } + } + + private void MarkEndpointUnavailable( + Uri unavailableEndpoint, + OperationType unavailableOperationType) + { + DateTime currentTime = DateTime.UtcNow; + LocationUnavailabilityInfo updatedInfo = this.locationUnavailablityInfoByEndpoint.AddOrUpdate( + unavailableEndpoint, + (Uri endpoint) => + { + return new LocationUnavailabilityInfo() + { + LastUnavailabilityCheckTimeStamp = currentTime, + UnavailableOperations = unavailableOperationType, + }; + }, + (Uri endpoint, LocationUnavailabilityInfo info) => + { + info.LastUnavailabilityCheckTimeStamp = currentTime; + info.UnavailableOperations |= unavailableOperationType; + return info; + }); + + this.UpdateLocationCache(); + + DefaultTrace.TraceInformation( + "Endpoint {0} unavailable for {1} added/updated to unavailableEndpoints with timestamp {2}", + unavailableEndpoint, + unavailableOperationType, + updatedInfo.LastUnavailabilityCheckTimeStamp); + } + + private void UpdateLocationCache( + IEnumerable writeLocations = null, + IEnumerable readLocations = null, + ReadOnlyCollection preferenceList = null, + bool? enableMultipleWriteLocations = null) + { + lock (this.lockObject) + { + DatabaseAccountLocationsInfo nextLocationInfo = new DatabaseAccountLocationsInfo(this.locationInfo); + + if (preferenceList != null) + { + nextLocationInfo.PreferredLocations = preferenceList; + } + + if (enableMultipleWriteLocations.HasValue) + { + this.enableMultipleWriteLocations = enableMultipleWriteLocations.Value; + } + + this.ClearStaleEndpointUnavailabilityInfo(); + + if (readLocations != null) + { + nextLocationInfo.AvailableReadEndpointByLocation = this.GetEndpointByLocation( + readLocations, + out ReadOnlyCollection availableReadLocations, + out ReadOnlyDictionary availableReadLocationsByEndpoint); + + nextLocationInfo.AvailableReadLocations = availableReadLocations; + nextLocationInfo.AccountReadEndpoints = nextLocationInfo.AvailableReadEndpointByLocation.Select(x => x.Value).ToList().AsReadOnly(); + nextLocationInfo.AvailableReadLocationByEndpoint = availableReadLocationsByEndpoint; + } + + if (writeLocations != null) + { + nextLocationInfo.AvailableWriteEndpointByLocation = this.GetEndpointByLocation( + writeLocations, + out ReadOnlyCollection availableWriteLocations, + out ReadOnlyDictionary availableWriteLocationsByEndpoint); + + nextLocationInfo.AvailableWriteLocations = availableWriteLocations; + nextLocationInfo.AvailableWriteLocationByEndpoint = availableWriteLocationsByEndpoint; + } + + nextLocationInfo.WriteEndpoints = this.GetPreferredAvailableEndpoints( + endpointsByLocation: nextLocationInfo.AvailableWriteEndpointByLocation, + orderedLocations: nextLocationInfo.AvailableWriteLocations, + expectedAvailableOperation: OperationType.Write, + fallbackEndpoint: this.defaultEndpoint); + + nextLocationInfo.ReadEndpoints = this.GetPreferredAvailableEndpoints( + endpointsByLocation: nextLocationInfo.AvailableReadEndpointByLocation, + orderedLocations: nextLocationInfo.AvailableReadLocations, + expectedAvailableOperation: OperationType.Read, + fallbackEndpoint: nextLocationInfo.WriteEndpoints[0]); + + if (nextLocationInfo.PreferredLocations == null || nextLocationInfo.PreferredLocations.Count == 0) + { + if (!nextLocationInfo.AvailableReadLocationByEndpoint.TryGetValue(this.defaultEndpoint, out string regionForDefaultEndpoint)) + { + nextLocationInfo.EffectivePreferredLocations = nextLocationInfo.AvailableReadLocations; + } + else + { + List locations = new () + { + regionForDefaultEndpoint + }; + + nextLocationInfo.EffectivePreferredLocations = new ReadOnlyCollection(locations); + } + } + + this.lastCacheUpdateTimestamp = DateTime.UtcNow; + + DefaultTrace.TraceInformation("Current WriteEndpoints = ({0}) ReadEndpoints = ({1})", + string.Join(", ", nextLocationInfo.WriteEndpoints.Select(endpoint => endpoint.ToString())), + string.Join(", ", nextLocationInfo.ReadEndpoints.Select(endpoint => endpoint.ToString()))); + + this.locationInfo = nextLocationInfo; + } + } + + private ReadOnlyCollection GetPreferredAvailableEndpoints(ReadOnlyDictionary endpointsByLocation, ReadOnlyCollection orderedLocations, OperationType expectedAvailableOperation, Uri fallbackEndpoint) + { + List endpoints = new List(); + DatabaseAccountLocationsInfo currentLocationInfo = this.locationInfo; + + // if enableEndpointDiscovery is false, we always use the defaultEndpoint that user passed in during documentClient init + if (this.enableEndpointDiscovery) + { + if (this.CanUseMultipleWriteLocations() || expectedAvailableOperation.HasFlag(OperationType.Read)) + { + List unavailableEndpoints = new List(); + + // When client can not use multiple write locations, preferred locations list should only be used + // determining read endpoints order. + // If client can use multiple write locations, preferred locations list should be used for determining + // both read and write endpoints order. + + if (currentLocationInfo.PreferredLocations != null && currentLocationInfo.PreferredLocations.Count >= 1) + { + foreach (string location in currentLocationInfo.PreferredLocations) + { + if (endpointsByLocation.TryGetValue(location, out Uri endpoint)) + { + if (this.IsEndpointUnavailable(endpoint, expectedAvailableOperation)) + { + unavailableEndpoints.Add(endpoint); + } + else + { + endpoints.Add(endpoint); + } + } + } + } + else + { + foreach (string location in orderedLocations) + { + if (endpointsByLocation.TryGetValue(location, out Uri endpoint)) + { + if (this.defaultEndpoint.Equals(endpoint)) + { + endpoints = new List(); + break; + } + + if (this.IsEndpointUnavailable(endpoint, expectedAvailableOperation)) + { + unavailableEndpoints.Add(endpoint); + } + else + { + endpoints.Add(endpoint); + } + } + } + } + + if (endpoints.Count == 0) + { + endpoints.Add(fallbackEndpoint); + unavailableEndpoints.Remove(fallbackEndpoint); + } + + endpoints.AddRange(unavailableEndpoints); + } + else + { + foreach (string location in orderedLocations) + { + if (!string.IsNullOrEmpty(location) && // location is empty during manual failover + endpointsByLocation.TryGetValue(location, out Uri endpoint)) + { + endpoints.Add(endpoint); + } + } + } + } + + if (endpoints.Count == 0) + { + endpoints.Add(fallbackEndpoint); + } + + return endpoints.AsReadOnly(); + } + + private ReadOnlyDictionary GetEndpointByLocation(IEnumerable locations, out ReadOnlyCollection orderedLocations, out ReadOnlyDictionary availableLocationsByEndpoint) + { + Dictionary endpointsByLocation = new Dictionary(StringComparer.OrdinalIgnoreCase); + Dictionary mutableAvailableLocationsByEndpoint = new Dictionary(); + + List parsedLocations = new List(); + + foreach (AccountRegion location in locations) + { + Uri endpoint; + if (!string.IsNullOrEmpty(location.Name) + && Uri.TryCreate(location.Endpoint, UriKind.Absolute, out endpoint)) + { + endpointsByLocation[location.Name] = endpoint; + parsedLocations.Add(location.Name); + + mutableAvailableLocationsByEndpoint[endpoint] = location.Name; + + this.SetServicePointConnectionLimit(endpoint); + } + else + { + DefaultTrace.TraceInformation("GetAvailableEndpointsByLocation() - skipping add for location = {0} as it is location name is either empty or endpoint is malformed {1}", + location.Name, + location.Endpoint); + } + } + + orderedLocations = parsedLocations.AsReadOnly(); + availableLocationsByEndpoint = new ReadOnlyDictionary(mutableAvailableLocationsByEndpoint); + + return new ReadOnlyDictionary(endpointsByLocation); + } + + private bool CanUseMultipleWriteLocations() + { + return this.useMultipleWriteLocations && this.enableMultipleWriteLocations; + } + + private void SetServicePointConnectionLimit(Uri endpoint) + { +#if !NETSTANDARD16 + ServicePointAccessor servicePoint = ServicePointAccessor.FindServicePoint(endpoint); + servicePoint.ConnectionLimit = this.connectionLimit; +#endif + } + + private sealed class LocationUnavailabilityInfo + { + public DateTime LastUnavailabilityCheckTimeStamp { get; set; } + public OperationType UnavailableOperations { get; set; } + } + + private sealed class DatabaseAccountLocationsInfo + { + public DatabaseAccountLocationsInfo(ReadOnlyCollection preferredLocations, Uri defaultEndpoint) + { + this.PreferredLocations = preferredLocations; + this.AvailableWriteLocations = new List().AsReadOnly(); + this.AvailableReadLocations = new List().AsReadOnly(); + this.AvailableWriteEndpointByLocation = new ReadOnlyDictionary(new Dictionary(StringComparer.OrdinalIgnoreCase)); + this.AvailableReadEndpointByLocation = new ReadOnlyDictionary(new Dictionary(StringComparer.OrdinalIgnoreCase)); + this.AvailableWriteLocationByEndpoint = new ReadOnlyDictionary(new Dictionary()); + this.AvailableReadLocationByEndpoint = new ReadOnlyDictionary(new Dictionary()); + this.WriteEndpoints = new List() { defaultEndpoint }.AsReadOnly(); + this.AccountReadEndpoints = new List() { defaultEndpoint }.AsReadOnly(); + this.ReadEndpoints = new List() { defaultEndpoint }.AsReadOnly(); + this.EffectivePreferredLocations = new List().AsReadOnly(); + } + + public DatabaseAccountLocationsInfo(DatabaseAccountLocationsInfo other) + { + this.PreferredLocations = other.PreferredLocations; + this.AvailableWriteLocations = other.AvailableWriteLocations; + this.AvailableReadLocations = other.AvailableReadLocations; + this.AvailableWriteEndpointByLocation = other.AvailableWriteEndpointByLocation; + this.AvailableReadEndpointByLocation = other.AvailableReadEndpointByLocation; + this.AvailableReadLocationByEndpoint = other.AvailableReadLocationByEndpoint; + this.AvailableWriteLocationByEndpoint = other.AvailableWriteLocationByEndpoint; + this.WriteEndpoints = other.WriteEndpoints; + this.AccountReadEndpoints = other.AccountReadEndpoints; + this.ReadEndpoints = other.ReadEndpoints; + this.EffectivePreferredLocations = other.EffectivePreferredLocations; + } + + public ReadOnlyCollection PreferredLocations { get; set; } + public ReadOnlyCollection AvailableWriteLocations { get; set; } + public ReadOnlyCollection AvailableReadLocations { get; set; } + public ReadOnlyDictionary AvailableWriteEndpointByLocation { get; set; } + public ReadOnlyDictionary AvailableReadEndpointByLocation { get; set; } + public ReadOnlyDictionary AvailableWriteLocationByEndpoint { get; set; } + public ReadOnlyDictionary AvailableReadLocationByEndpoint { get; set; } + + public ReadOnlyCollection WriteEndpoints { get; set; } + public ReadOnlyCollection ReadEndpoints { get; set; } + public ReadOnlyCollection AccountReadEndpoints { get; set; } + public ReadOnlyCollection EffectivePreferredLocations { get; set; } + } + + [Flags] + private enum OperationType + { + None = 0x0, + Read = 0x1, + Write = 0x2 + } + } +}