Skip to content

Commit

Permalink
Addition of extra comments to help explain the service; refactoring t…
Browse files Browse the repository at this point in the history
…o separate implementations
  • Loading branch information
TheCakeMonster committed Apr 20, 2022
1 parent 9cc2d69 commit 0bf5216
Show file tree
Hide file tree
Showing 14 changed files with 191 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,12 @@ public Task StartMessageReceiptAsync(string subscriptionName, CancellationToken
return Task.CompletedTask;
}

/// <summary>
/// Entry point for background execution; creates a consumer and then
/// waits for messages to handle, processing them as they arrive
/// </summary>
/// <param name="topics">The topics to which to subscribe</param>
/// <param name="cancellationToken">The token used to manage the lifetime of the background service</param>
private async Task ReceiveMessagesAsync(string topics, CancellationToken cancellationToken)
{
using IConsumer<TKey, TValue>? consumer = await CreateConsumer(topics, cancellationToken);
Expand All @@ -76,6 +82,11 @@ private async Task ReceiveMessagesAsync(string topics, CancellationToken cancell

}

/// <summary>
/// Create a consumer, with exception handling and basic retries
/// </summary>
/// <param name="topics">The topics to which to subscribe</param>
/// <param name="cancellationToken">The token used to manage the lifetime of the background service</param>
private async Task<IConsumer<TKey, TValue>?> CreateConsumer(string topics, CancellationToken cancellationToken)
{
IConsumer<TKey, TValue>? consumer = null;
Expand Down
4 changes: 1 addition & 3 deletions source/OrderService/Messaging/IMessagePublisher.cs
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@

using SummarisationSample.OrderService.Library;

namespace SummarisationSample.OrderService.Service.Messaging
namespace SummarisationSample.OrderService.Messaging
{
public interface IMessagePublisher<TKey, TValue>
{
Expand Down
2 changes: 1 addition & 1 deletion source/OrderService/Messaging/MessageQueue.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
using System.Text;
using System.Threading.Tasks;

namespace SummarisationSample.OrderService.Service.Messaging
namespace SummarisationSample.OrderService.Messaging
{
public class MessageQueue<TKey, TValue> : IMessageQueue<TKey, TValue>
{
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<TargetFramework>net6.0</TargetFramework>
<ImplicitUsings>enable</ImplicitUsings>
<Nullable>enable</Nullable>
</PropertyGroup>

<ItemGroup>
<ProjectReference Include="..\Library\SummarisationSample.OrderService.Library.csproj" />
</ItemGroup>

</Project>
Original file line number Diff line number Diff line change
@@ -1,11 +1,17 @@
using Confluent.Kafka;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.Logging;
using SummarisationSample.OrderService.Library;
using SummarisationSample.OrderService.Library.DataContracts;
using System.Collections.Concurrent;
using System.Net;

namespace SummarisationSample.OrderService.Service.Messaging
namespace SummarisationSample.OrderService.Messaging.Kafka
{
/// <summary>
/// Kafka implementation of a message publisher
/// </summary>
/// <typeparam name="TKey">The type of key in use</typeparam>
/// <typeparam name="TValue">The type of message being published</typeparam>
public class KafkaMessagePublisher<TKey, TValue> : IMessagePublisher<TKey, TValue>
{
private readonly string _busConnectionString;
Expand All @@ -32,26 +38,32 @@ public KafkaMessagePublisher(IMessageQueue<TKey, TValue> messageQueue, IConfigur
/// <param name="cancellationToken">The cancellation token used to manage execution</param>
public Task StartPublishingAsync(string topic, CancellationToken cancellationToken)
{
TaskFactory taskFactory = new TaskFactory();
TaskCreationOptions creationOptions = TaskCreationOptions.LongRunning | TaskCreationOptions.DenyChildAttach;

_publishingTask = taskFactory.StartNew(async () => await DoPublishingAsync(topic, cancellationToken), creationOptions);
_publishingTask = Task.Run(async () => await DoPublishingAsync(topic, cancellationToken));

return Task.CompletedTask;
}

#region Private Helper Methods

/// <summary>
/// Start of background publishing; create a producer and then sit waiting for
/// items in the queue that can be published
/// </summary>
/// <param name="topic">The topic to be used for publishing</param>
/// <param name="cancellationToken">The token used to manage the lifetime of the background service</param>
private async Task DoPublishingAsync(string topic, CancellationToken cancellationToken)
{
await Task.Yield();

using IProducer<TKey, TValue>? producer = await CreateProducerAsync(cancellationToken);
if (producer is null) return;

await PublishMessagesAsync(producer, topic, cancellationToken);
}

/// <summary>
/// Create a producer of messages, with exception handling and basic retries
/// </summary>
/// <param name="cancellationToken">The token used to manage the lifetime of the operation</param>
/// <returns>An instance of a producer that can produce messages</returns>
private async Task<IProducer<TKey, TValue>?> CreateProducerAsync(CancellationToken cancellationToken)
{
IProducer<TKey, TValue>? producer = null;
Expand All @@ -76,13 +88,19 @@ private async Task DoPublishingAsync(string topic, CancellationToken cancellatio
catch (Exception ex)
{
_logger.LogError(ex, "Failed to connect to publisher!");
await Task.Delay(1000);
await Task.Delay(1000, cancellationToken);
}
}

return producer;
}

/// <summary>
/// Long-running loop that checks for messages in the outbound queue and publishes them
/// </summary>
/// <param name="producer">The producer to use for publishing messages</param>
/// <param name="topic">The topic to which to assign the messages</param>
/// <param name="cancellationToken">The token used to manage the lifetime of the background operation</param>
private async Task PublishMessagesAsync(IProducer<TKey, TValue> producer, string topic, CancellationToken cancellationToken)
{
QueueItem<TKey, TValue>? queueItem;
Expand All @@ -93,7 +111,7 @@ private async Task PublishMessagesAsync(IProducer<TKey, TValue> producer, string
{
if (!_messageQueue.TryPeek(out queueItem))
{
await Task.Delay(25);
await Task.Delay(25000, cancellationToken);
continue;
}

Expand All @@ -111,6 +129,14 @@ private async Task PublishMessagesAsync(IProducer<TKey, TValue> producer, string

}

/// <summary>
/// Attempt to publish a single message and inform consumers of the outcome
/// </summary>
/// <param name="producer">The producer through which to attempt to publish the message</param>
/// <param name="topic">The topic to which to assign the message</param>
/// <param name="queueItem">The item from the queue that is to be published</param>
/// <param name="cancellationToken">The token used to manage the lifetime of the background operation</param>
/// <returns>The delivery result from the producer</returns>
private async Task<DeliveryResult<TKey, TValue>?> PublishMessageAsync(
IProducer<TKey, TValue> producer, string topic,
QueueItem<TKey, TValue> queueItem, CancellationToken cancellationToken)
Expand All @@ -131,13 +157,17 @@ private async Task PublishMessagesAsync(IProducer<TKey, TValue> producer, string
{
_logger.LogError(ex, "Error while publishing message!");
await OnPublishingFailureAsync(queueItem);
await Task.Delay(retryDelay);
await Task.Delay(retryDelay, cancellationToken);
if (retryDelay < 10000) retryDelay *= 2;
}

return result;
}

/// <summary>
/// Raise the MessagePublished event to consumers, handling any exceptions they might throw
/// </summary>
/// <param name="message">The queue item that was published</param>
private async Task OnPublishingSuccessAsync(QueueItem<TKey, TValue> message)
{
if (MessagePublished is not null)
Expand All @@ -153,6 +183,10 @@ private async Task OnPublishingSuccessAsync(QueueItem<TKey, TValue> message)
}
}

/// <summary>
/// Raise the MessagePublishingFailure event to consumers, handling any exceptions they might throw
/// </summary>
/// <param name="message">The queue item that failed to be published</param>
private async Task OnPublishingFailureAsync(QueueItem<TKey, TValue>? message)
{
if (message is not null && MessagePublishingFailure is not null)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
using System.Text;
using System.Text.Json;

namespace SummarisationSample.OrderService.Service.Messaging
namespace SummarisationSample.OrderService.Messaging.Kafka
{
public class MessageSerialiser<TValue> : ISerializer<TValue>
{
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
using SummarisationSample.OrderService.Messaging.Kafka;
using SummarisationSample.OrderService.Messaging;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;

namespace Microsoft.Extensions.DependencyInjection
{
public static class ServiceCollectionExtensions
{
/// <summary>
/// Do all of the service registration required to use Kafka
/// </summary>
/// <param name="services">The instance of IServiceCollection that is being extended</param>
/// <returns>The IServiceCollection extended, to support method chaining</returns>
public static IServiceCollection AddKafkaMessaging(this IServiceCollection services)
{
services.AddSingleton(typeof(IMessagePublisher<,>), typeof(KafkaMessagePublisher<,>));
return services;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<TargetFramework>net6.0</TargetFramework>
<ImplicitUsings>enable</ImplicitUsings>
<Nullable>enable</Nullable>
</PropertyGroup>

<ItemGroup>
<PackageReference Include="Confluent.Kafka" Version="1.8.2" />
<PackageReference Include="Microsoft.Extensions.Configuration.Abstractions" Version="6.0.0" />
<PackageReference Include="Microsoft.Extensions.Logging.Abstractions" Version="6.0.1" />
</ItemGroup>

<ItemGroup>
<ProjectReference Include="..\..\Library\SummarisationSample.OrderService.Library.csproj" />
<ProjectReference Include="..\..\Messaging\SummarisationSample.OrderService.Messaging.csproj" />
</ItemGroup>
</Project>
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
namespace SummarisationSample.OrderService.Service.Contracts
{
internal class PublishedActivityMessage
public class PublishedActivityMessage
{
public string MessageRef { get; set; } = string.Empty;

Expand Down
3 changes: 3 additions & 0 deletions source/OrderService/Service/Controllers/OrdersController.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@

namespace SummarisationSample.OrderService.Service.Controllers
{
/// <summary>
/// Public API exposed to consumers who are interested in orders
/// </summary>
[ApiController]
[Route("[controller]")]
public class OrdersController : ControllerBase
Expand Down
75 changes: 56 additions & 19 deletions source/OrderService/Service/Messaging/MessagePublishingService.cs
Original file line number Diff line number Diff line change
@@ -1,37 +1,46 @@
using SummarisationSample.OrderService.Library;
using SummarisationSample.OrderService.Library.DataContracts;
using SummarisationSample.OrderService.Messaging;
using SummarisationSample.OrderService.Service.Contracts;

namespace SummarisationSample.OrderService.Service.Messaging
{
/// <summary>
/// Background service that is responsible for publishing messages
/// </summary>
public class MessagePublishingService : BackgroundService
{
private readonly IServiceScope _scope;
private readonly IServiceScopeFactory _scopeFactory;
private readonly ILogger _logger;
private readonly IOrderRepository _orderRepository;
private readonly IMessageQueue<string, ActivityMessage> _sourceQueue;
private readonly IMessageQueue<string, PublishedActivityMessage> _publishingQueue;
private readonly IMessagePublisher<string, PublishedActivityMessage> _publisher;

public MessagePublishingService(IServiceScopeFactory serviceScopeFactory)
public MessagePublishingService(IServiceScopeFactory serviceScopeFactory,
IMessageQueue<string, ActivityMessage> sourceQueue,
IMessageQueue<string, PublishedActivityMessage> publishingQueue,
IMessagePublisher<string, PublishedActivityMessage> publisher,
ILogger<MessagePublishingService> logger)
{
_scope = serviceScopeFactory.CreateScope();
_logger = _scope.ServiceProvider.GetRequiredService<ILogger<MessagePublishingService>>();
_orderRepository = _scope.ServiceProvider.GetRequiredService<IOrderRepository>();
_sourceQueue = _scope.ServiceProvider.GetRequiredService<IMessageQueue<string, ActivityMessage>>();
_scopeFactory = serviceScopeFactory;
_logger = logger;
_sourceQueue = sourceQueue;
_sourceQueue.Enqueued += Source_OnEnqueued;
_publishingQueue = _scope.ServiceProvider.GetRequiredService<IMessageQueue<string, PublishedActivityMessage>>();
_publishingQueue = publishingQueue;
_publisher = publisher;
_publisher.MessagePublished += Publisher_OnMessagePublished;
_publisher.MessagePublishingFailure += Publisher_OnMessagePublishingFailure;
}

/// <summary>
/// Entry point of the background service, which initiates the work to be done
/// </summary>
/// <param name="stoppingToken">The token used to control the lifetime of the background service</param>
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
IMessagePublisher<string, PublishedActivityMessage> publisher;

publisher = _scope.ServiceProvider.GetRequiredService<IMessagePublisher<string, PublishedActivityMessage>>();
publisher.MessagePublished += Publisher_OnMessagePublished;
publisher.MessagePublishingFailure += Publisher_OnMessagePublishingFailure;
await EnqueueUnpublishedMessagesAsync(_publishingQueue, _orderRepository);
await EnqueueUnpublishedMessagesAsync(_publishingQueue);

await publisher.StartPublishingAsync("summarisation.order", stoppingToken);
await _publisher.StartPublishingAsync("summarisation.order", stoppingToken);
}

#region Private Helper Methods
Expand All @@ -42,9 +51,13 @@ protected override async Task ExecuteAsync(CancellationToken stoppingToken)
/// <param name="messagePublisher">The publisher of messages</param>
/// <param name="orderRepository">The repository from which unpublished messages can be retrieved</param>
/// <returns></returns>
private async Task EnqueueUnpublishedMessagesAsync(IMessageQueue<string, PublishedActivityMessage> messagePublisher, IOrderRepository orderRepository)
private async Task EnqueueUnpublishedMessagesAsync(IMessageQueue<string, PublishedActivityMessage> messagePublisher)
{
IList<Library.ActivityMessage> unpublishedMessages;
IOrderRepository orderRepository;

using IServiceScope serviceScope = _scopeFactory.CreateScope();
orderRepository = serviceScope.ServiceProvider.GetRequiredService<IOrderRepository>();

unpublishedMessages = await orderRepository.GetUnpublishedActivityMessagesAsync();
foreach (Library.ActivityMessage unpublishedMessage in unpublishedMessages)
Expand All @@ -53,25 +66,49 @@ private async Task EnqueueUnpublishedMessagesAsync(IMessageQueue<string, Publish
}
}

/// <summary>
/// Handler for the Enqueued event in the source message queue
/// Translates items from the source and puts them in the publishing queue
/// </summary>
private void Source_OnEnqueued()
{
QueueItem<string, ActivityMessage>? queueItem;

while (_sourceQueue.TryDequeue(out queueItem))
{
if (queueItem is null) continue;
if (queueItem is null) break;
_publishingQueue.Enqueue(queueItem.Key, queueItem.Value.ToPublishedActivityMessage());
}
}

/// <summary>
/// Handler for the publisher's MessagePublished event, using the repository to
/// mark an activity message as having been published
/// </summary>
/// <param name="key">The key of the message published</param>
/// <param name="message">The message that was published</param>
private async Task Publisher_OnMessagePublished(string key, PublishedActivityMessage message)
{
await _orderRepository.MarkActivityMessagePublishedAsync(message.MessageRef);
IOrderRepository orderRepository;

using IServiceScope serviceScope = _scopeFactory.CreateScope();
orderRepository = serviceScope.ServiceProvider.GetRequiredService<IOrderRepository>();
await orderRepository.MarkActivityMessagePublishedAsync(message.MessageRef);
}

/// <summary>
/// Handler for the publisher's MessagePublishingFailure event, using the repository to
/// mark an activity message as having failed publishing (to control poison messages)
/// </summary>
/// <param name="key">The key of the message that failed to publish</param>
/// <param name="message">The message that failed to publish</param>
private async Task Publisher_OnMessagePublishingFailure(string key, PublishedActivityMessage message)
{
await _orderRepository.RecordActivityMessagePublishingFailureAsync(message.MessageRef);
IOrderRepository orderRepository;

using IServiceScope serviceScope = _scopeFactory.CreateScope();
orderRepository = serviceScope.ServiceProvider.GetRequiredService<IOrderRepository>();
await orderRepository.RecordActivityMessagePublishingFailureAsync(message.MessageRef);
}

#endregion
Expand Down
Loading

0 comments on commit 0bf5216

Please sign in to comment.