Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix flush interval #338

Merged
merged 13 commits into from
Jul 24, 2024
Merged

Conversation

Darthmineboy
Copy link
Contributor

Fixes auto commit at flush interval not being triggered due to constant stream of messages which resets the flush interval.

I was having a bit of an issue with CI that it says There were failures detected in the following suites: stream ./pkg/stream but I was unable to see the failing test. I upgraded ginkgo because I thought it may be related to onsi/ginkgo#973 where test summary is not shown when just 1 test fails.

Turns out that this failure was caused by a data race that was logged as warning but it doesn't mark a specific test as failed. The data race is now fixed. If you want I can revert the ginkgo upgrade.

fixes #337

@Gsantomaggio
Copy link
Member

Thank you. I put @hiimjako in the cc to check the PR.

@hiimjako
Copy link
Collaborator

Looks great to me!

I was also thinking:

  1. If the lock in cacheStoreOffset is needed, because it doesn't look like there are concurrent calls to the function.
  2. If it makes sense to move the reset of messageCountBeforeStorage into cacheStoreOffset, along with lastAutoCommitStored, so that every time you store the offset, both values are reset. Because right now is not when is triggered in the case <-time.After(consumer.options.autoCommitStrategy.flushInterval).

@Gsantomaggio
Copy link
Member

Thanks both.
@hiimjako feel free to merge when it is ok with you

@Darthmineboy
Copy link
Contributor Author

I'll make some improvements after work

@Darthmineboy
Copy link
Contributor Author

Looks great to me!

I was also thinking:

  1. If the lock in cacheStoreOffset is needed, because it doesn't look like there are concurrent calls to the function.
  2. If it makes sense to move the reset of messageCountBeforeStorage into cacheStoreOffset, along with lastAutoCommitStored, so that every time you store the offset, both values are reset. Because right now is not when is triggered in the case <-time.After(consumer.options.autoCommitStrategy.flushInterval).
  1. I thought so as well, but the tests were failing due to a data race. What happens concurrently though is closing the consumer, which would cause the data race. I've made an additional change to resolve another data race issue.
  2. I've moved resetting messageCountBeforeStorage so that it applies to both cases.

…eforeStorage so that we don't need getMessageCountBeforeStorage() and request the mutex twice
@hiimjako
Copy link
Collaborator

What happens concurrently though is closing the consumer, which would cause the data race.

Yeah, right, it also makes sense having both values in a lock.
To avoid it, maybe we could make a channel to stop the consumer routine and fill it at the beginning of the close function, so you don't have concurrent calls, but I would say leave it for future PR.

That's fine for me, thanks for the contribution 😄

@Gsantomaggio Gsantomaggio merged commit 34bcc2d into rabbitmq:main Jul 24, 2024
2 checks passed
@Gsantomaggio
Copy link
Member

Thanks @Darthmineboy @hiimjako

@hiimjako hiimjako added the bug Something isn't working label Jul 29, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

Successfully merging this pull request may close these issues.

consumer auto commit flush interval reset by processed messages
3 participants