Skip to content

Commit

Permalink
Implement consume from multiple redis clusters
Browse files Browse the repository at this point in the history
  • Loading branch information
dlunch committed Oct 30, 2023
1 parent 013e637 commit 94d2154
Show file tree
Hide file tree
Showing 2 changed files with 53 additions and 16 deletions.
33 changes: 17 additions & 16 deletions kombu/transport/redis_cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -423,24 +423,25 @@ def _brpop_start(self, timeout):
return

for key in queues:
for _, conn, _ in self.connection.cycle._chan_to_sock:
if conn.key == key and conn.in_poll == False:
conn.in_poll = True
conn.timeout = timeout
if conn.key in self.ask_errors:
del self.ask_errors[conn.key]
for client in self.consumer_clients:
for _, conn, _ in self.connection.cycle._chan_to_sock:
if conn.key == key and conn.in_poll == False and conn.cluster_connection == client:
conn.in_poll = True
conn.timeout = timeout
if conn.key in self.ask_errors:
del self.ask_errors[conn.key]
try:
conn.redis_connection.execute_command('ASKING')
except:
logger.exception('Error while sending ASKING', extra={"key": conn.key})
continue

try:
conn.redis_connection.execute_command('ASKING')
conn.redis_connection.connection.send_command('BRPOP', key, timeout)
except:
logger.exception('Error while sending ASKING', extra={"key": conn.key})
continue

try:
conn.redis_connection.connection.send_command('BRPOP', key, timeout)
except:
logger.exception('Error while sending BRPOP', extra={"key": conn.key})
self.connection.cycle._unregister(self, conn, 'BRPOP')
break
logger.exception('Error while sending BRPOP', extra={"key": conn.key})
self.connection.cycle._unregister(self, conn, 'BRPOP')
break

def _brpop_read(self, **options):
conn = options.pop('conn')
Expand Down
36 changes: 36 additions & 0 deletions t/integration/test_redis_cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -209,3 +209,39 @@ def test_many_queue(connection):
assert message.content_encoding == 'utf-8'
assert message.headers == {'k1': 'v1'}
message.ack()


def test_multiple_consume():
consumer_conn = kombu.Connection('redis-cluster://localhost:7000?alt=redis-cluster://localhost:8000')
producer_conn1 = kombu.Connection('redis-cluster://localhost:7000')
producer_conn2 = kombu.Connection('redis-cluster://localhost:8000')

with producer_conn1 as producer:
queue = producer.SimpleQueue('test_multiple_consume')
queue.put({'Hello': 'World'}, headers={'k1': 'v1'})
queue.close()

with consumer_conn as consumer:
queue = consumer.SimpleQueue('test_multiple_consume')
message = queue.get(timeout=10)
assert message.payload == {'Hello': 'World'}
assert message.content_type == 'application/json'
assert message.content_encoding == 'utf-8'
assert message.headers == {'k1': 'v1'}
message.ack()
queue.close()

with producer_conn2 as producer:
queue = producer.SimpleQueue('test_multiple_consume1')
queue.put({'Hello': 'World'}, headers={'k1': 'v1'})
queue.close()

with consumer_conn as consumer:
queue = consumer.SimpleQueue('test_multiple_consume1')
message = queue.get(timeout=10)
assert message.payload == {'Hello': 'World'}
assert message.content_type == 'application/json'
assert message.content_encoding == 'utf-8'
assert message.headers == {'k1': 'v1'}
message.ack()
queue.close()

0 comments on commit 94d2154

Please sign in to comment.