Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Investigation hanging audit instance #4758

Draft
wants to merge 10 commits into
base: otel-metrics
Choose a base branch
from
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
namespace ServiceControl.Audit.Persistence.InMemory
{
using System.Threading;
using System.Threading.Tasks;
using ServiceControl.Audit.Auditing.BodyStorage;
using ServiceControl.Audit.Persistence.UnitOfWork;
Expand All @@ -12,7 +13,7 @@ public InMemoryAuditIngestionUnitOfWorkFactory(InMemoryAuditDataStore dataStore,
bodyStorageEnricher = new BodyStorageEnricher(bodyStorage, settings);
}

public ValueTask<IAuditIngestionUnitOfWork> StartNew(int batchSize)
public ValueTask<IAuditIngestionUnitOfWork> StartNew(int batchSize, CancellationToken cancellationToken)
{
//The batchSize argument is ignored: the in-memory storage implementation doesn't support batching.
return new ValueTask<IAuditIngestionUnitOfWork>(new InMemoryAuditIngestionUnitOfWork(dataStore, bodyStorageEnricher));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,10 @@ class RavenAuditIngestionUnitOfWorkFactory(
MinimumRequiredStorageState customCheckState)
: IAuditIngestionUnitOfWorkFactory
{
public async ValueTask<IAuditIngestionUnitOfWork> StartNew(int batchSize)
public async ValueTask<IAuditIngestionUnitOfWork> StartNew(int batchSize, CancellationToken cancellationToken)
{
var timedCancellationSource = new CancellationTokenSource(databaseConfiguration.BulkInsertCommitTimeout);
var timedCancellationSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be good to have a comment here indicating the disposal happens as part of the unit of work

timedCancellationSource.CancelAfter(databaseConfiguration.BulkInsertCommitTimeout);
var bulkInsert = (await documentStoreProvider.GetDocumentStore(timedCancellationSource.Token))
.BulkInsert(new BulkInsertOptions { SkipOverwriteIfUnchanged = true, }, timedCancellationSource.Token);

Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
namespace ServiceControl.Audit.Persistence.UnitOfWork
{
using System.Threading;
using System.Threading.Tasks;

public interface IAuditIngestionUnitOfWorkFactory
{
ValueTask<IAuditIngestionUnitOfWork> StartNew(int batchSize); //Throws if not enough space or some other problem preventing from writing data
ValueTask<IAuditIngestionUnitOfWork> StartNew(int batchSize, CancellationToken cancellationToken = default); //Throws if not enough space or some other problem preventing from writing data
bool CanIngestMore();
}
}
113 changes: 72 additions & 41 deletions src/ServiceControl.Audit/Auditing/AuditIngestion.cs
Original file line number Diff line number Diff line change
Expand Up @@ -52,15 +52,27 @@ public AuditIngestion(

errorHandlingPolicy = new AuditIngestionFaultPolicy(failedImportsStorage, settings.LoggingSettings, OnCriticalError);

watchdog = new Watchdog("audit message ingestion", EnsureStarted, EnsureStopped, ingestionState.ReportError, ingestionState.Clear, settings.TimeToRestartAuditIngestionAfterFailure, logger);

ingestionWorker = Task.Run(() => Loop(), CancellationToken.None);
watchdog = new Watchdog(
"audit message ingestion",
EnsureStarted,
EnsureStopped,
ingestionState.ReportError,
ingestionState.Clear,
settings.TimeToRestartAuditIngestionAfterFailure,
logger
);
}

public Task StartAsync(CancellationToken _) => watchdog.Start(() => applicationLifetime.StopApplication());
public async Task StartAsync(CancellationToken cancellationToken)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@andreasohlund @danielmarbach Alternative is to use BackgroundService.ExecuteAsync, assuming we can safely terminate ASAP:

protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
    await watchdog.Start(() => applicationLifetime.StopApplication());
    await Loop(stoppingToken);
    // Intentionally not invoking the following to shut down ASAP
    // watchdog.Stop();
    // channel.Writer.Complete();
    // if (transportInfrastructure != null)
    // {
    //     await transportInfrastructure.Shutdown(stoppingToken);
    // }
}

As everything is at-least-once processing and idempotent this would be the fastest method to shutdow.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it makes sense to switch to a BackgroundService here and inline the loop into the execute method

Be aware though of dotnet/runtime#36063 and https://blog.stephencleary.com/2020/05/backgroundservice-gotcha-startup.html for more context

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I already tested that and it works. As the loop pretty much immediately does IO its not affected which is why I removed the Task.Run. I can restore it for safety or add a code comment. Any preference @danielmarbach ?

{
stopSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
ingestionWorker = Loop(stopSource.Token);
await watchdog.Start(() => applicationLifetime.StopApplication());
}

public async Task StopAsync(CancellationToken cancellationToken)
{
await stopSource.CancelAsync();
await watchdog.Stop();
channel.Writer.Complete();
await ingestionWorker;
Expand Down Expand Up @@ -117,7 +129,7 @@ async Task EnsureStarted(CancellationToken cancellationToken = default)

queueIngestor = transportInfrastructure.Receivers[inputEndpoint];

await auditIngestor.VerifyCanReachForwardingAddress();
await auditIngestor.VerifyCanReachForwardingAddress(cancellationToken);

await queueIngestor.StartReceive(cancellationToken);

Expand Down Expand Up @@ -197,56 +209,74 @@ async Task OnMessage(MessageContext messageContext, CancellationToken cancellati
await taskCompletionSource.Task;
}

async Task Loop()
async Task Loop(CancellationToken cancellationToken)
{
var contexts = new List<MessageContext>(transportSettings.MaxConcurrency.Value);

while (await channel.Reader.WaitToReadAsync())
try
{
// will only enter here if there is something to read.
try
var contexts = new List<MessageContext>(transportSettings.MaxConcurrency.Value);

while (await channel.Reader.WaitToReadAsync(cancellationToken))
{
// as long as there is something to read this will fetch up to MaximumConcurrency items
while (channel.Reader.TryRead(out var context))
// will only enter here if there is something to read.
try
{
contexts.Add(context);
auditMessageSize.Record(context.Body.Length / 1024.0);
// as long as there is something to read this will fetch up to MaximumConcurrency items
while (channel.Reader.TryRead(out var context))
{
contexts.Add(context);
auditMessageSize.Record(context.Body.Length / 1024D);
}

auditBatchSize.Record(contexts.Count);
var sw = Stopwatch.StartNew();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not relevant for this PR but we should be using a ValueStopWatch here

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.


await auditIngestor.Ingest(contexts, cancellationToken);
auditBatchDuration.Record(sw.ElapsedMilliseconds);
}

auditBatchSize.Record(contexts.Count);
var sw = Stopwatch.StartNew();

await auditIngestor.Ingest(contexts);
auditBatchDuration.Record(sw.ElapsedMilliseconds);
}
catch (OperationCanceledException)
{
//Do nothing as we are shutting down
continue;
}
catch (Exception e) // show must go on
{
if (logger.IsInfoEnabled)
catch (OperationCanceledException e) when (e.CancellationToken == cancellationToken)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@andreasohlund please review todays commits seperately and then combined.

{
logger.Info("Ingesting messages failed", e);
logger.Debug("Cancelled by host");
return; // No point in continueing as WaitToReadAsync will throw OCE
}

// signal all message handling tasks to terminate
foreach (var context in contexts)
catch (Exception e) // show must go on
{
context.GetTaskCompletionSource().TrySetException(e);
if (logger.IsInfoEnabled)
{
logger.Info("Ingesting messages failed", e);
}

// signal all message handling tasks to terminate
foreach (var context in contexts)
{
if (!context.GetTaskCompletionSource().TrySetException(e))
{
logger.Error("Loop TrySetException failed");
}
}
}
finally
{
contexts.Clear();
}
}
finally
{
contexts.Clear();
}
// will fall out here when writer is completed
}
catch (OperationCanceledException e) when (e.CancellationToken == cancellationToken)
{
logger.Debug("Cancelled by host");
}
catch (Exception e)
{
// Might the next exception scope throw an exception, consider this fatal as that cannot be an OCE
logger.Fatal("Loop interrupted", e);
applicationLifetime.StopApplication();
throw;
}
// will fall out here when writer is completed
}

TransportInfrastructure transportInfrastructure;
IMessageReceiver queueIngestor;
Task ingestionWorker;

readonly SemaphoreSlim startStopSemaphore = new(1);
readonly string inputEndpoint;
Expand All @@ -262,9 +292,10 @@ async Task Loop()
readonly Histogram<double> auditMessageSize = AuditMetrics.Meter.CreateHistogram<double>($"{AuditMetrics.Prefix}.audit_message_size", unit: "kilobytes");
readonly Counter<long> receivedAudits = AuditMetrics.Meter.CreateCounter<long>($"{AuditMetrics.Prefix}.received_audits");
readonly Watchdog watchdog;
readonly Task ingestionWorker;
readonly IHostApplicationLifetime applicationLifetime;

CancellationTokenSource stopSource;

static readonly ILog logger = LogManager.GetLogger<AuditIngestion>();
}
}
24 changes: 15 additions & 9 deletions src/ServiceControl.Audit/Auditing/AuditIngestor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Infrastructure.Settings;
using Monitoring;
Expand Down Expand Up @@ -41,14 +42,14 @@ ITransportCustomization transportCustomization
);
}

public async Task Ingest(List<MessageContext> contexts)
public async Task Ingest(List<MessageContext> contexts, CancellationToken cancellationToken)
{
if (Log.IsDebugEnabled)
{
Log.Debug($"Ingesting {contexts.Count} message contexts");
}

var stored = await auditPersister.Persist(contexts);
var stored = await auditPersister.Persist(contexts, cancellationToken);

try
{
Expand All @@ -59,7 +60,7 @@ public async Task Ingest(List<MessageContext> contexts)
Log.Debug($"Forwarding {stored.Count} messages");
}

await Forward(stored, logQueueAddress);
await Forward(stored, logQueueAddress, cancellationToken);
if (Log.IsDebugEnabled)
{
Log.Debug("Forwarded messages");
Expand All @@ -68,7 +69,10 @@ public async Task Ingest(List<MessageContext> contexts)

foreach (var context in contexts)
{
context.GetTaskCompletionSource().TrySetResult(true);
if (!context.GetTaskCompletionSource().TrySetResult(true))
{
Log.Warn("TrySetResult failed");
}
}
}
catch (Exception e)
Expand All @@ -83,7 +87,7 @@ public async Task Ingest(List<MessageContext> contexts)
}
}

Task Forward(IReadOnlyCollection<MessageContext> messageContexts, string forwardingAddress)
Task Forward(IReadOnlyCollection<MessageContext> messageContexts, string forwardingAddress, CancellationToken cancellationToken)
{
var transportOperations = new TransportOperation[messageContexts.Count]; //We could allocate based on the actual number of ProcessedMessages but this should be OK
var index = 0;
Expand All @@ -100,7 +104,8 @@ Task Forward(IReadOnlyCollection<MessageContext> messageContexts, string forward
var outgoingMessage = new OutgoingMessage(
messageContext.NativeMessageId,
messageContext.Headers,
messageContext.Body);
messageContext.Body
);

// Forwarded messages should last as long as possible
outgoingMessage.Headers.Remove(Headers.TimeToBeReceived);
Expand All @@ -112,12 +117,13 @@ Task Forward(IReadOnlyCollection<MessageContext> messageContexts, string forward
return anyContext != null
? messageDispatcher.Value.Dispatch(
new TransportOperations(transportOperations),
anyContext.TransportTransaction
anyContext.TransportTransaction,
cancellationToken
)
: Task.CompletedTask;
}

public async Task VerifyCanReachForwardingAddress()
public async Task VerifyCanReachForwardingAddress(CancellationToken cancellationToken)
{
if (!settings.ForwardAuditMessages)
{
Expand All @@ -134,7 +140,7 @@ public async Task VerifyCanReachForwardingAddress()
)
);

await messageDispatcher.Value.Dispatch(transportOperations, new TransportTransaction());
await messageDispatcher.Value.Dispatch(transportOperations, new TransportTransaction(), cancellationToken);
}
catch (Exception e)
{
Expand Down
15 changes: 11 additions & 4 deletions src/ServiceControl.Audit/Auditing/AuditPersister.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
using System.Diagnostics;
using System.Diagnostics.Metrics;
using System.Text.Json;
using System.Threading;
using System.Threading.Tasks;
using Infrastructure;
using Monitoring;
Expand All @@ -23,7 +24,7 @@ class AuditPersister(IAuditIngestionUnitOfWorkFactory unitOfWorkFactory,
IMessageSession messageSession,
Lazy<IMessageDispatcher> messageDispatcher)
{
public async Task<IReadOnlyList<MessageContext>> Persist(IReadOnlyList<MessageContext> contexts)
public async Task<IReadOnlyList<MessageContext>> Persist(IReadOnlyList<MessageContext> contexts, CancellationToken cancellationToken)
{
var stopwatch = Stopwatch.StartNew();

Expand All @@ -37,7 +38,7 @@ public async Task<IReadOnlyList<MessageContext>> Persist(IReadOnlyList<MessageCo
try
{
// deliberately not using the using statement because we dispose async explicitly
unitOfWork = await unitOfWorkFactory.StartNew(contexts.Count);
unitOfWork = await unitOfWorkFactory.StartNew(contexts.Count, cancellationToken);
var inserts = new List<Task>(contexts.Count);
foreach (var context in contexts)
{
Expand Down Expand Up @@ -210,7 +211,10 @@ void ProcessSagaAuditMessage(MessageContext context)
}

// releasing the failed message context early so that they can be retried outside the current batch
context.GetTaskCompletionSource().TrySetException(e);
if (!context.GetTaskCompletionSource().TrySetException(e))
{
Logger.Warn("ProcessSagaAuditMessage TrySetException failed");
}
}
}

Expand Down Expand Up @@ -279,7 +283,10 @@ await messageDispatcher.Value.Dispatch(new TransportOperations(messagesToEmit.To
}

// releasing the failed message context early so that they can be retried outside the current batch
context.GetTaskCompletionSource().TrySetException(e);
if (!context.GetTaskCompletionSource().TrySetException(e))
{
Logger.Warn("ProcessAuditMessage TrySetException failed");
}
}
}

Expand Down
4 changes: 2 additions & 2 deletions src/ServiceControl.Audit/Auditing/ImportFailedAudits.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ public ImportFailedAudits(

public async Task Run(CancellationToken cancellationToken = default)
{
await auditIngestor.VerifyCanReachForwardingAddress();
await auditIngestor.VerifyCanReachForwardingAddress(cancellationToken);

var succeeded = 0;
var failed = 0;
Expand All @@ -37,7 +37,7 @@ await failedAuditStore.ProcessFailedMessages(
var taskCompletionSource = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
messageContext.SetTaskCompletionSource(taskCompletionSource);

await auditIngestor.Ingest([messageContext]);
await auditIngestor.Ingest([messageContext], cancellationToken);

await taskCompletionSource.Task;

Expand Down
2 changes: 1 addition & 1 deletion src/ServiceControl.Infrastructure/LoggingConfigurator.cs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ public static void ConfigureLogging(LoggingSettings loggingSettings)
}

var nlogConfig = new LoggingConfiguration();
var simpleLayout = new SimpleLayout("${longdate}|${threadid}|${level}|${logger}|${message}${onexception:|${exception:format=tostring}}");
var simpleLayout = new SimpleLayout("${longdate}|${processtime}|${threadid:padding=2}|${level:padding=5}|${logger:padding=70}|${message}${onexception:|${exception:format=tostring}}");

var fileTarget = new FileTarget
{
Expand Down
Loading