Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KafkaConsumer#resetToLastCommittedPositions doesn't work with the default throttled ack strategy πŸ€·πŸ»β€β™‚οΈ #2301

Open
Dougniel opened this issue Sep 28, 2023 · 2 comments
Labels

Comments

@Dougniel
Copy link

Dougniel commented Sep 28, 2023

Hi πŸ™‹,

? Context

We process records in batch and we want to send only poisoned records to a dead-letter-queue in case of failure, not all the records in the batch πŸ™….

For that we developed a custom failure strategy🫰to do a retry record per record 1 in which we apply the dead-letter-queue's base implementation strategy. To do the retry, we choose to use KafkaConsumer#resetToLastCommittedPositions (accessed through KafkaClientService) that fits perfectly our needs πŸ‘Œ

But.. sometimes retrying a batch of records also replayed the previous one too, that is where we noticed that the throttled acknowledgement strategy was not aware of that resetToLastCommittedPositions: the pending acks are not flushed to Kafka.

In our case we switched to the latest acknowledgement strategy and it's ok for now🚢, but it looks incoherent behaviour IMO and could lead to some very bad experiences 😞

πŸ™πŸΌ What I would like

Regarding that :

  • KafkaConsumer#resetToLastCommittedPositions is public and accessible through KafkaClientService)
  • the throttled acknowledgement strategy is the default one and pretty useful (thanks guys πŸ‘)

Would it will be possible to adapt the implementation of KafkaConsumer#resetToLastCommittedPositions to update the throttled mechanism accordingly ?

Seems that the actual partitions revocation mechanism could fit well :

  1. flushing the pending acks
  2. clearing off tracks of the received records not yet acknowledged

Thx

Footnotes

  1. Switching a batch into records is done with a PublisherDecorator that transformToIterable the Multi ↩

@ozangunalp
Copy link
Collaborator

Thanks for the detailed issue report!

It does seem like an interesting use case.

Have you tried the smallrye-fault-tolerance @Retry on a method other than the @Incoming one?

Whether you are using the Kafka batching or your own batching policy, you can filter out poisoned records and nack them right away and you might be able to refactor your processing logic to another method that is annotated with @Retry.

Otherwise, if you really must implement a custom failure strategy, what you need is to be able to call the commit strategy from the failure one. By the way, you already have the reference to KafkaConsumer through the KafkaFailureStrategy.Factory create method, you don't need to fetch it through KafkaClientService.

@Dougniel
Copy link
Author

Dougniel commented Oct 6, 2023

Have you tried the smallrye-fault-tolerance @Retry on a method other than the @Incoming one?

Honestly speaking, I didn't invest so much time in this option as I felt lost at first even if it was my first thought, unfortunately I didn't find a way to split a batch message with such mechanism before the retry.

Whether you are using the Kafka batching or your own batching policy, you can filter out poisoned records and nack them right away and you might be able to refactor your processing logic to another method that is annotated with @Retry.

I didn't find any documentation about extension facilities in smallrye-fault-tolerance/@Retry as in smallrye-reactive-messaging, do you have some ideas ?

Otherwise, if you really must implement a custom failure strategy, what you need is to be able to call the commit strategy from the failure one. By the way, you already have the reference to KafkaConsumer through the KafkaFailureStrategy.Factory create method, you don't need to fetch it through KafkaClientService.

Right πŸ‘

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

2 participants