From 1c17033cc484347bc32f1c1afcac39882fb8e642 Mon Sep 17 00:00:00 2001 From: Abhijeet Mohanty Date: Wed, 18 Sep 2024 22:45:19 -0400 Subject: [PATCH] Fix. --- .../src/Routing/GlobalEndpointManager.cs | 256 +++++++++--------- .../src/Routing/LocationCache.cs | 5 - 2 files changed, 128 insertions(+), 133 deletions(-) diff --git a/Microsoft.Azure.Cosmos/src/Routing/GlobalEndpointManager.cs b/Microsoft.Azure.Cosmos/src/Routing/GlobalEndpointManager.cs index ebe8d8e7ea..6c1ca825f1 100644 --- a/Microsoft.Azure.Cosmos/src/Routing/GlobalEndpointManager.cs +++ b/Microsoft.Azure.Cosmos/src/Routing/GlobalEndpointManager.cs @@ -37,7 +37,7 @@ internal class GlobalEndpointManager : IGlobalEndpointManager private readonly TimeSpan MinTimeBetweenAccountRefresh = TimeSpan.FromSeconds(15); private readonly int backgroundRefreshLocationTimeIntervalInMS = GlobalEndpointManager.DefaultBackgroundRefreshLocationTimeIntervalInMS; private readonly object backgroundAccountRefreshLock = new object(); - private readonly object isAccountRefreshInProgressLock = new object(); + private readonly object isAccountRefreshInProgressLock = new object(); private readonly ReaderWriterLockSlim locationCacheDatabaseAccountReadWriteLock = new ReaderWriterLockSlim(); private bool isAccountRefreshInProgress = false; private bool isBackgroundAccountRefreshActive = false; @@ -90,18 +90,18 @@ public GlobalEndpointManager(IDocumentClientInternal owner, ConnectionPolicy con } } - public ReadOnlyCollection ReadEndpoints => this.locationCache.ReadEndpoints; - + public ReadOnlyCollection ReadEndpoints => this.locationCache.ReadEndpoints; + public ReadOnlyCollection AccountReadEndpoints => this.locationCache.AccountReadEndpoints; - public ReadOnlyCollection WriteEndpoints => this.locationCache.WriteEndpoints; - + public ReadOnlyCollection WriteEndpoints => this.locationCache.WriteEndpoints; + public int PreferredLocationCount { get { Collection effectivePreferredLocations = this.GetEffectivePreferredLocations(); - + return effectivePreferredLocations.Count; } } @@ -113,8 +113,8 @@ public bool IsMultimasterMetadataWriteRequest(DocumentServiceRequest request) public Uri GetHubUri() { - return this.locationCache.GetHubUri(); - } + return this.locationCache.GetHubUri(); + } /// /// This will get the account information. @@ -125,20 +125,20 @@ public Uri GetHubUri() /// public static async Task GetDatabaseAccountFromAnyLocationsAsync( Uri defaultEndpoint, - IList? locations, + IList? locations, IList? accountInitializationCustomEndpoints, Func> getDatabaseAccountFn, - CancellationToken cancellationToken, + CancellationToken cancellationToken, ReaderWriterLockSlim accountPropertiesReaderWriterLock) - { + { using (GetAccountPropertiesHelper threadSafeGetAccountHelper = new GetAccountPropertiesHelper( defaultEndpoint, - locations, + locations, accountInitializationCustomEndpoints, getDatabaseAccountFn, - cancellationToken)) - { - return await threadSafeGetAccountHelper.GetAccountPropertiesAsync(accountPropertiesReaderWriterLock); + cancellationToken)) + { + return await threadSafeGetAccountHelper.GetAccountPropertiesAsync(accountPropertiesReaderWriterLock); } } @@ -149,30 +149,30 @@ private class GetAccountPropertiesHelper : IDisposable { private readonly CancellationTokenSource CancellationTokenSource; private readonly Uri DefaultEndpoint; - private readonly bool LimitToGlobalEndpointOnly; + private readonly bool LimitToGlobalEndpointOnly; private readonly IEnumerator ServiceEndpointEnumerator; private readonly Func> GetDatabaseAccountFn; private readonly List TransientExceptions = new List(); private AccountProperties? AccountProperties = null; - private Exception? NonRetriableException = null; + private Exception? NonRetriableException = null; private int disposeCounter = 0; public GetAccountPropertiesHelper( Uri defaultEndpoint, - IList? locations, + IList? locations, IList? accountInitializationCustomEndpoints, Func> getDatabaseAccountFn, CancellationToken cancellationToken) { this.DefaultEndpoint = defaultEndpoint; this.LimitToGlobalEndpointOnly = (locations == null || locations.Count == 0) && (accountInitializationCustomEndpoints == null || accountInitializationCustomEndpoints.Count == 0); - this.GetDatabaseAccountFn = getDatabaseAccountFn; - this.CancellationTokenSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken); - this.ServiceEndpointEnumerator = GetAccountPropertiesHelper - .GetServiceEndpoints( - defaultEndpoint, - locations, - accountInitializationCustomEndpoints) + this.GetDatabaseAccountFn = getDatabaseAccountFn; + this.CancellationTokenSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken); + this.ServiceEndpointEnumerator = GetAccountPropertiesHelper + .GetServiceEndpoints( + defaultEndpoint, + locations, + accountInitializationCustomEndpoints) .GetEnumerator(); } @@ -183,7 +183,7 @@ public async Task GetAccountPropertiesAsync(ReaderWriterLockS { return await this.GetOnlyGlobalEndpointAsync(readerWriterLock); } - + Task globalEndpointTask = this.GetAndUpdateAccountPropertiesAsync(this.DefaultEndpoint, readerWriterLock); // Start a timer to start secondary requests in parallel. @@ -273,7 +273,7 @@ private async Task GetOnlyGlobalEndpointAsync(ReaderWriterLoc /// This is done in a thread safe way to allow multiple tasks to iterate over the list of service endpoints. /// private async Task TryGetAccountPropertiesFromAllLocationsAsync(ReaderWriterLockSlim readerWriterLock) - { + { while (this.TryMoveNextServiceEndpointhreadSafe( out Uri? serviceEndpoint)) { @@ -283,20 +283,20 @@ private async Task TryGetAccountPropertiesFromAllLocationsAsync(ReaderWriterLock return; } - await this.GetAndUpdateAccountPropertiesAsync( - endpoint: serviceEndpoint, + await this.GetAndUpdateAccountPropertiesAsync( + endpoint: serviceEndpoint, readerWriterLock); } - } - - /// - /// We first iterate through all the private endpoints to fetch the account information. - /// If all the attempt fails to fetch the metadata from the private endpoints, we will - /// attempt to retrieve the account information from the regional endpoints constructed - /// using the preferred regions list. - /// - /// An instance of that will contain the service endpoint. - /// A boolean flag indicating if the was advanced in a thread safe manner. + } + + /// + /// We first iterate through all the private endpoints to fetch the account information. + /// If all the attempt fails to fetch the metadata from the private endpoints, we will + /// attempt to retrieve the account information from the regional endpoints constructed + /// using the preferred regions list. + /// + /// An instance of that will contain the service endpoint. + /// A boolean flag indicating if the was advanced in a thread safe manner. private bool TryMoveNextServiceEndpointhreadSafe( out Uri? serviceEndpoint) { @@ -317,7 +317,7 @@ private bool TryMoveNextServiceEndpointhreadSafe( serviceEndpoint = this.ServiceEndpointEnumerator.Current; return true; } - } + } private async Task GetAndUpdateAccountPropertiesAsync(Uri endpoint, ReaderWriterLockSlim readerWriterLock) { @@ -338,7 +338,7 @@ private async Task GetAndUpdateAccountPropertiesAsync(Uri endpoint, ReaderWriter if (databaseAccount != null) { readerWriterLock.EnterWriteLock(); - + try { this.AccountProperties = databaseAccount; @@ -378,81 +378,81 @@ private static bool IsNonRetriableException(Exception exception) } return false; - } - - /// - /// Returns an instance of containing the private and regional service endpoints to iterate over. - /// - /// An instance of containing the default global endpoint. - /// An instance of containing the preferred serviceEndpoint names. - /// An instance of containing the custom private endpoints. - /// An instance of containing the service endpoints. - private static IEnumerable GetServiceEndpoints( - Uri defaultEndpoint, - IList? locations, - IList? accountInitializationCustomEndpoints) - { - // We first iterate over all the private endpoints and yield return them. - if (accountInitializationCustomEndpoints?.Count > 0) - { - foreach (Uri customEndpoint in accountInitializationCustomEndpoints) - { - // Yield return all of the custom private endpoints first. - yield return customEndpoint; - } - } - - // The next step is to iterate over the preferred locations, construct and yield return the regional endpoints one by one. - // The regional endpoints will be constructed by appending the preferred region name as a suffix to the default global endpoint. - if (locations?.Count > 0) - { - foreach (string location in locations) - { - // Yield return all of the regional endpoints once the private custom endpoints are visited. - yield return LocationHelper.GetLocationEndpoint(defaultEndpoint, location); - } - } - } - - public void Dispose() - { - if (Interlocked.Increment(ref this.disposeCounter) == 1) - { - this.CancellationTokenSource?.Cancel(); - this.CancellationTokenSource?.Dispose(); - } - } + } + + /// + /// Returns an instance of containing the private and regional service endpoints to iterate over. + /// + /// An instance of containing the default global endpoint. + /// An instance of containing the preferred serviceEndpoint names. + /// An instance of containing the custom private endpoints. + /// An instance of containing the service endpoints. + private static IEnumerable GetServiceEndpoints( + Uri defaultEndpoint, + IList? locations, + IList? accountInitializationCustomEndpoints) + { + // We first iterate over all the private endpoints and yield return them. + if (accountInitializationCustomEndpoints?.Count > 0) + { + foreach (Uri customEndpoint in accountInitializationCustomEndpoints) + { + // Yield return all of the custom private endpoints first. + yield return customEndpoint; + } + } + + // The next step is to iterate over the preferred locations, construct and yield return the regional endpoints one by one. + // The regional endpoints will be constructed by appending the preferred region name as a suffix to the default global endpoint. + if (locations?.Count > 0) + { + foreach (string location in locations) + { + // Yield return all of the regional endpoints once the private custom endpoints are visited. + yield return LocationHelper.GetLocationEndpoint(defaultEndpoint, location); + } + } + } + + public void Dispose() + { + if (Interlocked.Increment(ref this.disposeCounter) == 1) + { + this.CancellationTokenSource?.Cancel(); + this.CancellationTokenSource?.Dispose(); + } + } } public virtual Uri ResolveServiceEndpoint(DocumentServiceRequest request) { return this.locationCache.ResolveServiceEndpoint(request); - } - - /// - /// Gets the default endpoint of the account - /// - /// the default endpoint. - public Uri GetDefaultEndpoint() - { - return this.locationCache.GetDefaultEndpoint(); - } - - /// - /// Gets the mapping of available write region names to the respective endpoints - /// - public ReadOnlyDictionary GetAvailableWriteEndpointsByLocation() - { - return this.locationCache.GetAvailableWriteEndpointsByLocation(); - } - - /// - /// Gets the mapping of available read region names to the respective endpoints - /// - public ReadOnlyDictionary GetAvailableReadEndpointsByLocation() - { - return this.locationCache.GetAvailableReadEndpointsByLocation(); - } + } + + /// + /// Gets the default endpoint of the account + /// + /// the default endpoint. + public Uri GetDefaultEndpoint() + { + return this.locationCache.GetDefaultEndpoint(); + } + + /// + /// Gets the mapping of available write region names to the respective endpoints + /// + public ReadOnlyDictionary GetAvailableWriteEndpointsByLocation() + { + return this.locationCache.GetAvailableWriteEndpointsByLocation(); + } + + /// + /// Gets the mapping of available read region names to the respective endpoints + /// + public ReadOnlyDictionary GetAvailableReadEndpointsByLocation() + { + return this.locationCache.GetAvailableReadEndpointsByLocation(); + } /// /// Returns serviceEndpoint corresponding to the endpoint @@ -467,11 +467,11 @@ public ReadOnlyCollection GetApplicableEndpoints(DocumentServiceRequest req { return this.locationCache.GetApplicableEndpoints(request, isReadRequest); } - + public ReadOnlyCollection GetApplicableRegions(IEnumerable excludeRegions, bool isReadRequest) { return this.locationCache.GetApplicableRegions(excludeRegions, isReadRequest); - } + } public bool TryGetLocationForGatewayDiagnostics(Uri endpoint, out string regionName) { @@ -515,9 +515,10 @@ public virtual void InitializeAccountPropertiesAndStartBackgroundRefresh(Account if (this.cancellationTokenSource.IsCancellationRequested) { return; - } + } + + this.locationCacheDatabaseAccountReadWriteLock.EnterWriteLock(); - this.locationCacheDatabaseAccountReadWriteLock.EnterWriteLock(); try { this.locationCache.OnDatabaseAccountRead(databaseAccount); @@ -526,7 +527,7 @@ public virtual void InitializeAccountPropertiesAndStartBackgroundRefresh(Account { this.locationCacheDatabaseAccountReadWriteLock.ExitWriteLock(); } - + if (this.isBackgroundAccountRefreshActive) { return; @@ -675,9 +676,8 @@ private async Task RefreshDatabaseAccountInternalAsync(bool forceRefresh) try { this.LastBackgroundRefreshUtc = DateTime.UtcNow; - AccountProperties accountProperties = await this.GetDatabaseAccountAsync(true); - + this.locationCacheDatabaseAccountReadWriteLock.EnterWriteLock(); try @@ -688,12 +688,12 @@ private async Task RefreshDatabaseAccountInternalAsync(bool forceRefresh) { this.locationCacheDatabaseAccountReadWriteLock.ExitWriteLock(); } - } - catch (Exception ex) - { - DefaultTrace.TraceWarning("Failed to refresh database account with exception: {0}. Activity Id: '{1}'", - ex, - System.Diagnostics.Trace.CorrelationManager.ActivityId); + } + catch (Exception ex) + { + DefaultTrace.TraceWarning("Failed to refresh database account with exception: {0}. Activity Id: '{1}'", + ex, + System.Diagnostics.Trace.CorrelationManager.ActivityId); } finally { @@ -711,10 +711,10 @@ internal async Task GetDatabaseAccountAsync(bool forceRefresh obsoleteValue: null, singleValueInitFunc: () => GlobalEndpointManager.GetDatabaseAccountFromAnyLocationsAsync( this.defaultEndpoint, - this.GetEffectivePreferredLocations(), + this.GetEffectivePreferredLocations(), this.connectionPolicy.AccountInitializationCustomEndpoints, this.GetDatabaseAccountAsync, - this.cancellationTokenSource.Token, + this.cancellationTokenSource.Token, this.locationCacheDatabaseAccountReadWriteLock), cancellationToken: this.cancellationTokenSource.Token, forceRefresh: forceRefresh); @@ -730,8 +730,8 @@ private bool SkipRefresh(bool forceRefresh) TimeSpan timeSinceLastRefresh = DateTime.UtcNow - this.LastBackgroundRefreshUtc; return (this.isAccountRefreshInProgress || this.MinTimeBetweenAccountRefresh > timeSinceLastRefresh) && !forceRefresh; - } - + } + public Collection GetEffectivePreferredLocations() { if (this.connectionPolicy.PreferredLocations != null && this.connectionPolicy.PreferredLocations.Count > 0) diff --git a/Microsoft.Azure.Cosmos/src/Routing/LocationCache.cs b/Microsoft.Azure.Cosmos/src/Routing/LocationCache.cs index 549e825e5c..85a11f8d58 100644 --- a/Microsoft.Azure.Cosmos/src/Routing/LocationCache.cs +++ b/Microsoft.Azure.Cosmos/src/Routing/LocationCache.cs @@ -275,11 +275,6 @@ public ReadOnlyCollection GetAvailableReadLocations() public ReadOnlyCollection GetAvailableWriteLocations() { return this.locationInfo.AvailableWriteLocations; - } - - public ReadOnlyCollection GetAvailableWriteLocations() - { - return this.locationInfo.AvailableWriteLocations; } ///