From 4e57debede91dc6b8fe628d2c6649717f95e08c1 Mon Sep 17 00:00:00 2001 From: Joost Ellerbroek Date: Fri, 30 Aug 2024 14:49:41 +0200 Subject: [PATCH] Move to single subscriber Changed separate subscribers for regular and sharedstate messages to a single implementation that uses autodetection of message types on first receive. --- bluesky/core/remotestore.py | 139 ---------- bluesky/network/__init__.py | 3 +- bluesky/network/client.py | 71 ++--- bluesky/network/node.py | 59 ++-- bluesky/network/publisher.py | 141 ++++++++++ bluesky/network/sharedstate.py | 428 ++++++++++++------------------ bluesky/network/subscriber.py | 299 +++++++++++++++++++++ bluesky/network/subscription.py | 121 --------- bluesky/simulation/screenio.py | 15 +- bluesky/simulation/simulation.py | 5 +- bluesky/stack/cmdparser.py | 4 +- bluesky/tools/areafilter.py | 4 +- bluesky/ui/qtgl/arealist.py | 10 +- bluesky/ui/qtgl/console.py | 13 +- bluesky/ui/qtgl/glmap.py | 2 +- bluesky/ui/qtgl/glnavdata.py | 16 +- bluesky/ui/qtgl/glpoly.py | 14 +- bluesky/ui/qtgl/gltraffic.py | 29 +- bluesky/ui/qtgl/mainwindow.py | 13 +- bluesky/ui/qtgl/radarwidget.py | 22 +- bluesky/ui/qtgl/settingswindow.py | 8 +- bluesky/ui/qtgl/tiledtexture.py | 4 +- bluesky/ui/qtgl/trafficlist.py | 15 +- 23 files changed, 784 insertions(+), 651 deletions(-) delete mode 100644 bluesky/core/remotestore.py create mode 100644 bluesky/network/publisher.py create mode 100644 bluesky/network/subscriber.py delete mode 100644 bluesky/network/subscription.py diff --git a/bluesky/core/remotestore.py b/bluesky/core/remotestore.py deleted file mode 100644 index c806d956a0..0000000000 --- a/bluesky/core/remotestore.py +++ /dev/null @@ -1,139 +0,0 @@ -''' Remotestore provides automatic storage of data that needs to be individually - updated and accessed for each individual remote node, and easy access of data of the active node. -''' -from types import SimpleNamespace -from numbers import Number -from copy import deepcopy -from collections import defaultdict -from functools import partial - -import bluesky as bs - - -def _genstore(): - store = deepcopy(defaults) - for g in generators: - g(store) - return store - - -class Store(SimpleNamespace): - def valid(self): - ''' Return True if this store has initialised attributes.''' - return all([bool(v) for v in vars(self).values()] or [False]) - - -# Keep track of default attribute values of mutable type. -# These always need to be stored per remote node. -defaults = Store() -# In some cases (such as for non-copyable types) a generator is specified -# instead of a default value -generators = list() - -# Keep a dict of remote state storage namespaces -remotes = defaultdict(_genstore) - - -def reset(remote_id=None): - ''' Reset data for remote ''' - remotes[remote_id or bs.net.act_id] = _genstore() - - -def get(remote_id=None, group=None): - ''' Retrieve a remote store, or a group in a remote store. - Returns the store of the active remote if no remote id is provided. - ''' - return (remotes[remote_id or bs.net.act_id] if group is None else - getattr(remotes[remote_id or bs.net.act_id], group)) - - -def setvalue(name, value, remote_id=None, group=None): - ''' Set the value of attribute 'name' in group 'group' for remote store with id 'remote_id' - Sets value in store of the active remote if no remote_id is provided. - ''' - setattr(remotes[remote_id or bs.net.act_id] if group is None else - getattr(remotes[remote_id or bs.net.act_id], group), name, value) - - -def setdefault(name, default, group=None): - ''' Set the default value for variable 'name' in group 'group' ''' - target = getattr(defaults, group, None) if group else defaults - if not target: - return setattr(defaults, group, Store(**{name:default})) - setattr(target, name, default) - - -def addgroup(name): - ''' Add a storage group to each remote data store. ''' - if hasattr(defaults, name): - return - # Add store to the defaults - setattr(defaults, name, Store()) - - # Also add to existing stores if necessary - for remote in remotes.values(): - setattr(remote, name, Store()) - - -def _generator(store, name, objtype, args, kwargs, group=None): - setattr(getattr(store, group) if group else store, name, objtype(*args, **kwargs)) - - -class ActData: - ''' Access data from the active remote as if it is a member variable. ''' - __slots__ = ('default', 'name', 'group') - - def __init__(self, *args, name='', group=None, **kwargs): - self.default = (args, kwargs) - self.name = name - self.group = group - - def __set_name__(self, owner, name): - if not self.name: - # Get name from attribute name if not previously specified - self.name = name - args, kwargs = self.default - # Retrieve annotated object type if present - objtype = owner.__annotations__.get(name) - if objtype: - self.default = objtype(*args, **kwargs) - elif len(args) != 1: - raise AttributeError('A default value and/or a type annotation should be provided with ActData') - else: - self.default = args[0] - - # If underlying datatype is mutable, always immediately - # store per remote node - if not isinstance(self.default, (str, tuple, Number, frozenset, bytes)): - # If an annotated object type is specified create a generator for it - if objtype: - generators.append(partial(_generator, name=name, objtype=objtype, args=args, kwargs=kwargs, group=self.group)) - # Add group if it doesn't exist yet - if self.group is not None: - addgroup(self.group) - - # In case remote data already exists, update stores - for remote_id in remotes.keys(): - setvalue(name, objtype(*args, **kwargs), remote_id, self.group) - # Otherwise assume deepcopy can be used to generate initial values per remote - else: - setdefault(name, self.default, self.group) - - # In case remote data already exists, update stores - for remote_id in remotes.keys(): - setvalue(name, deepcopy(self.default), remote_id, self.group) - - def __get__(self, obj, objtype=None): - ''' Return the actual value for the currently active node. ''' - if not bs.net.act_id: - return self.default - return getattr( - remotes[bs.net.act_id] if self.group is None else getattr(remotes[bs.net.act_id], self.group), - self.name, self.default) - # TODO: What is the active (client) node on the sim-side? Is this always within a currently processed stack command? -> stack.sender_id - - def __set__(self, obj, value): - if not bs.net.act_id: - self.default = value - else: - setattr(remotes[bs.net.act_id] if self.group is None else getattr(remotes[bs.net.act_id], self.group), self.name, value) diff --git a/bluesky/network/__init__.py b/bluesky/network/__init__.py index ca89414887..3d524bcc92 100644 --- a/bluesky/network/__init__.py +++ b/bluesky/network/__init__.py @@ -1,2 +1,3 @@ from bluesky.network.common import get_ownip, seqidx2id, seqid2idx, GROUPID_CLIENT, GROUPID_DEFAULT, GROUPID_NOGROUP, GROUPID_SIM -from bluesky.network.subscription import subscriber \ No newline at end of file +from bluesky.network.subscriber import subscriber, subscribe +from bluesky.network.publisher import state_publisher, StatePublisher diff --git a/bluesky/network/client.py b/bluesky/network/client.py index 06891ce7f6..59c878789b 100644 --- a/bluesky/network/client.py +++ b/bluesky/network/client.py @@ -9,7 +9,7 @@ from bluesky.stack.clientstack import process from bluesky.network import context as ctx from bluesky.network.npcodec import encode_ndarray, decode_ndarray -from bluesky.network.subscription import Subscription +from bluesky.network.subscriber import Subscription from bluesky.network.common import genid, asbytestr, seqid2idx, MSG_SUBSCRIBE, MSG_UNSUBSCRIBE, GROUPID_NOGROUP, GROUPID_CLIENT, GROUPID_SIM, GROUPID_DEFAULT, IDLEN @@ -69,7 +69,7 @@ def connect(self, hostname=None, recv_port=None, send_port=None, protocol='tcp') self.poller.register(self.sock_recv, zmq.POLLIN) self.poller.register(self.sock_send, zmq.POLLIN) # Register this client by subscribing to targeted messages - self.subscribe(b'', to_group=self.client_id) + self.subscribe('', '', to_group=self.client_id) def update(self): ''' Client periodic update function. @@ -103,8 +103,10 @@ def receive(self, timeout=0): if sock == self.sock_recv: ctx.topic = ctx.msg[0][IDLEN:-IDLEN].decode() ctx.sender_id = ctx.msg[0][-IDLEN:] - pydata = msgpack.unpackb(ctx.msg[1], object_hook=decode_ndarray, raw=False) - sub = Subscription.subscriptions.get(ctx.topic) or Subscription(ctx.topic, directedonly=True) + pydata = msgpack.unpackb(ctx.msg[1], object_hook=decode_ndarray, raw=False) + sub = Subscription.subscriptions.get(ctx.topic, None)# or Subscription(ctx.topic, directedonly=True) + if sub is None: + continue # Unpack dict or list, skip empty string if pydata == '': sub.emit() @@ -148,7 +150,7 @@ def receive(self, timeout=0): def send(self, topic: str, data: Union[str, Collection]='', to_group: str=''): btopic = asbytestr(topic) - btarget = asbytestr(to_group or stack.sender() or self.act_id or GROUPID_SIM) + btarget = asbytestr(to_group or stack.sender() or self.act_id or '') self.sock_send.send_multipart( [ btarget.ljust(IDLEN, b'*') + btopic + self.client_id, @@ -156,7 +158,7 @@ def send(self, topic: str, data: Union[str, Collection]='', to_group: str=''): ] ) - def subscribe(self, topic, from_id='', to_group=GROUPID_DEFAULT, actonly=False): + def subscribe(self, topic, from_group=GROUPID_DEFAULT, to_group='', actonly=False): ''' Subscribe to a topic. Arguments: @@ -171,18 +173,18 @@ def subscribe(self, topic, from_id='', to_group=GROUPID_DEFAULT, actonly=False): sub = Subscription(topic) sub.actonly = (sub.actonly or actonly) actonly = sub.actonly - if (from_id, to_group) in sub.subs: + if (from_group, to_group) in sub.subs: # Subscription already active. Just return Subscription object return sub - sub.subs.add((from_id, to_group)) + sub.subs.add((from_group, to_group)) - self._subscribe(topic, from_id, to_group, actonly) + self._subscribe(topic, from_group, to_group, actonly) # Messages coming in that match this subscription will be emitted using a # subscription signal return sub - def unsubscribe(self, topic, from_id='', to_group=GROUPID_DEFAULT): + def unsubscribe(self, topic, from_group=GROUPID_DEFAULT, to_group=''): ''' Unsubscribe from a topic. Arguments: @@ -191,30 +193,37 @@ def unsubscribe(self, topic, from_id='', to_group=GROUPID_DEFAULT): - to_group: The group mask that this topic is sent to (optional) ''' if topic: - Subscription(topic).subs.discard((from_id, to_group)) - self._unsubscribe(topic, from_id, to_group) - - def _subscribe(self, topic, from_id='', to_group=GROUPID_DEFAULT, actonly=False): - topic = asbytestr(topic) - from_id = asbytestr(from_id) - to_group = asbytestr(to_group or GROUPID_CLIENT) - if actonly and not from_id: - self.acttopics[topic].add(to_group) - from_id = self.act_id - if not from_id: - return - self.sock_recv.setsockopt(zmq.SUBSCRIBE, to_group.ljust(IDLEN, b'*') + topic + from_id) + Subscription(topic).subs.discard((from_group, to_group)) + self._unsubscribe(topic, from_group, to_group) + + def _subscribe(self, topic, from_group=GROUPID_DEFAULT, to_group='', actonly=False): + if from_group == GROUPID_DEFAULT: + from_group = GROUPID_SIM + if actonly: + self.acttopics[topic].add(to_group) + if self.act_id is not None: + bfrom_group = self.act_id + else: + return + btopic = asbytestr(topic) + bfrom_group = asbytestr(from_group) + bto_group = asbytestr(to_group) - def _unsubscribe(self, topic, from_id='', to_group=GROUPID_DEFAULT): - topic = asbytestr(topic) - from_id = asbytestr(from_id) - to_group = asbytestr(to_group or GROUPID_CLIENT) - if not from_id and topic in self.acttopics: + self.sock_recv.setsockopt(zmq.SUBSCRIBE, bto_group.ljust(IDLEN, b'*') + btopic + bfrom_group) + + def _unsubscribe(self, topic, from_group=GROUPID_DEFAULT, to_group=''): + if from_group == GROUPID_DEFAULT: + from_group = GROUPID_SIM + btopic = asbytestr(topic) + bfrom_group = asbytestr(from_group) + bto_group = asbytestr(to_group) + if from_group == GROUPID_DEFAULT and topic in self.acttopics: self.acttopics[topic].discard(to_group) - from_id = self.act_id - if not from_id: + if self.act_id is not None: + bfrom_group = self.act_id + else: return - self.sock_recv.setsockopt(zmq.UNSUBSCRIBE, to_group.ljust(IDLEN, b'*') + topic + from_id) + self.sock_recv.setsockopt(zmq.UNSUBSCRIBE, bto_group.ljust(IDLEN, b'*') + btopic + bfrom_group) def actnode(self, newact=None): ''' Set the new active node, or return the current active node. ''' diff --git a/bluesky/network/node.py b/bluesky/network/node.py index ab698cd6c6..b41dd2f2af 100644 --- a/bluesky/network/node.py +++ b/bluesky/network/node.py @@ -1,11 +1,12 @@ ''' Node test ''' +import inspect import zmq import msgpack import bluesky as bs from bluesky import stack from bluesky.core import Entity, Signal from bluesky.network import context as ctx -from bluesky.network.subscription import Subscription +from bluesky.network.subscriber import Subscription from bluesky.network.npcodec import encode_ndarray, decode_ndarray from bluesky.network.common import genid, asbytestr, seqidx2id, seqid2idx, MSG_SUBSCRIBE, MSG_UNSUBSCRIBE, GROUPID_NOGROUP, GROUPID_CLIENT, GROUPID_SIM, GROUPID_DEFAULT, IDLEN @@ -58,7 +59,7 @@ def connect(self, hostname=None, recv_port=None, send_port=None, protocol='tcp') self.poller.register(self.sock_recv, zmq.POLLIN) self.poller.register(self.sock_send, zmq.POLLIN) # Register this node by subscribing to targeted messages - self.subscribe(b'', to_group=self.node_id) + self.subscribe('', '', to_group=self.node_id) def close(self): ''' Close all network connections. ''' @@ -99,7 +100,11 @@ def receive(self, timeout=0): ctx.topic = ctx.msg[0][IDLEN:-IDLEN].decode() ctx.sender_id = ctx.msg[0][-IDLEN:] pydata = msgpack.unpackb(ctx.msg[1], object_hook=decode_ndarray, raw=False) - sub = Subscription.subscriptions.get(ctx.topic) or Subscription(ctx.topic, directedonly=True) + sub = Subscription.subscriptions.get(ctx.topic, None) #or Subscription(ctx.topic, directedonly=True) + if sub is None: + print('No subscription known for', ctx.topic, 'on', self.node_id) + continue + # Unpack dict or list, skip empty string if pydata == '': sub.emit() @@ -141,16 +146,16 @@ def receive(self, timeout=0): return False def send(self, topic, data='', to_group=''): - topic = asbytestr(topic) - to_group = asbytestr(to_group or stack.sender() or GROUPID_CLIENT) + btopic = asbytestr(topic) + bto_group = asbytestr(to_group or stack.sender() or '') self.sock_send.send_multipart( [ - to_group.ljust(IDLEN, b'*') + topic + self.node_id, + bto_group.ljust(IDLEN, b'*') + btopic + self.node_id, msgpack.packb(data, default=encode_ndarray, use_bin_type=True) ] ) - def subscribe(self, topic, from_id='', to_group=GROUPID_DEFAULT): + def subscribe(self, topic, from_group=GROUPID_DEFAULT, to_group=''): ''' Subscribe to a topic. Arguments: @@ -161,18 +166,18 @@ def subscribe(self, topic, from_id='', to_group=GROUPID_DEFAULT): sub = None if topic: sub = Subscription(topic) - if (from_id, to_group) in sub.subs: + if (from_group, to_group) in sub.subs: # Subscription already active. Just return Subscription object return sub - sub.subs.add((from_id, to_group)) + sub.subs.add((from_group, to_group)) - self._subscribe(topic, from_id, to_group) + self._subscribe(topic, from_group, to_group) # Messages coming in that match this subscription will be emitted using a # subscription signal return sub - def unsubscribe(self, topic, from_id='', to_group=GROUPID_DEFAULT): + def unsubscribe(self, topic, from_group=GROUPID_DEFAULT, to_group=''): ''' Unsubscribe from a topic. Arguments: @@ -181,20 +186,24 @@ def unsubscribe(self, topic, from_id='', to_group=GROUPID_DEFAULT): - to_group: The group mask that this topic is sent to (optional) ''' if topic: - Subscription(topic).subs.discard((from_id, to_group)) - self._unsubscribe(topic, from_id, to_group) - - def _subscribe(self, topic, from_id='', to_group=GROUPID_DEFAULT): - topic = asbytestr(topic) - from_id = asbytestr(from_id) - to_group = asbytestr(to_group or GROUPID_SIM) - self.sock_recv.setsockopt(zmq.SUBSCRIBE, to_group.ljust(IDLEN, b'*') + topic + from_id) - - def _unsubscribe(self, topic, from_id='', to_group=GROUPID_DEFAULT): - topic = asbytestr(topic) - from_id = asbytestr(from_id) - to_group = asbytestr(to_group or GROUPID_SIM) - self.sock_recv.setsockopt(zmq.UNSUBSCRIBE, to_group.ljust(IDLEN, b'*') + topic + from_id) + Subscription(topic).subs.discard((from_group, to_group)) + self._unsubscribe(topic, from_group, to_group) + + def _subscribe(self, topic, from_group=GROUPID_DEFAULT, to_group=''): + if from_group == GROUPID_DEFAULT: + from_group = GROUPID_CLIENT + btopic = asbytestr(topic) + bfrom_group = asbytestr(from_group) + bto_group = asbytestr(to_group) + self.sock_recv.setsockopt(zmq.SUBSCRIBE, bto_group.ljust(IDLEN, b'*') + btopic + bfrom_group) + + def _unsubscribe(self, topic, from_group=GROUPID_DEFAULT, to_group=''): + if from_group == GROUPID_DEFAULT: + from_group = GROUPID_CLIENT + btopic = asbytestr(topic) + bfrom_group = asbytestr(from_group) + bto_group = asbytestr(to_group) + self.sock_recv.setsockopt(zmq.UNSUBSCRIBE, bto_group.ljust(IDLEN, b'*') + btopic + bfrom_group) def addnodes(self, count=1, *node_ids): ''' Tell the server to add 'count' nodes. diff --git a/bluesky/network/publisher.py b/bluesky/network/publisher.py new file mode 100644 index 0000000000..1b81bbecf7 --- /dev/null +++ b/bluesky/network/publisher.py @@ -0,0 +1,141 @@ +import inspect +from collections import defaultdict +from typing import Callable, Optional + +import bluesky as bs +from bluesky.core.funcobject import FuncObject +from bluesky.core.timedfunction import timed_function +from bluesky.core.walltime import Timer +from bluesky.network.common import ActionType +from bluesky.network.subscriber import subscriber +import bluesky.network.context as ctx + + +class PublisherMeta(type): + __publishers__ = dict() + __timers__ = dict() + + def __call__(cls, topic: str, dt=None, collect=False): + pub = PublisherMeta.__publishers__.get(topic) + if pub is None: + pub = PublisherMeta.__publishers__[topic] = super().__call__(topic, dt, collect) + # If dt is specified, also create a timer + if dt is not None: + if dt not in PublisherMeta.__timers__: + timer = PublisherMeta.__timers__[dt] = Timer(dt) + else: + timer = PublisherMeta.__timers__[dt] + timer.timeout.connect(pub.send_replace) + return pub + + @subscriber + @staticmethod + def request(*topics): + ''' Clients can request a full state update, to which this function responds. ''' + for pub in (p for p in map(PublisherMeta.__publishers__.get, topics) if p is not None): + pub.send_replace(to_group=ctx.sender_id) + + +class StatePublisher(metaclass=PublisherMeta): + ''' BlueSky shared state publisher class. + + Use this class if you want more control over publishing shared state data. + Unlike the state_publisher decorator this class allows you to also send + update, delete, append, and extend actions. + ''' + __collect__ = defaultdict(list) + + @classmethod + def collect(cls, topic: str, payload: list, to_group: str=''): + if payload[0] == ActionType.Replace: + cls.__collect__[(topic, to_group)] = payload + return + store = cls.__collect__[(topic, to_group)] + if not store or store[-2] not in (payload[0], ActionType.Replace.value, ActionType.Extend.value): + if payload[0] == ActionType.Append.value: + payload[0] = ActionType.Extend.value + payload[1] = {k:[v] for k, v in payload[1].items()} + store.extend(payload) + elif payload[0] == ActionType.Update.value: + recursive_update(store[1], payload[1]) + elif payload[0] == ActionType.Append.value: + for key, item in payload[1].items(): + store[1][key].append(item) + elif payload[0] == ActionType.Extend.value: + for key, item in payload[1].items(): + store[1][key].extend(item) + + @staticmethod + @timed_function(hook=('update', 'hold')) + def send_collected(): + while StatePublisher.__collect__: + (topic, to_group), payload = StatePublisher.__collect__.popitem() + bs.net.send(topic, payload, to_group) + + def __init__(self, topic: str, dt=None, collect=False) -> None: + self.topic = topic + self.dt = dt + self.collects = collect + + @staticmethod + def get_payload(): + ''' Default payload function returns None ''' + return + + def send_update(self, to_group=b'', **data): + if data: + if self.collects: + self.collect(self.topic, [ActionType.Update.value, data], to_group) + else: + bs.net.send(self.topic, [ActionType.Update.value, data], to_group) + + + def send_delete(self, to_group=b'', **keys): + if keys: + bs.net.send(self.topic, [ActionType.Delete.value, keys], to_group) + + + def send_append(self, to_group=b'', **data): + if data: + if self.collects: + self.collect(self.topic, [ActionType.Append.value, data], to_group) + else: + bs.net.send(self.topic, [ActionType.Append.value, data], to_group) + + def send_extend(self, to_group=b'', **data): + if data: + if self.collects: + self.collect(self.topic, [ActionType.Extend.value, data], to_group) + else: + bs.net.send(self.topic, [ActionType.Extend.value, data], to_group) + + def send_replace(self, to_group=b'', **data): + data = data or self.get_payload() + if data: + bs.net.send(self.topic, [ActionType.Replace.value, data], to_group) + + def payload(self, func: Callable): + ''' Decorator method to specify payload getter for this Publisher. ''' + self.get_payload = FuncObject(func) + return func + + +def state_publisher(func: Optional[Callable] = None, *, topic='', dt=None): + ''' BlueSky shared state publisher decorator. + + Use this decorator instead of a StatePublisher object if you only want to send full updates. + Functions decorated with this decorator will be: + - periodically called at interval dt + - called when a subscriber requests a full state + + Decorated function should return a dictionary with all relevant state data. + ''' + def deco(func): + ifunc = inspect.unwrap(func, stop=lambda f:not isinstance(func, (staticmethod, classmethod))) + itopic = (topic or ifunc.__name__).upper() + + StatePublisher(itopic, dt).payload(func) + return func + + # Allow both @publisher and @publisher(args) + return deco if func is None else deco(func) \ No newline at end of file diff --git a/bluesky/network/sharedstate.py b/bluesky/network/sharedstate.py index 5a615cfb97..5f845a5f28 100644 --- a/bluesky/network/sharedstate.py +++ b/bluesky/network/sharedstate.py @@ -1,111 +1,125 @@ -''' BlueSky shared state class. +''' BlueSky shared state classes and functions. - This class is used to keep a shared state across client(s) and simulation node(s) + BlueSky's sharedstate is used to keep a shared state + across client(s) and simulation node(s) ''' import inspect -from collections import defaultdict -from enum import Enum import numpy as np -from typing import Dict, Callable, Optional +from numbers import Number +from functools import partial +from types import SimpleNamespace +from copy import deepcopy +from collections import defaultdict import bluesky as bs -from bluesky.core import signal, remotestore as rs -from bluesky.core.funcobject import FuncObject -from bluesky.core.timedfunction import timed_function -from bluesky.core.walltime import Timer from bluesky.network import context as ctx -from bluesky.network.subscription import Subscription, subscriber as nwsub -from bluesky.plugins.trafgenclasses import incircle -#TODO: trigger voor actnode changed? +def reset(remote_id=None): + ''' Reset shared state data to defaults for remote simulation. ''' + remotes[remote_id or bs.net.act_id] = _genstore() -# Keep track of the set of subscribed topics. Store signals to emit -# whenever a state update of each topic is received -changed: Dict[str, signal.Signal] = dict() +def get(remote_id=None, group=None): + ''' Retrieve a remote store, or a group in a remote store. + Returns the store of the active remote if no remote id is provided. + ''' + return (remotes[remote_id or bs.net.act_id] if group is None else + getattr(remotes[remote_id or bs.net.act_id], group)) -class ActionType(Enum): - ''' Shared state action types. - - An incoming shared state update can be of the following types: - Append: An item is appended to the state - Extend: Two or more items are appended to the state - Delete: One or more items are deleted from the state - Update: One or more items within the state are updated - Replace: The full state object is replaced - Reset: The entire object is reset to its (empty) default - ActChange: A new active remote is selected + +def setvalue(name, value, remote_id=None, group=None): + ''' Set the value of attribute 'name' in group 'group' for remote store with id 'remote_id' + Sets value in store of the active remote if no remote_id is provided. ''' - Append = b'A' - Extend = b'E' - Delete = b'D' - Update = b'U' - Replace = b'R' - Reset = b'X' - ActChange = b'C' - - -@nwsub -def reset(*args): - ''' Process incoming RESET events. ''' - rs.reset(ctx.sender_id) - if ctx.sender_id == bs.net.act_id: - ctx.action = ActionType.Reset - ctx.action_content = None - for topic, sig in changed.items(): - store = rs.get(group=topic.lower()) - sig.emit(store) - ctx.action = None - - -@signal.subscriber(topic='actnode-changed') -def on_actnode_changed(act_id): - ctx.action = ActionType.ActChange - ctx.action_content = None - for topic, sig in changed.items(): - store = rs.get(group=topic.lower()) - sig.emit(store) - ctx.action = None - - -@signal.subscriber(topic='node-added') -def on_node_added(node_id): - ''' When a new node is announced, request the initial/current state of all - subscribed shared states. + setattr(remotes[remote_id or bs.net.act_id] if group is None else + getattr(remotes[remote_id or bs.net.act_id], group), name, value) + + +def setdefault(name, default, group=None): + ''' Set the default value for variable 'name' in group 'group' ''' + target = getattr(defaults, group, None) if group else defaults + if not target: + return setattr(defaults, group, Store(**{name:default})) + setattr(target, name, default) + + +def addtopic(topic): + ''' Add a sharedstate topic if it doesn't yet exist. + + This creates a storage group for this topic, which is added to each + remote data store. + + Arguments: + - topic: The sharedstate topic to add ''' - bs.net.send('REQUEST', list(changed.keys()), to_group=node_id) + topic = topic.lower() + # No creation needed if topic is already known + if hasattr(defaults, topic): + return + + # Add store to the defaults + setattr(defaults, topic, Store()) + # Also add to existing stores if necessary + for remote in remotes.values(): + setattr(remote, topic, Store()) -def receive(action, data): - ''' Retrieve and process state data. ''' - store = rs.get(ctx.sender_id, ctx.topic.lower()) - # Store sharedstate context - ctx.action = ActionType(action) - ctx.action_content = data +class Store(SimpleNamespace): + ''' Simple storage object for nested storage of state data per simulation node. ''' + def valid(self): + ''' Return True if this store has initialised attributes.''' + return all([bool(v) for v in vars(self).values()] or [False]) - if ctx.action == ActionType.Update: + def update(self, data): + ''' Update a value in this store. ''' for key, item in data.items(): - container = getattr(store, key, None) + container = getattr(self, key, None) if container is None: - setattr(store, key, item) + setattr(self, key, item) elif isinstance(container, dict): - recursive_update(container, item) + _recursive_update(container, item) else: for idx, value in item.items(): container[idx] = value - elif ctx.action == ActionType.Delete: + + def append(self, data): + ''' Append data to (lists/arrays in) this store. ''' + for key, item in data.items(): + container = getattr(self, key, None) + if container is None: + setattr(self, key, [item]) + elif isinstance(container, np.ndarray): + setattr(self, key, np.append(container, item)) + + def extend(self, data): + ''' Extend data in (lists/arrays in) this store. ''' + for key, item in data.items(): + container = getattr(self, key, None) + if container is None: + setattr(self, key, item) + elif isinstance(container, list): + container.extend(item) + elif isinstance(container, np.ndarray): + setattr(self, key, np.concatenate([container, item])) + + def replace(self, data): + ''' Replace data containers in this store. ''' + vars(self).update(data) + + def delete(self, data): + ''' Delete data from this store. ''' # We are expecting either an index, or a key value from a reference variable for key, item in data.items(): idx = None - if key not in vars(store): - # Assume we are receiving an index to arrays/lists in our store + if key not in vars(self): + # Assume we are receiving an index to arrays/lists in this store if not isinstance(item, int): raise ValueError(f"Expected integer index for delete {key} in topic {ctx.topic}") idx = item else: - ref = getattr(store, key) + ref = getattr(self, key) # If ref is a dict, this delete action should only act on the dict. if isinstance(ref, dict): if isinstance(item, (list, tuple)): @@ -126,11 +140,11 @@ def receive(action, data): if idx is None: continue - for container in vars(store).values(): + for container in vars(self).values(): if isinstance(container, np.ndarray): mask = np.ones_like(container, dtype=bool) mask[idx] = False - setattr(store, key, container[mask]) + setattr(self, key, container[mask]) elif isinstance(container, list): if isinstance(idx, int): container.pop(idx) @@ -139,193 +153,97 @@ def receive(action, data): for iidx in reversed(sorted(idx)): container.pop(iidx) - elif ctx.action == ActionType.Append: - for key, item in data.items(): - container = getattr(store, key, None) - if container is None: - setattr(store, key, [item]) - elif isinstance(container, np.ndarray): - setattr(store, key, np.append(container, item)) - - elif ctx.action == ActionType.Extend: - for key, item in data.items(): - container = getattr(store, key, None) - if container is None: - setattr(store, key, item) - elif isinstance(container, list): - container.extend(item) - elif isinstance(container, np.ndarray): - setattr(store, key, np.concatenate([container, item])) - elif ctx.action == ActionType.Replace: - vars(store).update(data) - - # Inform subscribers of state update - # TODO: what to do with act vs all? - if ctx.sender_id == bs.net.act_id: - changed[ctx.topic].emit(store) - - # Reset context variables - ctx.action = None - ctx.action_content = None - - -def recursive_update(target, source): +class ActData: + ''' Access shared state data from the active remote as if it is a member variable. ''' + __slots__ = ('default', 'name', 'group') + + def __init__(self, *args, name='', group=None, **kwargs): + self.default = (args, kwargs) + self.name = name + self.group = group + + def __set_name__(self, owner, name): + if not self.name: + # Get name from attribute name if not previously specified + self.name = name + args, kwargs = self.default + # Retrieve annotated object type if present + objtype = owner.__annotations__.get(name) + if objtype: + self.default = objtype(*args, **kwargs) + elif len(args) != 1: + raise AttributeError('A default value and/or a type annotation should be provided with ActData') + else: + self.default = args[0] + + # If underlying datatype is mutable, always immediately + # store per remote node + if not isinstance(self.default, (str, tuple, Number, frozenset, bytes)): + # If an annotated object type is specified create a generator for it + if objtype: + generators.append(partial(_generator, name=name, objtype=objtype, args=args, kwargs=kwargs, group=self.group)) + # Add group if it doesn't exist yet + if self.group is not None: + addtopic(self.group) + + # In case remote data already exists, update stores + for remote_id in remotes.keys(): + setvalue(name, objtype(*args, **kwargs), remote_id, self.group) + # Otherwise assume deepcopy can be used to generate initial values per remote + else: + setdefault(name, self.default, self.group) + + # In case remote data already exists, update stores + for remote_id in remotes.keys(): + setvalue(name, deepcopy(self.default), remote_id, self.group) + + def __get__(self, obj, objtype=None): + ''' Return the actual value for the currently active node. ''' + if not bs.net.act_id: + return self.default + return getattr( + remotes[bs.net.act_id] if self.group is None else getattr(remotes[bs.net.act_id], self.group), + self.name, self.default) + # TODO: What is the active (client) node on the sim-side? Is this always within a currently processed stack command? -> stack.sender_id + + def __set__(self, obj, value): + if not bs.net.act_id: + self.default = value + else: + setattr(remotes[bs.net.act_id] if self.group is None else getattr(remotes[bs.net.act_id], self.group), self.name, value) + + +def _recursive_update(target, source): + ''' Recursively update nested dicts/lists/arrays in a Store. ''' for k, v in source.items(): if isinstance(v, dict): inner = target.get(k) if inner is not None: - recursive_update(inner, v) + _recursive_update(inner, v) continue target[k] = v -def subscribe(topic:str, *, actonly=False) -> signal.Signal: - ''' Subscribe to a SharedState topic. - - This function is called internally when a callback function is decorated - to subscribe to a SharedState topic, but can also be used to subscribe - to a SharedState topic when you don't wish to provide a callback function. - ''' - topic = topic.upper() - # Create a new network subscription only if it doesn't exist yet - if topic not in changed: - # Subscribe to this network topic - Subscription(topic, actonly=actonly).connect(receive) - - # Add data store default to actdata - rs.addgroup(topic.lower()) - - # Create the signal to emit whenever data of the active remote changes - sig = signal.Signal(f'state-changed.{topic.lower()}') - changed[topic] = sig - return sig - return changed[topic] - - -def subscriber(func=None, *, topic='', actonly=False): - ''' Decorator to subscribe to a state topic. ''' - def deco(func): - ifunc = inspect.unwrap(func, stop=lambda f:not isinstance(func, (staticmethod, classmethod))) - - # Subscribe to topic, and connect callback function to data change signal - subscribe((topic or ifunc.__name__).upper(), actonly=actonly).connect(ifunc) - return func - - # Allow both @subscriber and @subscriber(args) - return deco if func is None else deco(func) - - -class PublisherMeta(type): - __publishers__ = dict() - __timers__ = dict() - - def __call__(cls, topic: str, dt=None, collect=False): - pub = PublisherMeta.__publishers__.get(topic) - if pub is None: - pub = PublisherMeta.__publishers__[topic] = super().__call__(topic, dt, collect) - # If dt is specified, also create a timer - if dt is not None: - if dt not in PublisherMeta.__timers__: - timer = PublisherMeta.__timers__[dt] = Timer(dt) - else: - timer = PublisherMeta.__timers__[dt] - timer.timeout.connect(pub.send_replace) - return pub - - @nwsub - @staticmethod - def request(*topics): - for pub in (p for p in map(PublisherMeta.__publishers__.get, topics) if p is not None): - pub.send_replace(to_group=ctx.sender_id) - - -class Publisher(metaclass=PublisherMeta): - __collect__ = defaultdict(list) - - @classmethod - def collect(cls, topic: str, payload: list, to_group: str=''): - if payload[0] == ActionType.Replace: - cls.__collect__[(topic, to_group)] = payload - return - store = cls.__collect__[(topic, to_group)] - if not store or store[-2] not in (payload[0], ActionType.Replace.value, ActionType.Extend.value): - if payload[0] == ActionType.Append.value: - payload[0] = ActionType.Extend.value - payload[1] = {k:[v] for k, v in payload[1].items()} - store.extend(payload) - elif payload[0] == ActionType.Update.value: - recursive_update(store[1], payload[1]) - elif payload[0] == ActionType.Append.value: - for key, item in payload[1].items(): - store[1][key].append(item) - elif payload[0] == ActionType.Extend.value: - for key, item in payload[1].items(): - store[1][key].extend(item) - - @staticmethod - @timed_function(hook=('update', 'hold')) - def send_collected(): - while Publisher.__collect__: - (topic, to_group), payload = Publisher.__collect__.popitem() - bs.net.send(topic, payload, to_group) - - def __init__(self, topic: str, dt=None, collect=False) -> None: - self.topic = topic - self.dt = dt - self.collects = collect - - @staticmethod - def get_payload(): - ''' Default payload function returns None ''' - return - - def send_update(self, to_group=b'', **data): - if data: - if self.collects: - self.collect(self.topic, [ActionType.Update.value, data], to_group) - else: - bs.net.send(self.topic, [ActionType.Update.value, data], to_group) - - - def send_delete(self, to_group=b'', **keys): - if keys: - bs.net.send(self.topic, [ActionType.Delete.value, keys], to_group) - - - def send_append(self, to_group=b'', **data): - if data: - if self.collects: - self.collect(self.topic, [ActionType.Append.value, data], to_group) - else: - bs.net.send(self.topic, [ActionType.Append.value, data], to_group) - - def send_extend(self, to_group=b'', **data): - if data: - if self.collects: - self.collect(self.topic, [ActionType.Extend.value, data], to_group) - else: - bs.net.send(self.topic, [ActionType.Extend.value, data], to_group) - - def send_replace(self, to_group=b'', **data): - data = data or self.get_payload() - if data: - bs.net.send(self.topic, [ActionType.Replace.value, data], to_group) +def _genstore(): + ''' Generate a store object for a remote simulation from defaults. ''' + store = deepcopy(defaults) + for g in generators: + g(store) + return store - def payload(self, func: Callable): - ''' Decorator method to specify payload getter for this Publisher. ''' - self.get_payload = FuncObject(func) - return func +def _generator(store, name, objtype, args, kwargs, group=None): + ''' Custom generator for non-base types. ''' + setattr(getattr(store, group) if group else store, name, objtype(*args, **kwargs)) -# Publisher decorator? -def publisher(func: Optional[Callable] = None, *, topic='', dt=None): - def deco(func): - ifunc = inspect.unwrap(func, stop=lambda f:not isinstance(func, (staticmethod, classmethod))) - itopic = (topic or ifunc.__name__).upper() - Publisher(itopic, dt).payload(func) - return func +# Keep track of default attribute values of mutable type. +# These always need to be stored per remote node. +defaults = Store() +# In some cases (such as for non-copyable types) a generator is specified +# instead of a default value +generators = list() - # Allow both @publisher and @publisher(args) - return deco if func is None else deco(func) \ No newline at end of file +# Keep a dict of remote state storage namespaces +remotes = defaultdict(_genstore) diff --git a/bluesky/network/subscriber.py b/bluesky/network/subscriber.py new file mode 100644 index 0000000000..f4ce594525 --- /dev/null +++ b/bluesky/network/subscriber.py @@ -0,0 +1,299 @@ +import inspect +from typing import Dict + +import bluesky as bs +from bluesky.core.funcobject import FuncObject +from bluesky.core import signal +import bluesky.network.sharedstate as ss +import bluesky.network.context as ctx +from bluesky.network.common import GROUPID_DEFAULT, ActionType, MessageType + + +#TODO: +# trigger voor actnode changed? + +# Keep track of the set of subscribed sharedstate topics. Store signals to emit +# whenever a state update of each topic is received +changed: Dict[str, signal.Signal] = dict() + + +def subscriber(func=None, *, topic='', broadcast=True, actonly=False, raw=False, from_group=GROUPID_DEFAULT, to_group=''): + ''' BlueSky network subscription decorator. + + Functions decorated with this decorator will be called whenever data + with the specified topic is received. + + Arguments: + - topic: The topic to subscribe to for this function. Function name + is used when no topic is specified + - broadcast: Whether to subscribe to broadcast-to-(all/group) topic. + When set to False, to_group and from_group are ignored, + and only messages sent directly to this client/node are + received. + - raw: Set to true for SharedState messages if you want to + receive the original unprocessed message + - from_group: Subscribe to data from a specific sender(-group) (optional) + By default, broadcast subscriptions filter so that only + sim->client and client->sim messages are received (to avoid + receiving our own broadcasts). If you don't want this, + set from_group='*'. + - to_group: Subscribe to data sent to a specific group (optional) + - actonly: Only receive this data for the active node (client only) + ''' + def deco(func): + # Unwrap wrapped functions, static and class methods + ifunc = inspect.unwrap(func, stop=lambda f:not isinstance(func, (staticmethod, classmethod))) + + # Create the subscription object. Network subscriptions will be made as + # soon as the network connection is available + Subscription(topic.upper() or ifunc.__name__.upper(), broadcast, actonly, from_group, to_group).connect(ifunc, raw) + + # Construct the subscription object, but return the original function + return func + # Allow both @subscriber and @subscriber(args) + return deco if func is None else deco(func) + + +def subscribe(topic, *, broadcast=True, actonly=False, from_group=GROUPID_DEFAULT, to_group='') -> 'Subscription': + ''' Subscribe to a network topic without passing a function callback. + + Arguments: + - topic: The topic to subscribe to for this function. + - broadcast: Whether to subscribe to broadcast-to-(all/group) topic. + When set to False, to_group and from_group are ignored, + and only messages sent directly to this client/node are + received. + - from_group: Subscribe to data from a specific sender(-group) (optional) + By default, broadcast subscriptions filter so that only + sim->client and client->sim messages are received (to avoid + receiving our own broadcasts). If you don't want this, + set from_group='*'. + - to_group: Subscribe to data sent to a specific group (optional) + - actonly: Only receive this data for the active node (client only) + + Returns: + - The subscription object for this topic + ''' + # Create the subscription object. Network subscriptions will be made as + # soon as the network connection is available + return Subscription(topic.upper(), broadcast, actonly, from_group, to_group) + + +class SubscriptionFactory(signal.SignalFactory): + # Individually keep track of all network subscriptions + subscriptions: dict[str, 'Subscription'] = dict() + + def __call__(cls, topic, broadcast=True, actonly=None, from_group=GROUPID_DEFAULT, to_group=''): + ''' Factory function for Subscription construction. + + Arguments: + - topic: The topic to subscribe to for this function. Function name + is used when no topic is specified + - broadcast: Whether to subscribe to broadcast-to-(all/group) topic. + When set to False, to_group and from_group are ignored, + and only messages sent directly to this client/node are + received. + - from_group: Subscribe to data from a specific sender(-group) (optional) + By default, broadcast subscriptions filter so that only + sim->client and client->sim messages are received (to avoid + receiving our own broadcasts). If you don't want this, + set from_group='*'. + - to_group: Subscribe to data sent to a specific group (optional) + - actonly: Only receive this data for the active node (client only) + ''' + # # Convert name to string if necessary + if isinstance(topic, bytes): + topic = topic.decode() + # Get subscription object if it already exists + sub = SubscriptionFactory.subscriptions.get(topic) + if sub is None: + sub = super().__call__(topic) + + # Check if type is correct. It could be that Subscription was previously + # initialised as plain Signal + if not isinstance(sub, Subscription): + if issubclass(Subscription, type(sub)): + # Signal object instance needs to stay intact, so instead change type and reinitialise + sub.__class__ = Subscription + sub.subs = set() + sub.requested = set() + else: + raise TypeError(f'Trying to connect network subscription to signal with incompatible type {type(sub).__name__}') + + # Store subscription + SubscriptionFactory.subscriptions[topic] = sub + + # Keep track of all broadcast topic orig/dest combinations + if broadcast: + sub.subscribe(from_group, to_group) + if actonly is not None: + sub.actonly = actonly + return sub + + +class Subscription(signal.Signal, metaclass=SubscriptionFactory): + ''' Network subscription class. + + Objects of this type are used to store subscription details, and to + keep track of local topic subscribers + ''' + def __init__(self, topic, broadcast=True, actonly=False, from_group=GROUPID_DEFAULT, to_group=''): + super().__init__(topic) + self.subs = set() + self.requested = set() + self.actonly = actonly + # Start out uninitialised so we can detect whether incoming data is + # a shared state or a regular message + self.msg_type = MessageType.Unknown + self.deferred_subs = [] + super().connect(self._detect_type) + + def _detect_type(self, *args, **kwargs): + # if self.topic.lower() == 'panzoom': + # First disconnect this function, it is only needed once + super().disconnect(self._detect_type) + # This function responds to the first incoming message for topic + # Detect whether it is a sharedstate message + if args and ActionType.isaction(args[0]): + # This is a sharedstate message + self.msg_type = MessageType.SharedState + # In this case, all (non-raw) subscribers will be configured + # as sharedstate subscribers + ss.addtopic(self.topic) + sig = signal.Signal(f'state-changed.{self.topic}') + changed[self.topic] = sig + while self.deferred_subs: + sig.connect(self.deferred_subs.pop()) + + # Finally send the sharedstate message on to the subscribers, + # and subscribe the sharedstate processing function to this topic + super().connect(on_sharedstate_received) + on_sharedstate_received(*args, **kwargs) + + else: + self.msg_type = MessageType.Regular + while self.deferred_subs: + cb = self.deferred_subs.pop() + cb(*args, **kwargs) + super().connect(cb) + + + @property + def active(self): + ''' Returns True if this Subscription has activated network subscriptions. ''' + return bool(self.subs) + + def connect(self, func, raw=False): + ''' Connect a callback function to incoming data on this + subscription topic. + + Arguments: + - func: The callback function + - raw: Set this to True for SharedState messages if you want to + receive the original unprocessed message + ''' + self.subscribe_all() + if raw or self.msg_type == MessageType.Regular: + super().connect(func) + elif self.msg_type == MessageType.Unknown: + self.deferred_subs.append(FuncObject(func)) + else: + signal.Signal(f'state-changed.{self.topic.lower()}').connect(func) + + def subscribe(self, from_group=GROUPID_DEFAULT, to_group=''): + if (from_group, to_group) not in self.subs: + if bs.net is not None: + self.subs.add((from_group, to_group)) + if self.actonly: + bs.net._subscribe(self.topic, from_group, to_group, self.actonly) + else: + bs.net._subscribe(self.topic, from_group, to_group) + else: + self.requested.add((from_group, to_group)) + + def subscribe_all(self): + if bs.net is not None: + while self.requested: + self.subscribe(*self.requested.pop()) + + def unsubscribe(self, from_group=GROUPID_DEFAULT, to_group=''): + if (from_group or to_group is not None): + if (from_group, to_group) in self.subs: + self.subs.discard((from_group, to_group)) + if bs.net is not None: + bs.net._unsubscribe(self.topic, from_group, to_group) + else: + # Unsubscribe all + self.requested.clear() + if bs.net is not None: + while self.subs: + bs.net._unsubscribe(self.topic, *self.subs.pop()) + + +@subscriber +def reset(*args): + ''' Process incoming RESET events. ''' + # Clear state data to defaults for this simulation node + ss.reset(ctx.sender_id) + + # If this is the active node, also emit a signal about this change + if ctx.sender_id == bs.net.act_id: + ctx.action = ActionType.Reset + ctx.action_content = None + for topic, sig in changed.items(): + store = get(group=topic.lower()) + sig.emit(store) + ctx.action = None + + +@signal.subscriber(topic='actnode-changed') +def on_actnode_changed(act_id): + ctx.action = ActionType.ActChange + ctx.action_content = None + for topic, sig in changed.items(): + store = ss.get(group=topic.lower()) + sig.emit(store) + ctx.action = None + + +@signal.subscriber(topic='node-added') +def on_node_added(node_id): + ''' When a new node is announced, request the initial/current state of all + subscribed shared states. + ''' + topics = [topic for topic, sub in SubscriptionFactory.subscriptions.items() + if sub.msg_type in (MessageType.Unknown, MessageType.SharedState)] + bs.net.send('REQUEST', topics, to_group=node_id) + + +def on_sharedstate_received(action, data): + ''' Retrieve and process state data. ''' + store = ss.get(ctx.sender_id, ctx.topic.lower()) + + # Store sharedstate context + ctx.action = ActionType(action) + ctx.action_content = data + + if ctx.action == ActionType.Update: + store.update(data) + + elif ctx.action == ActionType.Append: + store.append(data) + + elif ctx.action == ActionType.Extend: + store.extend(data) + + elif ctx.action == ActionType.Replace: + store.replace(data) + + elif ctx.action == ActionType.Delete: + store.delete(data) + + # Inform subscribers of state update + # TODO: what to do with act vs all? + if ctx.sender_id == bs.net.act_id: + changed[ctx.topic].emit(store) + + # Reset context variables + ctx.action = None + ctx.action_content = None diff --git a/bluesky/network/subscription.py b/bluesky/network/subscription.py deleted file mode 100644 index f2232feb78..0000000000 --- a/bluesky/network/subscription.py +++ /dev/null @@ -1,121 +0,0 @@ -import inspect -from typing import Dict -from bluesky.core.signal import Signal, SignalFactory -from bluesky.network.common import GROUPID_DEFAULT -import bluesky as bs - - -class SubscriptionFactory(SignalFactory): - # Individually keep track of all network subscriptions - subscriptions: Dict[str, 'Subscription'] = dict() - - def __call__(cls, topic, from_id='', to_group=GROUPID_DEFAULT, actonly=None, directedonly=False): - ''' Factory function for Signal construction. - - Arguments: - - topic: The topic to subscribe to - - from_id: When specified, only subscribe to the specified topic transmissions from a single origin. - - to_group: Subscribe to messages targeted at a specific group. By default, a subscription is - made to the top-level group (Simulation/Client group) - - actonly: When set to true, only messages from the currently active remote are subscribed to. - When the active remote is changed, all actonly subscriptions are automatically adjusted to this - new active node. - - directedonly: Only subscribe to messages specifically directed at this network node. - ''' - # # Convert name to string if necessary - if isinstance(topic, bytes): - topic = topic.decode() - sub = SubscriptionFactory.subscriptions.get(topic) - if sub is None: - sub = super().__call__(topic) - - # Check if type is correct. It could be that Subscription was previously - # initialised as plain Signal - if not isinstance(sub, Subscription): - if issubclass(Subscription, type(sub)): - # Signal object instance needs to stay intact, so instead change type and reinitialise - sub.__class__ = Subscription - sub.subs = set() - sub.requested = set() - else: - raise TypeError(f'Trying to connect network subscription to signal with incompatible type {type(sub).__name__}') - - # Store subscription - SubscriptionFactory.subscriptions[topic] = sub - - if not directedonly and (from_id, to_group) not in sub.subs: - sub.requested.add((from_id, to_group)) - if actonly is not None: - sub.actonly = actonly - return sub - - -class Subscription(Signal, metaclass=SubscriptionFactory): - def __init__(self, topic): - super().__init__(topic) - self.subs = set() - self.requested = set() - self.actonly = False - - @property - def active(self): - return bool(self.subs) - - def connect(self, func): - self.subscribe_all() - return super().connect(func) - - def subscribe(self, from_id='', to_group=GROUPID_DEFAULT): - if (from_id, to_group) not in self.subs: - if bs.net is not None: - self.subs.add((from_id, to_group)) - if self.actonly: - bs.net._subscribe(self.topic, from_id, to_group, self.actonly) - else: - bs.net._subscribe(self.topic, from_id, to_group) - else: - self.requested.add((from_id, to_group)) - - def subscribe_all(self): - if bs.net is not None: - while self.requested: - self.subscribe(*self.requested.pop()) - - def unsubscribe(self, from_id='', to_group=None): - if (from_id or to_group is not None): - to_group = to_group or GROUPID_DEFAULT - if (from_id, to_group) in self.subs: - self.subs.discard((from_id, to_group)) - if bs.net is not None: - bs.net._unsubscribe(self.topic, from_id, to_group) - else: - # Unsubscribe all - self.requested.clear() - if bs.net is not None: - while self.subs: - bs.net._unsubscribe(self.topic, *self.subs.pop()) - - -def subscriber(func=None, *, topic='', directedonly=False, **kwargs): - ''' BlueSky network subscription decorator. - - Functions decorated with this decorator will be called whenever data - with the specified topic is received. - - Arguments: - - topic: The topic to subscribe to for this function - - from_id: Subscribe to data from a specific sender (optional) - - to_group: Subscribe to data sent to a specific group (optional) - - actonly: Only receive this data for the active node (client only) - ''' - def deco(func): - ifunc = inspect.unwrap(func, stop=lambda f:not isinstance(func, (staticmethod, classmethod))) - - # Create the subscription object. Network subscriptions will be made as - # soon as the network connection is available - Subscription(topic or ifunc.__name__.upper(), directedonly=directedonly, **kwargs).connect(ifunc) - - # Construct the subscription object, but return the original function - return func - # Allow both @subscriber and @subscriber(args) - return deco if func is None else deco(func) diff --git a/bluesky/simulation/screenio.py b/bluesky/simulation/screenio.py index ef5bd6b76e..c602023cb0 100644 --- a/bluesky/simulation/screenio.py +++ b/bluesky/simulation/screenio.py @@ -8,9 +8,10 @@ from bluesky.core import Entity from bluesky.tools import aero from bluesky.core.walltime import Timer -from bluesky.network import subscriber, context as ctx +from bluesky.network.subscriber import subscriber +from bluesky.network.publisher import state_publisher, StatePublisher +import bluesky.network.context as ctx -import bluesky.network.sharedstate as ss # ========================================================================= # Settings @@ -25,9 +26,9 @@ class ScreenIO(Entity): """Class within sim task which sends/receives data to/from GUI task""" - pub_panzoom = ss.Publisher('PANZOOM') - pub_defwpt = ss.Publisher('DEFWPT', collect=True) - pub_route = ss.Publisher('ROUTEDATA') + pub_panzoom = StatePublisher('PANZOOM') + pub_defwpt = StatePublisher('DEFWPT', collect=True) + pub_route = StatePublisher('ROUTEDATA') # ========================================================================= # Functions @@ -207,7 +208,7 @@ def send_siminfo(self): str(bs.sim.utc.replace(microsecond=0)), bs.traf.ntraf, bs.sim.state, stack.get_scenname())) - @ss.publisher(topic='TRAILS', dt=1000 // SIMINFO_RATE) + @state_publisher(topic='TRAILS', dt=1000 // SIMINFO_RATE) def send_trails(self): # Trails, send only new line segments to be added if bs.traf.trails.active and len(bs.traf.trails.newlat0) > 0: @@ -218,7 +219,7 @@ def send_trails(self): bs.traf.trails.clearnew() return data - @ss.publisher(topic='ACDATA', dt=1000 // ACUPDATE_RATE) + @state_publisher(topic='ACDATA', dt=1000 // ACUPDATE_RATE) def send_aircraft_data(self): data = dict() data['simt'] = bs.sim.simt diff --git a/bluesky/simulation/simulation.py b/bluesky/simulation/simulation.py index 77f7639d56..d1a286c7c9 100644 --- a/bluesky/simulation/simulation.py +++ b/bluesky/simulation/simulation.py @@ -11,11 +11,11 @@ import bluesky.core as core from bluesky.core import plugin, simtime from bluesky.core.signal import subscriber +from bluesky.network.publisher import state_publisher from bluesky.core.walltime import Timer from bluesky.core.timedfunction import hooks from bluesky.stack import simstack, recorder from bluesky.tools import datalog, areafilter, plotter -import bluesky.network.sharedstate as ss # Minimum sleep interval @@ -115,6 +115,7 @@ def step(self, dt_increment=0): # Update traffic and other update functions for the next timestep bs.traf.update() hooks.update.trigger() + else: hooks.hold.trigger() @@ -253,7 +254,7 @@ def start_batch_scenario(self, data): bs.stack.set_scendata(data['scentime'], data['scencmd']) self.op() - @ss.publisher(topic='SIMSETTINGS') + @state_publisher(topic='SIMSETTINGS') def pub_simsettings(self): ''' Publish simulation settings on request. ''' return dict(settings=bs.settings._settings_hierarchy, diff --git a/bluesky/stack/cmdparser.py b/bluesky/stack/cmdparser.py index 1516e68367..22e6abdb09 100644 --- a/bluesky/stack/cmdparser.py +++ b/bluesky/stack/cmdparser.py @@ -3,7 +3,7 @@ from typing import Dict from bluesky.core.funcobject import FuncObject -from bluesky.network.sharedstate import publisher +from bluesky.network.publisher import state_publisher from bluesky.stack.argparser import Parameter, getnextarg, ArgumentError @@ -13,7 +13,7 @@ class Command: cmddict: Dict[str, 'Command'] = dict() @staticmethod - @publisher(topic='STACKCMDS') + @state_publisher(topic='STACKCMDS') def pubcmdlist(): ''' Send a dictionary with available stack commands when requested. ''' return {'cmddict': {cmd : val.brief[len(cmd) + 1:] for cmd, val in Command.cmddict.items()}} diff --git a/bluesky/tools/areafilter.py b/bluesky/tools/areafilter.py index f62449dd3f..f8b4bd188c 100644 --- a/bluesky/tools/areafilter.py +++ b/bluesky/tools/areafilter.py @@ -30,13 +30,13 @@ def delete(*args, **kwargs): import bluesky as bs from bluesky.stack import commandgroup from bluesky.tools.geo import kwikdist -from bluesky.network.sharedstate import Publisher +from bluesky.network.publisher import StatePublisher # Dictionary of all basic shapes (The shape classes defined in this file) by name basic_shapes = dict() # Publisher object to manage publishing of states to clients -polypub = Publisher('POLY', collect=True) +polypub = StatePublisher('POLY', collect=True) @polypub.payload diff --git a/bluesky/ui/qtgl/arealist.py b/bluesky/ui/qtgl/arealist.py index f550ff2a55..957d6877b2 100644 --- a/bluesky/ui/qtgl/arealist.py +++ b/bluesky/ui/qtgl/arealist.py @@ -8,14 +8,16 @@ from PyQt6.QtGui import QColor, QPalette from PyQt6.QtCore import QSize, Qt, QRect, QAbstractListModel, QModelIndex, Qt, QVariant -from bluesky.network import sharedstate, context as ctx -from bluesky.core import Base, remotestore as rs +from bluesky.network import context as ctx +from bluesky.network.subscriber import subscriber +from bluesky.network.sharedstate import ActData +from bluesky.core import Base from bluesky.ui import palette import bluesky as bs class AreaModel(QAbstractListModel, Base): - polys: dict = rs.ActData(group='poly') + polys: dict = ActData(group='poly') def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) @@ -37,7 +39,7 @@ def itemsRemoved(self, items): # remove items from the list pass - @sharedstate.subscriber(topic='POLY') + @subscriber(topic='POLY') def on_poly_update(self, data): if ctx.action in (ctx.action.ActChange, ctx.action.Reset): # Notify the gui that the entire list needs to be updated diff --git a/bluesky/ui/qtgl/console.py b/bluesky/ui/qtgl/console.py index 5ec0e063e3..909efb105d 100644 --- a/bluesky/ui/qtgl/console.py +++ b/bluesky/ui/qtgl/console.py @@ -11,11 +11,12 @@ from PyQt6.QtWidgets import QWidget, QTextEdit import bluesky as bs +from bluesky.core.signal import Signal from bluesky.stack.cmdparser import Command from bluesky.tools import cachefile from bluesky.tools.misc import cmdsplit -from bluesky.core import Signal, remotestore as rs -from bluesky.network import sharedstate as ss +from bluesky.network import subscribe +from bluesky.network.sharedstate import ActData from . import autocomplete @@ -59,9 +60,9 @@ class Console(QWidget): _instance = None # Per-remote data - echotext: list = rs.ActData() - echoflags: list = rs.ActData() - cmddict: dict = rs.ActData(group='stackcmds') + echotext: list = ActData() + echoflags: list = ActData() + cmddict: dict = ActData(group='stackcmds') def __init__(self, parent=None): super().__init__(parent) @@ -88,7 +89,7 @@ def __init__(self, parent=None): QApplication.instance().aboutToQuit.connect(self.close) # Connect to stack command list SharedState - ss.subscribe('STACKCMDS') + subscribe('STACKCMDS') def close(self): ''' Save command history when BlueSky closes. ''' diff --git a/bluesky/ui/qtgl/glmap.py b/bluesky/ui/qtgl/glmap.py index aa6ce54d4b..e64e8a48b4 100644 --- a/bluesky/ui/qtgl/glmap.py +++ b/bluesky/ui/qtgl/glmap.py @@ -1,7 +1,7 @@ ''' BlueSky OpenGL map object. ''' import numpy as np -from bluesky.core.remotestore import ActData +from bluesky.network.sharedstate import ActData from bluesky.stack import command from bluesky.ui import palette from bluesky.ui.qtgl import glhelpers as glh diff --git a/bluesky/ui/qtgl/glnavdata.py b/bluesky/ui/qtgl/glnavdata.py index 7eee8dd503..d5e3e8c8cb 100644 --- a/bluesky/ui/qtgl/glnavdata.py +++ b/bluesky/ui/qtgl/glnavdata.py @@ -1,8 +1,10 @@ ''' BlueSky navdata OpenGL visualisation object. ''' import numpy as np import bluesky as bs -from bluesky.core import Signal, remotestore as rs -from bluesky.network import sharedstate, context as ctx +from bluesky.core import Signal +from bluesky.network import context as ctx +from bluesky.network.subscriber import subscriber +from bluesky.network.sharedstate import ActData from bluesky.stack import command from bluesky.ui.qtgl import glhelpers as glh from bluesky import settings @@ -35,10 +37,10 @@ class Navdata(glh.RenderObject, layer=-10): ''' Navdata OpenGL object. ''' # Per remote node attributes - show_wpt = rs.ActData(1) - show_apt = rs.ActData(1) - pan = rs.ActData([0.0, 0.0], group='panzoom') - zoom = rs.ActData(1.0, group='panzoom') + show_wpt = ActData(1) + show_apt = ActData(1) + pan = ActData([0.0, 0.0], group='panzoom') + zoom = ActData(1.0, group='panzoom') @command def showwpt(self, flag:int=None): @@ -115,7 +117,7 @@ def panzoom(self, data, finished=True): else: self.apt_inrange = np.array([]) - @sharedstate.subscriber + @subscriber def defwpt(self, data): ''' Receive custom waypoint data and add to visualisation. ''' if not data.valid(): diff --git a/bluesky/ui/qtgl/glpoly.py b/bluesky/ui/qtgl/glpoly.py index 9babd7deff..c3df1f90de 100644 --- a/bluesky/ui/qtgl/glpoly.py +++ b/bluesky/ui/qtgl/glpoly.py @@ -1,12 +1,14 @@ ''' BlueSky OpenGL line and polygon (areafilter) drawing. ''' import numpy as np -from bluesky.core import Signal, remotestore as rs -from bluesky.network import sharedstate, context as ctx +from bluesky.core import Signal +from bluesky.network import context as ctx +from bluesky.network.subscriber import subscriber from bluesky.stack import command from bluesky.ui import palette from bluesky.ui.polytools import PolygonSet from bluesky.ui.qtgl import console from bluesky.ui.qtgl import glhelpers as glh +from bluesky.network.sharedstate import ActData palette.set_default_colours( @@ -23,9 +25,9 @@ class Poly(glh.RenderObject, layer=-20): ''' Poly OpenGL object. ''' # Per remote node attributes - show_poly = rs.ActData(1) - polys: dict = rs.ActData(group='poly') - bufdata: dict = rs.ActData() + show_poly = ActData(1) + polys: dict = ActData(group='poly') + bufdata: dict = ActData() @command def showpoly(self, flag:int=None): @@ -125,7 +127,7 @@ def previewpoly(self, mouseevent): except ValueError: pass - @sharedstate.subscriber(topic='POLY') + @subscriber(topic='POLY') def update_poly_data(self, data): if ctx.action == ctx.action.Reset or ctx.action == ctx.action.ActChange:# TODO hack # Simulation reset: Clear all entries diff --git a/bluesky/ui/qtgl/gltraffic.py b/bluesky/ui/qtgl/gltraffic.py index 19eb8bee16..ffe10076d2 100644 --- a/bluesky/ui/qtgl/gltraffic.py +++ b/bluesky/ui/qtgl/gltraffic.py @@ -2,13 +2,14 @@ import numpy as np from bluesky.ui.qtgl import glhelpers as glh -from bluesky.core import remotestore as rs from bluesky.stack import command from bluesky.tools import geo from bluesky import settings from bluesky.ui import palette from bluesky.tools.aero import ft, nm, kts -from bluesky.network import sharedstate, context as ctx +from bluesky.network import context as ctx +from bluesky.network.subscriber import subscriber +from bluesky.network.sharedstate import ActData # Register settings defaults @@ -33,15 +34,15 @@ class Traffic(glh.RenderObject, layer=100): ''' Traffic OpenGL object. ''' # Per remote node attributes - show_pz = rs.ActData(False) - show_traf = rs.ActData(True) - show_lbl = rs.ActData(2) - ssd_all = rs.ActData(False) - ssd_conflicts = rs.ActData(False) - ssd_ownship = rs.ActData(set()) - altrange = rs.ActData(tuple()) - naircraft = rs.ActData(0) - zoom = rs.ActData(1, group='panzoom') + show_pz = ActData(False) + show_traf = ActData(True) + show_lbl = ActData(2) + ssd_all = ActData(False) + ssd_conflicts = ActData(False) + ssd_ownship = ActData(set()) + altrange = ActData(tuple()) + naircraft = ActData(0) + zoom = ActData(1, group='panzoom') @command def showpz(self, flag:bool=None): @@ -247,7 +248,7 @@ def draw(self): self.ssd.draw(vertex_count=self.naircraft, n_instances=self.naircraft) - @sharedstate.subscriber(topic='TRAILS') + @subscriber(topic='TRAILS') def update_trails_data(self, data): ''' Update GPU buffers with route data from simulation. ''' if not self.initialized: @@ -266,7 +267,7 @@ def update_trails_data(self, data): else: self.traillines.set_vertex_count(0) - @sharedstate.subscriber(topic='ROUTEDATA', actonly=True) + @subscriber(topic='ROUTEDATA', actonly=True) def update_route_data(self, data): ''' Update GPU buffers with route data from simulation. ''' if not self.initialized: @@ -325,7 +326,7 @@ def update_route_data(self, data): self.route.set_vertex_count(0) self.routelbl.n_instances = 0 - @sharedstate.subscriber(topic='ACDATA', actonly=True) + @subscriber(topic='ACDATA', actonly=True) def update_aircraft_data(self, data): ''' Update GPU buffers with new aircraft simulation data. ''' if not self.initialized: diff --git a/bluesky/ui/qtgl/mainwindow.py b/bluesky/ui/qtgl/mainwindow.py index c0add62d64..aacefb069b 100644 --- a/bluesky/ui/qtgl/mainwindow.py +++ b/bluesky/ui/qtgl/mainwindow.py @@ -29,8 +29,9 @@ from bluesky.tools.misc import tim2txt from bluesky.network import subscriber, context as ctx from bluesky.network.common import get_ownip, seqidx2id, seqid2idx +import bluesky.network.sharedstate as ss + from bluesky.ui import palette -from bluesky.core import remotestore as rs # Child windows from bluesky.ui.qtgl.docwindow import DocWindow @@ -119,10 +120,10 @@ class MainWindow(QMainWindow, Base): modes = ['Init', 'Hold', 'Operate', 'End'] # Per remote node attributes - nconf_cur = rs.ActData(0, group='acdata') - nconf_tot = rs.ActData(0, group='acdata') - nlos_cur = rs.ActData(0, group='acdata') - nlos_tot = rs.ActData(0, group='acdata') + nconf_cur = ss.ActData(0, group='acdata') + nconf_tot = ss.ActData(0, group='acdata') + nlos_cur = ss.ActData(0, group='acdata') + nlos_tot = ss.ActData(0, group='acdata') @pyqtProperty(str) def style(self): @@ -258,7 +259,7 @@ def closeEvent(self, event=None): def echo(self, text, flags=None, sender_id=None): refnode = sender_id or ctx.sender_id or bs.net.act_id # Always update the store - store = rs.get(refnode) + store = ss.get(refnode) store.echotext.append(text) store.echoflags.append(flags) # Directly echo if message corresponds to active node diff --git a/bluesky/ui/qtgl/radarwidget.py b/bluesky/ui/qtgl/radarwidget.py index e561d9c167..e15f514fd0 100644 --- a/bluesky/ui/qtgl/radarwidget.py +++ b/bluesky/ui/qtgl/radarwidget.py @@ -8,8 +8,10 @@ from PyQt6.QtCore import Qt, QEvent, QT_VERSION import bluesky as bs -from bluesky.core import Signal, remotestore as rs -from bluesky.network import sharedstate as ss +from bluesky.core import Signal +from bluesky.network import subscribe +import bluesky.network.context as ctx +import bluesky.network.sharedstate as ss from bluesky.ui.qtgl import glhelpers as glh from bluesky.ui.radarclick import radarclick from bluesky.ui.qtgl import console @@ -90,8 +92,8 @@ class RadarWidget(glh.RenderWidget): ''' The BlueSky radar view. ''' # Per-remote attributes - pan = rs.ActData([0.0, 0.0], group='panzoom') - zoom = rs.ActData(1.0, group='panzoom') + pan = ss.ActData([0.0, 0.0], group='panzoom') + zoom = ss.ActData(1.0, group='panzoom') def __init__(self, parent=None): super().__init__(parent) @@ -126,7 +128,7 @@ def __init__(self, parent=None): # Signals and slots self.mouse_event = Signal('radarmouse') self.panzoom_event = Signal('state-changed.panzoom') - ss.subscriber(self.on_panzoom, topic='PANZOOM') + subscribe('PANZOOM').connect(self.on_panzoom) def initializeGL(self): """Initialize OpenGL, VBOs, upload data on the GPU, etc.""" @@ -252,8 +254,10 @@ def setpanzoom(self, pan=None, zoom=None, origin=None, absolute=True, finished=T (1.0 / self.zoom - 1.0 / prevzoom) / self.flat_earth self.pan[0] = self.pan[0] - gly * \ (1.0 / self.zoom - 1.0 / prevzoom) / self.ar - rs.get().panzoom.zoom = self.zoom # temp TODO - self.panzoom_event.emit(rs.get().panzoom, finished) + ss.get().panzoom.zoom = self.zoom # temp TODO + ctx.topic = 'PANZOOM' + self.panzoom_event.emit(ss.get().panzoom, finished) + ctx.topic = None return True def event(self, event): @@ -314,7 +318,7 @@ def event(self, event): elif event.type() == QEvent.Type.MouseButtonRelease and \ event.button() & Qt.MouseButton.LeftButton and not self.mousedragged: lat, lon = self.pixelCoordsToLatLon(event.pos().x(), event.pos().y()) - actdata = rs.get() + actdata = ss.get() tostack, tocmdline = radarclick(console.get_cmdline(), lat, lon, actdata.acdata, actdata.routedata) @@ -344,7 +348,7 @@ def event(self, event): self.panzoomchanged = False bs.net.send(b'PANZOOM', dict(pan=(self.pan[0], self.pan[1]), zoom=self.zoom, ar=self.ar, absolute=True)) - self.panzoom_event.emit(rs.get().panzoom, True) + self.panzoom_event.emit(ss.get().panzoom, True) elif int(event.type()) == 216: # 216 is screen change event, but doesn't exist (yet) in pyqt as enum self.pxratio = self.devicePixelRatio() diff --git a/bluesky/ui/qtgl/settingswindow.py b/bluesky/ui/qtgl/settingswindow.py index 78617183e0..c004eebd73 100644 --- a/bluesky/ui/qtgl/settingswindow.py +++ b/bluesky/ui/qtgl/settingswindow.py @@ -15,9 +15,9 @@ import bluesky as bs from bluesky.network.common import seqidx2id -from bluesky.core import Base +from bluesky.network.subscriber import subscribe import bluesky.network.sharedstate as ss -import bluesky.core.remotestore as rs +from bluesky.core import Base def sel_palette(value, changed_fun): @@ -114,7 +114,7 @@ def __init__(self): bs.net.node_added.connect(self.nodesChanged) # Subscribe to simulation settings SharedState - ss.subscribe('SIMSETTINGS') + subscribe('SIMSETTINGS') def show(self): if not self.populated: @@ -168,7 +168,7 @@ def nodetreeClicked(self, item, column): plugins = list() for node in item.nodes: - store = rs.get(node, 'simsettings') + store = ss.get(node, 'simsettings') simsettings.update(store.settings) plugins = store.plugins diff --git a/bluesky/ui/qtgl/tiledtexture.py b/bluesky/ui/qtgl/tiledtexture.py index b7a36f55ab..9746c96c76 100644 --- a/bluesky/ui/qtgl/tiledtexture.py +++ b/bluesky/ui/qtgl/tiledtexture.py @@ -18,7 +18,7 @@ from PyQt6.QtGui import QImage, qRgba import bluesky as bs -from bluesky.network import sharedstate as ss +from bluesky.network.subscriber import subscribe from bluesky.ui.qtgl import glhelpers as glh @@ -167,7 +167,7 @@ def __init__(self, glsurface, tilesource='opentopomap'): self.indextexture = glh.Texture(target=glh.Texture.Target.Target2D) self.indexsampler_loc = 0 self.arraysampler_loc = 0 - ss.subscriber(self.panzoom, topic='PANZOOM') + subscribe('PANZOOM').connect(self.panzoom) def add_bounding_box(self, lat0, lon0, lat1, lon1): ''' Add the bounding box of a textured shape. diff --git a/bluesky/ui/qtgl/trafficlist.py b/bluesky/ui/qtgl/trafficlist.py index 2146056e19..6a52ec3e70 100644 --- a/bluesky/ui/qtgl/trafficlist.py +++ b/bluesky/ui/qtgl/trafficlist.py @@ -4,16 +4,17 @@ from PyQt6.QtGui import QColor, QPalette from PyQt6.QtCore import QSize, Qt, QRect, QAbstractListModel, QModelIndex, Qt, QVariant, QTimer -from bluesky.core import Base, remotestore as rs -from bluesky.network import sharedstate as ss +from bluesky.core import Base +from bluesky.network.subscriber import subscriber +from bluesky.network.sharedstate import ActData from bluesky.tools import aero class TrafficModel(QAbstractListModel, Base): - id: list = rs.ActData(group='acdata') - alt: np.ndarray = rs.ActData(0, group='acdata') - trk: np.ndarray = rs.ActData(0, group='acdata') - cas: np.ndarray = rs.ActData(0, group='acdata') + id: list = ActData(group='acdata') + alt: np.ndarray = ActData(0, group='acdata') + trk: np.ndarray = ActData(0, group='acdata') + cas: np.ndarray = ActData(0, group='acdata') def __init__(self) -> None: super().__init__() @@ -44,7 +45,7 @@ def data(self, index, role=Qt.ItemDataRole.DisplayRole): f'{self.cas[idx] / aero.kts:1.0f}' )) - @ss.subscriber(topic='ACDATA', actonly=True) + @subscriber(topic='ACDATA', actonly=True) def on_data_update(self, data): if len(data.id) == 0: self.beginResetModel()