diff --git a/src/KafkaFlow/Consumers/ConsumerWorker.cs b/src/KafkaFlow/Consumers/ConsumerWorker.cs index 3df1b9033..b459c22c6 100644 --- a/src/KafkaFlow/Consumers/ConsumerWorker.cs +++ b/src/KafkaFlow/Consumers/ConsumerWorker.cs @@ -139,7 +139,7 @@ await this.middlewareExecutor } catch (Exception ex) { - await this.globalEvents.FireMessageConsumeErrorAsync(new MessageEventExceptionContext(context, ex)); + await this.globalEvents.FireMessageConsumeErrorAsync(new MessageErrorEventContext(context, ex)); this.logHandler.Error( "Error processing message", diff --git a/src/KafkaFlow/Producers/MessageProducer.cs b/src/KafkaFlow/Producers/MessageProducer.cs index 926aa6179..08727fb55 100644 --- a/src/KafkaFlow/Producers/MessageProducer.cs +++ b/src/KafkaFlow/Producers/MessageProducer.cs @@ -42,7 +42,7 @@ public async Task> ProduceAsync( var messageContext = this.CreateMessageContext(topic, messageKey, messageValue, headers); - await this.globalEvents.FireMessageProduceStartedAsync(new MessageEventContext(messageContext, this.dependencyResolver)); + await this.globalEvents.FireMessageProduceStartedAsync(new MessageEventContext(messageContext)); await this.middlewareExecutor .Execute( @@ -56,7 +56,7 @@ await this.middlewareExecutor }) .ConfigureAwait(false); - await this.globalEvents.FireMessageProduceCompletedAsync(new MessageEventContext(messageContext, this.dependencyResolver)); + await this.globalEvents.FireMessageProduceCompletedAsync(new MessageEventContext(messageContext)); return report; } @@ -93,7 +93,7 @@ public void Produce( var messageContext = this.CreateMessageContext(topic, messageKey, messageValue, headers); - this.globalEvents.FireMessageProduceStartedAsync(new MessageEventContext(messageContext, this.dependencyResolver)); + this.globalEvents.FireMessageProduceStartedAsync(new MessageEventContext(messageContext)); this.middlewareExecutor .Execute( @@ -139,7 +139,7 @@ public void Produce( scope.Dispose(); }); - this.globalEvents.FireMessageProduceCompletedAsync(new MessageEventContext(messageContext, this.dependencyResolver)); + this.globalEvents.FireMessageProduceCompletedAsync(new MessageEventContext(messageContext)); } public void Produce( @@ -283,7 +283,7 @@ private async Task> InternalProduceAsync(IMessage } catch (ProduceException e) { - await this.globalEvents.FireMessageProduceErrorAsync(new MessageEventExceptionContext(context, e)); + await this.globalEvents.FireMessageProduceErrorAsync(new MessageErrorEventContext(context, e)); if (e.Error.IsFatal) {