Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Include EventHub and option to SubscribeGlobalEvents #445

Closed
wants to merge 18 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
</PropertyGroup>

<ItemGroup>
<PackageReference Include="Microsoft.Extensions.DependencyInjection" Version="3.1.17" />
<PackageReference Include="Microsoft.Extensions.DependencyInjection" Version="6.0.1" />
</ItemGroup>

<ItemGroup>
Expand Down
2 changes: 1 addition & 1 deletion samples/KafkaFlow.Sample.BatchOperations/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
.AddConsumer(
consumerBuilder => consumerBuilder
.Topic(batchTestTopic)
.WithGroupId("kafka-flow-sample")
.WithGroupId("kafkaflow-sample")
.WithBufferSize(10000)
.WithWorkersCount(1)
.AddMiddlewares(
Expand Down
2 changes: 1 addition & 1 deletion samples/KafkaFlow.Sample.Dashboard/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,4 +27,4 @@ Using your terminal of choice, start the sample for the sample folder.
dotnet run
```

The dashboard UI will be available at `/kafka-flow`.
The dashboard UI will be available at `/kafkaflow`.
4 changes: 2 additions & 2 deletions samples/KafkaFlow.Sample.Dashboard/Startup.cs
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ public void ConfigureServices(IServiceCollection services)
const string topicName = "topic-dashboard";
cluster
.WithBrokers(new[] { "localhost:9092" })
.EnableAdminMessages("kafka-flow.admin", "kafka-flow.admin.group.id")
.EnableTelemetry("kafka-flow.admin", "kafka-flow.telemetry.group.id")
.EnableAdminMessages("kafkaflow.admin", "kafkaflow.admin.group.id")
.EnableTelemetry("kafkaflow.admin", "kafkaflow.telemetry.group.id")
.CreateTopicIfNotExists(topicName, 3, 1)
.AddConsumer(
consumer =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<TargetFramework>net6.0</TargetFramework>
<IsPackable>false</IsPackable>
<GenerateDocumentationFile>false</GenerateDocumentationFile>
<LangVersion>9</LangVersion>
<LangVersion>10</LangVersion>
<InvariantGlobalization>true</InvariantGlobalization>
</PropertyGroup>

Expand All @@ -18,7 +18,7 @@
</PropertyGroup>

<ItemGroup>
<PackageReference Include="Microsoft.Extensions.DependencyInjection" Version="3.1.17" />
<PackageReference Include="Microsoft.Extensions.DependencyInjection" Version="6.0.1" />
</ItemGroup>

<ItemGroup>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
</ItemGroup>

<ItemGroup>
<PackageReference Include="Microsoft.Extensions.DependencyInjection" Version="3.1.17" />
<PackageReference Include="Microsoft.Extensions.DependencyInjection" Version="6.0.1" />
</ItemGroup>

</Project>
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ public async Task Invoke(IMessageContext context, MiddlewareDelegate next)
}
catch (Exception exception)
{
context.ConsumerContext.ShouldStoreOffset = false;
context.ConsumerContext.AutoMessageCompletion = false;
this.logHandler.Error("Error handling message", exception,
new
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
</ItemGroup>

<ItemGroup>
<PackageReference Include="Microsoft.Extensions.DependencyInjection" Version="3.1.17" />
<PackageReference Include="Microsoft.Extensions.DependencyInjection" Version="6.0.1" />
</ItemGroup>

</Project>
8 changes: 4 additions & 4 deletions samples/KafkaFlow.Sample.WebApi/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
.AddCluster(
cluster => cluster
.WithBrokers(new[] { "localhost:9092" })
.EnableAdminMessages("kafka-flow.admin")
.EnableAdminMessages("kafkaflow.admin")
)
);

Expand All @@ -19,11 +19,11 @@
c =>
{
c.SwaggerDoc(
"kafka-flow",
"kafkaflow",
new OpenApiInfo
{
Title = "KafkaFlow Admin",
Version = "kafka-flow",
Version = "kafkaflow",
});
})
.AddControllers();
Expand All @@ -37,7 +37,7 @@
app.UseSwagger();
app.UseSwaggerUI(c =>
{
c.SwaggerEndpoint("/swagger/kafka-flow/swagger.json", "KafkaFlow Admin");
c.SwaggerEndpoint("/swagger/kafkaflow/swagger.json", "KafkaFlow Admin");
});

var kafkaBus = app.Services.CreateKafkaBus();
Expand Down
2 changes: 1 addition & 1 deletion samples/KafkaFlow.Sample/KafkaFlow.Sample.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
</ItemGroup>

<ItemGroup>
<PackageReference Include="Microsoft.Extensions.DependencyInjection" Version="3.1.17" />
<PackageReference Include="Microsoft.Extensions.DependencyInjection" Version="6.0.1" />
</ItemGroup>

</Project>
2 changes: 1 addition & 1 deletion src/Directory.Build.props
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
<Project>
<PropertyGroup>
<LangVersion>9.0</LangVersion>
<LangVersion>10.0</LangVersion>
<GenerateDocumentationFile>true</GenerateDocumentationFile>
<IncludeSymbols>true</IncludeSymbols>
<SymbolPackageFormat>snupkg</SymbolPackageFormat>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ namespace KafkaFlow.Configuration
{
using System;
using System.Collections.Generic;
using System.Threading.Tasks;

/// <summary>
/// Used to build the consumer configuration
Expand Down Expand Up @@ -91,6 +92,24 @@ public interface IConsumerConfigurationBuilder
/// <returns></returns>
IConsumerConfigurationBuilder WithWorkersCount(int workersCount);

/// <summary>
/// Configures a custom function to dynamically calculate the number of workers.
/// </summary>
/// <param name="calculator">A function that takes a WorkersCountContext object and returns a Task yielding the new workers count</param>
/// <param name="evaluationInterval">The interval that the calculator will be called</param>
/// <returns>The IConsumerConfigurationBuilder instance for method chaining</returns>
IConsumerConfigurationBuilder WithWorkersCount(
Func<WorkersCountContext, IDependencyResolver, Task<int>> calculator,
TimeSpan evaluationInterval);

/// <summary>
/// Configures a custom function to dynamically calculate the number of workers.
/// By default, this function is called every 5 minutes.
/// </summary>
/// <param name="calculator">A function that takes a WorkersCountContext object and returns a Task yielding the new workers count</param>
/// <returns>The IConsumerConfigurationBuilder instance for method chaining</returns>
IConsumerConfigurationBuilder WithWorkersCount(Func<WorkersCountContext, IDependencyResolver, Task<int>> calculator);

/// <summary>
/// Sets how many messages will be buffered for each worker
/// </summary>
Expand Down Expand Up @@ -132,16 +151,11 @@ IConsumerConfigurationBuilder WithWorkDistributionStrategy<T>()
where T : class, IDistributionStrategy;

/// <summary>
/// Offsets will be stored after the execution of the handler and middlewares automatically, this is the default behaviour
/// </summary>
/// <returns></returns>
IConsumerConfigurationBuilder WithAutoStoreOffsets();

/// <summary>
/// The client should call the <see cref="IConsumerContext.StoreOffset()"/>
/// Configures the consumer for manual message completion.
/// The client should call the <see cref="IConsumerContext.Complete"/> to mark the message processing as finished
/// </summary>
/// <returns></returns>
IConsumerConfigurationBuilder WithManualStoreOffsets();
IConsumerConfigurationBuilder WithManualMessageCompletion();

/// <summary>
/// No offsets will be stored on Kafka
Expand Down
23 changes: 23 additions & 0 deletions src/KafkaFlow.Abstractions/Configuration/IGlobalEvents.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
namespace KafkaFlow.Configuration
{
/// <summary>
/// Provides access to events fired by the internals of the library
/// </summary>
public interface IGlobalEvents
{
/// <summary>
/// Gets the message consume started event
/// </summary>
IEvent<MessageEventContext> MessageConsumeStarted { get; }

/// <summary>
/// Gets the message consume ended event
/// </summary>
IEvent<MessageEventContext> MessageConsumeEnded { get; }

/// <summary>
/// Gets the message produce started event
/// </summary>
IEvent<MessageEventContext> MessageProduceStarted { get; }
}
}
Original file line number Diff line number Diff line change
@@ -1,25 +1,31 @@
namespace KafkaFlow.Configuration
namespace KafkaFlow.Configuration;

using System;

/// <summary>
/// A builder to configure KafkaFlow
/// </summary>
public interface IKafkaConfigurationBuilder
{
using System;
/// <summary>
/// Adds a new Cluster
/// </summary>
/// <param name="cluster">A handle to configure the cluster</param>
/// <returns></returns>
IKafkaConfigurationBuilder AddCluster(Action<IClusterConfigurationBuilder> cluster);

/// <summary>
/// A builder to configure KafkaFlow
/// Set the log handler to be used by the Framework, if none is provided the <see cref="NullLogHandler"/> will be used
/// </summary>
public interface IKafkaConfigurationBuilder
{
/// <summary>
/// Adds a new Cluster
/// </summary>
/// <param name="cluster">A handle to configure the cluster</param>
/// <returns></returns>
IKafkaConfigurationBuilder AddCluster(Action<IClusterConfigurationBuilder> cluster);
/// <typeparam name="TLogHandler">A class that implements the <see cref="ILogHandler"/> interface</typeparam>
/// <returns></returns>
IKafkaConfigurationBuilder UseLogHandler<TLogHandler>()
where TLogHandler : ILogHandler;

/// <summary>
/// Set the log handler to be used by the Framework, if none is provided the <see cref="NullLogHandler"/> will be used
/// </summary>
/// <typeparam name="TLogHandler">A class that implements the <see cref="ILogHandler"/> interface</typeparam>
/// <returns></returns>
IKafkaConfigurationBuilder UseLogHandler<TLogHandler>()
where TLogHandler : ILogHandler;
}
/// <summary>
/// Subscribe the global events defined in <see cref="IGlobalEvents"/>
/// </summary>
/// <param name="observers">A handle to subscribe the events</param>
/// <returns></returns>
IKafkaConfigurationBuilder SubscribeGlobalEvents(Action<IGlobalEvents> observers);
}
41 changes: 41 additions & 0 deletions src/KafkaFlow.Abstractions/Configuration/WorkersCountContext.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
namespace KafkaFlow.Configuration
{
using System.Collections.Generic;

/// <summary>
/// A metadata class with some context information help to calculate the number of workers
/// </summary>
public class WorkersCountContext
{
/// <summary>
/// Initializes a new instance of the <see cref="WorkersCountContext"/> class.
/// </summary>
/// <param name="consumerName">The consumer's name</param>
/// <param name="consumerGroupId">The consumer's group id</param>
/// <param name="assignedTopicsPartitions">The consumer's assigned partition</param>
public WorkersCountContext(
string consumerName,
string consumerGroupId,
IReadOnlyCollection<TopicPartitions> assignedTopicsPartitions)
{
this.ConsumerName = consumerName;
this.ConsumerGroupId = consumerGroupId;
this.AssignedTopicsPartitions = assignedTopicsPartitions;
}

/// <summary>
/// Gets the consumer's name
/// </summary>
public string ConsumerName { get; }

/// <summary>
/// Gets the consumer's group id
/// </summary>
public string ConsumerGroupId { get; }

/// <summary>
/// Gets the assigned partitions to the consumer
/// </summary>
public IReadOnlyCollection<TopicPartitions> AssignedTopicsPartitions { get; }
}
}
28 changes: 28 additions & 0 deletions src/KafkaFlow.Abstractions/Consumers/IWorker.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
namespace KafkaFlow
{
/// <summary>
/// Represents the interface of a internal worker
/// </summary>
public interface IWorker
{
/// <summary>
/// Gets worker's id
/// </summary>
int Id { get; }

/// <summary>
/// Gets the subject for worker stopping events where observers can subscribe to receive notifications.
/// </summary>
IEvent WorkerStopping { get; }

/// <summary>
/// Gets the subject for worker stopped events where observers can subscribe to receive notifications.
/// </summary>
IEvent WorkerStopped { get; }

/// <summary>
/// Gets the subject for worker consumption completed events where observers can subscribe to receive notifications.
/// </summary>
IEvent<IMessageContext> WorkerProcessingEnded { get; }
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,35 @@ public static IDependencyConfigurator AddSingleton<TService>(
InstanceLifetime.Singleton);
}

/// <summary>
/// Registers a scoped type mapping where the returned instance will be given by the provided factory
/// </summary>
/// <param name="configurator">The <see cref="IDependencyConfigurator"/> object that this method was called on</param>
/// <typeparam name="TService">Type that will be created</typeparam>
/// <returns></returns>
public static IDependencyConfigurator AddScoped<TService>(this IDependencyConfigurator configurator)
where TService : class
{
return configurator.Add<TService>(InstanceLifetime.Scoped);
}

/// <summary>
/// Registers a scoped type mapping where the returned instance will be given by the provided factory
/// </summary>
/// <param name="configurator">The <see cref="IDependencyConfigurator"/> object that this method was called on</param>
/// <param name="factory">A factory to create new instances of the service implementation</param>
/// <typeparam name="TService">Type that will be created</typeparam>
/// <returns></returns>
public static IDependencyConfigurator AddScoped<TService>(
this IDependencyConfigurator configurator,
Func<IDependencyResolver, TService> factory)
{
return configurator.Add(
typeof(TService),
factory,
InstanceLifetime.Scoped);
}

/// <summary>
/// Registers a transient type mapping
/// </summary>
Expand Down
Loading
Loading