Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement v3 main features #409

Merged
merged 8 commits into from
Oct 3, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ public async Task Invoke(IMessageContext context, MiddlewareDelegate next)
}
catch (Exception exception)
{
context.ConsumerContext.ShouldStoreOffset = false;
context.ConsumerContext.AutoMessageCompletion = false;
joelfoliveira marked this conversation as resolved.
Show resolved Hide resolved
this.logHandler.Error("Error handling message", exception,
new
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ namespace KafkaFlow.Configuration
{
using System;
using System.Collections.Generic;
using System.Threading.Tasks;

/// <summary>
/// Used to build the consumer configuration
Expand Down Expand Up @@ -91,6 +92,24 @@ public interface IConsumerConfigurationBuilder
/// <returns></returns>
IConsumerConfigurationBuilder WithWorkersCount(int workersCount);

/// <summary>
/// Configures a custom function to dynamically calculate the number of workers.
/// </summary>
/// <param name="calculator">A function that takes a WorkersCountContext object and returns a Task yielding the new workers count</param>
/// <param name="evaluationInterval">The interval that the calculator will be called</param>
/// <returns>The IConsumerConfigurationBuilder instance for method chaining</returns>
IConsumerConfigurationBuilder WithWorkersCount(
Func<WorkersCountContext, IDependencyResolver, Task<int>> calculator,
TimeSpan evaluationInterval);

/// <summary>
/// Configures a custom function to dynamically calculate the number of workers.
filipeesch marked this conversation as resolved.
Show resolved Hide resolved
/// By default, this function is called every 5 minutes.
/// </summary>
/// <param name="calculator">A function that takes a WorkersCountContext object and returns a Task yielding the new workers count</param>
/// <returns>The IConsumerConfigurationBuilder instance for method chaining</returns>
IConsumerConfigurationBuilder WithWorkersCount(Func<WorkersCountContext, IDependencyResolver, Task<int>> calculator);

/// <summary>
/// Sets how many messages will be buffered for each worker
/// </summary>
Expand Down Expand Up @@ -132,16 +151,11 @@ IConsumerConfigurationBuilder WithWorkDistributionStrategy<T>()
where T : class, IDistributionStrategy;

/// <summary>
/// Offsets will be stored after the execution of the handler and middlewares automatically, this is the default behaviour
/// </summary>
/// <returns></returns>
IConsumerConfigurationBuilder WithAutoStoreOffsets();

/// <summary>
/// The client should call the <see cref="IConsumerContext.StoreOffset()"/>
/// Configures the consumer for manual message completion.
/// The client should call the <see cref="IConsumerContext.Complete"/> to mark the message processing as finished
/// </summary>
/// <returns></returns>
IConsumerConfigurationBuilder WithManualStoreOffsets();
IConsumerConfigurationBuilder WithManualMessageCompletion();

/// <summary>
/// No offsets will be stored on Kafka
Expand Down
41 changes: 41 additions & 0 deletions src/KafkaFlow.Abstractions/Configuration/WorkersCountContext.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
namespace KafkaFlow.Configuration
{
using System.Collections.Generic;

/// <summary>
/// A metadata class with some context information help to calculate the number of workers
/// </summary>
public class WorkersCountContext
{
/// <summary>
/// Initializes a new instance of the <see cref="WorkersCountContext"/> class.
/// </summary>
/// <param name="consumerName">The consumer's name</param>
/// <param name="consumerGroupId">The consumer's group id</param>
/// <param name="assignedTopicsPartitions">The consumer's assigned partition</param>
public WorkersCountContext(
string consumerName,
string consumerGroupId,
IReadOnlyCollection<TopicPartitions> assignedTopicsPartitions)
{
this.ConsumerName = consumerName;
this.ConsumerGroupId = consumerGroupId;
this.AssignedTopicsPartitions = assignedTopicsPartitions;
}

/// <summary>
/// Gets the consumer's name
/// </summary>
public string ConsumerName { get; }

/// <summary>
/// Gets the consumer's group id
/// </summary>
public string ConsumerGroupId { get; }

/// <summary>
/// Gets the assigned partitions to the consumer
/// </summary>
public IReadOnlyCollection<TopicPartitions> AssignedTopicsPartitions { get; }
}
}
filipeesch marked this conversation as resolved.
Show resolved Hide resolved
28 changes: 28 additions & 0 deletions src/KafkaFlow.Abstractions/Consumers/IWorker.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
namespace KafkaFlow
{
/// <summary>
/// Represents the interface of a internal worker
/// </summary>
public interface IWorker
{
/// <summary>
/// Gets worker's id
/// </summary>
int Id { get; }

/// <summary>
/// Gets the subject for worker stopping events where observers can subscribe to receive notifications.
/// </summary>
IEvent WorkerStopping { get; }

/// <summary>
/// Gets the subject for worker stopped events where observers can subscribe to receive notifications.
/// </summary>
IEvent WorkerStopped { get; }

/// <summary>
/// Gets the subject for worker consumption completed events where observers can subscribe to receive notifications.
/// </summary>
IEvent<IMessageContext> WorkerProcessingEnded { get; }
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,35 @@ public static IDependencyConfigurator AddSingleton<TService>(
InstanceLifetime.Singleton);
}

/// <summary>
/// Registers a scoped type mapping where the returned instance will be given by the provided factory
/// </summary>
/// <param name="configurator">The <see cref="IDependencyConfigurator"/> object that this method was called on</param>
/// <typeparam name="TService">Type that will be created</typeparam>
/// <returns></returns>
public static IDependencyConfigurator AddScoped<TService>(this IDependencyConfigurator configurator)
where TService : class
{
return configurator.Add<TService>(InstanceLifetime.Scoped);
}

/// <summary>
/// Registers a scoped type mapping where the returned instance will be given by the provided factory
/// </summary>
/// <param name="configurator">The <see cref="IDependencyConfigurator"/> object that this method was called on</param>
/// <param name="factory">A factory to create new instances of the service implementation</param>
/// <typeparam name="TService">Type that will be created</typeparam>
/// <returns></returns>
public static IDependencyConfigurator AddScoped<TService>(
this IDependencyConfigurator configurator,
Func<IDependencyResolver, TService> factory)
{
return configurator.Add(
typeof(TService),
factory,
InstanceLifetime.Scoped);
}

/// <summary>
/// Registers a transient type mapping
/// </summary>
Expand Down
41 changes: 38 additions & 3 deletions src/KafkaFlow.Abstractions/IConsumerContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ namespace KafkaFlow
{
using System;
using System.Threading;
using System.Threading.Tasks;

/// <summary>
/// Represents the message consumer
Expand Down Expand Up @@ -38,6 +39,11 @@ public interface IConsumerContext
/// </summary>
long Offset { get; }

/// <summary>
/// Gets the <see cref="TopicPartitionOffset"/> object associated with the message
/// </summary>
TopicPartitionOffset TopicPartitionOffset { get; }

/// <summary>
/// Gets the consumer group id from kafka consumer that received the message
/// </summary>
Expand All @@ -49,14 +55,43 @@ public interface IConsumerContext
DateTime MessageTimestamp { get; }

/// <summary>
/// Gets or sets a value indicating whether if the framework should store the current offset in the end when auto store offset is used
/// Gets or sets a value indicating whether if the framework should invoke the <see cref="Complete"/> method after the message has been processed
/// </summary>
bool AutoMessageCompletion { get; set; }

/// <summary>
/// Gets or sets a value indicating whether if the message offset must be stored when the message is marked as completed
/// </summary>
bool ShouldStoreOffset { get; set; }

/// <summary>
/// Store the message offset when manual store option is used
/// Gets an instance of IDependencyResolver which provides methods to resolve dependencies.
/// This instance is tied to the consumer scope, meaning it is capable of resolving dependencies
/// that are scoped to the lifecycle of a single consumer.
/// </summary>
IDependencyResolver ConsumerDependencyResolver { get; }

/// <summary>
/// Gets an instance of IDependencyResolver which provides methods to resolve dependencies.
/// This instance is tied to the worker scope, meaning it is capable of resolving dependencies
/// that are scoped to the lifecycle of a single worker.
/// </summary>
IDependencyResolver WorkerDependencyResolver { get; }
filipeesch marked this conversation as resolved.
Show resolved Hide resolved

/// <summary>
/// Gets a Task that completes when the <see cref="Complete"/> method is invoked,
/// indicating the end of message processing. This allows async operations
/// to wait for the message to be fully processed and its offset stored.
/// </summary>
Task<TopicPartitionOffset> Completion { get; }

/// <summary>
/// Signals the completion of message processing and stores the message offset to eventually be committed.
/// After this call, the framework marks the message processing as finished and releases resources associated with the message.
/// By default, this method is automatically invoked when message processing concludes, unless
/// the consumer is configured for manual message completion or the <see cref="AutoMessageCompletion"/> flag is set to false.
/// </summary>
void StoreOffset();
void Complete();

/// <summary>
/// Get offset watermark data
Expand Down
4 changes: 2 additions & 2 deletions src/KafkaFlow.Abstractions/IDateTimeProvider.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ namespace KafkaFlow
/// </summary>
public interface IDateTimeProvider
{
/// <inheritdoc cref="DateTime.Now"/>
DateTime Now { get; }
/// <inheritdoc cref="DateTime.UtcNow"/>
DateTime UtcNow { get; }
filipeesch marked this conversation as resolved.
Show resolved Hide resolved

/// <inheritdoc cref="DateTime.MinValue"/>
DateTime MinValue { get; }
Expand Down
32 changes: 32 additions & 0 deletions src/KafkaFlow.Abstractions/IEvent.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
namespace KafkaFlow
{
using System;
using System.Threading.Tasks;

/// <summary>
/// Represents an Event to be subscribed.
/// </summary>
public interface IEvent
{
/// <summary>
/// Subscribes to the event.
/// </summary>
/// <param name="handler">The handler to be called when the event is fired.</param>
/// <returns>Event subscription reference</returns>
IEventSubscription Subscribe(Func<Task> handler);
}

/// <summary>
/// Represents an Event to be subscribed.
/// </summary>
/// <typeparam name="TArg">The argument expected by the event.</typeparam>
public interface IEvent<TArg>
{
/// <summary>
/// Subscribes to the event.
/// </summary>
/// <param name="handler">The handler to be called when the event is fired.</param>
/// <returns>Event subscription reference</returns>
IEventSubscription Subscribe(Func<TArg, Task> handler);
}
}
12 changes: 12 additions & 0 deletions src/KafkaFlow.Abstractions/IEventSubscription.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
namespace KafkaFlow;

/// <summary>
/// Represents an Event subscription.
/// </summary>
public interface IEventSubscription
{
/// <summary>
/// Cancels the subscription to the event.
/// </summary>
void Cancel();
}
17 changes: 7 additions & 10 deletions src/KafkaFlow.Abstractions/IMessageContext.cs
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
namespace KafkaFlow
{
using System;

/// <summary>
/// A context that contains the message and metadata
/// </summary>
Expand All @@ -27,20 +25,19 @@ public interface IMessageContext
/// </summary>
IProducerContext ProducerContext { get; }

/// <summary>
/// Gets an instance of IDependencyResolver which provides methods to resolve dependencies.
/// This instance is tied to the message scope, meaning it is capable of resolving dependencies
/// that are scoped to the lifecycle of a single processed message.
/// </summary>
IDependencyResolver DependencyResolver { get; }

/// <summary>
/// Creates a new <see cref="IMessageContext"/> with the new message
/// </summary>
/// <param name="key">The new message key</param>
/// <param name="value">The new message value</param>
/// <returns>A new message context containing the new values</returns>
IMessageContext SetMessage(object key, object value);

/// <summary>
/// Deprecated
/// </summary>
/// <param name="message">key</param>
/// <returns></returns>
[Obsolete("This method should no longer be used, use the " + nameof(SetMessage) + "() instead.", true)]
IMessageContext TransformMessage(object message);
}
}
40 changes: 23 additions & 17 deletions src/KafkaFlow.Abstractions/IProducerContext.cs
Original file line number Diff line number Diff line change
@@ -1,23 +1,29 @@
namespace KafkaFlow
namespace KafkaFlow;

/// <summary>
/// Some producer metadata
/// </summary>
public interface IProducerContext
{
/// <summary>
/// Some producer metadata
/// Gets the topic associated with the message
/// </summary>
string Topic { get; }

/// <summary>
/// Gets the partition associated with the message
/// </summary>
public interface IProducerContext
{
/// <summary>
/// Gets the topic associated with the message
/// </summary>
string Topic { get; }
int? Partition { get; }

/// <summary>
/// Gets the partition associated with the message
/// </summary>
int? Partition { get; }
/// <summary>
/// Gets the partition offset associated with the message
/// </summary>
long? Offset { get; }

/// <summary>
/// Gets the partition offset associated with the message
/// </summary>
long? Offset { get; }
}
/// <summary>
/// Gets an instance of IDependencyResolver which provides methods to resolve dependencies.
/// This instance is tied to the producer scope, meaning it is capable of resolving dependencies
/// that are scoped to the lifecycle of a single producer.
/// </summary>
IDependencyResolver DependencyResolver { get; }
}
21 changes: 0 additions & 21 deletions src/KafkaFlow.Abstractions/IWorker.cs

This file was deleted.

Loading
Loading