diff --git a/cylc/flow/data_store_mgr.py b/cylc/flow/data_store_mgr.py index 9b3b39509a2..152b2581474 100644 --- a/cylc/flow/data_store_mgr.py +++ b/cylc/flow/data_store_mgr.py @@ -62,8 +62,9 @@ from typing import ( Any, Dict, - Optional, + Iterable, List, + Optional, Set, TYPE_CHECKING, Tuple, @@ -2705,6 +2706,23 @@ def get_entire_workflow(self): return workflow_msg + def get_workflow_only(self): + """Gather workflow summary data into a Protobuf message. + + No tasks / cycles, etc, just workflow stuff. + + Returns: + cylc.flow.data_messages_pb2.PbEntireWorkflow + + """ + + workflow_msg = PbEntireWorkflow() + workflow_msg.workflow.CopyFrom( + self.data[self.workflow_id][WORKFLOW] + ) + + return workflow_msg + def get_publish_deltas(self): """Return deltas for publishing.""" all_deltas = DELTAS_MAP[ALL_DELTAS]() @@ -2752,3 +2770,25 @@ def edge_id(self, left_tokens: Tokens, right_tokens: Tokens) -> str: f'$edge|{left_tokens.relative_id}|{right_tokens.relative_id}' ) ).id + + # subscription stubs + def graphql_sub_interrogate(self, sub_id, info): + """Scope data requirements.""" + pass + + async def graphql_sub_data_match(self, w_id, sub_id): + """Match store data level to requested graphql subscription.""" + pass + + async def graphql_sub_discard(self, sub_id): + """Discard graphql subscription references.""" + pass + + async def set_query_sync_levels( + self, + w_ids: Iterable[str], + level: Optional[str] = None, + expire_delay: Optional[float] = None, + ): + """Set a workflow sync level.""" + pass diff --git a/cylc/flow/network/resolvers.py b/cylc/flow/network/resolvers.py index 8bd9c2347ac..abf0e9e1c6f 100644 --- a/cylc/flow/network/resolvers.py +++ b/cylc/flow/network/resolvers.py @@ -346,9 +346,14 @@ def __init__(self, data_store_mgr: 'DataStoreMgr'): async def get_workflow_by_id(self, args): """Return a workflow store by ID.""" try: - if 'sub_id' in args and args['delta_store']: - return self.delta_store[args['sub_id']][args['id']][ - args['delta_type']][WORKFLOW] + if 'sub_id' in args: + if args['delta_store']: + return self.delta_store[args['sub_id']][args['id']][ + args['delta_type']][WORKFLOW] + else: + await self.data_store_mgr.set_query_sync_levels( + [self.data_store_mgr.data[args['id']][WORKFLOW].id] + ) return self.data_store_mgr.data[args['id']][WORKFLOW] except KeyError: return None @@ -356,12 +361,21 @@ async def get_workflow_by_id(self, args): async def get_workflows_data(self, args: Dict[str, Any]): """Return list of data from workflows.""" # Both cases just as common so 'if' not 'try' - if 'sub_id' in args and args['delta_store']: - return [ - delta[args['delta_type']] - for key, delta in self.delta_store[args['sub_id']].items() - if workflow_filter(self.data_store_mgr.data[key], args) - ] + if 'sub_id' in args: + if args['delta_store']: + return [ + delta[args['delta_type']] + for key, delta in self.delta_store[args['sub_id']].items() + if workflow_filter(self.data_store_mgr.data[key], args) + ] + else: + await self.data_store_mgr.set_query_sync_levels( + [ + workflow[WORKFLOW].id + for workflow in self.data_store_mgr.data.values() + if workflow_filter(workflow, args) + ] + ) return [ workflow for workflow in self.data_store_mgr.data.values() @@ -375,6 +389,22 @@ async def get_workflows(self, args): for flow in await self.get_workflows_data(args)], args) + async def get_flow_data_from_ids(self, data_store, native_ids): + """Return workflow data by id.""" + w_ids = [] + for native_id in native_ids: + w_ids.append( + Tokens(native_id).workflow_id + ) + await self.data_store_mgr.set_query_sync_levels( + set(w_ids) + ) + return [ + data_store[w_id] + for w_id in iter_uniq(w_ids) + if w_id in data_store + ] + # nodes def get_node_state(self, node, node_type): """Return state, from node or data-store.""" @@ -414,14 +444,18 @@ async def get_nodes_by_ids(self, node_type, args): """Return protobuf node objects for given id.""" nat_ids = uniq(args.get('native_ids', [])) # Both cases just as common so 'if' not 'try' - if 'sub_id' in args and args['delta_store']: - flow_data = [ - delta[args['delta_type']] - for delta in get_flow_data_from_ids( - self.delta_store[args['sub_id']], nat_ids) - ] + if 'sub_id' in args: + if args['delta_store']: + flow_data = [ + delta[args['delta_type']] + for delta in get_flow_data_from_ids( + self.delta_store[args['sub_id']], nat_ids) + ] + else: + flow_data = get_flow_data_from_ids( + self.data_store_mgr.data, nat_ids) else: - flow_data = get_flow_data_from_ids( + flow_data = await self.get_flow_data_from_ids( self.data_store_mgr.data, nat_ids) if node_type == PROXY_NODES: @@ -450,10 +484,14 @@ async def get_node_by_id(self, node_type, args): w_id = Tokens(n_id).workflow_id # Both cases just as common so 'if' not 'try' try: - if 'sub_id' in args and args.get('delta_store'): - flow = self.delta_store[ - args['sub_id']][w_id][args['delta_type']] + if 'sub_id' in args: + if args.get('delta_store'): + flow = self.delta_store[ + args['sub_id']][w_id][args['delta_type']] + else: + flow = self.data_store_mgr.data[w_id] else: + await self.data_store_mgr.set_query_sync_levels([w_id]) flow = self.data_store_mgr.data[w_id] except KeyError: return None @@ -475,14 +513,18 @@ async def get_edges_all(self, args): async def get_edges_by_ids(self, args): """Return protobuf edge objects for given id.""" nat_ids = uniq(args.get('native_ids', [])) - if 'sub_id' in args and args['delta_store']: - flow_data = [ - delta[args['delta_type']] - for delta in get_flow_data_from_ids( - self.delta_store[args['sub_id']], nat_ids) - ] + if 'sub_id' in args: + if args['delta_store']: + flow_data = [ + delta[args['delta_type']] + for delta in get_flow_data_from_ids( + self.delta_store[args['sub_id']], nat_ids) + ] + else: + flow_data = get_flow_data_from_ids( + self.data_store_mgr.data, nat_ids) else: - flow_data = get_flow_data_from_ids( + flow_data = await self.get_flow_data_from_ids( self.data_store_mgr.data, nat_ids) return sort_elements( @@ -502,6 +544,7 @@ async def get_nodes_edges(self, root_nodes, args): edge_ids = set() # Setup for edgewise search. new_nodes = root_nodes + is_sub = 'sub_id' in args for _ in range(args['distance']): # Gather edges. # Edges should be unique (graph not circular), @@ -512,10 +555,19 @@ async def get_nodes_edges(self, root_nodes, args): for e_id in n.edges }.difference(edge_ids) edge_ids.update(new_edge_ids) + if is_sub: + flow_data = get_flow_data_from_ids( + self.data_store_mgr.data, + new_edge_ids + ) + else: + flow_data = await self.get_flow_data_from_ids( + self.data_store_mgr.data, + new_edge_ids + ) new_edges = [ edge - for flow in get_flow_data_from_ids( - self.data_store_mgr.data, new_edge_ids) + for flow in flow_data for edge in get_data_elements(flow, new_edge_ids, EDGES) ] edges += new_edges @@ -530,10 +582,19 @@ async def get_nodes_edges(self, root_nodes, args): if not new_node_ids: break node_ids.update(new_node_ids) + if is_sub: + flow_data = get_flow_data_from_ids( + self.data_store_mgr.data, + new_node_ids + ) + else: + flow_data = await self.get_flow_data_from_ids( + self.data_store_mgr.data, + new_node_ids + ) new_nodes = [ node - for flow in get_flow_data_from_ids( - self.data_store_mgr.data, new_node_ids) + for flow in flow_data for node in get_data_elements(flow, new_node_ids, TASK_PROXIES) ] nodes += new_nodes @@ -569,6 +630,8 @@ async def subscribe_delta( delta_queues = self.data_store_mgr.delta_queues deltas_queue: DeltaQueue = queue.Queue() + self.data_store_mgr.graphql_sub_interrogate(sub_id, info) + counters: Dict[str, int] = {} delta_yield_queue: DeltaQueue = queue.Queue() flow_delta_queues: Dict[str, queue.Queue[Tuple[str, dict]]] = {} @@ -591,6 +654,9 @@ async def subscribe_delta( if w_id in self.data_store_mgr.data: if sub_id not in delta_queues[w_id]: delta_queues[w_id][sub_id] = deltas_queue + await self.data_store_mgr.graphql_sub_data_match( + w_id, sub_id + ) # On new yield workflow data-store as added delta if args.get('initial_burst'): delta_store = create_delta_store( @@ -658,6 +724,7 @@ async def subscribe_delta( import traceback logger.warning(traceback.format_exc()) finally: + await self.data_store_mgr.graphql_sub_discard(sub_id) for w_id in w_ids: if delta_queues.get(w_id, {}).get(sub_id): del delta_queues[w_id][sub_id] @@ -702,8 +769,11 @@ async def mutator( meta: Dict[str, Any] ) -> List[Dict[str, Any]]: """Mutate workflow.""" - w_ids = [flow[WORKFLOW].id - for flow in await self.get_workflows_data(w_args)] + w_ids = [ + workflow[WORKFLOW].id + for workflow in self.data_store_mgr.data.values() + if workflow_filter(workflow, w_args) + ] if not w_ids: workflows = list(self.data_store_mgr.data.keys()) return [{ diff --git a/cylc/flow/network/server.py b/cylc/flow/network/server.py index 2c170e61198..a4bd9dcd9bc 100644 --- a/cylc/flow/network/server.py +++ b/cylc/flow/network/server.py @@ -46,6 +46,7 @@ # maps server methods to the protobuf message (for client/UIS import) PB_METHOD_MAP: Dict[str, Any] = { 'pb_entire_workflow': PbEntireWorkflow, + 'pb_workflow_only': PbEntireWorkflow, 'pb_data_elements': DELTAS_MAP } @@ -416,6 +417,16 @@ def pb_entire_workflow(self, **_kwargs) -> bytes: pb_msg = self.schd.data_store_mgr.get_entire_workflow() return pb_msg.SerializeToString() + @authorise() + @expose + def pb_workflow_only(self, **_kwargs) -> bytes: + """Send only the workflow data, not tasks etc. + + Returns serialised Protobuf message + """ + pb_msg = self.schd.data_store_mgr.get_workflow_only() + return pb_msg.SerializeToString() + @authorise() @expose def pb_data_elements(self, element_type: str, **_kwargs) -> bytes: diff --git a/cylc/flow/network/subscriber.py b/cylc/flow/network/subscriber.py index 66bd16f81f8..88b45d9c286 100644 --- a/cylc/flow/network/subscriber.py +++ b/cylc/flow/network/subscriber.py @@ -93,6 +93,16 @@ def _socket_options(self) -> None: for topic in self.topics: self.socket.setsockopt(zmq.SUBSCRIBE, topic) + def unsubscribe_topic(self, topic): + if topic in self.topics: + self.socket.setsockopt(zmq.UNSUBSCRIBE, topic) + self.topics.discard(topic) + + def subscribe_topic(self, topic): + if topic not in self.topics: + self.socket.setsockopt(zmq.SUBSCRIBE, topic) + self.topics.add(topic) + async def subscribe(self, msg_handler, *args, **kwargs): """Subscribe to updates from the provided socket.""" while True: diff --git a/tests/integration/test_publisher.py b/tests/integration/test_publisher.py index c1fe7e1bcd2..13a42b724ab 100644 --- a/tests/integration/test_publisher.py +++ b/tests/integration/test_publisher.py @@ -32,9 +32,13 @@ async def test_publisher(flow, scheduler, run, one_conf, port_range): schd.workflow, host=schd.host, port=schd.server.pub_port, - topics=[b'workflow'] + topics=[b'shutdown'] ) + subscriber.unsubscribe_topic(b'shutdown') + subscriber.subscribe_topic(b'workflow') + assert subscriber.topics == {b'workflow'} + async with timeout(2): # wait for the first delta from the workflow btopic, msg = await subscriber.socket.recv_multipart() diff --git a/tests/integration/test_server.py b/tests/integration/test_server.py index bc7103b8365..ede0f8718b6 100644 --- a/tests/integration/test_server.py +++ b/tests/integration/test_server.py @@ -79,6 +79,17 @@ def test_pb_entire_workflow(myflow): assert data.workflow.id == myflow.id +def test_pb_workflow_only(myflow): + """Test Protobuf workflow only endpoint method.""" + data = PB_METHOD_MAP['pb_workflow_only']() + data.ParseFromString( + call_server_method( + myflow.server.pb_workflow_only + ) + ) + assert data.workflow.id == myflow.id + + async def test_stop(one: Scheduler, start): """Test stop.""" async with start(one):