Skip to content

Commit

Permalink
Code changes to sync up with cosmos.direct release 3.31.2.
Browse files Browse the repository at this point in the history
  • Loading branch information
kundadebdatta committed Jun 20, 2023
1 parent d22ccdf commit 949ce0d
Show file tree
Hide file tree
Showing 8 changed files with 381 additions and 97 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
//------------------------------------------------------------
// Copyright (c) Microsoft Corporation. All rights reserved.
//------------------------------------------------------------

// This File is copied from Azure.Core repo i.e. https://github.com/Azure/azure-sdk-for-net/blob/main/sdk/core/Azure.Core/src/Shared/AppContextSwitchHelper.cs

#nullable enable
#if NETSTANDARD2_0_OR_GREATER
namespace Azure.Core
{
using System;
/// <summary>
/// Helper for interacting with AppConfig settings and their related Environment variable settings.
/// </summary>
internal static class AppContextSwitchHelper
{
/// <summary>
/// Determines if either an AppContext switch or its corresponding Environment Variable is set
/// </summary>
/// <param name="appContexSwitchName">Name of the AppContext switch.</param>
/// <param name="environmentVariableName">Name of the Environment variable.</param>
/// <returns>If the AppContext switch has been set, returns the value of the switch.
/// If the AppContext switch has not been set, returns the value of the environment variable.
/// False if neither is set.
/// </returns>
public static bool GetConfigValue(string appContexSwitchName, string environmentVariableName)
{
// First check for the AppContext switch, giving it priority over the environment variable.
if (AppContext.TryGetSwitch(appContexSwitchName, out bool value))
{
return value;
}
// AppContext switch wasn't used. Check the environment variable.
string? envVar = Environment.GetEnvironmentVariable(environmentVariableName);
if (envVar != null
&& (envVar.Equals("true", StringComparison.OrdinalIgnoreCase)
|| envVar.Equals("1", StringComparison.OrdinalIgnoreCase)))
{
return true;
}

// Default to false.
return false;
}
}
}
#endif
18 changes: 5 additions & 13 deletions Microsoft.Azure.Cosmos/src/direct/ChannelDictionary.cs
Original file line number Diff line number Diff line change
Expand Up @@ -60,20 +60,12 @@ public Task OpenChannelAsync(
bool localRegionRequest,
Guid activityId)
{
// Do not open a new channel, if the channel is
// already a part of the concurrent dictionary.
if (!this.channels.ContainsKey(
new ServerKey(physicalAddress)))
{
this.ThrowIfDisposed();
IChannel channel = this.GetChannel(
physicalAddress,
localRegionRequest);

return channel.OpenChannelAsync(activityId);
}
this.ThrowIfDisposed();
IChannel channel = this.GetChannel(
physicalAddress,
localRegionRequest);

return Task.FromResult(0);
return channel.OpenChannelAsync(activityId);
}

public void Dispose()
Expand Down
2 changes: 1 addition & 1 deletion Microsoft.Azure.Cosmos/src/direct/CustomTypeExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ internal static class CustomTypeExtensions

#if COSMOSCLIENT
public const string SDKName = "cosmos-netstandard-sdk";
public const string SDKVersion = "3.31.1";
public const string SDKVersion = "3.31.2";
#else
public const string SDKName = "documentdb-netcore-sdk";
public const string SDKVersion = "2.14.0";
Expand Down
Binary file modified Microsoft.Azure.Cosmos/src/direct/HttpConstants.cs
Binary file not shown.
81 changes: 75 additions & 6 deletions Microsoft.Azure.Cosmos/src/direct/LoadBalancingPartition.cs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,15 @@ internal sealed class LoadBalancingPartition : IDisposable

private readonly SemaphoreSlim concurrentOpeningChannelSlim;

public LoadBalancingPartition(Uri serverUri, ChannelProperties channelProperties, bool localRegionRequest)
// This channel factory delegate is meant for unit testing only and a default implementation is provided.
// However it can be extended to support the main line code path if needed.
private readonly Func<Guid, Uri, ChannelProperties, bool, SemaphoreSlim, IChannel> channelFactory;

public LoadBalancingPartition(
Uri serverUri,
ChannelProperties channelProperties,
bool localRegionRequest,
Func<Guid, Uri, ChannelProperties, bool, SemaphoreSlim, IChannel> channelFactory = null)
{
Debug.Assert(serverUri != null);
this.serverUri = serverUri;
Expand All @@ -44,6 +52,10 @@ public LoadBalancingPartition(Uri serverUri, ChannelProperties channelProperties

this.concurrentOpeningChannelSlim =
new SemaphoreSlim(channelProperties.MaxConcurrentOpeningConnectionCount, channelProperties.MaxConcurrentOpeningConnectionCount);

this.channelFactory = channelFactory != null
? channelFactory
: CreateAndInitializeChannel;
}

public async Task<StoreResponse> RequestAsync(
Expand Down Expand Up @@ -210,9 +222,24 @@ internal Task OpenChannelAsync(Guid activityId)
this.capacityLock.EnterWriteLock();
try
{
return this.OpenChannelAndIncrementCapacity(
activityId: activityId,
waitForBackgroundInitializationComplete: true);
if (this.capacity < this.maxCapacity)
{
return this.OpenChannelAndIncrementCapacity(
activityId: activityId,
waitForBackgroundInitializationComplete: true);

}
else
{
string errorMessage = $"Failed to open channels to server {this.serverUri} because the current channel capacity {this.capacity} has exceeded the maaximum channel capacity limit: {this.maxCapacity}";

// Converting the error into invalid operation exception. Note that the OpenChannelAsync() method is used today, by the open connection flow
// in RntbdOpenConnectionHandler that is primarily used for the replica validation. Because the replica validation is done deterministically
// to open the Rntbd connections with best effort, throwing an exception from this place won't impact the replica validation flow because it
// will be caught and swallowed by the RntbdOpenConnectionHandler and the replica validation flow will continue.
throw new InvalidOperationException(
message: errorMessage);
}
}
finally
{
Expand All @@ -234,7 +261,17 @@ public void Dispose()
{
this.capacityLock.ExitWriteLock();
}
this.capacityLock.Dispose();

try
{
this.capacityLock.Dispose();
}
catch(SynchronizationLockException)
{
// SynchronizationLockException is thrown if there are inflight requests during the disposal of capacityLock
// suspend this exception to avoid crashing disposing other partitions/channels in hierarchical calls
return;
}
}

/// <summary>
Expand All @@ -250,13 +287,21 @@ private async Task OpenChannelAndIncrementCapacity(
bool waitForBackgroundInitializationComplete)
{
Debug.Assert(this.capacityLock.IsWriteLockHeld);
Channel newChannel = new(

IChannel newChannel = this.channelFactory(
activityId,
this.serverUri,
this.channelProperties,
this.localRegionRequest,
this.concurrentOpeningChannelSlim);

if (newChannel == null)
{
throw new ArgumentNullException(
paramName: nameof(newChannel),
message: "Channel can't be null.");
}

if (waitForBackgroundInitializationComplete)
{
await newChannel.OpenChannelAsync(activityId);
Expand All @@ -269,6 +314,30 @@ private async Task OpenChannelAndIncrementCapacity(
this.capacity += this.channelProperties.MaxRequestsPerChannel;
}

/// <summary>
/// Creates and initializes a new instance of rntbd <see cref="Channel"/>.
/// </summary>
/// <param name="activityId">A guid containing the activity id for the operation.</param>
/// <param name="serverUri">An instance of <see cref="Uri"/> containing the physical server uri.</param>
/// <param name="channelProperties">An instance of <see cref="ChannelProperties"/>.</param>
/// <param name="localRegionRequest">A boolean flag indicating if the request is intendent for local region.</param>
/// <param name="concurrentOpeningChannelSlim">An instance of <see cref="SemaphoreSlim"/>.</param>
/// <returns></returns>
private static IChannel CreateAndInitializeChannel(
Guid activityId,
Uri serverUri,
ChannelProperties channelProperties,
bool localRegionRequest,
SemaphoreSlim concurrentOpeningChannelSlim)
{
return new Channel(
activityId,
serverUri,
channelProperties,
localRegionRequest,
concurrentOpeningChannelSlim);
}

private sealed class SequenceGenerator
{
private int current = 0;
Expand Down
2 changes: 1 addition & 1 deletion Microsoft.Azure.Cosmos/src/direct/LocationNames.cs
Original file line number Diff line number Diff line change
Expand Up @@ -372,7 +372,7 @@ static class LocationNames
/// <summary>
/// Name of the Azure Israel Central region in the Azure Cosmos DB service.
/// </summary>
internal const string IsraelCentral = "Israel Central";
public const string IsraelCentral = "Israel Central";

/// <summary>
/// Name of the Azure Mexico Central region in the Azure Cosmos DB service.
Expand Down
Loading

0 comments on commit 949ce0d

Please sign in to comment.