-
Notifications
You must be signed in to change notification settings - Fork 121
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Include EventHub and option to SubscribeGlobalEvents #445
Conversation
fix: change dependency injection lifetime management for worker and consumer/producer fix: create AdminClient without authentication perf: memory optimizations on MessageContext
{ | ||
public IMessageContext MessageContext { get; set; } | ||
|
||
public IDependencyResolver DependencyResolver { get; set; } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
no setters and initialization from ctor. Or just use a record, now we can :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I tried to use a record but my VS kept saying that I couldn't use C#9 features in .netstandard2.0 projects 🤔
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done, please review
var middlewareContext = this.workerDependencyResolverScope.Resolver.Resolve<ConsumerMiddlewareContext>(); | ||
this.eventHub = this.workerDependencyResolverScope.Resolver.Resolve<IEventHub>() as EventHub; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this.workerDependencyResolverScope.Resolver.Resolve<EventHub>()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done, please review
await this.eventHub.FireMessageConsumeStartedAsync( | ||
new MessageEventContext | ||
{ | ||
DependencyResolver = this.workerDependencyResolverScope.Resolver, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe we don't need the Resolver here, since we have it in the IMessageContext
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we remove the IDependencyResolver
from the MessageEventContext
class given that, currently, it only contains the IMessageContext
and IDependencyResolver
objects? 🤔 And only include it later if needed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, that's it 😃
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I ended up keeping as it was and included a constructor with only the IMessageContext
src/KafkaFlow/Event.cs
Outdated
|
||
public Event(ILogHandler logHandler) | ||
{ | ||
this.logHandler = logHandler; | ||
} | ||
|
||
public IEventSubscription Subscribe(Func<TArg, Task> handler) | ||
public void Subscribe(Func<TArg, Task> handler) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why the IEventSubscription
was removed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Error in the rebase process. Solved now
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should some tests be added to cover the new SubscribeGlobalEvents
configuration?
@@ -20,7 +20,7 @@ public interface IEvent | |||
/// Represents an Event to be subscribed. | |||
/// </summary> | |||
/// <typeparam name="TArg">The argument expected by the event.</typeparam> | |||
public interface IEvent<TArg> | |||
public interface IEvent<out TArg> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we need a covariant generic here?
public void AddClusters(IEnumerable<ClusterConfiguration> configurations) => | ||
this.clusters.AddRange(configurations); | ||
} | ||
internal void AddClusters(IEnumerable<ClusterConfiguration> configurations) => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is a reason to move this to internal?
There is an issue in KafkaFlow where the AddClusters method was suggested as a workaround. This change could break those use cases.
return this; | ||
} | ||
|
||
public IKafkaConfigurationBuilder SubscribeGlobalEvents(Action<IGlobalEvents> builder) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please rename the parameter to observers
to be consistent with the interface.
@@ -91,7 +91,7 @@ public interface IConsumerContext | |||
/// By default, this method is automatically invoked when message processing concludes, unless | |||
/// the consumer is configured for manual message completion or the <see cref="AutoMessageCompletion"/> flag is set to false. | |||
/// </summary> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please consider adding the following suggestion:
/// </summary> | |
/// </summary> | |
/// <param name="context">The message context</param> |
@@ -4,14 +4,15 @@ namespace KafkaFlow.Consumers | |||
using System.Threading; | |||
using System.Threading.Channels; | |||
using System.Threading.Tasks; | |||
using KafkaFlow.Configuration; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please consider removing the unnecessary using
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
public Task FireMessageConsumeStartedAsync(MessageEventContext context) | ||
=> this.messageConsumeStarted.FireAsync(context); | ||
|
||
public Task FireMessageConsumeCompletedAsync(MessageEventContext context) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ailton asked:
"Why not to expose the FireAsync in the IEvent interface and reduce the complexity for calling the event?"
#451 (comment)
87cd205
to
d5a9c21
Compare
Description
Includes EventHub and option to SubscribeGlobalEvents in KafkaConfigurationBuilder
Checklist