From 623ec1a91b330a795df4f030f7ad9ba90e1781f0 Mon Sep 17 00:00:00 2001 From: Joe Schmitt <1146681+schmittjoseph@users.noreply.github.com> Date: Tue, 21 May 2024 11:18:05 -0700 Subject: [PATCH] Prevent operations from getting stuck in the `Starting` state (#6686) --- .../Controllers/DiagController.cs | 2 +- .../Controllers/DiagnosticsControllerBase.cs | 4 +- .../Controllers/OperationsController.cs | 4 +- .../Operation/EgressOperationService.cs | 25 ++- .../Operation/EgressOperationStore.cs | 2 +- .../Operation/IEgressOperationStore.cs | 28 +++ .../Operation/OperationExtensions.cs | 2 +- .../Operation/EgressOperationServiceTests.cs | 195 ++++++++++++++++++ .../Actions/CollectionRuleEgressActionBase.cs | 4 +- 9 files changed, 247 insertions(+), 19 deletions(-) create mode 100644 src/Microsoft.Diagnostics.Monitoring.WebApi/Operation/IEgressOperationStore.cs create mode 100644 src/Tests/Microsoft.Diagnostics.Monitoring.WebApi.UnitTests/Operation/EgressOperationServiceTests.cs diff --git a/src/Microsoft.Diagnostics.Monitoring.WebApi/Controllers/DiagController.cs b/src/Microsoft.Diagnostics.Monitoring.WebApi/Controllers/DiagController.cs index 988f547da60..82a0be0026b 100644 --- a/src/Microsoft.Diagnostics.Monitoring.WebApi/Controllers/DiagController.cs +++ b/src/Microsoft.Diagnostics.Monitoring.WebApi/Controllers/DiagController.cs @@ -50,7 +50,7 @@ public partial class DiagController : DiagnosticsControllerBase private readonly IStacksOperationFactory _stacksOperationFactory; public DiagController(IServiceProvider serviceProvider, ILogger logger) - : base(serviceProvider.GetRequiredService(), serviceProvider.GetRequiredService(), logger) + : base(serviceProvider.GetRequiredService(), serviceProvider.GetRequiredService(), logger) { _diagnosticPortOptions = serviceProvider.GetService>(); _callStacksOptions = serviceProvider.GetRequiredService>(); diff --git a/src/Microsoft.Diagnostics.Monitoring.WebApi/Controllers/DiagnosticsControllerBase.cs b/src/Microsoft.Diagnostics.Monitoring.WebApi/Controllers/DiagnosticsControllerBase.cs index b7770fa8cb0..7903fd5a349 100644 --- a/src/Microsoft.Diagnostics.Monitoring.WebApi/Controllers/DiagnosticsControllerBase.cs +++ b/src/Microsoft.Diagnostics.Monitoring.WebApi/Controllers/DiagnosticsControllerBase.cs @@ -10,7 +10,7 @@ namespace Microsoft.Diagnostics.Monitoring.WebApi.Controllers { public abstract class DiagnosticsControllerBase : ControllerBase { - private protected DiagnosticsControllerBase(IDiagnosticServices diagnosticServices, EgressOperationStore operationStore, ILogger logger) + private protected DiagnosticsControllerBase(IDiagnosticServices diagnosticServices, IEgressOperationStore operationStore, ILogger logger) { DiagnosticServices = diagnosticServices ?? throw new ArgumentNullException(nameof(diagnosticServices)); OperationStore = operationStore ?? throw new ArgumentNullException(nameof(operationStore)); @@ -138,7 +138,7 @@ private async Task SendToEgress(IEgressOperation egressOperation, private protected IDiagnosticServices DiagnosticServices { get; } - private protected EgressOperationStore OperationStore { get; } + private protected IEgressOperationStore OperationStore { get; } protected ILogger Logger { get; } } diff --git a/src/Microsoft.Diagnostics.Monitoring.WebApi/Controllers/OperationsController.cs b/src/Microsoft.Diagnostics.Monitoring.WebApi/Controllers/OperationsController.cs index 144fac44e7d..9c52a10f02b 100644 --- a/src/Microsoft.Diagnostics.Monitoring.WebApi/Controllers/OperationsController.cs +++ b/src/Microsoft.Diagnostics.Monitoring.WebApi/Controllers/OperationsController.cs @@ -19,14 +19,14 @@ namespace Microsoft.Diagnostics.Monitoring.WebApi.Controllers public class OperationsController : ControllerBase { private readonly ILogger _logger; - private readonly EgressOperationStore _operationsStore; + private readonly IEgressOperationStore _operationsStore; public const string ControllerName = "operations"; public OperationsController(ILogger logger, IServiceProvider serviceProvider) { _logger = logger; - _operationsStore = serviceProvider.GetRequiredService(); + _operationsStore = serviceProvider.GetRequiredService(); } /// diff --git a/src/Microsoft.Diagnostics.Monitoring.WebApi/Operation/EgressOperationService.cs b/src/Microsoft.Diagnostics.Monitoring.WebApi/Operation/EgressOperationService.cs index d6315cc2425..60cda1f3788 100644 --- a/src/Microsoft.Diagnostics.Monitoring.WebApi/Operation/EgressOperationService.cs +++ b/src/Microsoft.Diagnostics.Monitoring.WebApi/Operation/EgressOperationService.cs @@ -1,7 +1,6 @@ // Licensed to the .NET Foundation under one or more agreements. // The .NET Foundation licenses this file to you under the MIT license. -using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Hosting; using System; using System.Threading; @@ -11,14 +10,15 @@ namespace Microsoft.Diagnostics.Monitoring.WebApi { internal sealed class EgressOperationService : BackgroundService { - private IEgressOperationQueue _queue; - private IServiceProvider _serviceProvider; - private EgressOperationStore _operationsStore; + private readonly IEgressOperationQueue _queue; + private readonly IServiceProvider _serviceProvider; + private readonly IEgressOperationStore _operationsStore; public EgressOperationService(IServiceProvider serviceProvider, - EgressOperationStore operationStore) + IEgressOperationQueue operationQueue, + IEgressOperationStore operationStore) { - _queue = serviceProvider.GetRequiredService(); + _queue = operationQueue; _serviceProvider = serviceProvider; _operationsStore = operationStore; } @@ -30,11 +30,12 @@ protected override async Task ExecuteAsync(CancellationToken stoppingToken) EgressRequest egressRequest = await _queue.DequeueAsync(stoppingToken); //Note we do not await these tasks, but we do limit how many can be executed at the same time - _ = Task.Run(() => ExecuteEgressOperation(egressRequest, stoppingToken), stoppingToken); + _ = Task.Run(() => ExecuteEgressOperationAsync(egressRequest, stoppingToken), stoppingToken); } } - private async Task ExecuteEgressOperation(EgressRequest egressRequest, CancellationToken stoppingToken) + // Internal for testing. + internal async Task ExecuteEgressOperationAsync(EgressRequest egressRequest, CancellationToken stoppingToken) { //We have two stopping tokens, one per item that can be triggered via Delete //and if we are stopping the service @@ -47,9 +48,13 @@ private async Task ExecuteEgressOperation(EgressRequest egressRequest, Cancellat try { Task> executeTask = egressRequest.EgressOperation.ExecuteAsync(_serviceProvider, token); + Task startTask = egressRequest.EgressOperation.Started; - await egressRequest.EgressOperation.Started.WaitAsync(token).ConfigureAwait(false); - _operationsStore.MarkOperationAsRunning(egressRequest.OperationId); + await Task.WhenAny(startTask, executeTask).Unwrap().WaitAsync(token).ConfigureAwait(false); + if (startTask.IsCompleted) + { + _operationsStore.MarkOperationAsRunning(egressRequest.OperationId); + } ExecutionResult result = await executeTask.WaitAsync(token).ConfigureAwait(false); diff --git a/src/Microsoft.Diagnostics.Monitoring.WebApi/Operation/EgressOperationStore.cs b/src/Microsoft.Diagnostics.Monitoring.WebApi/Operation/EgressOperationStore.cs index c73cfb1f4f0..e741782d252 100644 --- a/src/Microsoft.Diagnostics.Monitoring.WebApi/Operation/EgressOperationStore.cs +++ b/src/Microsoft.Diagnostics.Monitoring.WebApi/Operation/EgressOperationStore.cs @@ -11,7 +11,7 @@ namespace Microsoft.Diagnostics.Monitoring.WebApi { - internal sealed class EgressOperationStore + internal sealed class EgressOperationStore : IEgressOperationStore { private sealed class EgressEntry { diff --git a/src/Microsoft.Diagnostics.Monitoring.WebApi/Operation/IEgressOperationStore.cs b/src/Microsoft.Diagnostics.Monitoring.WebApi/Operation/IEgressOperationStore.cs new file mode 100644 index 00000000000..42920c5371f --- /dev/null +++ b/src/Microsoft.Diagnostics.Monitoring.WebApi/Operation/IEgressOperationStore.cs @@ -0,0 +1,28 @@ +// Licensed to the .NET Foundation under one or more agreements. +// The .NET Foundation licenses this file to you under the MIT license. + +using System; +using System.Collections.Generic; +using System.Threading.Tasks; + +namespace Microsoft.Diagnostics.Monitoring.WebApi +{ + internal interface IEgressOperationStore + { + Task> ExecuteOperation(IEgressOperation egressOperation); + + Task AddOperation(IEgressOperation egressOperation, string limitKey); + + void StopOperation(Guid operationId, Action onStopException); + + void MarkOperationAsRunning(Guid operationId); + + void CancelOperation(Guid operationId); + + void CompleteOperation(Guid operationId, ExecutionResult result); + + IEnumerable GetOperations(ProcessKey? processKey, string tags); + + Models.OperationStatus GetOperationStatus(Guid operationId); + } +} diff --git a/src/Microsoft.Diagnostics.Monitoring.WebApi/Operation/OperationExtensions.cs b/src/Microsoft.Diagnostics.Monitoring.WebApi/Operation/OperationExtensions.cs index 69c9eaa025d..ca6fde9c71c 100644 --- a/src/Microsoft.Diagnostics.Monitoring.WebApi/Operation/OperationExtensions.cs +++ b/src/Microsoft.Diagnostics.Monitoring.WebApi/Operation/OperationExtensions.cs @@ -10,7 +10,7 @@ internal static class OperationExtensions public static IServiceCollection ConfigureOperationStore(this IServiceCollection services) { services.AddSingleton(); - services.AddSingleton(); + services.AddSingleton(); services.AddHostedService(); return services; } diff --git a/src/Tests/Microsoft.Diagnostics.Monitoring.WebApi.UnitTests/Operation/EgressOperationServiceTests.cs b/src/Tests/Microsoft.Diagnostics.Monitoring.WebApi.UnitTests/Operation/EgressOperationServiceTests.cs new file mode 100644 index 00000000000..11aa7c60b15 --- /dev/null +++ b/src/Tests/Microsoft.Diagnostics.Monitoring.WebApi.UnitTests/Operation/EgressOperationServiceTests.cs @@ -0,0 +1,195 @@ +// Licensed to the .NET Foundation under one or more agreements. +// The .NET Foundation licenses this file to you under the MIT license. + +using Microsoft.Diagnostics.Monitoring.TestCommon; +using Moq; +using System; +using System.Collections.Generic; +using System.Threading; +using System.Threading.Tasks; +using Xunit; + +namespace Microsoft.Diagnostics.Monitoring.WebApi.UnitTests.Operation +{ + public class EgressOperationServiceTests + { + private sealed class TestEgressOperation(Func>> executeFunc) : IEgressOperation + { + public bool IsStoppable => false; + + public ISet Tags => new HashSet(); + + public string EgressProviderName => "NA"; + + public EgressProcessInfo ProcessInfo => new EgressProcessInfo("dotnet", processId: 1, Guid.NewGuid()); + + public Task Started => _startedCompletionSource.Task; + + private readonly TaskCompletionSource _startedCompletionSource = new(TaskCreationOptions.RunContinuationsAsynchronously); + + public Task> ExecuteAsync(IServiceProvider serviceProvider, CancellationToken token) + { + return executeFunc(_startedCompletionSource, token); + } + + public Task StopAsync(CancellationToken token) + { + throw new NotImplementedException(); + } + + public void Validate(IServiceProvider serviceProvider) + { + } + } + + private static EgressRequest CreateEgressRequest(Func>> operationExecuteFunc) + => new EgressRequest(Guid.NewGuid(), new TestEgressOperation(operationExecuteFunc), Mock.Of()); + + private static EgressRequest CreateEgressRequest(ExecutionResult result) + => CreateEgressRequest((startCompletionSource, token) => + { + startCompletionSource.SetResult(); + return Task.FromResult(result); + }); + + + private static EgressOperationService CreateEgressOperationService(IEgressOperationStore operationStore) + => new EgressOperationService(Mock.Of(), Mock.Of(), operationStore); + + + [Fact] + public async Task ExecuteAsync_Successful_TransitionsState_ToRunning() + { + // Arrange + Mock mockStore = new(); + using EgressOperationService service = CreateEgressOperationService(mockStore.Object); + using CancellationTokenSource cts = new CancellationTokenSource(CommonTestTimeouts.GeneralTimeout); + + EgressRequest request = CreateEgressRequest(ExecutionResult.Empty()); + + // Act + await service.ExecuteEgressOperationAsync(request, cts.Token); + + + // Assert + mockStore.Verify( + m => m.MarkOperationAsRunning(request.OperationId), + Times.Once()); + } + + [Fact] + public async Task ExecuteAsync_Successful_TransitionsState_ToCompleted() + { + // Arrange + Mock mockStore = new(); + using EgressOperationService service = CreateEgressOperationService(mockStore.Object); + using CancellationTokenSource cts = new CancellationTokenSource(CommonTestTimeouts.GeneralTimeout); + + EgressRequest request = CreateEgressRequest(ExecutionResult.Empty()); + + // Act + await service.ExecuteEgressOperationAsync(request, cts.Token); + + + // Assert + mockStore.Verify( + m => m.CompleteOperation(request.OperationId, It.IsAny>()), + Times.Once()); + } + + [Fact] + public async Task ExecuteAsync_CompletesWithoutStarting_TransitionsState_ToCompleted() + { + // Arrange + Mock mockStore = new(); + using EgressOperationService service = CreateEgressOperationService(mockStore.Object); + using CancellationTokenSource cts = new CancellationTokenSource(CommonTestTimeouts.GeneralTimeout); + + EgressRequest request = CreateEgressRequest((startCompletionSource, token) => + { + // Don't signal the start completion source + return Task.FromResult(ExecutionResult.Empty()); + }); + + // Act + await service.ExecuteEgressOperationAsync(request, cts.Token); + + + // Assert + mockStore.Verify( + m => m.CompleteOperation(request.OperationId, It.IsAny>()), + Times.Once()); + } + + [Theory] + [InlineData(true)] + [InlineData(false)] + public async Task ExecuteAsync_Failure_TransitionsState_ToFaulted(bool isStarted) + { + // Arrange + Exception testException = new InvalidOperationException("test"); + Mock mockStore = new(); + EgressRequest request = CreateEgressRequest((startCompletionSource, token) => + { + if (isStarted) + { + _ = startCompletionSource.TrySetResult(); + } + throw testException; + }); + + TaskCompletionSource> egressResultCompletionSource = new(TaskCreationOptions.RunContinuationsAsynchronously); + + _ = mockStore + .Setup(m => m.CompleteOperation(request.OperationId, It.IsAny>())) + .Callback((Guid id, ExecutionResult result) => + { + egressResultCompletionSource.SetResult(result); + }); + + using EgressOperationService service = CreateEgressOperationService(mockStore.Object); + using CancellationTokenSource cts = new CancellationTokenSource(CommonTestTimeouts.GeneralTimeout); + + // Act + await service.ExecuteEgressOperationAsync(request, cts.Token); + + + // Assert + Assert.True(egressResultCompletionSource.Task.IsCompleted); + ExecutionResult result = await egressResultCompletionSource.Task; + + Assert.Equal(testException, result.Exception); + } + + [Theory] + [InlineData(true)] + [InlineData(false)] + public async Task ExecuteAsync_Cancelled_TransitionsState_ToCancelled(bool isStarted) + { + // Arrange + Mock mockStore = new(); + using EgressOperationService service = CreateEgressOperationService(mockStore.Object); + using CancellationTokenSource cts = new CancellationTokenSource(CommonTestTimeouts.GeneralTimeout); + + EgressRequest request = CreateEgressRequest(async (startCompletionSource, token) => + { + if (isStarted) + { + _ = startCompletionSource.TrySetResult(); + } + + await cts.CancelAsync(); + token.ThrowIfCancellationRequested(); + + throw new InvalidOperationException("Should never reach here"); + }); + + // Act & Assert + await Assert.ThrowsAnyAsync(() => service.ExecuteEgressOperationAsync(request, cts.Token)); + + mockStore.Verify( + m => m.CancelOperation(request.OperationId), + Times.Once()); + } + } +} diff --git a/src/Tools/dotnet-monitor/CollectionRules/Actions/CollectionRuleEgressActionBase.cs b/src/Tools/dotnet-monitor/CollectionRules/Actions/CollectionRuleEgressActionBase.cs index c4df93cb6b0..7dc0acaaddb 100644 --- a/src/Tools/dotnet-monitor/CollectionRules/Actions/CollectionRuleEgressActionBase.cs +++ b/src/Tools/dotnet-monitor/CollectionRules/Actions/CollectionRuleEgressActionBase.cs @@ -15,13 +15,13 @@ internal abstract class CollectionRuleEgressActionBase : CollectionRul { protected IServiceProvider ServiceProvider { get; } - protected EgressOperationStore EgressOperationStore { get; } + protected IEgressOperationStore EgressOperationStore { get; } protected CollectionRuleEgressActionBase(IServiceProvider serviceProvider, IProcessInfo processInfo, TOptions options) : base(processInfo, options) { ServiceProvider = serviceProvider ?? throw new ArgumentNullException(nameof(serviceProvider)); - EgressOperationStore = ServiceProvider.GetRequiredService(); + EgressOperationStore = ServiceProvider.GetRequiredService(); } protected abstract EgressOperation CreateArtifactOperation(CollectionRuleMetadata collectionRuleMetadata);