Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Availability: Adds account-level read regions as effective preferred regions when preferred regions is not set on client. #4709

Merged
Show file tree
Hide file tree
Changes from 20 commits
Commits
Show all changes
39 commits
Select commit Hold shift + click to select a range
4597f2d
Initial changes to LocationCache.
jeet1995 Sep 9, 2024
4b2a455
Intial changes to LocationCache.
jeet1995 Sep 9, 2024
7aeaa20
LocationCacheTests changes.
jeet1995 Sep 9, 2024
5a6c044
Update effective preferred regions count in GlobalEndpointManager.cs
jeet1995 Sep 11, 2024
ab14fff
Revert changes.
jeet1995 Sep 11, 2024
45b3f53
Updated LocationCacheTests and CosmosAvailabilityStrategyTests.
jeet1995 Sep 13, 2024
c6758d8
Revert changes.
jeet1995 Sep 13, 2024
8c14ad4
Fixing tests.
jeet1995 Sep 13, 2024
98c0a67
Additional wiring of effective preferred regions.
jeet1995 Sep 13, 2024
49fd41d
Merge branch 'master' of https://github.com/Azure/azure-cosmos-dotnet…
jeet1995 Sep 13, 2024
31744c2
Modified GlobalEndpointManagerTest.cs
jeet1995 Sep 13, 2024
6ca25a7
Modify LocationCacheTests.cs
jeet1995 Sep 16, 2024
0fb4202
Fix LocationCacheTests.cs
jeet1995 Sep 16, 2024
d42b3e3
Merge branch 'master' of https://github.com/Azure/azure-cosmos-dotnet…
jeet1995 Sep 16, 2024
b9860a7
Wiring ExcludeRegions for ReadMany.
jeet1995 Sep 16, 2024
39a4c8c
Attempt at fixing line separators.
jeet1995 Sep 18, 2024
d1be991
Delete global.json.
jeet1995 Sep 19, 2024
bcfc960
Fix PR.
jeet1995 Sep 19, 2024
b43e1e0
Merge branch 'master' into users/jeet1995/crossRetryScenariosFixWhenP…
jeet1995 Sep 19, 2024
1c17033
Fix.
jeet1995 Sep 19, 2024
383be32
Addressing review comments.
jeet1995 Sep 28, 2024
e439379
Merge branch 'master' into users/jeet1995/crossRetryScenariosFixWhenP…
kirankumarkolli Oct 6, 2024
edb0175
Merge branch 'master' into users/jeet1995/crossRetryScenariosFixWhenP…
jeet1995 Oct 9, 2024
cd56556
Fix ClientRetryPolicyTests to assert for cross-region retries even wi…
jeet1995 Oct 9, 2024
0500f1a
Merge branch 'master' into users/jeet1995/crossRetryScenariosFixWhenP…
jeet1995 Oct 9, 2024
4ce40cb
Merge branch 'master' into users/jeet1995/crossRetryScenariosFixWhenP…
jeet1995 Oct 9, 2024
d6c783d
Remove PreferredRegions check for AvailabilityStrategy
NaluTripician Oct 9, 2024
cf23272
Merge branch 'master' into users/jeet1995/crossRetryScenariosFixWhenP…
kirankumarkolli Oct 10, 2024
d83230c
Revert locking changes in GlobalEndpointManager.cs
jeet1995 Oct 10, 2024
154abd0
Merge branch 'users/jeet1995/crossRetryScenariosFixWhenPreferredRegio…
jeet1995 Oct 10, 2024
1e04c5c
Merge branch 'master' into users/jeet1995/crossRetryScenariosFixWhenP…
kirankumarkolli Oct 10, 2024
e0c4f9b
Merge branch 'master' into users/jeet1995/crossRetryScenariosFixWhenP…
kirankumarkolli Oct 11, 2024
b05c3d1
Updated comment to reflect available read / write locations are alos …
jeet1995 Oct 13, 2024
bda28f1
Merge branch 'users/jeet1995/crossRetryScenariosFixWhenPreferredRegio…
jeet1995 Oct 13, 2024
20336f4
Update account-level read / write region API in LocationCache.cs
jeet1995 Oct 13, 2024
9d7b5f9
Merge branch 'master' into users/jeet1995/crossRetryScenariosFixWhenP…
jeet1995 Oct 13, 2024
d1db683
Merge branch 'master' into users/jeet1995/crossRetryScenariosFixWhenP…
kundadebdatta Oct 15, 2024
bb424d4
Addressing review comments.
jeet1995 Oct 16, 2024
fc6c94f
Merge branch 'master' into users/jeet1995/crossRetryScenariosFixWhenP…
jeet1995 Oct 16, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion Microsoft.Azure.Cosmos/src/GatewayAccountReader.cs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ internal sealed class GatewayAccountReader
private readonly CosmosHttpClient httpClient;
private readonly Uri serviceEndpoint;
private readonly CancellationToken cancellationToken;
private readonly ReaderWriterLockSlim readerWriterLock;

// Backlog: Auth abstractions are spilling through. 4 arguments for this CTOR are result of it.
public GatewayAccountReader(Uri serviceEndpoint,
Expand All @@ -35,6 +36,7 @@ public GatewayAccountReader(Uri serviceEndpoint,
this.cosmosAuthorization = cosmosAuthorization ?? throw new ArgumentNullException(nameof(AuthorizationTokenProvider));
this.connectionPolicy = connectionPolicy;
this.cancellationToken = cancellationToken;
this.readerWriterLock = new ReaderWriterLockSlim();
}

private async Task<AccountProperties> GetDatabaseAccountAsync(Uri serviceEndpoint)
Expand Down Expand Up @@ -90,7 +92,8 @@ public async Task<AccountProperties> InitializeReaderAsync()
locations: this.connectionPolicy.PreferredLocations,
accountInitializationCustomEndpoints: this.connectionPolicy.AccountInitializationCustomEndpoints,
getDatabaseAccountFn: this.GetDatabaseAccountAsync,
cancellationToken: this.cancellationToken);
cancellationToken: this.cancellationToken,
accountPropertiesReaderWriterLock: this.readerWriterLock);

return databaseAccount;
}
Expand Down
3 changes: 2 additions & 1 deletion Microsoft.Azure.Cosmos/src/ReadManyRequestOptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,8 @@ internal QueryRequestOptions ConvertToQueryRequestOptions()
IfMatchEtag = this.IfMatchEtag,
IfNoneMatchEtag = this.IfNoneMatchEtag,
Properties = this.Properties,
AddRequestHeaders = this.AddRequestHeaders
AddRequestHeaders = this.AddRequestHeaders,
ExcludeRegions = this.ExcludeRegions
};
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -442,7 +442,7 @@ public override Task<ResponseMessage> ReadManyItemsStreamAsync(
containerName: this.Id,
databaseName: this.Database.Id,
operationType: Documents.OperationType.Read,
requestOptions: null,
requestOptions: readManyRequestOptions,
kirankumarkolli marked this conversation as resolved.
Show resolved Hide resolved
task: (trace) => base.ReadManyItemsStreamAsync(items, trace, readManyRequestOptions, cancellationToken),
openTelemetry: new (OpenTelemetryConstants.Operations.ReadManyItems, (response) => new OpenTelemetryResponse(responseMessage: response)));
}
Expand All @@ -457,7 +457,7 @@ public override Task<FeedResponse<T>> ReadManyItemsAsync<T>(
containerName: this.Id,
databaseName: this.Database.Id,
operationType: Documents.OperationType.Read,
requestOptions: null,
requestOptions: readManyRequestOptions,
task: (trace) => base.ReadManyItemsAsync<T>(items, trace, readManyRequestOptions, cancellationToken),
openTelemetry: new (OpenTelemetryConstants.Operations.ReadManyItems, (response) => new OpenTelemetryResponse<T>(responseMessage: response)));
}
Expand Down
102 changes: 81 additions & 21 deletions Microsoft.Azure.Cosmos/src/Routing/GlobalEndpointManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@ internal class GlobalEndpointManager : IGlobalEndpointManager
private readonly TimeSpan MinTimeBetweenAccountRefresh = TimeSpan.FromSeconds(15);
private readonly int backgroundRefreshLocationTimeIntervalInMS = GlobalEndpointManager.DefaultBackgroundRefreshLocationTimeIntervalInMS;
private readonly object backgroundAccountRefreshLock = new object();
private readonly object isAccountRefreshInProgressLock = new object();
private readonly object isAccountRefreshInProgressLock = new object();
private readonly ReaderWriterLockSlim locationCacheDatabaseAccountReadWriteLock = new ReaderWriterLockSlim();
private bool isAccountRefreshInProgress = false;
private bool isBackgroundAccountRefreshActive = false;
private DateTime LastBackgroundRefreshUtc = DateTime.MinValue;
Expand Down Expand Up @@ -94,8 +95,16 @@ public GlobalEndpointManager(IDocumentClientInternal owner, ConnectionPolicy con
public ReadOnlyCollection<Uri> AccountReadEndpoints => this.locationCache.AccountReadEndpoints;

public ReadOnlyCollection<Uri> WriteEndpoints => this.locationCache.WriteEndpoints;

public int PreferredLocationCount
{
get
{
Collection<string> effectivePreferredLocations = this.GetEffectivePreferredLocations();

public int PreferredLocationCount => this.connectionPolicy.PreferredLocations != null ? this.connectionPolicy.PreferredLocations.Count : 0;
return effectivePreferredLocations.Count;
}
}

public bool IsMultimasterMetadataWriteRequest(DocumentServiceRequest request)
{
Expand All @@ -119,7 +128,8 @@ public static async Task<AccountProperties> GetDatabaseAccountFromAnyLocationsAs
IList<string>? locations,
IList<Uri>? accountInitializationCustomEndpoints,
Func<Uri, Task<AccountProperties>> getDatabaseAccountFn,
CancellationToken cancellationToken)
CancellationToken cancellationToken,
jeet1995 marked this conversation as resolved.
Show resolved Hide resolved
ReaderWriterLockSlim accountPropertiesReaderWriterLock)
{
using (GetAccountPropertiesHelper threadSafeGetAccountHelper = new GetAccountPropertiesHelper(
defaultEndpoint,
Expand All @@ -128,7 +138,7 @@ public static async Task<AccountProperties> GetDatabaseAccountFromAnyLocationsAs
getDatabaseAccountFn,
cancellationToken))
{
return await threadSafeGetAccountHelper.GetAccountPropertiesAsync();
return await threadSafeGetAccountHelper.GetAccountPropertiesAsync(accountPropertiesReaderWriterLock);
}
}

Expand Down Expand Up @@ -166,15 +176,15 @@ public GetAccountPropertiesHelper(
.GetEnumerator();
}

public async Task<AccountProperties> GetAccountPropertiesAsync()
public async Task<AccountProperties> GetAccountPropertiesAsync(ReaderWriterLockSlim readerWriterLock)
{
// If there are no preferred regions or private endpoints, then just wait for the global endpoint results
if (this.LimitToGlobalEndpointOnly)
{
return await this.GetOnlyGlobalEndpointAsync();
return await this.GetOnlyGlobalEndpointAsync(readerWriterLock);
}

Task globalEndpointTask = this.GetAndUpdateAccountPropertiesAsync(this.DefaultEndpoint);
Task globalEndpointTask = this.GetAndUpdateAccountPropertiesAsync(this.DefaultEndpoint, readerWriterLock);

// Start a timer to start secondary requests in parallel.
Task timerTask = Task.Delay(TimeSpan.FromSeconds(5));
Expand All @@ -194,8 +204,8 @@ public async Task<AccountProperties> GetAccountPropertiesAsync()
HashSet<Task> tasksToWaitOn = new HashSet<Task>
{
globalEndpointTask,
this.TryGetAccountPropertiesFromAllLocationsAsync(),
this.TryGetAccountPropertiesFromAllLocationsAsync()
this.TryGetAccountPropertiesFromAllLocationsAsync(readerWriterLock),
this.TryGetAccountPropertiesFromAllLocationsAsync(readerWriterLock)
};

while (tasksToWaitOn.Any())
Expand Down Expand Up @@ -227,14 +237,14 @@ public async Task<AccountProperties> GetAccountPropertiesAsync()
throw new AggregateException(this.TransientExceptions);
}

private async Task<AccountProperties> GetOnlyGlobalEndpointAsync()
private async Task<AccountProperties> GetOnlyGlobalEndpointAsync(ReaderWriterLockSlim readerWriterLock)
{
if (!this.LimitToGlobalEndpointOnly)
{
throw new ArgumentException("GetOnlyGlobalEndpointAsync should only be called if there are no other private endpoints or regions");
}

await this.GetAndUpdateAccountPropertiesAsync(this.DefaultEndpoint);
await this.GetAndUpdateAccountPropertiesAsync(this.DefaultEndpoint, readerWriterLock);

if (this.AccountProperties != null)
{
Expand Down Expand Up @@ -262,7 +272,7 @@ private async Task<AccountProperties> GetOnlyGlobalEndpointAsync()
/// <summary>
/// This is done in a thread safe way to allow multiple tasks to iterate over the list of service endpoints.
/// </summary>
private async Task TryGetAccountPropertiesFromAllLocationsAsync()
private async Task TryGetAccountPropertiesFromAllLocationsAsync(ReaderWriterLockSlim readerWriterLock)
{
while (this.TryMoveNextServiceEndpointhreadSafe(
out Uri? serviceEndpoint))
Expand All @@ -274,7 +284,8 @@ private async Task TryGetAccountPropertiesFromAllLocationsAsync()
}

await this.GetAndUpdateAccountPropertiesAsync(
endpoint: serviceEndpoint);
endpoint: serviceEndpoint,
readerWriterLock);
}
}

Expand Down Expand Up @@ -308,7 +319,7 @@ private bool TryMoveNextServiceEndpointhreadSafe(
}
}

private async Task GetAndUpdateAccountPropertiesAsync(Uri endpoint)
private async Task GetAndUpdateAccountPropertiesAsync(Uri endpoint, ReaderWriterLockSlim readerWriterLock)
{
try
{
Expand All @@ -326,8 +337,17 @@ private async Task GetAndUpdateAccountPropertiesAsync(Uri endpoint)

if (databaseAccount != null)
{
this.AccountProperties = databaseAccount;
this.CancellationTokenSource.Cancel();
readerWriterLock.EnterWriteLock();

try
{
this.AccountProperties = databaseAccount;
this.CancellationTokenSource.Cancel();
}
finally
{
readerWriterLock.ExitWriteLock();
}
}
}
catch (Exception e)
Expand Down Expand Up @@ -495,9 +515,18 @@ public virtual void InitializeAccountPropertiesAndStartBackgroundRefresh(Account
if (this.cancellationTokenSource.IsCancellationRequested)
{
return;
}
}

this.locationCacheDatabaseAccountReadWriteLock.EnterWriteLock();

this.locationCache.OnDatabaseAccountRead(databaseAccount);
try
{
this.locationCache.OnDatabaseAccountRead(databaseAccount);
}
finally
{
this.locationCacheDatabaseAccountReadWriteLock.ExitWriteLock();
}

if (this.isBackgroundAccountRefreshActive)
{
Expand Down Expand Up @@ -647,7 +676,18 @@ private async Task RefreshDatabaseAccountInternalAsync(bool forceRefresh)
try
{
this.LastBackgroundRefreshUtc = DateTime.UtcNow;
this.locationCache.OnDatabaseAccountRead(await this.GetDatabaseAccountAsync(true));
AccountProperties accountProperties = await this.GetDatabaseAccountAsync(true);

this.locationCacheDatabaseAccountReadWriteLock.EnterWriteLock();

try
{
this.locationCache.OnDatabaseAccountRead(accountProperties);
}
finally
{
this.locationCacheDatabaseAccountReadWriteLock.ExitWriteLock();
}
}
catch (Exception ex)
{
Expand All @@ -671,10 +711,11 @@ internal async Task<AccountProperties> GetDatabaseAccountAsync(bool forceRefresh
obsoleteValue: null,
singleValueInitFunc: () => GlobalEndpointManager.GetDatabaseAccountFromAnyLocationsAsync(
this.defaultEndpoint,
this.connectionPolicy.PreferredLocations,
this.GetEffectivePreferredLocations(),
this.connectionPolicy.AccountInitializationCustomEndpoints,
this.GetDatabaseAccountAsync,
this.cancellationTokenSource.Token),
this.cancellationTokenSource.Token,
this.locationCacheDatabaseAccountReadWriteLock),
cancellationToken: this.cancellationTokenSource.Token,
forceRefresh: forceRefresh);
#nullable enable
Expand All @@ -689,6 +730,25 @@ private bool SkipRefresh(bool forceRefresh)
TimeSpan timeSinceLastRefresh = DateTime.UtcNow - this.LastBackgroundRefreshUtc;
return (this.isAccountRefreshInProgress || this.MinTimeBetweenAccountRefresh > timeSinceLastRefresh)
&& !forceRefresh;
}

public Collection<string> GetEffectivePreferredLocations()
jeet1995 marked this conversation as resolved.
Show resolved Hide resolved
{
if (this.connectionPolicy.PreferredLocations != null && this.connectionPolicy.PreferredLocations.Count > 0)
jeet1995 marked this conversation as resolved.
Show resolved Hide resolved
{
return this.connectionPolicy.PreferredLocations;
}

this.locationCacheDatabaseAccountReadWriteLock.EnterReadLock();
jeet1995 marked this conversation as resolved.
Show resolved Hide resolved

try
{
return new Collection<string>(this.locationCache.EffectivePreferredLocations);
jeet1995 marked this conversation as resolved.
Show resolved Hide resolved
}
finally
{
this.locationCacheDatabaseAccountReadWriteLock.ExitReadLock();
}
}
}
}
Loading
Loading