Skip to content

Commit

Permalink
Prevent operations from getting stuck in the Starting state (#6686)
Browse files Browse the repository at this point in the history
  • Loading branch information
schmittjoseph authored May 21, 2024
1 parent 798744e commit 3cb7701
Show file tree
Hide file tree
Showing 9 changed files with 248 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ public partial class DiagController : DiagnosticsControllerBase
private readonly IStacksOperationFactory _stacksOperationFactory;

public DiagController(IServiceProvider serviceProvider, ILogger<DiagController> logger)
: base(serviceProvider.GetRequiredService<IDiagnosticServices>(), serviceProvider.GetRequiredService<EgressOperationStore>(), logger)
: base(serviceProvider.GetRequiredService<IDiagnosticServices>(), serviceProvider.GetRequiredService<IEgressOperationStore>(), logger)
{
_diagnosticPortOptions = serviceProvider.GetService<IOptions<DiagnosticPortOptions>>();
_callStacksOptions = serviceProvider.GetRequiredService<IOptions<CallStacksOptions>>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,10 @@ namespace Microsoft.Diagnostics.Monitoring.WebApi.Controllers
public abstract class DiagnosticsControllerBase : ControllerBase
{
protected DiagnosticsControllerBase(IServiceProvider serviceProvider, ILogger logger) :
this(serviceProvider.GetRequiredService<IDiagnosticServices>(), serviceProvider.GetRequiredService<EgressOperationStore>(), logger)
this(serviceProvider.GetRequiredService<IDiagnosticServices>(), serviceProvider.GetRequiredService<IEgressOperationStore>(), logger)
{ }

private protected DiagnosticsControllerBase(IDiagnosticServices diagnosticServices, EgressOperationStore operationStore, ILogger logger)
private protected DiagnosticsControllerBase(IDiagnosticServices diagnosticServices, IEgressOperationStore operationStore, ILogger logger)
{
DiagnosticServices = diagnosticServices ?? throw new ArgumentNullException(nameof(diagnosticServices));
OperationStore = operationStore ?? throw new ArgumentNullException(nameof(operationStore));
Expand Down Expand Up @@ -129,7 +129,7 @@ private async Task<ActionResult> SendToEgress(IEgressOperation egressOperation,

private protected IDiagnosticServices DiagnosticServices { get; }

private protected EgressOperationStore OperationStore { get; }
private protected IEgressOperationStore OperationStore { get; }

protected ILogger Logger { get; }
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,14 @@ namespace Microsoft.Diagnostics.Monitoring.WebApi.Controllers
public class OperationsController : ControllerBase
{
private readonly ILogger<OperationsController> _logger;
private readonly EgressOperationStore _operationsStore;
private readonly IEgressOperationStore _operationsStore;

public const string ControllerName = "operations";

public OperationsController(ILogger<OperationsController> logger, IServiceProvider serviceProvider)
{
_logger = logger;
_operationsStore = serviceProvider.GetRequiredService<EgressOperationStore>();
_operationsStore = serviceProvider.GetRequiredService<IEgressOperationStore>();
}

/// <summary>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.

using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using System;
using System.Threading;
Expand All @@ -11,14 +10,15 @@ namespace Microsoft.Diagnostics.Monitoring.WebApi
{
internal sealed class EgressOperationService : BackgroundService
{
private IEgressOperationQueue _queue;
private IServiceProvider _serviceProvider;
private EgressOperationStore _operationsStore;
private readonly IEgressOperationQueue _queue;
private readonly IServiceProvider _serviceProvider;
private readonly IEgressOperationStore _operationsStore;

public EgressOperationService(IServiceProvider serviceProvider,
EgressOperationStore operationStore)
IEgressOperationQueue operationQueue,
IEgressOperationStore operationStore)
{
_queue = serviceProvider.GetRequiredService<IEgressOperationQueue>();
_queue = operationQueue;
_serviceProvider = serviceProvider;
_operationsStore = operationStore;
}
Expand All @@ -30,11 +30,12 @@ protected override async Task ExecuteAsync(CancellationToken stoppingToken)
EgressRequest egressRequest = await _queue.DequeueAsync(stoppingToken);

//Note we do not await these tasks, but we do limit how many can be executed at the same time
_ = Task.Run(() => ExecuteEgressOperation(egressRequest, stoppingToken), stoppingToken);
_ = Task.Run(() => ExecuteEgressOperationAsync(egressRequest, stoppingToken), stoppingToken);
}
}

private async Task ExecuteEgressOperation(EgressRequest egressRequest, CancellationToken stoppingToken)
// Internal for testing.
internal async Task ExecuteEgressOperationAsync(EgressRequest egressRequest, CancellationToken stoppingToken)
{
//We have two stopping tokens, one per item that can be triggered via Delete
//and if we are stopping the service
Expand All @@ -47,9 +48,13 @@ private async Task ExecuteEgressOperation(EgressRequest egressRequest, Cancellat
try
{
Task<ExecutionResult<EgressResult>> executeTask = egressRequest.EgressOperation.ExecuteAsync(_serviceProvider, token);
Task startTask = egressRequest.EgressOperation.Started;

await egressRequest.EgressOperation.Started.WaitAsync(token).ConfigureAwait(false);
_operationsStore.MarkOperationAsRunning(egressRequest.OperationId);
await Task.WhenAny(startTask, executeTask).Unwrap().WaitAsync(token).ConfigureAwait(false);
if (startTask.IsCompleted)
{
_operationsStore.MarkOperationAsRunning(egressRequest.OperationId);
}

ExecutionResult<EgressResult> result = await executeTask.WaitAsync(token).ConfigureAwait(false);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@

namespace Microsoft.Diagnostics.Monitoring.WebApi
{
internal sealed class EgressOperationStore
internal sealed class EgressOperationStore : IEgressOperationStore
{
private sealed class EgressEntry
{
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.

using System;
using System.Collections.Generic;
using System.Threading.Tasks;

namespace Microsoft.Diagnostics.Monitoring.WebApi
{
internal interface IEgressOperationStore
{
Task<ExecutionResult<EgressResult>> ExecuteOperation(IEgressOperation egressOperation);

Task<Guid> AddOperation(IEgressOperation egressOperation, string limitKey);

void StopOperation(Guid operationId, Action<Exception> onStopException);

void MarkOperationAsRunning(Guid operationId);

void CancelOperation(Guid operationId);

void CompleteOperation(Guid operationId, ExecutionResult<EgressResult> result);

IEnumerable<Models.OperationSummary> GetOperations(ProcessKey? processKey, string tags);

Models.OperationStatus GetOperationStatus(Guid operationId);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ internal static class OperationExtensions
public static IServiceCollection ConfigureOperationStore(this IServiceCollection services)
{
services.AddSingleton<IEgressOperationQueue, EgressOperationQueue>();
services.AddSingleton<EgressOperationStore>();
services.AddSingleton<IEgressOperationStore, EgressOperationStore>();
services.AddHostedService<EgressOperationService>();
return services;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,195 @@
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.

using Microsoft.Diagnostics.Monitoring.TestCommon;
using Moq;
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using Xunit;

namespace Microsoft.Diagnostics.Monitoring.WebApi.UnitTests.Operation
{
public class EgressOperationServiceTests
{
private sealed class TestEgressOperation(Func<TaskCompletionSource, CancellationToken, Task<ExecutionResult<EgressResult>>> executeFunc) : IEgressOperation
{
public bool IsStoppable => false;

public ISet<string> Tags => new HashSet<string>();

public string EgressProviderName => "NA";

public EgressProcessInfo ProcessInfo => new EgressProcessInfo("dotnet", processId: 1, Guid.NewGuid());

public Task Started => _startedCompletionSource.Task;

private readonly TaskCompletionSource _startedCompletionSource = new(TaskCreationOptions.RunContinuationsAsynchronously);

public Task<ExecutionResult<EgressResult>> ExecuteAsync(IServiceProvider serviceProvider, CancellationToken token)
{
return executeFunc(_startedCompletionSource, token);
}

public Task StopAsync(CancellationToken token)
{
throw new NotImplementedException();
}

public void Validate(IServiceProvider serviceProvider)
{
}
}

private static EgressRequest CreateEgressRequest(Func<TaskCompletionSource, CancellationToken, Task<ExecutionResult<EgressResult>>> operationExecuteFunc)
=> new EgressRequest(Guid.NewGuid(), new TestEgressOperation(operationExecuteFunc), Mock.Of<IDisposable>());

private static EgressRequest CreateEgressRequest(ExecutionResult<EgressResult> result)
=> CreateEgressRequest((startCompletionSource, token) =>
{
startCompletionSource.SetResult();
return Task.FromResult(result);
});


private static EgressOperationService CreateEgressOperationService(IEgressOperationStore operationStore)
=> new EgressOperationService(Mock.Of<IServiceProvider>(), Mock.Of<IEgressOperationQueue>(), operationStore);


[Fact]
public async Task ExecuteAsync_Successful_TransitionsState_ToRunning()
{
// Arrange
Mock<IEgressOperationStore> mockStore = new();
using EgressOperationService service = CreateEgressOperationService(mockStore.Object);
using CancellationTokenSource cts = new CancellationTokenSource(CommonTestTimeouts.GeneralTimeout);

EgressRequest request = CreateEgressRequest(ExecutionResult<EgressResult>.Empty());

// Act
await service.ExecuteEgressOperationAsync(request, cts.Token);


// Assert
mockStore.Verify(
m => m.MarkOperationAsRunning(request.OperationId),
Times.Once());
}

[Fact]
public async Task ExecuteAsync_Successful_TransitionsState_ToCompleted()
{
// Arrange
Mock<IEgressOperationStore> mockStore = new();
using EgressOperationService service = CreateEgressOperationService(mockStore.Object);
using CancellationTokenSource cts = new CancellationTokenSource(CommonTestTimeouts.GeneralTimeout);

EgressRequest request = CreateEgressRequest(ExecutionResult<EgressResult>.Empty());

// Act
await service.ExecuteEgressOperationAsync(request, cts.Token);


// Assert
mockStore.Verify(
m => m.CompleteOperation(request.OperationId, It.IsAny<ExecutionResult<EgressResult>>()),
Times.Once());
}

[Fact]
public async Task ExecuteAsync_CompletesWithoutStarting_TransitionsState_ToCompleted()
{
// Arrange
Mock<IEgressOperationStore> mockStore = new();
using EgressOperationService service = CreateEgressOperationService(mockStore.Object);
using CancellationTokenSource cts = new CancellationTokenSource(CommonTestTimeouts.GeneralTimeout);

EgressRequest request = CreateEgressRequest((startCompletionSource, token) =>
{
// Don't signal the start completion source
return Task.FromResult(ExecutionResult<EgressResult>.Empty());
});

// Act
await service.ExecuteEgressOperationAsync(request, cts.Token);


// Assert
mockStore.Verify(
m => m.CompleteOperation(request.OperationId, It.IsAny<ExecutionResult<EgressResult>>()),
Times.Once());
}

[Theory]
[InlineData(true)]
[InlineData(false)]
public async Task ExecuteAsync_Failure_TransitionsState_ToFaulted(bool isStarted)
{
// Arrange
Exception testException = new InvalidOperationException("test");
Mock<IEgressOperationStore> mockStore = new();
EgressRequest request = CreateEgressRequest((startCompletionSource, token) =>
{
if (isStarted)
{
_ = startCompletionSource.TrySetResult();
}
throw testException;
});

TaskCompletionSource<ExecutionResult<EgressResult>> egressResultCompletionSource = new(TaskCreationOptions.RunContinuationsAsynchronously);

_ = mockStore
.Setup(m => m.CompleteOperation(request.OperationId, It.IsAny<ExecutionResult<EgressResult>>()))
.Callback((Guid id, ExecutionResult<EgressResult> result) =>
{
egressResultCompletionSource.SetResult(result);
});

using EgressOperationService service = CreateEgressOperationService(mockStore.Object);
using CancellationTokenSource cts = new CancellationTokenSource(CommonTestTimeouts.GeneralTimeout);

// Act
await service.ExecuteEgressOperationAsync(request, cts.Token);


// Assert
Assert.True(egressResultCompletionSource.Task.IsCompleted);
ExecutionResult<EgressResult> result = await egressResultCompletionSource.Task;

Assert.Equal(testException, result.Exception);
}

[Theory]
[InlineData(true)]
[InlineData(false)]
public async Task ExecuteAsync_Cancelled_TransitionsState_ToCancelled(bool isStarted)
{
// Arrange
Mock<IEgressOperationStore> mockStore = new();
using EgressOperationService service = CreateEgressOperationService(mockStore.Object);
using CancellationTokenSource cts = new CancellationTokenSource(CommonTestTimeouts.GeneralTimeout);

EgressRequest request = CreateEgressRequest(async (startCompletionSource, token) =>
{
if (isStarted)
{
_ = startCompletionSource.TrySetResult();
}

await cts.CancelAsync();
token.ThrowIfCancellationRequested();

throw new InvalidOperationException("Should never reach here");
});

// Act & Assert
await Assert.ThrowsAnyAsync<OperationCanceledException>(() => service.ExecuteEgressOperationAsync(request, cts.Token));

mockStore.Verify(
m => m.CancelOperation(request.OperationId),
Times.Once());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,13 @@ internal abstract class CollectionRuleEgressActionBase<TOptions> : CollectionRul
{
protected IServiceProvider ServiceProvider { get; }

protected EgressOperationStore EgressOperationStore { get; }
protected IEgressOperationStore EgressOperationStore { get; }

protected CollectionRuleEgressActionBase(IServiceProvider serviceProvider, IProcessInfo processInfo, TOptions options)
: base(processInfo, options)
{
ServiceProvider = serviceProvider ?? throw new ArgumentNullException(nameof(serviceProvider));
EgressOperationStore = ServiceProvider.GetRequiredService<EgressOperationStore>();
EgressOperationStore = ServiceProvider.GetRequiredService<IEgressOperationStore>();
}

protected abstract EgressOperation CreateArtifactOperation(CollectionRuleMetadata collectionRuleMetadata);
Expand Down

0 comments on commit 3cb7701

Please sign in to comment.