Skip to content

choose a node randomly when subscribing #28

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: 4.4
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 29 additions & 11 deletions redis/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -1646,7 +1646,7 @@ class ClusterPubSub(PubSub):
https://redis-py-cluster.readthedocs.io/en/stable/pubsub.html
"""

def __init__(self, redis_cluster, node=None, host=None, port=None, replica=False, **kwargs):
def __init__(self, redis_cluster, node=None, host=None, port=None, **kwargs):
"""
When a pubsub instance is created without specifying a node, a single
node will be transparently chosen for the pubsub connection on the
Expand All @@ -1661,7 +1661,6 @@ def __init__(self, redis_cluster, node=None, host=None, port=None, replica=False
:type port: int
"""
self.node = None
self.replica = replica
self.set_pubsub_node(redis_cluster, node, host, port)
connection_pool = (
None
Expand Down Expand Up @@ -1795,8 +1794,16 @@ def get_sharded_message(
if message["channel"] in self.pending_unsubscribe_shard_channels:
self.pending_unsubscribe_shard_channels.remove(message["channel"])
self.shard_channels.pop(message["channel"], None)
node = self.cluster.get_node_from_key(message["channel"], self.replica)
if self.node_pubsub_mapping[node.name].subscribed is False:
slot = self.cluster.keyslot(message["channel"])
slot_cache = self.cluster.nodes_manager.slots_cache.get(slot)
if slot_cache is None or len(slot_cache) == 0:
raise SlotNotCoveredError(f'Slot "{slot}" is not covered by the cluster.')
for node in slot_cache:
p = self.node_pubsub_mapping.get(node.name)
if p is None:
continue
if p.subscribed is not False:
continue
self.node_pubsub_mapping.pop(node.name)
if not self.channels and not self.patterns and not self.shard_channels:
# There are no subscriptions anymore, set subscribed_event flag
Expand All @@ -1812,7 +1819,10 @@ def ssubscribe(self, *args, **kwargs):
s_channels = dict.fromkeys(args)
s_channels.update(kwargs)
for s_channel, handler in s_channels.items():
node = self.cluster.get_node_from_key(s_channel, self.replica)
slot = self.cluster.keyslot(s_channel)
node = self.cluster.nodes_manager.get_node_from_slot(
slot, self.cluster.read_from_replicas
)
pubsub = self._get_node_pubsub(node)
if handler:
pubsub.ssubscribe(**{s_channel: handler})
Expand All @@ -1833,12 +1843,20 @@ def sunsubscribe(self, *args):
args = self.shard_channels

for s_channel in args:
node = self.cluster.get_node_from_key(s_channel, self.replica)
p = self._get_node_pubsub(node)
p.sunsubscribe(s_channel)
self.pending_unsubscribe_shard_channels.update(
p.pending_unsubscribe_shard_channels
)
slot = self.cluster.keyslot(s_channel)
slot_cache = self.cluster.nodes_manager.slots_cache.get(slot)
if slot_cache is None or len(slot_cache) == 0:
raise SlotNotCoveredError(f'Slot "{slot}" is not covered by the cluster.')
for node in slot_cache:
p = self.node_pubsub_mapping.get(node.name)
if p is None:
continue
if s_channel not in p.shard_channels:
continue
p.sunsubscribe(s_channel)
self.pending_unsubscribe_shard_channels.update(
p.pending_unsubscribe_shard_channels
)

def get_redis_connection(self):
"""
Expand Down