Skip to content

Commit

Permalink
Cosmos DB: Correctly handle DrainMode configuration (#870)
Browse files Browse the repository at this point in the history
* Wiring DrainModeManager

* Adding cancellation detection for DrainMode manager

* Tests

* Missing this

* Undo other tests

* add logs

* Test

* Renaming leases

* Adding cleanup
  • Loading branch information
ealsur authored Sep 21, 2023
1 parent 4fb877c commit 4d9b059
Show file tree
Hide file tree
Showing 10 changed files with 100 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
using Microsoft.Azure.Cosmos;
using Microsoft.Azure.WebJobs.Description;
using Microsoft.Azure.WebJobs.Extensions.CosmosDB.Bindings;
using Microsoft.Azure.WebJobs.Host;
using Microsoft.Azure.WebJobs.Host.Bindings;
using Microsoft.Azure.WebJobs.Host.Config;
using Microsoft.Extensions.Logging;
Expand All @@ -27,19 +28,22 @@ internal class CosmosDBExtensionConfigProvider : IExtensionConfigProvider
private readonly ICosmosDBSerializerFactory _cosmosSerializerFactory;
private readonly INameResolver _nameResolver;
private readonly CosmosDBOptions _options;
private readonly IDrainModeManager _drainModeManager;
private readonly ILoggerFactory _loggerFactory;

public CosmosDBExtensionConfigProvider(
IOptions<CosmosDBOptions> options,
ICosmosDBServiceFactory cosmosDBServiceFactory,
ICosmosDBSerializerFactory cosmosSerializerFactory,
INameResolver nameResolver,
IDrainModeManager drainModeManager,
ILoggerFactory loggerFactory)
{
_cosmosDBServiceFactory = cosmosDBServiceFactory;
_cosmosSerializerFactory = cosmosSerializerFactory;
_nameResolver = nameResolver;
_options = options.Value;
_drainModeManager = drainModeManager;
_loggerFactory = loggerFactory;
}

Expand Down Expand Up @@ -75,7 +79,7 @@ public void Initialize(ExtensionConfigContext context)

// Trigger
var rule2 = context.AddBindingRule<CosmosDBTriggerAttribute>();
rule2.BindToTrigger(new CosmosDBTriggerAttributeBindingProviderGenerator(_nameResolver, _options, this, _loggerFactory));
rule2.BindToTrigger(new CosmosDBTriggerAttributeBindingProviderGenerator(_nameResolver, _options, this, _drainModeManager, _loggerFactory));
}

internal void ValidateConnection(CosmosDBAttribute attribute, Type paramType)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,18 @@ internal class CosmosDBTriggerAttributeBindingProvider<T>
private readonly CosmosDBOptions _options;
private readonly ILogger _logger;
private readonly CosmosDBExtensionConfigProvider _configProvider;
private readonly IDrainModeManager _drainModeManager;

public CosmosDBTriggerAttributeBindingProvider(INameResolver nameResolver, CosmosDBOptions options,
CosmosDBExtensionConfigProvider configProvider, ILoggerFactory loggerFactory)
public CosmosDBTriggerAttributeBindingProvider(INameResolver nameResolver,
CosmosDBOptions options,
CosmosDBExtensionConfigProvider configProvider,
IDrainModeManager drainModeManager,
ILoggerFactory loggerFactory)
{
_nameResolver = nameResolver;
_options = options;
_configProvider = configProvider;
_drainModeManager = drainModeManager;
_logger = loggerFactory.CreateLogger(LogCategories.CreateTriggerCategory("CosmosDB"));
}

Expand Down Expand Up @@ -118,6 +123,7 @@ public async Task<ITriggerBinding> TryCreateAsync(TriggerBindingProviderContext
monitoredContainer,
leasesContainer,
attribute,
_drainModeManager,
_logger);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
using System;
using System.Reflection;
using System.Threading.Tasks;
using Microsoft.Azure.WebJobs.Host;
using Microsoft.Azure.WebJobs.Host.Triggers;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json.Linq;
Expand All @@ -18,14 +19,19 @@ internal class CosmosDBTriggerAttributeBindingProviderGenerator : ITriggerBindin
private readonly INameResolver _nameResolver;
private readonly CosmosDBOptions _options;
private readonly ILoggerFactory _loggerFactory;
private readonly IDrainModeManager _drainModeManager;
private readonly CosmosDBExtensionConfigProvider _configProvider;

public CosmosDBTriggerAttributeBindingProviderGenerator(INameResolver nameResolver, CosmosDBOptions options,
CosmosDBExtensionConfigProvider configProvider, ILoggerFactory loggerFactory)
public CosmosDBTriggerAttributeBindingProviderGenerator(INameResolver nameResolver,
CosmosDBOptions options,
CosmosDBExtensionConfigProvider configProvider,
IDrainModeManager drainModeManager,
ILoggerFactory loggerFactory)
{
_nameResolver = nameResolver;
_options = options;
_configProvider = configProvider;
_drainModeManager = drainModeManager;
_loggerFactory = loggerFactory;
}

Expand Down Expand Up @@ -57,11 +63,11 @@ public Task<ITriggerBinding> TryCreateAsync(TriggerBindingProviderContext contex

Type genericBindingType = baseType.MakeGenericType(documentType);

Type[] typeArgs = { typeof(INameResolver), typeof(CosmosDBOptions), typeof(CosmosDBExtensionConfigProvider), typeof(ILoggerFactory) };
Type[] typeArgs = { typeof(INameResolver), typeof(CosmosDBOptions), typeof(CosmosDBExtensionConfigProvider), typeof(IDrainModeManager), typeof(ILoggerFactory) };

ConstructorInfo constructor = genericBindingType.GetConstructor(typeArgs);

object[] constructorParameterValues = { _nameResolver, _options, _configProvider, _loggerFactory };
object[] constructorParameterValues = { _nameResolver, _options, _configProvider, _drainModeManager, _loggerFactory };

object cosmosDBTriggerAttributeBindingProvider = constructor.Invoke(constructorParameterValues);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,20 +26,23 @@ internal class CosmosDBTriggerBinding<T> : ITriggerBinding
private readonly Container _monitoredContainer;
private readonly Container _leaseContainer;
private readonly CosmosDBTriggerAttribute _cosmosDBAttribute;
private readonly IDrainModeManager _drainModeManager;

public CosmosDBTriggerBinding(
ParameterInfo parameter,
string processorName,
Container monitoredContainer,
Container leaseContainer,
CosmosDBTriggerAttribute cosmosDBAttribute,
IDrainModeManager drainModeManager,
ILogger logger)
{
_monitoredContainer = monitoredContainer;
_leaseContainer = leaseContainer;
_cosmosDBAttribute = cosmosDBAttribute;
_parameter = parameter;
_processorName = processorName;
_drainModeManager = drainModeManager;
_logger = logger;
}

Expand Down Expand Up @@ -85,6 +88,7 @@ public Task<IListener> CreateListenerAsync(ListenerFactoryContext context)
this._monitoredContainer,
this._leaseContainer,
this._cosmosDBAttribute,
this._drainModeManager,
this._logger));
}

Expand Down
19 changes: 15 additions & 4 deletions src/WebJobs.Extensions.CosmosDB/Trigger/CosmosDBTriggerListener.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
using System.Threading.Tasks;
using Microsoft.Azure.Cosmos;
using Microsoft.Azure.WebJobs.Extensions.CosmosDB.Trigger;
using Microsoft.Azure.WebJobs.Host;
using Microsoft.Azure.WebJobs.Host.Executors;
using Microsoft.Azure.WebJobs.Host.Listeners;
using Microsoft.Azure.WebJobs.Host.Scale;
Expand All @@ -34,6 +35,8 @@ internal class CosmosDBTriggerListener<T> : IListener, IScaleMonitorProvider, IT
private readonly string _listenerLogDetails;
private readonly IScaleMonitor<CosmosDBTriggerMetrics> _cosmosDBScaleMonitor;
private readonly ITargetScaler _cosmosDBTargetScaler;
private readonly CancellationTokenSource _functionExecutionCancellationTokenSource;
private readonly IDrainModeManager _drainModeManager;
private ChangeFeedProcessor _host;
private ChangeFeedProcessorBuilder _hostBuilder;
private int _listenerStatus;
Expand All @@ -45,10 +48,13 @@ public CosmosDBTriggerListener(
Container monitoredContainer,
Container leaseContainer,
CosmosDBTriggerAttribute cosmosDBAttribute,
IDrainModeManager drainModeManager,
ILogger logger)
{
this._logger = logger;
this._executor = executor;
this._drainModeManager = drainModeManager;
this._functionExecutionCancellationTokenSource = new CancellationTokenSource();
this._processorName = processorName;
this._hostName = Guid.NewGuid().ToString();
this._functionId = functionId;
Expand All @@ -73,7 +79,7 @@ public void Cancel()

public void Dispose()
{
//Nothing to dispose
_functionExecutionCancellationTokenSource.Cancel();
}

public async Task StartAsync(CancellationToken cancellationToken)
Expand Down Expand Up @@ -118,6 +124,11 @@ public async Task StartAsync(CancellationToken cancellationToken)

public async Task StopAsync(CancellationToken cancellationToken)
{
if (!this._drainModeManager.IsDrainModeEnabled)
{
this._functionExecutionCancellationTokenSource.Cancel();
}

try
{
if (this._host != null)
Expand Down Expand Up @@ -211,7 +222,7 @@ internal virtual void InitializeBuilder()
private async Task ProcessChangesAsync(ChangeFeedProcessorContext context, IReadOnlyCollection<T> docs, CancellationToken cancellationToken)
{
this._healthMonitor.OnChangesDelivered(context);
FunctionResult result = await this._executor.TryExecuteAsync(new TriggeredFunctionData() { TriggerValue = docs }, cancellationToken);
FunctionResult result = await this._executor.TryExecuteAsync(new TriggeredFunctionData() { TriggerValue = docs }, this._functionExecutionCancellationTokenSource.Token);
if (result != null // TryExecuteAsync when using RetryPolicies can return null
&& !result.Succeeded
&& result.Exception != null)
Expand All @@ -220,8 +231,8 @@ private async Task ProcessChangesAsync(ChangeFeedProcessorContext context, IRead
await this._healthMonitor.OnErrorAsync(context.LeaseToken, userException);
}

// Prevent the change feed lease from being checkpointed if cancellation was requested
cancellationToken.ThrowIfCancellationRequested();
// Prevent the change feed lease from being checkpointed if cancellation was requested when not in Drain mode
this._functionExecutionCancellationTokenSource.Token.ThrowIfCancellationRequested();
}

public IScaleMonitor GetMonitor()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
using System.Threading.Tasks;
using Microsoft.Azure.WebJobs.Extensions.Tests.Common;
using Microsoft.Azure.WebJobs.Extensions.Tests.Extensions.CosmosDB.Models;
using Microsoft.Azure.WebJobs.Host;
using Microsoft.Extensions.Azure;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.Logging.Abstractions;
Expand All @@ -29,7 +30,7 @@ public async Task Configuration_Caches_Clients()
{
// Arrange
var options = new CosmosDBOptions();
var config = new CosmosDBExtensionConfigProvider(new OptionsWrapper<CosmosDBOptions>(options), new DefaultCosmosDBServiceFactory(_baseConfig, Mock.Of<AzureComponentFactory>()), new DefaultCosmosDBSerializerFactory(), new TestNameResolver(), NullLoggerFactory.Instance);
var config = new CosmosDBExtensionConfigProvider(new OptionsWrapper<CosmosDBOptions>(options), new DefaultCosmosDBServiceFactory(_baseConfig, Mock.Of<AzureComponentFactory>()), new DefaultCosmosDBSerializerFactory(), new TestNameResolver(), Mock.Of<IDrainModeManager>(), NullLoggerFactory.Instance);
var attribute = new CosmosDBAttribute { Id = "abcdef" };

// Act
Expand Down Expand Up @@ -104,7 +105,7 @@ private CosmosDBExtensionConfigProvider InitializeExtensionConfigProvider()
var options = new CosmosDBOptions();
var factory = new DefaultCosmosDBServiceFactory(_baseConfig, Mock.Of<AzureComponentFactory>());
var nameResolver = new TestNameResolver();
var configProvider = new CosmosDBExtensionConfigProvider(new OptionsWrapper<CosmosDBOptions>(options), factory, new DefaultCosmosDBSerializerFactory(), nameResolver, NullLoggerFactory.Instance);
var configProvider = new CosmosDBExtensionConfigProvider(new OptionsWrapper<CosmosDBOptions>(options), factory, new DefaultCosmosDBSerializerFactory(), nameResolver, Mock.Of<IDrainModeManager>(), NullLoggerFactory.Instance);

var context = TestHelpers.CreateExtensionConfigContext(nameResolver);

Expand Down
36 changes: 32 additions & 4 deletions test/WebJobs.Extensions.CosmosDB.Tests/CosmosDBEndToEndTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
Expand All @@ -26,6 +27,7 @@ public class CosmosDBEndToEndTests
{
private const string DatabaseName = "E2EDb";
private const string CollectionName = "E2ECollection";
private const string LeaseCollectionName = "leases";
private readonly TestLoggerProvider _loggerProvider = new TestLoggerProvider();

[Fact]
Expand Down Expand Up @@ -73,6 +75,18 @@ await TestHelpers.Await(() =>
.FormattedMessage;
JObject loggedOptions = JObject.Parse(optionsMessage.Substring(optionsMessage.IndexOf(Environment.NewLine)));
Assert.Null(loggedOptions["ConnectionMode"].Value<string>());

// Clean-up leases
Container leaseContainer = client.GetContainer(DatabaseName, LeaseCollectionName);
using FeedIterator<JObject> leaseIterator = leaseContainer.GetItemQueryIterator<JObject>();
while (leaseIterator.HasMoreResults)
{
FeedResponse<JObject> leaseIteratorResponse = await leaseIterator.ReadNextAsync();
foreach (JObject lease in leaseIteratorResponse)
{
await leaseContainer.DeleteItemStreamAsync(lease.Value<string>("id"), new PartitionKey(lease.Value<string>("id")));
}
}
}
}

Expand All @@ -94,6 +108,8 @@ public async Task CosmosDBEndToEndCancellation()
// Start the host again and wait for the logs to show the cancelled item was reprocessed
using (var host = await StartHostAsync(typeof(EndToEndCancellationTestClass)))
{
var client = await InitializeDocumentClientAsync(host.Services.GetRequiredService<IConfiguration>(), DatabaseName, CollectionName);

await TestHelpers.Await(() =>
{
var logMessages = _loggerProvider.GetAllLogMessages();
Expand All @@ -102,6 +118,18 @@ await TestHelpers.Await(() =>
&& logMessages.Count(p => p.FormattedMessage != null && p.FormattedMessage.Contains("Saw the first document again!")) == 1
&& logMessages.Count(p => p.Exception is TaskCanceledException) > 0;
});

// Clean-up leases
Container leaseContainer = client.GetContainer(DatabaseName, LeaseCollectionName);
using FeedIterator<JObject> leaseIterator = leaseContainer.GetItemQueryIterator<JObject>();
while (leaseIterator.HasMoreResults)
{
FeedResponse<JObject> leaseIteratorResponse = await leaseIterator.ReadNextAsync();
foreach (JObject lease in leaseIteratorResponse)
{
await leaseContainer.DeleteItemStreamAsync(lease.Value<string>("id"), new PartitionKey(lease.Value<string>("id")));
}
}
}
}

Expand Down Expand Up @@ -188,7 +216,7 @@ public static void Inputs(
}

public static void Trigger(
[CosmosDBTrigger(DatabaseName, CollectionName, CreateLeaseContainerIfNotExists = true)]IReadOnlyList<Item> documents,
[CosmosDBTrigger(DatabaseName, CollectionName, CreateLeaseContainerIfNotExists = true, LeaseContainerPrefix = "ciTrigger")]IReadOnlyList<Item> documents,
ILogger log)
{
foreach (var document in documents)
Expand All @@ -198,7 +226,7 @@ public static void Trigger(
}

public static void TriggerWithString(
[CosmosDBTrigger(DatabaseName, CollectionName, CreateLeaseContainerIfNotExists = true, LeaseContainerPrefix = "withstring")] string documents,
[CosmosDBTrigger(DatabaseName, CollectionName, CreateLeaseContainerIfNotExists = true, LeaseContainerPrefix = "ciTriggerWithString")] string documents,
ILogger log)
{
foreach (var document in JArray.Parse(documents))
Expand All @@ -209,7 +237,7 @@ public static void TriggerWithString(

[FixedDelayRetry(5, "00:00:01")]
public static void TriggerWithRetry(
[CosmosDBTrigger(DatabaseName, CollectionName, CreateLeaseContainerIfNotExists = true, LeaseContainerPrefix = "retry")] IReadOnlyList<Item> documents,
[CosmosDBTrigger(DatabaseName, CollectionName, CreateLeaseContainerIfNotExists = true, LeaseContainerPrefix = "ciTriggerWithRetry")] IReadOnlyList<Item> documents,
ILogger log)
{
foreach (var document in documents)
Expand All @@ -234,7 +262,7 @@ public static async Task Trigger(
DatabaseName,
CollectionName,
CreateLeaseContainerIfNotExists = true,
LeaseContainerPrefix = "cancellation",
LeaseContainerPrefix = "ciTriggerWithCancellation",
LeaseExpirationInterval = 20 * 1000,
LeaseRenewInterval = 5 * 1000,
FeedPollDelay = 500,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
using Microsoft.Azure.Cosmos;
using Microsoft.Azure.WebJobs.Extensions.Tests.Common;
using Microsoft.Azure.WebJobs.Extensions.Tests.Extensions.CosmosDB.Models;
using Microsoft.Azure.WebJobs.Host;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.Logging.Abstractions;
using Microsoft.Extensions.Options;
Expand Down Expand Up @@ -221,7 +222,7 @@ private static CosmosDBEnumerableBuilder<T> CreateBuilder<T>(out Mock<CosmosClie
var options = new OptionsWrapper<CosmosDBOptions>(new CosmosDBOptions
{
});
var configProvider = new CosmosDBExtensionConfigProvider(options, mockServiceFactory.Object, new DefaultCosmosDBSerializerFactory(), new TestNameResolver(), NullLoggerFactory.Instance);
var configProvider = new CosmosDBExtensionConfigProvider(options, mockServiceFactory.Object, new DefaultCosmosDBSerializerFactory(), new TestNameResolver(), Mock.Of<IDrainModeManager>(), NullLoggerFactory.Instance);

return new CosmosDBEnumerableBuilder<T>(configProvider);
}
Expand Down
Loading

0 comments on commit 4d9b059

Please sign in to comment.