Skip to content

Commit

Permalink
Consumer API: Correlation IDs are retained across domain event bounda…
Browse files Browse the repository at this point in the history
…ries (#809)

* fix: repair reference errors

* feat: propagate correlation id across events via rabbitmq

* feat: propagate correlation id across events via azure service bus

* feat: propagate correlation id across events via pub sub

* feat: add missing assembly reference

* fix: update object instantiation

* fix: repair failing tests

* fix: repair failing tests

* fix: repair failing licence tests

* fix: missing correlation Ids for Azure EventBus

* fix: EventHandlerService not postinng to seq

* chore: fix formatting

* fix: make CorrelationIds work without http context by introducing CustomLogContext

* chore: remove unused classes

* feat: don't use hyphens in generated correlation ids

* chore: remove leftovers from former solution

* chore: delete weird code from Messages.Infrastructure.csproj

* chore: remove unused nuget package

* feat: add RequestResponseTimeMiddleware, ResponseDurationMiddleware, TraceIdMiddleware and CorrelationIdMiddleware to Admin API

---------

Co-authored-by: mergify[bot] <37929162+mergify[bot]@users.noreply.github.com>
Co-authored-by: Daniel Silva <[email protected]>
Co-authored-by: Timo Notheisen <[email protected]>
  • Loading branch information
4 people authored Aug 30, 2024
1 parent 715afeb commit 7b307dd
Show file tree
Hide file tree
Showing 16 changed files with 178 additions and 107 deletions.
5 changes: 1 addition & 4 deletions .ci/ignoredPackages.json
Original file line number Diff line number Diff line change
@@ -1,4 +1 @@
[
"Serilog.Enrichers.Demystifier",
"SpecFlow.NUnit"
]
["Serilog.Enrichers.Demystifier", "SpecFlow.NUnit"]
6 changes: 6 additions & 0 deletions Applications/AdminApi/src/AdminApi/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
using Backbone.AdminApi.Infrastructure.Persistence;
using Backbone.AdminApi.Infrastructure.Persistence.Database;
using Backbone.BuildingBlocks.API.Extensions;
using Backbone.BuildingBlocks.API.Mvc.Middleware;
using Backbone.BuildingBlocks.API.Serilog;
using Backbone.BuildingBlocks.Application.QuotaCheck;
using Backbone.BuildingBlocks.Infrastructure.Persistence.Database;
Expand Down Expand Up @@ -168,6 +169,11 @@ static void Configure(WebApplication app)

app.UseForwardedHeaders();

app.UseMiddleware<RequestResponseTimeMiddleware>()
.UseMiddleware<ResponseDurationMiddleware>()
.UseMiddleware<TraceIdMiddleware>()
.UseMiddleware<CorrelationIdMiddleware>();

var configuration = app.Services.GetRequiredService<IOptions<AdminConfiguration>>().Value;

app.UseSerilogRequestLogging(opts =>
Expand Down
3 changes: 2 additions & 1 deletion Applications/ConsumerApi/src/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,8 @@ static void Configure(WebApplication app)

app.UseMiddleware<RequestResponseTimeMiddleware>()
.UseMiddleware<ResponseDurationMiddleware>()
.UseMiddleware<TraceIdMiddleware>();
.UseMiddleware<TraceIdMiddleware>()
.UseMiddleware<CorrelationIdMiddleware>();

app.UseSecurityHeaders(policies =>
policies
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,28 +7,29 @@
</PropertyGroup>

<ItemGroup>
<PackageReference Include="Autofac.Extensions.DependencyInjection" Version="9.0.0" />
<PackageReference Include="Serilog.Enrichers.Demystifier" Version="1.0.2" />
<PackageReference Include="Serilog.Enrichers.Sensitive" Version="1.7.3" />
<PackageReference Include="Serilog.Exceptions" Version="8.4.0" />
<PackageReference Include="Serilog.Exceptions.EntityFrameworkCore" Version="8.4.0" />
<PackageReference Include="Autofac.Extensions.DependencyInjection" Version="9.0.0"/>
<PackageReference Include="Serilog.Enrichers.Demystifier" Version="1.0.2"/>
<PackageReference Include="Serilog.Enrichers.Sensitive" Version="1.7.3"/>
<PackageReference Include="Serilog.Exceptions" Version="8.4.0"/>
<PackageReference Include="Serilog.Exceptions.EntityFrameworkCore" Version="8.4.0"/>
<PackageReference Include="Serilog.Sinks.Seq" Version="8.0.0"/>
</ItemGroup>

<ItemGroup>
<ProjectReference Include="..\..\..\..\BuildingBlocks\src\BuildingBlocks.Infrastructure\BuildingBlocks.Infrastructure.csproj" />
<ProjectReference Include="..\..\..\..\Infrastructure\Infrastructure.csproj" />
<ProjectReference Include="..\..\..\..\Modules\Challenges\src\Challenges.ConsumerApi\Challenges.ConsumerApi.csproj" />
<ProjectReference Include="..\..\..\..\Modules\Devices\src\Devices.ConsumerApi\Devices.ConsumerApi.csproj" />
<ProjectReference Include="..\..\..\..\Modules\Files\src\Files.ConsumerApi\Files.ConsumerApi.csproj" />
<ProjectReference Include="..\..\..\..\Modules\Messages\src\Messages.ConsumerApi\Messages.ConsumerApi.csproj" />
<ProjectReference Include="..\..\..\..\Modules\Quotas\src\Quotas.ConsumerApi\Quotas.ConsumerApi.csproj" />
<ProjectReference Include="..\..\..\..\Modules\Relationships\src\Relationships.ConsumerApi\Relationships.ConsumerApi.csproj" />
<ProjectReference Include="..\..\..\..\Modules\Synchronization\src\Synchronization.ConsumerApi\Synchronization.ConsumerApi.csproj" />
<ProjectReference Include="..\..\..\..\Modules\Tokens\src\Tokens.ConsumerApi\Tokens.ConsumerApi.csproj" />
<ProjectReference Include="..\..\..\..\BuildingBlocks\src\BuildingBlocks.Infrastructure\BuildingBlocks.Infrastructure.csproj"/>
<ProjectReference Include="..\..\..\..\Infrastructure\Infrastructure.csproj"/>
<ProjectReference Include="..\..\..\..\Modules\Challenges\src\Challenges.ConsumerApi\Challenges.ConsumerApi.csproj"/>
<ProjectReference Include="..\..\..\..\Modules\Devices\src\Devices.ConsumerApi\Devices.ConsumerApi.csproj"/>
<ProjectReference Include="..\..\..\..\Modules\Files\src\Files.ConsumerApi\Files.ConsumerApi.csproj"/>
<ProjectReference Include="..\..\..\..\Modules\Messages\src\Messages.ConsumerApi\Messages.ConsumerApi.csproj"/>
<ProjectReference Include="..\..\..\..\Modules\Quotas\src\Quotas.ConsumerApi\Quotas.ConsumerApi.csproj"/>
<ProjectReference Include="..\..\..\..\Modules\Relationships\src\Relationships.ConsumerApi\Relationships.ConsumerApi.csproj"/>
<ProjectReference Include="..\..\..\..\Modules\Synchronization\src\Synchronization.ConsumerApi\Synchronization.ConsumerApi.csproj"/>
<ProjectReference Include="..\..\..\..\Modules\Tokens\src\Tokens.ConsumerApi\Tokens.ConsumerApi.csproj"/>
</ItemGroup>

<Target Name="PreBuild" BeforeTargets="Build" Condition="$(Configuration) == Debug">
<Delete Files="$(ProjectDir)appsettings.override.json" />
<Copy SourceFiles="..\..\..\..\appsettings.override.json" DestinationFolder="$(ProjectDir)" UseHardlinksIfPossible="true" />
<Delete Files="$(ProjectDir)appsettings.override.json"/>
<Copy SourceFiles="..\..\..\..\appsettings.override.json" DestinationFolder="$(ProjectDir)" UseHardlinksIfPossible="true"/>
</Target>
</Project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
using Backbone.BuildingBlocks.Infrastructure.CorrelationIds;
using Microsoft.AspNetCore.Http;

namespace Backbone.BuildingBlocks.API.Mvc.Middleware;

public class CorrelationIdMiddleware
{
private readonly RequestDelegate _next;

public CorrelationIdMiddleware(RequestDelegate next)
{
_next = next;
}

public async Task InvokeAsync(HttpContext context)
{
var correlationId = context.Request.Headers["X-Correlation-ID"].FirstOrDefault();

if (string.IsNullOrEmpty(correlationId))
{
correlationId = CustomLogContext.GenerateCorrelationId();
}

context.Response.Headers["X-Correlation-ID"] = correlationId;

using (CustomLogContext.SetCorrelationId(correlationId))
{
await _next(context);
}
}
}
Original file line number Diff line number Diff line change
@@ -1,27 +1,27 @@
<Project Sdk="Microsoft.NET.Sdk">
<Project Sdk="Microsoft.NET.Sdk">

<ItemGroup>
<PackageReference Include="Autofac" Version="8.0.0" />
<PackageReference Include="Azure.Messaging.ServiceBus" Version="7.18.1" />
<PackageReference Include="Autofac" Version="8.0.0"/>
<PackageReference Include="Azure.Messaging.ServiceBus" Version="7.18.1"/>
<PackageReference Include="Azure.Storage.Blobs" Version="12.21.2"/>
<PackageReference Include="Google.Cloud.PubSub.V1" Version="3.16.0"/>
<PackageReference Include="Google.Cloud.Storage.V1" Version="4.10.0"/>
<PackageReference Include="Microsoft.EntityFrameworkCore" Version="8.0.8"/>
<PackageReference Include="Microsoft.EntityFrameworkCore.Abstractions" Version="8.0.8"/>
<PackageReference Include="Microsoft.EntityFrameworkCore.Relational" Version="8.0.8"/>
<PackageReference Include="Microsoft.EntityFrameworkCore.SqlServer" Version="8.0.8"/>
<PackageReference Include="Microsoft.Extensions.DependencyInjection" Version="8.0.0" />
<PackageReference Include="Microsoft.Extensions.Identity.Core" Version="8.0.8" />
<PackageReference Include="Newtonsoft.Json" Version="13.0.3" />
<PackageReference Include="Microsoft.Extensions.Logging" Version="8.0.0" />
<PackageReference Include="Npgsql.EntityFrameworkCore.PostgreSQL" Version="8.0.4" />
<PackageReference Include="Polly" Version="8.4.1" />
<PackageReference Include="RabbitMQ.Client" Version="6.8.1" />
<PackageReference Include="System.Interactive.Async" Version="6.0.1" />
<PackageReference Include="Microsoft.Extensions.DependencyInjection" Version="8.0.0"/>
<PackageReference Include="Microsoft.Extensions.Identity.Core" Version="8.0.8"/>
<PackageReference Include="Newtonsoft.Json" Version="13.0.3"/>
<PackageReference Include="Microsoft.Extensions.Logging" Version="8.0.0"/>
<PackageReference Include="Npgsql.EntityFrameworkCore.PostgreSQL" Version="8.0.4"/>
<PackageReference Include="Polly" Version="8.4.1"/>
<PackageReference Include="RabbitMQ.Client" Version="6.8.1"/>
<PackageReference Include="Serilog" Version="4.0.1"/>
<PackageReference Include="System.Interactive.Async" Version="6.0.1"/>
</ItemGroup>

<ItemGroup>
<ProjectReference Include="..\BuildingBlocks.Application.Abstractions\BuildingBlocks.Application.Abstractions.csproj" />
<ProjectReference Include="..\BuildingBlocks.Application.Abstractions\BuildingBlocks.Application.Abstractions.csproj"/>
</ItemGroup>

</Project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
using Serilog.Context;

namespace Backbone.BuildingBlocks.Infrastructure.CorrelationIds;

public static class CustomLogContext
{
private static readonly AsyncLocal<string> CORRELATION_ID = new();

public static IDisposable SetCorrelationId(string correlationId)
{
CORRELATION_ID.Value = correlationId;
return LogContext.PushProperty("CorrelationId", correlationId);
}

public static string GetCorrelationId()
{
return CORRELATION_ID.Value ?? "";
}

public static string GenerateCorrelationId()
{
return Guid.NewGuid().ToString("N");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,10 @@
using Azure.Messaging.ServiceBus.Administration;
using Backbone.BuildingBlocks.Application.Abstractions.Infrastructure.EventBus;
using Backbone.BuildingBlocks.Domain.Events;
using Backbone.BuildingBlocks.Infrastructure.CorrelationIds;
using Backbone.BuildingBlocks.Infrastructure.EventBus.Json;
using Microsoft.Azure.Amqp;
using Microsoft.Extensions.Logging;
using Microsoft.IdentityModel.Tokens;
using Newtonsoft.Json;

namespace Backbone.BuildingBlocks.Infrastructure.EventBus.AzureServiceBus;
Expand All @@ -27,8 +28,7 @@ public class EventBusAzureServiceBus : IEventBus, IDisposable

public EventBusAzureServiceBus(IServiceBusPersisterConnection serviceBusPersisterConnection,
ILogger<EventBusAzureServiceBus> logger, IEventBusSubscriptionsManager subscriptionManager,
ILifetimeScope autofac,
HandlerRetryBehavior handlerRetryBehavior,
ILifetimeScope autofac, HandlerRetryBehavior handlerRetryBehavior,
string subscriptionClientName)
{
_serviceBusPersisterConnection = serviceBusPersisterConnection;
Expand All @@ -38,9 +38,7 @@ public EventBusAzureServiceBus(IServiceBusPersisterConnection serviceBusPersiste
_subscriptionName = subscriptionClientName;
_sender = _serviceBusPersisterConnection.TopicClient.CreateSender(TOPIC_NAME);
var options = new ServiceBusProcessorOptions { MaxConcurrentCalls = 10, AutoCompleteMessages = false };
_processor =
_serviceBusPersisterConnection.TopicClient.CreateProcessor(TOPIC_NAME, _subscriptionName, options);

_processor = _serviceBusPersisterConnection.TopicClient.CreateProcessor(TOPIC_NAME, _subscriptionName, options);
_handlerRetryBehavior = handlerRetryBehavior;
}

Expand All @@ -63,7 +61,8 @@ public async void Publish(DomainEvent @event)
{
MessageId = @event.DomainEventId,
Body = new BinaryData(body),
Subject = eventName
Subject = eventName,
CorrelationId = CustomLogContext.GetCorrelationId()
};

_logger.SendingDomainEvent(message.MessageId);
Expand Down Expand Up @@ -117,12 +116,18 @@ private async Task RegisterSubscriptionClientMessageHandlerAsync()
{
var eventName = $"{args.Message.Subject}{DOMAIN_EVENT_SUFFIX}";
var messageData = args.Message.Body.ToString();
var correlationId = args.Message.CorrelationId;

correlationId = correlationId.IsNullOrEmpty() ? CustomLogContext.GenerateCorrelationId() : correlationId;

// Complete the message so that it is not received again.
if (await ProcessEvent(eventName, messageData))
await args.CompleteMessageAsync(args.Message);
else
_logger.EventWasNotProcessed(args.Message.MessageId);
using (CustomLogContext.SetCorrelationId(correlationId))
{
// Complete the message so that it is not received again.
if (await ProcessEvent(eventName, messageData))
await args.CompleteMessageAsync(args.Message);
else
_logger.EventWasNotProcessed(args.Message.MessageId);
}
};

_processor.ProcessErrorAsync += ErrorHandler;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@
using Autofac;
using Backbone.BuildingBlocks.Application.Abstractions.Infrastructure.EventBus;
using Backbone.BuildingBlocks.Domain.Events;
using Backbone.BuildingBlocks.Infrastructure.CorrelationIds;
using Backbone.BuildingBlocks.Infrastructure.EventBus.Json;
using Backbone.Tooling.Extensions;
using Google.Cloud.PubSub.V1;
using Google.Protobuf;
using Microsoft.Extensions.Logging;
Expand All @@ -15,6 +17,7 @@ public class EventBusGoogleCloudPubSub : IEventBus, IDisposable
private static class PubSubMessageAttributes
{
public const string EVENT_NAME = "Subject";
public const string CORRELATION_ID = "CorrelationId";
}

private const string DOMAIN_EVENT_SUFFIX = "DomainEvent";
Expand Down Expand Up @@ -58,7 +61,8 @@ public async void Publish(DomainEvent @event)
Data = ByteString.CopyFromUtf8(jsonMessage),
Attributes =
{
{ PubSubMessageAttributes.EVENT_NAME, eventName }
{ PubSubMessageAttributes.EVENT_NAME, eventName },
{ PubSubMessageAttributes.CORRELATION_ID, CustomLogContext.GetCorrelationId() }
}
};

Expand Down Expand Up @@ -91,13 +95,19 @@ private static string RemoveDomainEventSuffix(string typeName)

private async Task<SubscriberClient.Reply> OnIncomingEvent(PubsubMessage @event, CancellationToken _)
{
var eventNameFromAttributes =
$"{@event.Attributes[PubSubMessageAttributes.EVENT_NAME]}{DOMAIN_EVENT_SUFFIX}";
var eventNameFromAttributes = $"{@event.Attributes[PubSubMessageAttributes.EVENT_NAME]}{DOMAIN_EVENT_SUFFIX}";
var eventData = @event.Data.ToStringUtf8();

try
{
await ProcessEvent(eventNameFromAttributes, eventData);
@event.Attributes.TryGetValue(PubSubMessageAttributes.CORRELATION_ID, out var correlationId);

correlationId = correlationId.IsNullOrEmpty() ? CustomLogContext.GenerateCorrelationId() : correlationId;

using (CustomLogContext.SetCorrelationId(correlationId))
{
await ProcessEvent(eventNameFromAttributes, eventData);
}
}
catch (Exception ex)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
using Autofac;
using Backbone.BuildingBlocks.Application.Abstractions.Infrastructure.EventBus;
using Backbone.BuildingBlocks.Domain.Events;
using Backbone.BuildingBlocks.Infrastructure.CorrelationIds;
using Backbone.BuildingBlocks.Infrastructure.EventBus.Json;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json;
Expand Down Expand Up @@ -96,6 +97,8 @@ public void Publish(DomainEvent @event)
properties.DeliveryMode = 2; // persistent
properties.MessageId = @event.DomainEventId;

properties.CorrelationId = CustomLogContext.GetCorrelationId();

channel.BasicPublish(BROKER_NAME,
eventName,
true,
Expand Down Expand Up @@ -161,11 +164,18 @@ private IModel CreateConsumerChannel()
{
var eventName = eventArgs.RoutingKey;
var message = Encoding.UTF8.GetString(eventArgs.Body.ToArray());

try
{
await ProcessEvent(eventName, message);
var correlationId = eventArgs.BasicProperties.CorrelationId;
correlationId = string.IsNullOrEmpty(correlationId) ? Guid.NewGuid().ToString() : correlationId;

using (CustomLogContext.SetCorrelationId(correlationId))
{
await ProcessEvent(eventName, message);

channel.BasicAck(eventArgs.DeliveryTag, false);
channel.BasicAck(eventArgs.DeliveryTag, false);
}
}
catch (Exception ex)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,8 @@ public static void AddRabbitMq(this IServiceCollection services, Action<RabbitMq
var logger = sp.GetRequiredService<ILogger<EventBusRabbitMq>>();
var eventBusSubscriptionsManager = sp.GetRequiredService<IEventBusSubscriptionsManager>();

return new EventBusRabbitMq(rabbitMqPersistentConnection, logger, iLifetimeScope,
eventBusSubscriptionsManager, options.HandlerRetryBehavior, subscriptionClientName, options.ConnectionRetryCount);
return new EventBusRabbitMq(rabbitMqPersistentConnection, logger, iLifetimeScope, eventBusSubscriptionsManager,
options.HandlerRetryBehavior, subscriptionClientName, options.ConnectionRetryCount);
});
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<IsPackable>false</IsPackable>
</PropertyGroup>
<PropertyGroup>
<TargetFramework>net8.0</TargetFramework>
<IsPackable>false</IsPackable>
</PropertyGroup>

<ItemGroup>
<PackageReference Include="Autofac" Version="8.0.0" />
<PackageReference Include="Autofac.Extensions.DependencyInjection" Version="9.0.0" />
<PackageReference Include="Divergic.Logging.Xunit" Version="4.3.1" />
<PackageReference Include="FluentAssertions" Version="6.12.0" />
<ItemGroup>
<PackageReference Include="Autofac" Version="8.0.0" />
<PackageReference Include="Autofac.Extensions.DependencyInjection" Version="9.0.0" />
<PackageReference Include="Divergic.Logging.Xunit" Version="4.3.1" />
<PackageReference Include="FluentAssertions" Version="6.12.0" />
<PackageReference Include="GitHubActionsTestLogger" Version="2.4.1">
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
<PrivateAssets>all</PrivateAssets>
Expand All @@ -17,14 +18,18 @@
<PackageReference Include="Polly" Version="8.4.1" />
<PackageReference Include="xunit" Version="2.9.0" />
<PackageReference Include="xunit.runner.visualstudio" Version="2.8.2">
<PrivateAssets>all</PrivateAssets>
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
</PackageReference>
</ItemGroup>
<PrivateAssets>all</PrivateAssets>
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
</PackageReference>
</ItemGroup>

<ItemGroup>
<FrameworkReference Include="Microsoft.AspNetCore.App" />
</ItemGroup>

<ItemGroup>
<ProjectReference Include="..\..\src\BuildingBlocks.Infrastructure\BuildingBlocks.Infrastructure.csproj" />
<ProjectReference Include="..\..\src\UnitTestTools\UnitTestTools.csproj" />
</ItemGroup>
<ItemGroup>
<ProjectReference Include="..\..\src\BuildingBlocks.Infrastructure\BuildingBlocks.Infrastructure.csproj" />
<ProjectReference Include="..\..\src\UnitTestTools\UnitTestTools.csproj" />
</ItemGroup>

</Project>
Loading

0 comments on commit 7b307dd

Please sign in to comment.