Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Prevent operations from getting stuck in the Starting state #6686

Merged
merged 2 commits into from
May 21, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ public partial class DiagController : DiagnosticsControllerBase
private readonly IStacksOperationFactory _stacksOperationFactory;

public DiagController(IServiceProvider serviceProvider, ILogger<DiagController> logger)
: base(serviceProvider.GetRequiredService<IDiagnosticServices>(), serviceProvider.GetRequiredService<EgressOperationStore>(), logger)
: base(serviceProvider.GetRequiredService<IDiagnosticServices>(), serviceProvider.GetRequiredService<IEgressOperationStore>(), logger)
{
_diagnosticPortOptions = serviceProvider.GetService<IOptions<DiagnosticPortOptions>>();
_callStacksOptions = serviceProvider.GetRequiredService<IOptions<CallStacksOptions>>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,10 @@ namespace Microsoft.Diagnostics.Monitoring.WebApi.Controllers
public abstract class DiagnosticsControllerBase : ControllerBase
{
protected DiagnosticsControllerBase(IServiceProvider serviceProvider, ILogger logger) :
this(serviceProvider.GetRequiredService<IDiagnosticServices>(), serviceProvider.GetRequiredService<EgressOperationStore>(), logger)
this(serviceProvider.GetRequiredService<IDiagnosticServices>(), serviceProvider.GetRequiredService<IEgressOperationStore>(), logger)
{ }

private protected DiagnosticsControllerBase(IDiagnosticServices diagnosticServices, EgressOperationStore operationStore, ILogger logger)
private protected DiagnosticsControllerBase(IDiagnosticServices diagnosticServices, IEgressOperationStore operationStore, ILogger logger)
{
DiagnosticServices = diagnosticServices ?? throw new ArgumentNullException(nameof(diagnosticServices));
OperationStore = operationStore ?? throw new ArgumentNullException(nameof(operationStore));
Expand Down Expand Up @@ -129,7 +129,7 @@ private async Task<ActionResult> SendToEgress(IEgressOperation egressOperation,

private protected IDiagnosticServices DiagnosticServices { get; }

private protected EgressOperationStore OperationStore { get; }
private protected IEgressOperationStore OperationStore { get; }

protected ILogger Logger { get; }
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,14 @@ namespace Microsoft.Diagnostics.Monitoring.WebApi.Controllers
public class OperationsController : ControllerBase
{
private readonly ILogger<OperationsController> _logger;
private readonly EgressOperationStore _operationsStore;
private readonly IEgressOperationStore _operationsStore;

public const string ControllerName = "operations";

public OperationsController(ILogger<OperationsController> logger, IServiceProvider serviceProvider)
{
_logger = logger;
_operationsStore = serviceProvider.GetRequiredService<EgressOperationStore>();
_operationsStore = serviceProvider.GetRequiredService<IEgressOperationStore>();
}

/// <summary>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.

using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using System;
using System.Threading;
Expand All @@ -11,14 +10,15 @@ namespace Microsoft.Diagnostics.Monitoring.WebApi
{
internal sealed class EgressOperationService : BackgroundService
{
private IEgressOperationQueue _queue;
private IServiceProvider _serviceProvider;
private EgressOperationStore _operationsStore;
private readonly IEgressOperationQueue _queue;
private readonly IServiceProvider _serviceProvider;
private readonly IEgressOperationStore _operationsStore;

public EgressOperationService(IServiceProvider serviceProvider,
EgressOperationStore operationStore)
IEgressOperationQueue operationQueue,
IEgressOperationStore operationStore)
{
_queue = serviceProvider.GetRequiredService<IEgressOperationQueue>();
_queue = operationQueue;
_serviceProvider = serviceProvider;
_operationsStore = operationStore;
}
Expand All @@ -30,11 +30,12 @@ protected override async Task ExecuteAsync(CancellationToken stoppingToken)
EgressRequest egressRequest = await _queue.DequeueAsync(stoppingToken);

//Note we do not await these tasks, but we do limit how many can be executed at the same time
_ = Task.Run(() => ExecuteEgressOperation(egressRequest, stoppingToken), stoppingToken);
_ = Task.Run(() => ExecuteEgressOperationAsync(egressRequest, stoppingToken), stoppingToken);
}
}

private async Task ExecuteEgressOperation(EgressRequest egressRequest, CancellationToken stoppingToken)
// Internal for testing.
internal async Task ExecuteEgressOperationAsync(EgressRequest egressRequest, CancellationToken stoppingToken)
{
//We have two stopping tokens, one per item that can be triggered via Delete
//and if we are stopping the service
Expand All @@ -47,9 +48,13 @@ private async Task ExecuteEgressOperation(EgressRequest egressRequest, Cancellat
try
{
Task<ExecutionResult<EgressResult>> executeTask = egressRequest.EgressOperation.ExecuteAsync(_serviceProvider, token);
Task startTask = egressRequest.EgressOperation.Started;

await egressRequest.EgressOperation.Started.WaitAsync(token).ConfigureAwait(false);
_operationsStore.MarkOperationAsRunning(egressRequest.OperationId);
await Task.WhenAny(startTask, executeTask).Unwrap().WaitAsync(token).ConfigureAwait(false);
if (startTask.IsCompleted)
{
_operationsStore.MarkOperationAsRunning(egressRequest.OperationId);
}

ExecutionResult<EgressResult> result = await executeTask.WaitAsync(token).ConfigureAwait(false);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@

namespace Microsoft.Diagnostics.Monitoring.WebApi
{
internal sealed class EgressOperationStore
internal sealed class EgressOperationStore : IEgressOperationStore
{
private sealed class EgressEntry
{
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.

using System;
using System.Collections.Generic;
using System.Threading.Tasks;

namespace Microsoft.Diagnostics.Monitoring.WebApi
{
internal interface IEgressOperationStore
{
Task<ExecutionResult<EgressResult>> ExecuteOperation(IEgressOperation egressOperation);

Task<Guid> AddOperation(IEgressOperation egressOperation, string limitKey);

void StopOperation(Guid operationId, Action<Exception> onStopException);

void MarkOperationAsRunning(Guid operationId);

void CancelOperation(Guid operationId);

void CompleteOperation(Guid operationId, ExecutionResult<EgressResult> result);

IEnumerable<Models.OperationSummary> GetOperations(ProcessKey? processKey, string tags);

Models.OperationStatus GetOperationStatus(Guid operationId);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ internal static class OperationExtensions
public static IServiceCollection ConfigureOperationStore(this IServiceCollection services)
{
services.AddSingleton<IEgressOperationQueue, EgressOperationQueue>();
services.AddSingleton<EgressOperationStore>();
services.AddSingleton<IEgressOperationStore, EgressOperationStore>();
services.AddHostedService<EgressOperationService>();
return services;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,195 @@
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.

using Microsoft.Diagnostics.Monitoring.TestCommon;
using Moq;
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using Xunit;

namespace Microsoft.Diagnostics.Monitoring.WebApi.UnitTests.Operation
{
public class EgressOperationServiceTests
{
private sealed class TestEgressOperation(Func<TaskCompletionSource, CancellationToken, Task<ExecutionResult<EgressResult>>> executeFunc) : IEgressOperation
{
public bool IsStoppable => false;

public ISet<string> Tags => new HashSet<string>();

public string EgressProviderName => "NA";

public EgressProcessInfo ProcessInfo => new EgressProcessInfo("dotnet", processId: 1, Guid.NewGuid());

public Task Started => _startedCompletionSource.Task;

private readonly TaskCompletionSource _startedCompletionSource = new(TaskCreationOptions.RunContinuationsAsynchronously);

public Task<ExecutionResult<EgressResult>> ExecuteAsync(IServiceProvider serviceProvider, CancellationToken token)
{
return executeFunc(_startedCompletionSource, token);
}

public Task StopAsync(CancellationToken token)
{
throw new NotImplementedException();
}

public void Validate(IServiceProvider serviceProvider)
{
}
}

private static EgressRequest CreateEgressRequest(Func<TaskCompletionSource, CancellationToken, Task<ExecutionResult<EgressResult>>> operationExecuteFunc)
=> new EgressRequest(Guid.NewGuid(), new TestEgressOperation(operationExecuteFunc), Mock.Of<IDisposable>());

private static EgressRequest CreateEgressRequest(ExecutionResult<EgressResult> result)
=> CreateEgressRequest((startCompletionSource, token) =>
{
startCompletionSource.SetResult();
return Task.FromResult(result);
});


private static EgressOperationService CreateEgressOperationService(IEgressOperationStore operationStore)
=> new EgressOperationService(Mock.Of<IServiceProvider>(), Mock.Of<IEgressOperationQueue>(), operationStore);


[Fact]
public async Task ExecuteAsync_Successful_TransitionsState_ToRunning()
{
// Arrange
Mock<IEgressOperationStore> mockStore = new();
using EgressOperationService service = CreateEgressOperationService(mockStore.Object);
using CancellationTokenSource cts = new CancellationTokenSource(CommonTestTimeouts.GeneralTimeout);

EgressRequest request = CreateEgressRequest(ExecutionResult<EgressResult>.Empty());

// Act
await service.ExecuteEgressOperationAsync(request, cts.Token);


// Assert
mockStore.Verify(
m => m.MarkOperationAsRunning(request.OperationId),
Times.Once());
}

[Fact]
public async Task ExecuteAsync_Successful_TransitionsState_ToCompleted()
{
// Arrange
Mock<IEgressOperationStore> mockStore = new();
using EgressOperationService service = CreateEgressOperationService(mockStore.Object);
using CancellationTokenSource cts = new CancellationTokenSource(CommonTestTimeouts.GeneralTimeout);

EgressRequest request = CreateEgressRequest(ExecutionResult<EgressResult>.Empty());

// Act
await service.ExecuteEgressOperationAsync(request, cts.Token);


// Assert
mockStore.Verify(
m => m.CompleteOperation(request.OperationId, It.IsAny<ExecutionResult<EgressResult>>()),
Times.Once());
}

[Fact]
public async Task ExecuteAsync_CompletesWithoutStarting_TransitionsState_ToCompleted()
{
// Arrange
Mock<IEgressOperationStore> mockStore = new();
using EgressOperationService service = CreateEgressOperationService(mockStore.Object);
using CancellationTokenSource cts = new CancellationTokenSource(CommonTestTimeouts.GeneralTimeout);

EgressRequest request = CreateEgressRequest((startCompletionSource, token) =>
{
// Don't signal the start completion source
return Task.FromResult(ExecutionResult<EgressResult>.Empty());
});

// Act
await service.ExecuteEgressOperationAsync(request, cts.Token);


// Assert
mockStore.Verify(
m => m.CompleteOperation(request.OperationId, It.IsAny<ExecutionResult<EgressResult>>()),
Times.Once());
}

[Theory]
[InlineData(true)]
[InlineData(false)]
public async Task ExecuteAsync_Failure_TransitionsState_ToFaulted(bool isStarted)
{
// Arrange
Exception testException = new InvalidOperationException("test");
Mock<IEgressOperationStore> mockStore = new();
EgressRequest request = CreateEgressRequest((startCompletionSource, token) =>
{
if (isStarted)
{
_ = startCompletionSource.TrySetResult();
}
throw testException;
});

TaskCompletionSource<ExecutionResult<EgressResult>> egressResultCompletionSource = new();
schmittjoseph marked this conversation as resolved.
Show resolved Hide resolved

_ = mockStore
.Setup(m => m.CompleteOperation(request.OperationId, It.IsAny<ExecutionResult<EgressResult>>()))
.Callback((Guid id, ExecutionResult<EgressResult> result) =>
{
egressResultCompletionSource.SetResult(result);
});

using EgressOperationService service = CreateEgressOperationService(mockStore.Object);
using CancellationTokenSource cts = new CancellationTokenSource(CommonTestTimeouts.GeneralTimeout);

// Act
await service.ExecuteEgressOperationAsync(request, cts.Token);


// Assert
Assert.True(egressResultCompletionSource.Task.IsCompleted);
ExecutionResult<EgressResult> result = await egressResultCompletionSource.Task;

Assert.Equal(testException, result.Exception);
}

[Theory]
[InlineData(true)]
[InlineData(false)]
public async Task ExecuteAsync_Cancelled_TransitionsState_ToCancelled(bool isStarted)
{
// Arrange
Mock<IEgressOperationStore> mockStore = new();
using EgressOperationService service = CreateEgressOperationService(mockStore.Object);
using CancellationTokenSource cts = new CancellationTokenSource(CommonTestTimeouts.GeneralTimeout);

EgressRequest request = CreateEgressRequest(async (startCompletionSource, token) =>
{
if (isStarted)
{
_ = startCompletionSource.TrySetResult();
}

await cts.CancelAsync();
token.ThrowIfCancellationRequested();

throw new InvalidOperationException("Should never reach here");
});

// Act & Assert
await Assert.ThrowsAnyAsync<OperationCanceledException>(() => service.ExecuteEgressOperationAsync(request, cts.Token));

mockStore.Verify(
m => m.CancelOperation(request.OperationId),
Times.Once());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,13 @@ internal abstract class CollectionRuleEgressActionBase<TOptions> : CollectionRul
{
protected IServiceProvider ServiceProvider { get; }

protected EgressOperationStore EgressOperationStore { get; }
protected IEgressOperationStore EgressOperationStore { get; }

protected CollectionRuleEgressActionBase(IServiceProvider serviceProvider, IProcessInfo processInfo, TOptions options)
: base(processInfo, options)
{
ServiceProvider = serviceProvider ?? throw new ArgumentNullException(nameof(serviceProvider));
EgressOperationStore = ServiceProvider.GetRequiredService<EgressOperationStore>();
EgressOperationStore = ServiceProvider.GetRequiredService<IEgressOperationStore>();
}

protected abstract EgressOperation CreateArtifactOperation(CollectionRuleMetadata collectionRuleMetadata);
Expand Down