-
-
Notifications
You must be signed in to change notification settings - Fork 933
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix: redis requeue concurrency bug #1800 #1805
Conversation
@@ -382,7 +382,9 @@ def ack(self, delivery_tag): | |||
def reject(self, delivery_tag, requeue=False): | |||
if requeue: | |||
self.restore_by_tag(delivery_tag, leftmost=True) | |||
self.ack(delivery_tag) | |||
else: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it would be great if you could add unit test for this change and if possible, integration test which will give us a lot of confidence
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok
@@ -382,7 +382,9 @@ def ack(self, delivery_tag): | |||
def reject(self, delivery_tag, requeue=False): | |||
if requeue: | |||
self.restore_by_tag(delivery_tag, leftmost=True) | |||
self.ack(delivery_tag) | |||
else: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it would be great if you could add unit test for this change and if possible, integration test which will give us a lot of confidence
tests seems to be passing |
I feel like we're getting impacted by the similar issue, but not exactly sure. @auvipy is there a plan to move this forward? What we're exactly observing is a case where there is a message that's being worked on ( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you please pull from main branch to test with python 3.12? we got new python on the ci
this bug is ack concurrency caused by del unack key.
for debug i print full stack :
delete a58b8b31-c8c8-4c1e-88d9-f857bbf375a2 unacked_index
File "/home/jiangxf/py3/lib/python3.10/site-packages/kombu/transport/virtual/base.py", line 639, in _callback
return callback(message)
File "/home/jiangxf/py3/lib/python3.10/site-packages/kombu/messaging.py", line 656, in _receive_callback
return on_m(message) if on_m else self.receive(decoded, message)
File "/home/jiangxf/py3/lib/python3.10/site-packages/kombu/messaging.py", line 622, in receive
[callback(body, message) for callback in callbacks]
File "/home/jiangxf/py3/lib/python3.10/site-packages/kombu/messaging.py", line 622, in
[callback(body, message) for callback in callbacks]
File "/home/jiangxf/test_kombu.py", line 22, in message_process_requeue
message.requeue()
File "/home/jiangxf/py3/lib/python3.10/site-packages/kombu/message.py", line 187, in requeue
self.channel.basic_reject(self.delivery_tag, requeue=True)
File "/home/jiangxf/py3/lib/python3.10/site-packages/kombu/transport/virtual/base.py", line 680, in basic_reject
self.qos.reject(delivery_tag, requeue=requeue)
File "/home/jiangxf/py3/lib/python3.10/site-packages/kombu/transport/redis.py", line 385, in reject
self.ack(delivery_tag)
File "/home/jiangxf/py3/lib/python3.10/site-packages/kombu/transport/redis.py", line 379, in ack
self._remove_from_indices(delivery_tag).execute()
File "/home/jiangxf/py3/lib/python3.10/site-packages/kombu/transport/redis.py", line 397, in _remove_from_indices
traceback.print_stack(limit=10)