Skip to content

Commit

Permalink
Move to single subscriber
Browse files Browse the repository at this point in the history
Changed separate subscribers for regular and sharedstate messages to a single implementation that uses autodetection of message types on first receive.
  • Loading branch information
jooste committed Aug 30, 2024
1 parent f1f0c08 commit 4e57deb
Show file tree
Hide file tree
Showing 23 changed files with 784 additions and 651 deletions.
139 changes: 0 additions & 139 deletions bluesky/core/remotestore.py

This file was deleted.

3 changes: 2 additions & 1 deletion bluesky/network/__init__.py
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
from bluesky.network.common import get_ownip, seqidx2id, seqid2idx, GROUPID_CLIENT, GROUPID_DEFAULT, GROUPID_NOGROUP, GROUPID_SIM
from bluesky.network.subscription import subscriber
from bluesky.network.subscriber import subscriber, subscribe
from bluesky.network.publisher import state_publisher, StatePublisher
71 changes: 40 additions & 31 deletions bluesky/network/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from bluesky.stack.clientstack import process
from bluesky.network import context as ctx
from bluesky.network.npcodec import encode_ndarray, decode_ndarray
from bluesky.network.subscription import Subscription
from bluesky.network.subscriber import Subscription
from bluesky.network.common import genid, asbytestr, seqid2idx, MSG_SUBSCRIBE, MSG_UNSUBSCRIBE, GROUPID_NOGROUP, GROUPID_CLIENT, GROUPID_SIM, GROUPID_DEFAULT, IDLEN


Expand Down Expand Up @@ -69,7 +69,7 @@ def connect(self, hostname=None, recv_port=None, send_port=None, protocol='tcp')
self.poller.register(self.sock_recv, zmq.POLLIN)
self.poller.register(self.sock_send, zmq.POLLIN)
# Register this client by subscribing to targeted messages
self.subscribe(b'', to_group=self.client_id)
self.subscribe('', '', to_group=self.client_id)

def update(self):
''' Client periodic update function.
Expand Down Expand Up @@ -103,8 +103,10 @@ def receive(self, timeout=0):
if sock == self.sock_recv:
ctx.topic = ctx.msg[0][IDLEN:-IDLEN].decode()
ctx.sender_id = ctx.msg[0][-IDLEN:]
pydata = msgpack.unpackb(ctx.msg[1], object_hook=decode_ndarray, raw=False)
sub = Subscription.subscriptions.get(ctx.topic) or Subscription(ctx.topic, directedonly=True)
pydata = msgpack.unpackb(ctx.msg[1], object_hook=decode_ndarray, raw=False)
sub = Subscription.subscriptions.get(ctx.topic, None)# or Subscription(ctx.topic, directedonly=True)
if sub is None:
continue
# Unpack dict or list, skip empty string
if pydata == '':
sub.emit()
Expand Down Expand Up @@ -148,15 +150,15 @@ def receive(self, timeout=0):

def send(self, topic: str, data: Union[str, Collection]='', to_group: str=''):
btopic = asbytestr(topic)
btarget = asbytestr(to_group or stack.sender() or self.act_id or GROUPID_SIM)
btarget = asbytestr(to_group or stack.sender() or self.act_id or '')
self.sock_send.send_multipart(
[
btarget.ljust(IDLEN, b'*') + btopic + self.client_id,
msgpack.packb(data, default=encode_ndarray, use_bin_type=True)
]
)

def subscribe(self, topic, from_id='', to_group=GROUPID_DEFAULT, actonly=False):
def subscribe(self, topic, from_group=GROUPID_DEFAULT, to_group='', actonly=False):
''' Subscribe to a topic.
Arguments:
Expand All @@ -171,18 +173,18 @@ def subscribe(self, topic, from_id='', to_group=GROUPID_DEFAULT, actonly=False):
sub = Subscription(topic)
sub.actonly = (sub.actonly or actonly)
actonly = sub.actonly
if (from_id, to_group) in sub.subs:
if (from_group, to_group) in sub.subs:
# Subscription already active. Just return Subscription object
return sub
sub.subs.add((from_id, to_group))
sub.subs.add((from_group, to_group))

self._subscribe(topic, from_id, to_group, actonly)
self._subscribe(topic, from_group, to_group, actonly)

# Messages coming in that match this subscription will be emitted using a
# subscription signal
return sub

def unsubscribe(self, topic, from_id='', to_group=GROUPID_DEFAULT):
def unsubscribe(self, topic, from_group=GROUPID_DEFAULT, to_group=''):
''' Unsubscribe from a topic.
Arguments:
Expand All @@ -191,30 +193,37 @@ def unsubscribe(self, topic, from_id='', to_group=GROUPID_DEFAULT):
- to_group: The group mask that this topic is sent to (optional)
'''
if topic:
Subscription(topic).subs.discard((from_id, to_group))
self._unsubscribe(topic, from_id, to_group)

def _subscribe(self, topic, from_id='', to_group=GROUPID_DEFAULT, actonly=False):
topic = asbytestr(topic)
from_id = asbytestr(from_id)
to_group = asbytestr(to_group or GROUPID_CLIENT)
if actonly and not from_id:
self.acttopics[topic].add(to_group)
from_id = self.act_id
if not from_id:
return
self.sock_recv.setsockopt(zmq.SUBSCRIBE, to_group.ljust(IDLEN, b'*') + topic + from_id)
Subscription(topic).subs.discard((from_group, to_group))
self._unsubscribe(topic, from_group, to_group)

def _subscribe(self, topic, from_group=GROUPID_DEFAULT, to_group='', actonly=False):
if from_group == GROUPID_DEFAULT:
from_group = GROUPID_SIM
if actonly:
self.acttopics[topic].add(to_group)
if self.act_id is not None:
bfrom_group = self.act_id
else:
return
btopic = asbytestr(topic)
bfrom_group = asbytestr(from_group)
bto_group = asbytestr(to_group)

def _unsubscribe(self, topic, from_id='', to_group=GROUPID_DEFAULT):
topic = asbytestr(topic)
from_id = asbytestr(from_id)
to_group = asbytestr(to_group or GROUPID_CLIENT)
if not from_id and topic in self.acttopics:
self.sock_recv.setsockopt(zmq.SUBSCRIBE, bto_group.ljust(IDLEN, b'*') + btopic + bfrom_group)

def _unsubscribe(self, topic, from_group=GROUPID_DEFAULT, to_group=''):
if from_group == GROUPID_DEFAULT:
from_group = GROUPID_SIM
btopic = asbytestr(topic)
bfrom_group = asbytestr(from_group)
bto_group = asbytestr(to_group)
if from_group == GROUPID_DEFAULT and topic in self.acttopics:
self.acttopics[topic].discard(to_group)
from_id = self.act_id
if not from_id:
if self.act_id is not None:
bfrom_group = self.act_id
else:
return
self.sock_recv.setsockopt(zmq.UNSUBSCRIBE, to_group.ljust(IDLEN, b'*') + topic + from_id)
self.sock_recv.setsockopt(zmq.UNSUBSCRIBE, bto_group.ljust(IDLEN, b'*') + btopic + bfrom_group)

def actnode(self, newact=None):
''' Set the new active node, or return the current active node. '''
Expand Down
59 changes: 34 additions & 25 deletions bluesky/network/node.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
''' Node test '''
import inspect
import zmq
import msgpack
import bluesky as bs
from bluesky import stack
from bluesky.core import Entity, Signal
from bluesky.network import context as ctx
from bluesky.network.subscription import Subscription
from bluesky.network.subscriber import Subscription
from bluesky.network.npcodec import encode_ndarray, decode_ndarray
from bluesky.network.common import genid, asbytestr, seqidx2id, seqid2idx, MSG_SUBSCRIBE, MSG_UNSUBSCRIBE, GROUPID_NOGROUP, GROUPID_CLIENT, GROUPID_SIM, GROUPID_DEFAULT, IDLEN

Expand Down Expand Up @@ -58,7 +59,7 @@ def connect(self, hostname=None, recv_port=None, send_port=None, protocol='tcp')
self.poller.register(self.sock_recv, zmq.POLLIN)
self.poller.register(self.sock_send, zmq.POLLIN)
# Register this node by subscribing to targeted messages
self.subscribe(b'', to_group=self.node_id)
self.subscribe('', '', to_group=self.node_id)

def close(self):
''' Close all network connections. '''
Expand Down Expand Up @@ -99,7 +100,11 @@ def receive(self, timeout=0):
ctx.topic = ctx.msg[0][IDLEN:-IDLEN].decode()
ctx.sender_id = ctx.msg[0][-IDLEN:]
pydata = msgpack.unpackb(ctx.msg[1], object_hook=decode_ndarray, raw=False)
sub = Subscription.subscriptions.get(ctx.topic) or Subscription(ctx.topic, directedonly=True)
sub = Subscription.subscriptions.get(ctx.topic, None) #or Subscription(ctx.topic, directedonly=True)
if sub is None:
print('No subscription known for', ctx.topic, 'on', self.node_id)
continue

# Unpack dict or list, skip empty string
if pydata == '':
sub.emit()
Expand Down Expand Up @@ -141,16 +146,16 @@ def receive(self, timeout=0):
return False

def send(self, topic, data='', to_group=''):
topic = asbytestr(topic)
to_group = asbytestr(to_group or stack.sender() or GROUPID_CLIENT)
btopic = asbytestr(topic)
bto_group = asbytestr(to_group or stack.sender() or '')
self.sock_send.send_multipart(
[
to_group.ljust(IDLEN, b'*') + topic + self.node_id,
bto_group.ljust(IDLEN, b'*') + btopic + self.node_id,
msgpack.packb(data, default=encode_ndarray, use_bin_type=True)
]
)

def subscribe(self, topic, from_id='', to_group=GROUPID_DEFAULT):
def subscribe(self, topic, from_group=GROUPID_DEFAULT, to_group=''):
''' Subscribe to a topic.
Arguments:
Expand All @@ -161,18 +166,18 @@ def subscribe(self, topic, from_id='', to_group=GROUPID_DEFAULT):
sub = None
if topic:
sub = Subscription(topic)
if (from_id, to_group) in sub.subs:
if (from_group, to_group) in sub.subs:
# Subscription already active. Just return Subscription object
return sub
sub.subs.add((from_id, to_group))
sub.subs.add((from_group, to_group))

self._subscribe(topic, from_id, to_group)
self._subscribe(topic, from_group, to_group)

# Messages coming in that match this subscription will be emitted using a
# subscription signal
return sub

def unsubscribe(self, topic, from_id='', to_group=GROUPID_DEFAULT):
def unsubscribe(self, topic, from_group=GROUPID_DEFAULT, to_group=''):
''' Unsubscribe from a topic.
Arguments:
Expand All @@ -181,20 +186,24 @@ def unsubscribe(self, topic, from_id='', to_group=GROUPID_DEFAULT):
- to_group: The group mask that this topic is sent to (optional)
'''
if topic:
Subscription(topic).subs.discard((from_id, to_group))
self._unsubscribe(topic, from_id, to_group)

def _subscribe(self, topic, from_id='', to_group=GROUPID_DEFAULT):
topic = asbytestr(topic)
from_id = asbytestr(from_id)
to_group = asbytestr(to_group or GROUPID_SIM)
self.sock_recv.setsockopt(zmq.SUBSCRIBE, to_group.ljust(IDLEN, b'*') + topic + from_id)

def _unsubscribe(self, topic, from_id='', to_group=GROUPID_DEFAULT):
topic = asbytestr(topic)
from_id = asbytestr(from_id)
to_group = asbytestr(to_group or GROUPID_SIM)
self.sock_recv.setsockopt(zmq.UNSUBSCRIBE, to_group.ljust(IDLEN, b'*') + topic + from_id)
Subscription(topic).subs.discard((from_group, to_group))
self._unsubscribe(topic, from_group, to_group)

def _subscribe(self, topic, from_group=GROUPID_DEFAULT, to_group=''):
if from_group == GROUPID_DEFAULT:
from_group = GROUPID_CLIENT
btopic = asbytestr(topic)
bfrom_group = asbytestr(from_group)
bto_group = asbytestr(to_group)
self.sock_recv.setsockopt(zmq.SUBSCRIBE, bto_group.ljust(IDLEN, b'*') + btopic + bfrom_group)

def _unsubscribe(self, topic, from_group=GROUPID_DEFAULT, to_group=''):
if from_group == GROUPID_DEFAULT:
from_group = GROUPID_CLIENT
btopic = asbytestr(topic)
bfrom_group = asbytestr(from_group)
bto_group = asbytestr(to_group)
self.sock_recv.setsockopt(zmq.UNSUBSCRIBE, bto_group.ljust(IDLEN, b'*') + btopic + bfrom_group)

def addnodes(self, count=1, *node_ids):
''' Tell the server to add 'count' nodes.
Expand Down
Loading

0 comments on commit 4e57deb

Please sign in to comment.