diff --git a/samples/KafkaFlow.Sample.BatchOperations/KafkaFlow.Sample.BatchOperations.csproj b/samples/KafkaFlow.Sample.BatchOperations/KafkaFlow.Sample.BatchOperations.csproj
index 970ff0ec6..f51dbe58e 100644
--- a/samples/KafkaFlow.Sample.BatchOperations/KafkaFlow.Sample.BatchOperations.csproj
+++ b/samples/KafkaFlow.Sample.BatchOperations/KafkaFlow.Sample.BatchOperations.csproj
@@ -17,7 +17,7 @@
-
+
diff --git a/samples/KafkaFlow.Sample.BatchOperations/Program.cs b/samples/KafkaFlow.Sample.BatchOperations/Program.cs
index a47f7a704..1515286ed 100644
--- a/samples/KafkaFlow.Sample.BatchOperations/Program.cs
+++ b/samples/KafkaFlow.Sample.BatchOperations/Program.cs
@@ -30,7 +30,7 @@
.AddConsumer(
consumerBuilder => consumerBuilder
.Topic(batchTestTopic)
- .WithGroupId("kafka-flow-sample")
+ .WithGroupId("kafkaflow-sample")
.WithBufferSize(10000)
.WithWorkersCount(1)
.AddMiddlewares(
diff --git a/samples/KafkaFlow.Sample.Dashboard/README.md b/samples/KafkaFlow.Sample.Dashboard/README.md
index 88ac8c9e7..a5c37c4e1 100644
--- a/samples/KafkaFlow.Sample.Dashboard/README.md
+++ b/samples/KafkaFlow.Sample.Dashboard/README.md
@@ -27,4 +27,4 @@ Using your terminal of choice, start the sample for the sample folder.
dotnet run
```
-The dashboard UI will be available at `/kafka-flow`.
+The dashboard UI will be available at `/kafkaflow`.
diff --git a/samples/KafkaFlow.Sample.Dashboard/Startup.cs b/samples/KafkaFlow.Sample.Dashboard/Startup.cs
index cd7090457..289b2bef6 100644
--- a/samples/KafkaFlow.Sample.Dashboard/Startup.cs
+++ b/samples/KafkaFlow.Sample.Dashboard/Startup.cs
@@ -20,8 +20,8 @@ public void ConfigureServices(IServiceCollection services)
const string topicName = "topic-dashboard";
cluster
.WithBrokers(new[] { "localhost:9092" })
- .EnableAdminMessages("kafka-flow.admin", "kafka-flow.admin.group.id")
- .EnableTelemetry("kafka-flow.admin", "kafka-flow.telemetry.group.id")
+ .EnableAdminMessages("kafkaflow.admin", "kafkaflow.admin.group.id")
+ .EnableTelemetry("kafkaflow.admin", "kafkaflow.telemetry.group.id")
.CreateTopicIfNotExists(topicName, 3, 1)
.AddConsumer(
consumer =>
diff --git a/samples/KafkaFlow.Sample.FlowControl/KafkaFlow.Sample.FlowControl.csproj b/samples/KafkaFlow.Sample.FlowControl/KafkaFlow.Sample.FlowControl.csproj
index c0be1644d..e4a1914e1 100644
--- a/samples/KafkaFlow.Sample.FlowControl/KafkaFlow.Sample.FlowControl.csproj
+++ b/samples/KafkaFlow.Sample.FlowControl/KafkaFlow.Sample.FlowControl.csproj
@@ -5,7 +5,7 @@
net6.0falsefalse
- 9
+ 10true
@@ -18,7 +18,7 @@
-
+
diff --git a/samples/KafkaFlow.Sample.PauseConsumerOnError/KafkaFlow.Sample.PauseConsumerOnError.csproj b/samples/KafkaFlow.Sample.PauseConsumerOnError/KafkaFlow.Sample.PauseConsumerOnError.csproj
index 1520146c4..f6f6d1e42 100644
--- a/samples/KafkaFlow.Sample.PauseConsumerOnError/KafkaFlow.Sample.PauseConsumerOnError.csproj
+++ b/samples/KafkaFlow.Sample.PauseConsumerOnError/KafkaFlow.Sample.PauseConsumerOnError.csproj
@@ -20,7 +20,7 @@
-
+
diff --git a/samples/KafkaFlow.Sample.PauseConsumerOnError/PauseConsumerOnExceptionMiddleware.cs b/samples/KafkaFlow.Sample.PauseConsumerOnError/PauseConsumerOnExceptionMiddleware.cs
index d78811488..b45b0ec4b 100644
--- a/samples/KafkaFlow.Sample.PauseConsumerOnError/PauseConsumerOnExceptionMiddleware.cs
+++ b/samples/KafkaFlow.Sample.PauseConsumerOnError/PauseConsumerOnExceptionMiddleware.cs
@@ -21,7 +21,7 @@ public async Task Invoke(IMessageContext context, MiddlewareDelegate next)
}
catch (Exception exception)
{
- context.ConsumerContext.ShouldStoreOffset = false;
+ context.ConsumerContext.AutoMessageCompletion = false;
this.logHandler.Error("Error handling message", exception,
new
{
diff --git a/samples/KafkaFlow.Sample.SchemaRegistry/KafkaFlow.Sample.SchemaRegistry.csproj b/samples/KafkaFlow.Sample.SchemaRegistry/KafkaFlow.Sample.SchemaRegistry.csproj
index d03e43ef4..220139cdf 100644
--- a/samples/KafkaFlow.Sample.SchemaRegistry/KafkaFlow.Sample.SchemaRegistry.csproj
+++ b/samples/KafkaFlow.Sample.SchemaRegistry/KafkaFlow.Sample.SchemaRegistry.csproj
@@ -30,7 +30,7 @@
-
+
diff --git a/samples/KafkaFlow.Sample.WebApi/Program.cs b/samples/KafkaFlow.Sample.WebApi/Program.cs
index 62e03129a..d50287655 100644
--- a/samples/KafkaFlow.Sample.WebApi/Program.cs
+++ b/samples/KafkaFlow.Sample.WebApi/Program.cs
@@ -10,7 +10,7 @@
.AddCluster(
cluster => cluster
.WithBrokers(new[] { "localhost:9092" })
- .EnableAdminMessages("kafka-flow.admin")
+ .EnableAdminMessages("kafkaflow.admin")
)
);
@@ -19,11 +19,11 @@
c =>
{
c.SwaggerDoc(
- "kafka-flow",
+ "kafkaflow",
new OpenApiInfo
{
Title = "KafkaFlow Admin",
- Version = "kafka-flow",
+ Version = "kafkaflow",
});
})
.AddControllers();
@@ -37,7 +37,7 @@
app.UseSwagger();
app.UseSwaggerUI(c =>
{
- c.SwaggerEndpoint("/swagger/kafka-flow/swagger.json", "KafkaFlow Admin");
+ c.SwaggerEndpoint("/swagger/kafkaflow/swagger.json", "KafkaFlow Admin");
});
var kafkaBus = app.Services.CreateKafkaBus();
diff --git a/samples/KafkaFlow.Sample/KafkaFlow.Sample.csproj b/samples/KafkaFlow.Sample/KafkaFlow.Sample.csproj
index 843c6fea2..1dfe8a114 100644
--- a/samples/KafkaFlow.Sample/KafkaFlow.Sample.csproj
+++ b/samples/KafkaFlow.Sample/KafkaFlow.Sample.csproj
@@ -30,7 +30,7 @@
-
+
diff --git a/src/Directory.Build.props b/src/Directory.Build.props
index b50a98807..673cb04c4 100644
--- a/src/Directory.Build.props
+++ b/src/Directory.Build.props
@@ -1,6 +1,6 @@
- 9.0
+ 10.0truetruesnupkg
diff --git a/src/KafkaFlow.Abstractions/Configuration/IConsumerConfigurationBuilder.cs b/src/KafkaFlow.Abstractions/Configuration/IConsumerConfigurationBuilder.cs
index 0e85e8654..e20bf6258 100644
--- a/src/KafkaFlow.Abstractions/Configuration/IConsumerConfigurationBuilder.cs
+++ b/src/KafkaFlow.Abstractions/Configuration/IConsumerConfigurationBuilder.cs
@@ -2,6 +2,7 @@ namespace KafkaFlow.Configuration
{
using System;
using System.Collections.Generic;
+ using System.Threading.Tasks;
///
/// Used to build the consumer configuration
@@ -91,6 +92,24 @@ public interface IConsumerConfigurationBuilder
///
IConsumerConfigurationBuilder WithWorkersCount(int workersCount);
+ ///
+ /// Configures a custom function to dynamically calculate the number of workers.
+ ///
+ /// A function that takes a WorkersCountContext object and returns a Task yielding the new workers count
+ /// The interval that the calculator will be called
+ /// The IConsumerConfigurationBuilder instance for method chaining
+ IConsumerConfigurationBuilder WithWorkersCount(
+ Func> calculator,
+ TimeSpan evaluationInterval);
+
+ ///
+ /// Configures a custom function to dynamically calculate the number of workers.
+ /// By default, this function is called every 5 minutes.
+ ///
+ /// A function that takes a WorkersCountContext object and returns a Task yielding the new workers count
+ /// The IConsumerConfigurationBuilder instance for method chaining
+ IConsumerConfigurationBuilder WithWorkersCount(Func> calculator);
+
///
/// Sets how many messages will be buffered for each worker
///
@@ -132,16 +151,11 @@ IConsumerConfigurationBuilder WithWorkDistributionStrategy()
where T : class, IDistributionStrategy;
///
- /// Offsets will be stored after the execution of the handler and middlewares automatically, this is the default behaviour
- ///
- ///
- IConsumerConfigurationBuilder WithAutoStoreOffsets();
-
- ///
- /// The client should call the
+ /// Configures the consumer for manual message completion.
+ /// The client should call the to mark the message processing as finished
///
///
- IConsumerConfigurationBuilder WithManualStoreOffsets();
+ IConsumerConfigurationBuilder WithManualMessageCompletion();
///
/// No offsets will be stored on Kafka
diff --git a/src/KafkaFlow.Abstractions/Configuration/IGlobalEvents.cs b/src/KafkaFlow.Abstractions/Configuration/IGlobalEvents.cs
new file mode 100644
index 000000000..e5daf6cbf
--- /dev/null
+++ b/src/KafkaFlow.Abstractions/Configuration/IGlobalEvents.cs
@@ -0,0 +1,23 @@
+namespace KafkaFlow.Configuration
+{
+ ///
+ /// Provides access to events fired by the internals of the library
+ ///
+ public interface IGlobalEvents
+ {
+ ///
+ /// Gets the message consume started event
+ ///
+ IEvent MessageConsumeStarted { get; }
+
+ ///
+ /// Gets the message consume ended event
+ ///
+ IEvent MessageConsumeEnded { get; }
+
+ ///
+ /// Gets the message produce started event
+ ///
+ IEvent MessageProduceStarted { get; }
+ }
+}
diff --git a/src/KafkaFlow.Abstractions/Configuration/IKafkaConfigurationBuilder.cs b/src/KafkaFlow.Abstractions/Configuration/IKafkaConfigurationBuilder.cs
index a29a6cfdb..865c96ff4 100644
--- a/src/KafkaFlow.Abstractions/Configuration/IKafkaConfigurationBuilder.cs
+++ b/src/KafkaFlow.Abstractions/Configuration/IKafkaConfigurationBuilder.cs
@@ -1,25 +1,31 @@
-namespace KafkaFlow.Configuration
+namespace KafkaFlow.Configuration;
+
+using System;
+
+///
+/// A builder to configure KafkaFlow
+///
+public interface IKafkaConfigurationBuilder
{
- using System;
+ ///
+ /// Adds a new Cluster
+ ///
+ /// A handle to configure the cluster
+ ///
+ IKafkaConfigurationBuilder AddCluster(Action cluster);
///
- /// A builder to configure KafkaFlow
+ /// Set the log handler to be used by the Framework, if none is provided the will be used
///
- public interface IKafkaConfigurationBuilder
- {
- ///
- /// Adds a new Cluster
- ///
- /// A handle to configure the cluster
- ///
- IKafkaConfigurationBuilder AddCluster(Action cluster);
+ /// A class that implements the interface
+ ///
+ IKafkaConfigurationBuilder UseLogHandler()
+ where TLogHandler : ILogHandler;
- ///
- /// Set the log handler to be used by the Framework, if none is provided the will be used
- ///
- /// A class that implements the interface
- ///
- IKafkaConfigurationBuilder UseLogHandler()
- where TLogHandler : ILogHandler;
- }
+ ///
+ /// Subscribe the global events defined in
+ ///
+ /// A handle to subscribe the events
+ ///
+ IKafkaConfigurationBuilder SubscribeGlobalEvents(Action observers);
}
diff --git a/src/KafkaFlow.Abstractions/Configuration/WorkersCountContext.cs b/src/KafkaFlow.Abstractions/Configuration/WorkersCountContext.cs
new file mode 100644
index 000000000..a54a4b82f
--- /dev/null
+++ b/src/KafkaFlow.Abstractions/Configuration/WorkersCountContext.cs
@@ -0,0 +1,41 @@
+namespace KafkaFlow.Configuration
+{
+ using System.Collections.Generic;
+
+ ///
+ /// A metadata class with some context information help to calculate the number of workers
+ ///
+ public class WorkersCountContext
+ {
+ ///
+ /// Initializes a new instance of the class.
+ ///
+ /// The consumer's name
+ /// The consumer's group id
+ /// The consumer's assigned partition
+ public WorkersCountContext(
+ string consumerName,
+ string consumerGroupId,
+ IReadOnlyCollection assignedTopicsPartitions)
+ {
+ this.ConsumerName = consumerName;
+ this.ConsumerGroupId = consumerGroupId;
+ this.AssignedTopicsPartitions = assignedTopicsPartitions;
+ }
+
+ ///
+ /// Gets the consumer's name
+ ///
+ public string ConsumerName { get; }
+
+ ///
+ /// Gets the consumer's group id
+ ///
+ public string ConsumerGroupId { get; }
+
+ ///
+ /// Gets the assigned partitions to the consumer
+ ///
+ public IReadOnlyCollection AssignedTopicsPartitions { get; }
+ }
+}
diff --git a/src/KafkaFlow.Abstractions/Consumers/IWorker.cs b/src/KafkaFlow.Abstractions/Consumers/IWorker.cs
new file mode 100644
index 000000000..f54a374ac
--- /dev/null
+++ b/src/KafkaFlow.Abstractions/Consumers/IWorker.cs
@@ -0,0 +1,28 @@
+namespace KafkaFlow
+{
+ ///
+ /// Represents the interface of a internal worker
+ ///
+ public interface IWorker
+ {
+ ///
+ /// Gets worker's id
+ ///
+ int Id { get; }
+
+ ///
+ /// Gets the subject for worker stopping events where observers can subscribe to receive notifications.
+ ///
+ IEvent WorkerStopping { get; }
+
+ ///
+ /// Gets the subject for worker stopped events where observers can subscribe to receive notifications.
+ ///
+ IEvent WorkerStopped { get; }
+
+ ///
+ /// Gets the subject for worker consumption completed events where observers can subscribe to receive notifications.
+ ///
+ IEvent WorkerProcessingEnded { get; }
+ }
+}
diff --git a/src/KafkaFlow.Abstractions/Extensions/DependencyConfiguratorExtensions.cs b/src/KafkaFlow.Abstractions/Extensions/DependencyConfiguratorExtensions.cs
index 8059be30c..f2021fed2 100644
--- a/src/KafkaFlow.Abstractions/Extensions/DependencyConfiguratorExtensions.cs
+++ b/src/KafkaFlow.Abstractions/Extensions/DependencyConfiguratorExtensions.cs
@@ -65,6 +65,35 @@ public static IDependencyConfigurator AddSingleton(
InstanceLifetime.Singleton);
}
+ ///
+ /// Registers a scoped type mapping where the returned instance will be given by the provided factory
+ ///
+ /// The object that this method was called on
+ /// Type that will be created
+ ///
+ public static IDependencyConfigurator AddScoped(this IDependencyConfigurator configurator)
+ where TService : class
+ {
+ return configurator.Add(InstanceLifetime.Scoped);
+ }
+
+ ///
+ /// Registers a scoped type mapping where the returned instance will be given by the provided factory
+ ///
+ /// The object that this method was called on
+ /// A factory to create new instances of the service implementation
+ /// Type that will be created
+ ///
+ public static IDependencyConfigurator AddScoped(
+ this IDependencyConfigurator configurator,
+ Func factory)
+ {
+ return configurator.Add(
+ typeof(TService),
+ factory,
+ InstanceLifetime.Scoped);
+ }
+
///
/// Registers a transient type mapping
///
diff --git a/src/KafkaFlow.Abstractions/IConsumerContext.cs b/src/KafkaFlow.Abstractions/IConsumerContext.cs
index 76733d147..f07b84042 100644
--- a/src/KafkaFlow.Abstractions/IConsumerContext.cs
+++ b/src/KafkaFlow.Abstractions/IConsumerContext.cs
@@ -2,6 +2,7 @@ namespace KafkaFlow
{
using System;
using System.Threading;
+ using System.Threading.Tasks;
///
/// Represents the message consumer
@@ -38,6 +39,11 @@ public interface IConsumerContext
///
long Offset { get; }
+ ///
+ /// Gets the object associated with the message
+ ///
+ TopicPartitionOffset TopicPartitionOffset { get; }
+
///
/// Gets the consumer group id from kafka consumer that received the message
///
@@ -49,14 +55,43 @@ public interface IConsumerContext
DateTime MessageTimestamp { get; }
///
- /// Gets or sets a value indicating whether if the framework should store the current offset in the end when auto store offset is used
+ /// Gets or sets a value indicating whether if the framework should invoke the method after the message has been processed
+ ///
+ bool AutoMessageCompletion { get; set; }
+
+ ///
+ /// Gets or sets a value indicating whether if the message offset must be stored when the message is marked as completed
///
bool ShouldStoreOffset { get; set; }
///
- /// Store the message offset when manual store option is used
+ /// Gets an instance of IDependencyResolver which provides methods to resolve dependencies.
+ /// This instance is tied to the consumer scope, meaning it is capable of resolving dependencies
+ /// that are scoped to the lifecycle of a single consumer.
+ ///
+ IDependencyResolver ConsumerDependencyResolver { get; }
+
+ ///
+ /// Gets an instance of IDependencyResolver which provides methods to resolve dependencies.
+ /// This instance is tied to the worker scope, meaning it is capable of resolving dependencies
+ /// that are scoped to the lifecycle of a single worker.
+ ///
+ IDependencyResolver WorkerDependencyResolver { get; }
+
+ ///
+ /// Gets a Task that completes when the method is invoked,
+ /// indicating the end of message processing. This allows async operations
+ /// to wait for the message to be fully processed and its offset stored.
+ ///
+ Task Completion { get; }
+
+ ///
+ /// Signals the completion of message processing and stores the message offset to eventually be committed.
+ /// After this call, the framework marks the message processing as finished and releases resources associated with the message.
+ /// By default, this method is automatically invoked when message processing concludes, unless
+ /// the consumer is configured for manual message completion or the flag is set to false.
///
- void StoreOffset();
+ void Complete(IMessageContext context);
///
/// Get offset watermark data
diff --git a/src/KafkaFlow.Abstractions/IDateTimeProvider.cs b/src/KafkaFlow.Abstractions/IDateTimeProvider.cs
index 8439703e4..b235eeb4a 100644
--- a/src/KafkaFlow.Abstractions/IDateTimeProvider.cs
+++ b/src/KafkaFlow.Abstractions/IDateTimeProvider.cs
@@ -7,8 +7,8 @@ namespace KafkaFlow
///
public interface IDateTimeProvider
{
- ///
- DateTime Now { get; }
+ ///
+ DateTime UtcNow { get; }
///
DateTime MinValue { get; }
diff --git a/src/KafkaFlow.Abstractions/IEvent.cs b/src/KafkaFlow.Abstractions/IEvent.cs
new file mode 100644
index 000000000..f9a055b4f
--- /dev/null
+++ b/src/KafkaFlow.Abstractions/IEvent.cs
@@ -0,0 +1,32 @@
+namespace KafkaFlow
+{
+ using System;
+ using System.Threading.Tasks;
+
+ ///
+ /// Represents an Event to be subscribed.
+ ///
+ public interface IEvent
+ {
+ ///
+ /// Subscribes to the event.
+ ///
+ /// The handler to be called when the event is fired.
+ /// Event subscription reference
+ IEventSubscription Subscribe(Func handler);
+ }
+
+ ///
+ /// Represents an Event to be subscribed.
+ ///
+ /// The argument expected by the event.
+ public interface IEvent
+ {
+ ///
+ /// Subscribes to the event.
+ ///
+ /// The handler to be called when the event is fired.
+ /// Event subscription reference
+ IEventSubscription Subscribe(Func handler);
+ }
+}
\ No newline at end of file
diff --git a/src/KafkaFlow.Abstractions/IEventSubscription.cs b/src/KafkaFlow.Abstractions/IEventSubscription.cs
new file mode 100644
index 000000000..a4e6fccd7
--- /dev/null
+++ b/src/KafkaFlow.Abstractions/IEventSubscription.cs
@@ -0,0 +1,12 @@
+namespace KafkaFlow;
+
+///
+/// Represents an Event subscription.
+///
+public interface IEventSubscription
+{
+ ///
+ /// Cancels the subscription to the event.
+ ///
+ void Cancel();
+}
diff --git a/src/KafkaFlow.Abstractions/IMessageContext.cs b/src/KafkaFlow.Abstractions/IMessageContext.cs
index be95315a3..639b47dbf 100644
--- a/src/KafkaFlow.Abstractions/IMessageContext.cs
+++ b/src/KafkaFlow.Abstractions/IMessageContext.cs
@@ -1,7 +1,5 @@
namespace KafkaFlow
{
- using System;
-
///
/// A context that contains the message and metadata
///
@@ -27,6 +25,13 @@ public interface IMessageContext
///
IProducerContext ProducerContext { get; }
+ ///
+ /// Gets an instance of IDependencyResolver which provides methods to resolve dependencies.
+ /// This instance is tied to the message scope, meaning it is capable of resolving dependencies
+ /// that are scoped to the lifecycle of a single processed message.
+ ///
+ IDependencyResolver DependencyResolver { get; }
+
///
/// Creates a new with the new message
///
@@ -34,13 +39,5 @@ public interface IMessageContext
/// The new message value
/// A new message context containing the new values
IMessageContext SetMessage(object key, object value);
-
- ///
- /// Deprecated
- ///
- /// key
- ///
- [Obsolete("This method should no longer be used, use the " + nameof(SetMessage) + "() instead.", true)]
- IMessageContext TransformMessage(object message);
}
}
diff --git a/src/KafkaFlow.Abstractions/IProducerContext.cs b/src/KafkaFlow.Abstractions/IProducerContext.cs
index 886e856e4..1f63af400 100644
--- a/src/KafkaFlow.Abstractions/IProducerContext.cs
+++ b/src/KafkaFlow.Abstractions/IProducerContext.cs
@@ -1,23 +1,29 @@
-namespace KafkaFlow
+namespace KafkaFlow;
+
+///
+/// Some producer metadata
+///
+public interface IProducerContext
{
///
- /// Some producer metadata
+ /// Gets the topic associated with the message
+ ///
+ string Topic { get; }
+
+ ///
+ /// Gets the partition associated with the message
///
- public interface IProducerContext
- {
- ///
- /// Gets the topic associated with the message
- ///
- string Topic { get; }
+ int? Partition { get; }
- ///
- /// Gets the partition associated with the message
- ///
- int? Partition { get; }
+ ///
+ /// Gets the partition offset associated with the message
+ ///
+ long? Offset { get; }
- ///
- /// Gets the partition offset associated with the message
- ///
- long? Offset { get; }
- }
+ ///
+ /// Gets an instance of IDependencyResolver which provides methods to resolve dependencies.
+ /// This instance is tied to the producer scope, meaning it is capable of resolving dependencies
+ /// that are scoped to the lifecycle of a single producer.
+ ///
+ IDependencyResolver DependencyResolver { get; }
}
diff --git a/src/KafkaFlow.Abstractions/IWorker.cs b/src/KafkaFlow.Abstractions/IWorker.cs
deleted file mode 100644
index bc2e086d2..000000000
--- a/src/KafkaFlow.Abstractions/IWorker.cs
+++ /dev/null
@@ -1,21 +0,0 @@
-namespace KafkaFlow
-{
- using System;
-
- ///
- /// Represents the interface of a internal worker
- ///
- public interface IWorker
- {
- ///
- /// Gets worker's id
- ///
- int Id { get; }
-
- ///
- /// This handler is called immediately after a worker completes the consumption of a message
- ///
- /// to be executed
- void OnTaskCompleted(Action handler);
- }
-}
diff --git a/src/KafkaFlow.Abstractions/MessageEventContext.cs b/src/KafkaFlow.Abstractions/MessageEventContext.cs
new file mode 100644
index 000000000..5b85ef51c
--- /dev/null
+++ b/src/KafkaFlow.Abstractions/MessageEventContext.cs
@@ -0,0 +1,41 @@
+namespace KafkaFlow
+{
+ ///
+ /// Represents a message context used in the events
+ ///
+ public class MessageEventContext
+ {
+ private readonly IMessageContext messageContext;
+ private readonly IDependencyResolver dependencyResolver;
+
+ ///
+ /// Initializes a new instance of the class.
+ ///
+ /// The message context
+ /// The dependency resolver
+ public MessageEventContext(IMessageContext messageContext, IDependencyResolver dependencyResolver)
+ {
+ this.messageContext = messageContext;
+ this.dependencyResolver = dependencyResolver;
+ }
+
+ ///
+ /// Initializes a new instance of the class.
+ ///
+ /// The message context
+ public MessageEventContext(IMessageContext messageContext)
+ : this(messageContext, null)
+ {
+ }
+
+ ///
+ /// Gets the message context
+ ///
+ public IMessageContext MessageContext => this.messageContext;
+
+ ///
+ /// Gets the dependency resolver
+ ///
+ public IDependencyResolver DependencyResolver => this.dependencyResolver;
+ }
+}
diff --git a/src/KafkaFlow.Abstractions/MiddlewareLifetime.cs b/src/KafkaFlow.Abstractions/MiddlewareLifetime.cs
index 4a17301ff..0b3f28966 100644
--- a/src/KafkaFlow.Abstractions/MiddlewareLifetime.cs
+++ b/src/KafkaFlow.Abstractions/MiddlewareLifetime.cs
@@ -6,28 +6,32 @@ namespace KafkaFlow
public enum MiddlewareLifetime
{
///
- /// Specifies that a single instance of the class will be created for the entire application
+ /// Denotes a single instance of the middleware to be maintained throughout the application's lifecycle.
+ /// This instance will not be disposed until the application is shut down.
///
Singleton,
///
- /// Specifies that a new instance of the class will be created for each scope
+ /// Indicates a new middleware instance is instantiated for each individual message scope, ensuring isolated processing.
+ /// This instance will be disposed when the message scope ends.
///
- Scoped,
+ Message,
///
- /// Specifies that a new instance of the class will be created every time it is requested
+ /// Signifies a new middleware instance is created every time a request is made, providing the highest level of isolation.
+ /// This instance will be disposed as soon as it goes out of scope.
///
Transient,
///
- /// Specifies that a new instance of the class will be created for each Consumer/Producer
+ /// Specifies a separate middleware instance for each Consumer/Producer, useful for maintaining context within the Consumer/Producer scope.
+ /// This instance will be disposed when the Consumer/Producer it belongs to is disposed.
///
ConsumerOrProducer,
///
- /// Specifies that a new instance of the class will be created for each Consumer Worker
- /// For producers this setting will behave as
+ /// Specifies a unique middleware instance for each Consumer Worker. For Producers, this behaves as ConsumerOrProducer. This allows context preservation at a worker level.
+ /// This instance will be disposed when the Worker it belongs to is disposed.
///
Worker,
}
diff --git a/src/KafkaFlow.Abstractions/TopicPartitionOffset.cs b/src/KafkaFlow.Abstractions/TopicPartitionOffset.cs
new file mode 100644
index 000000000..5b17cb138
--- /dev/null
+++ b/src/KafkaFlow.Abstractions/TopicPartitionOffset.cs
@@ -0,0 +1,36 @@
+namespace KafkaFlow
+{
+ ///
+ /// Represents a Kafka topic along with its partition and offset information.
+ ///
+ public readonly struct TopicPartitionOffset
+ {
+ ///
+ /// Initializes a new instance of the struct with the specified topic name, partition number, and offset value.
+ ///
+ /// The name of the topic.
+ /// The id of the partition.
+ /// The offset value.
+ public TopicPartitionOffset(string topic, int partition, long offset)
+ {
+ this.Topic = topic;
+ this.Partition = partition;
+ this.Offset = offset;
+ }
+
+ ///
+ /// Gets the name of the topic.
+ ///
+ public string Topic { get; }
+
+ ///
+ /// Gets the id of the partition.
+ ///
+ public int Partition { get; }
+
+ ///
+ /// Gets the offset value.
+ ///
+ public long Offset { get; }
+ }
+}
diff --git a/src/KafkaFlow.Admin.Dashboard/ApplicationBuilderExtensions.cs b/src/KafkaFlow.Admin.Dashboard/ApplicationBuilderExtensions.cs
index 0eff65f3f..f525b9a1a 100644
--- a/src/KafkaFlow.Admin.Dashboard/ApplicationBuilderExtensions.cs
+++ b/src/KafkaFlow.Admin.Dashboard/ApplicationBuilderExtensions.cs
@@ -16,7 +16,7 @@ namespace KafkaFlow.Admin.Dashboard
public static class ApplicationBuilderExtensions
{
///
- /// Enable the KafkaFlow dashboard. The path will be `/kafka-flow`
+ /// Enable the KafkaFlow dashboard. The path will be `/kafkaflow`
///
/// Instance of
///
diff --git a/src/KafkaFlow.Admin.Dashboard/ClientApp/angular.json b/src/KafkaFlow.Admin.Dashboard/ClientApp/angular.json
index 8546c530d..e0f737e14 100644
--- a/src/KafkaFlow.Admin.Dashboard/ClientApp/angular.json
+++ b/src/KafkaFlow.Admin.Dashboard/ClientApp/angular.json
@@ -29,7 +29,7 @@
"customWebpackConfig": {
"path": "./webpack.config.js"
},
- "baseHref": "/kafka-flow/",
+ "baseHref": "/kafkaflow/",
"outputPath": "dist",
"index": "src/index.html",
"main": "src/main.ts",
@@ -143,4 +143,4 @@
}
},
"defaultProject": "dashboard"
-}
\ No newline at end of file
+}
diff --git a/src/KafkaFlow.Admin.Dashboard/ClientApp/dist/index.html b/src/KafkaFlow.Admin.Dashboard/ClientApp/dist/index.html
index a83bac539..3d6e39aba 100644
--- a/src/KafkaFlow.Admin.Dashboard/ClientApp/dist/index.html
+++ b/src/KafkaFlow.Admin.Dashboard/ClientApp/dist/index.html
@@ -1,16 +1,15 @@
-
-
-
+
KafkaFlow - Dashboard
-
+
-
+
-
-
+
+
+