Skip to content

Commit

Permalink
Extracted config for better reusability. Reduced wait between queue c…
Browse files Browse the repository at this point in the history
…hecks
  • Loading branch information
TheCakeMonster committed Apr 20, 2022
1 parent 0bf5216 commit baffdff
Show file tree
Hide file tree
Showing 7 changed files with 88 additions and 6 deletions.
16 changes: 16 additions & 0 deletions source/OrderService/Messaging/IPublicationConfiguration.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;

namespace SummarisationSample.OrderService.Messaging
{

/// <summary>
/// Marker interface for classes implementing publication configuration
/// </summary>
public interface IPublicationConfiguration
{
}
}
13 changes: 13 additions & 0 deletions source/OrderService/Messaging/IPublicationFactory.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;

namespace SummarisationSample.OrderService.Messaging
{
public interface IPublicationFactory
{
IPublicationConfiguration GetPublicationConfiguration(string publicationName);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,15 @@ public class KafkaMessagePublisher<TKey, TValue> : IMessagePublisher<TKey, TValu
private readonly string _busConnectionString;
private readonly ILogger _logger;
private readonly IMessageQueue<TKey, TValue> _messageQueue;
private readonly IPublicationFactory _publicationFactory;
private Task? _publishingTask;
private int retryDelay = 50;

public KafkaMessagePublisher(IMessageQueue<TKey, TValue> messageQueue, IConfiguration configuration, ILogger<KafkaMessagePublisher<TKey, TValue>> logger)
public KafkaMessagePublisher(IMessageQueue<TKey, TValue> messageQueue, IPublicationFactory publicationFactory,
IConfiguration configuration, ILogger<KafkaMessagePublisher<TKey, TValue>> logger)
{
_messageQueue = messageQueue;
_publicationFactory = publicationFactory;
_busConnectionString = configuration.GetConnectionString("MessageBus");
_logger = logger;
}
Expand All @@ -34,11 +37,19 @@ public KafkaMessagePublisher(IMessageQueue<TKey, TValue> messageQueue, IConfigur
/// <summary>
/// Entrypoint for the long-running operation of publishing messages
/// </summary>
/// <param name="topic">The topic to which messages are being published</param>
/// <param name="publicationName">The name of the publication to which messages are being published</param>
/// <param name="cancellationToken">The cancellation token used to manage execution</param>
public Task StartPublishingAsync(string topic, CancellationToken cancellationToken)
public Task StartPublishingAsync(string publicationName, CancellationToken cancellationToken)
{
_publishingTask = Task.Run(async () => await DoPublishingAsync(topic, cancellationToken));
IPublicationConfiguration untypedConfig;
KafkaPublicationConfiguration? publicationConfig;

untypedConfig = _publicationFactory.GetPublicationConfiguration(publicationName);
if (untypedConfig is null) throw new ArgumentException(nameof(publicationName));
publicationConfig = untypedConfig as KafkaPublicationConfiguration;
if (publicationConfig is null) throw new ArgumentException(nameof(untypedConfig));

_publishingTask = Task.Run(async () => await DoPublishingAsync(publicationConfig.Topic, cancellationToken));

return Task.CompletedTask;
}
Expand Down Expand Up @@ -111,7 +122,7 @@ private async Task PublishMessagesAsync(IProducer<TKey, TValue> producer, string
{
if (!_messageQueue.TryPeek(out queueItem))
{
await Task.Delay(25000, cancellationToken);
await Task.Delay(25, cancellationToken);
continue;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;

namespace SummarisationSample.OrderService.Messaging.Kafka
{
public class KafkaPublicationConfiguration : IPublicationConfiguration
{
public string Topic { get; init; } = string.Empty;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;

namespace SummarisationSample.OrderService.Messaging.Kafka
{
public class KafkaPublicationFactory : IPublicationFactory
{
/// <summary>
/// Get the configuration for a named publication
/// </summary>
/// <param name="publicationName">The name of the publication for which config is required</param>
/// <returns>The configuration for the publication named</returns>
/// <exception cref="ArgumentNullException"></exception>
public IPublicationConfiguration GetPublicationConfiguration(string publicationName)
{
if (publicationName is null) throw new ArgumentNullException(nameof(publicationName));

// Only one publication for now; extend if required
return new KafkaPublicationConfiguration()
{
Topic = "summarisation.order"
};
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ public static class ServiceCollectionExtensions
public static IServiceCollection AddKafkaMessaging(this IServiceCollection services)
{
services.AddSingleton(typeof(IMessagePublisher<,>), typeof(KafkaMessagePublisher<,>));
services.AddTransient<IPublicationFactory, KafkaPublicationFactory>();
return services;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
await EnqueueUnpublishedMessagesAsync(_publishingQueue);

await _publisher.StartPublishingAsync("summarisation.order", stoppingToken);
await _publisher.StartPublishingAsync("Order", stoppingToken);
}

#region Private Helper Methods
Expand Down

0 comments on commit baffdff

Please sign in to comment.