Skip to content

Commit

Permalink
code review
Browse files Browse the repository at this point in the history
  • Loading branch information
simaoribeiro committed Oct 12, 2023
1 parent b0c1733 commit 1c0e18b
Show file tree
Hide file tree
Showing 2 changed files with 6 additions and 6 deletions.
2 changes: 1 addition & 1 deletion src/KafkaFlow/Consumers/ConsumerWorker.cs
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ await this.middlewareExecutor
}
catch (Exception ex)
{
await this.globalEvents.FireMessageConsumeErrorAsync(new MessageEventExceptionContext(context, ex));
await this.globalEvents.FireMessageConsumeErrorAsync(new MessageErrorEventContext(context, ex));

this.logHandler.Error(
"Error processing message",
Expand Down
10 changes: 5 additions & 5 deletions src/KafkaFlow/Producers/MessageProducer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ public async Task<DeliveryResult<byte[], byte[]>> ProduceAsync(

var messageContext = this.CreateMessageContext(topic, messageKey, messageValue, headers);

await this.globalEvents.FireMessageProduceStartedAsync(new MessageEventContext(messageContext, this.dependencyResolver));
await this.globalEvents.FireMessageProduceStartedAsync(new MessageEventContext(messageContext));

await this.middlewareExecutor
.Execute(
Expand All @@ -56,7 +56,7 @@ await this.middlewareExecutor
})
.ConfigureAwait(false);

await this.globalEvents.FireMessageProduceCompletedAsync(new MessageEventContext(messageContext, this.dependencyResolver));
await this.globalEvents.FireMessageProduceCompletedAsync(new MessageEventContext(messageContext));

return report;
}
Expand Down Expand Up @@ -93,7 +93,7 @@ public void Produce(

var messageContext = this.CreateMessageContext(topic, messageKey, messageValue, headers);

this.globalEvents.FireMessageProduceStartedAsync(new MessageEventContext(messageContext, this.dependencyResolver));
this.globalEvents.FireMessageProduceStartedAsync(new MessageEventContext(messageContext));

this.middlewareExecutor
.Execute(
Expand Down Expand Up @@ -139,7 +139,7 @@ public void Produce(
scope.Dispose();
});

this.globalEvents.FireMessageProduceCompletedAsync(new MessageEventContext(messageContext, this.dependencyResolver));
this.globalEvents.FireMessageProduceCompletedAsync(new MessageEventContext(messageContext));
}

public void Produce(
Expand Down Expand Up @@ -283,7 +283,7 @@ private async Task<DeliveryResult<byte[], byte[]>> InternalProduceAsync(IMessage
}
catch (ProduceException<byte[], byte[]> e)
{
await this.globalEvents.FireMessageProduceErrorAsync(new MessageEventExceptionContext(context, e));
await this.globalEvents.FireMessageProduceErrorAsync(new MessageErrorEventContext(context, e));

if (e.Error.IsFatal)
{
Expand Down

0 comments on commit 1c0e18b

Please sign in to comment.