diff --git a/cylc/uiserver/data_store_mgr.py b/cylc/uiserver/data_store_mgr.py index fee55548..df8ef8f9 100644 --- a/cylc/uiserver/data_store_mgr.py +++ b/cylc/uiserver/data_store_mgr.py @@ -36,7 +36,7 @@ from copy import deepcopy from pathlib import Path import time -from typing import Dict, Optional, Set +from typing import Dict, Set from cylc.flow.exceptions import WorkflowStopped from cylc.flow.id import Tokens @@ -56,6 +56,27 @@ from .utils import fmt_call from .workflows_mgr import workflow_request +MIN_LEVEL = 'min' +MAX_LEVEL = 'max' +SUBSCRIPTION_LEVELS = { + MIN_LEVEL: { + 'topics': {WORKFLOW.encode('utf-8'), b'shutdown'}, + 'criteria': { + 'fragments': { + 'AddedDelta', + 'WorkflowData', + 'UpdatedDelta' + }, + }, + 'request': 'pb_workflow_only', + }, + MAX_LEVEL: { + 'topics': {ALL_DELTAS.encode('utf-8'), b'shutdown'}, + 'criteria': {'fragments': set()}, + 'request': 'pb_entire_workflow', + }, +} + def log_call(fcn): """Decorator for data store methods we want to log.""" @@ -101,22 +122,13 @@ def __init__(self, workflows_mgr, log, max_threads=10): self.log = log self.data = {} self.w_subs: Dict[str, WorkflowSubscriber] = {} - self.topics = { - ALL_DELTAS.encode('utf-8'), - WORKFLOW.encode('utf-8'), - b'shutdown' - } - # If fragments in graphql sub for minimal sync - self.min_sync_fragments = { - 'AddedDelta', - 'WorkflowData', - 'UpdatedDelta' + # graphql subscription level + self.sync_level_graphql_subs = { + MIN_LEVEL: set(), + MAX_LEVEL: set() } - # set of workflows to sync all data - self.full_sync_workflows = set() - self.full_sync_gql_subs = set() - # dict of workflow full sync subscriber IDs - self.full_sync_workflow_gql_subs = {} + # workflow graphql subscription by level + self.sync_level_workflow_graphql_subs = {} self.loop = None self.executor = ThreadPoolExecutor(max_threads) self.delta_queues = {} @@ -141,6 +153,12 @@ async def register_workflow(self, w_id: str, is_active: bool) -> None: status_msg=self._get_status_msg(w_id, is_active), ) + # setup sync subscriber set + self.sync_level_workflow_graphql_subs[w_id] = { + MIN_LEVEL: set(), + MAX_LEVEL: set() + } + @log_call async def unregister_workflow(self, w_id): """Remove a workflow from the data store entirely. @@ -176,8 +194,9 @@ async def connect_workflow(self, w_id, contact_data): self.delta_queues[w_id] = {} - # setup sync subscriber set - self.full_sync_workflow_gql_subs[w_id] = set() + level = MIN_LEVEL + if self.sync_level_workflow_graphql_subs[w_id][MAX_LEVEL]: + level = MAX_LEVEL # Might be options other than threads to achieve # non-blocking subscriptions, but this works. @@ -186,10 +205,11 @@ async def connect_workflow(self, w_id, contact_data): w_id, contact_data['name'], contact_data[CFF.HOST], - contact_data[CFF.PUBLISH_PORT] + contact_data[CFF.PUBLISH_PORT], + SUBSCRIPTION_LEVELS[level]['topics'] ) - result = await self.workflow_data_update(w_id, minimal=True) + result = await self.workflow_data_update(w_id, level) if result: # don't update the contact data until we have successfully updated @@ -199,13 +219,11 @@ async def connect_workflow(self, w_id, contact_data): async def workflow_data_update( self, w_id: str, - minimal: Optional[bool] = None + level: str, ): - if minimal is None: - minimal = w_id in self.full_sync_workflows - successful_updates = await self._entire_workflow_update( + successful_updates = await self._workflow_update( ids=[w_id], - minimal=minimal + req_method=SUBSCRIPTION_LEVELS[level]['request'] ) if w_id not in successful_updates: @@ -241,9 +259,6 @@ def disconnect_workflow(self, w_id, update_contact=True): if w_id in self.w_subs: self.w_subs[w_id].stop() del self.w_subs[w_id] - if w_id in self.full_sync_workflow_gql_subs: - del self.full_sync_workflow_gql_subs[w_id] - self.full_sync_workflows.discard(w_id) def get_workflows(self): """Return all workflows the data store is currently tracking. @@ -273,8 +288,10 @@ def _purge_workflow(self, w_id): del self.data[w_id] if w_id in self.delta_queues: del self.delta_queues[w_id] + if w_id in self.sync_level_workflow_graphql_subs: + del self.sync_level_workflow_graphql_subs[w_id] - def _start_subscription(self, w_id, reg, host, port): + def _start_subscription(self, w_id, reg, host, port, topics): """Instantiate and run subscriber data-store sync. Args: @@ -282,6 +299,7 @@ def _start_subscription(self, w_id, reg, host, port): reg (str): Registered workflow name. host (str): Hostname of target workflow. port (int): Port of target workflow. + topics set(str): set of topics to subscribe to. """ self.w_subs[w_id] = WorkflowSubscriber( @@ -289,7 +307,7 @@ def _start_subscription(self, w_id, reg, host, port): host=host, port=port, context=self.workflows_mgr.context, - topics=self.topics + topics=topics ) self.w_subs[w_id].loop.run_until_complete( self.w_subs[w_id].subscribe( @@ -321,7 +339,7 @@ def _update_workflow_data(self, topic, delta, w_id): self.disconnect_workflow(w_id) return elif topic == WORKFLOW: - if w_id in self.full_sync_workflows: + if self.sync_level_workflow_graphql_subs[w_id][MAX_LEVEL]: return self._apply_delta(w_id, WORKFLOW, delta) # might seem clunky, but as with contact update, making it look @@ -329,7 +347,7 @@ def _update_workflow_data(self, topic, delta, w_id): all_deltas = DELTAS_MAP[ALL_DELTAS]() all_deltas.workflow.CopyFrom(delta) self._delta_store_to_queues(w_id, ALL_DELTAS, all_deltas) - elif w_id in self.full_sync_workflows: + else: self._apply_all_delta(w_id, delta) self._delta_store_to_queues(w_id, topic, delta) @@ -419,10 +437,8 @@ def _reconcile_update(self, topic, delta, w_id): except Exception as exc: self.log.exception(exc) - async def _entire_workflow_update( - self, - ids: Optional[list] = None, - minimal: Optional[bool] = False + async def _workflow_update( + self, ids: list[str], req_method: str, ) -> Set[str]: """Update entire local data-store of workflow(s). @@ -430,11 +446,6 @@ async def _entire_workflow_update( ids: List of workflow external IDs. """ - if ids is None: - ids = [] - - # Request new data - req_method = 'pb_entire_workflow' requests = { w_id: workflow_request( @@ -467,8 +478,6 @@ async def _entire_workflow_update( for key in DATA_TEMPLATE } continue - elif minimal: - continue new_data[field.name] = {n.id: n for n in value} self.data[w_id] = new_data successes.add(w_id) @@ -558,32 +567,63 @@ def _get_status_msg(self, w_id: str, is_active: bool) -> str: # the workflow has not yet run return 'not yet run' + async def _update_subscription_level(self, w_id, level): + """Update level of data subscribed to.""" + sub = self.w_subs.get(w_id) + if sub: + stop_topics = sub.topics.difference( + SUBSCRIPTION_LEVELS[level]['topics'] + ) + start_topics = SUBSCRIPTION_LEVELS[level]['topics'].difference( + sub.topics + ) + for stop_topic in stop_topics: + sub.unsubscribe_topic(stop_topic) + # Doing this after unsubscribe and before subscribe + # to make sure old topics stop and new data is in place. + await self.workflow_data_update(w_id, level) + for start_topic in start_topics: + sub.subscribe_topic(start_topic) + def graphql_sub_interrogate(self, sub_id, info): """Scope data requirements.""" fragments = set(info.fragments.keys()) minimal = ( - fragments <= self.min_sync_fragments + ( + fragments + <= SUBSCRIPTION_LEVELS[MIN_LEVEL]['criteria']['fragments'] + ) and bool(fragments) ) - if not minimal: - self.full_sync_gql_subs.add(sub_id) - return minimal + if minimal: + self.sync_level_graphql_subs[MIN_LEVEL].add(sub_id) + return + self.sync_level_graphql_subs[MAX_LEVEL].add(sub_id) async def graphql_sub_data_match(self, w_id, sub_id): """Match store data level to requested graphql subscription.""" - if ( - sub_id in self.full_sync_gql_subs - and sub_id not in self.full_sync_workflow_gql_subs[w_id] - ): - self.full_sync_workflow_gql_subs[w_id].add(sub_id) - await self.workflow_data_update(w_id, minimal=False) - - self.full_sync_workflows.add(w_id) + sync_level_wsubs = self.sync_level_workflow_graphql_subs[w_id] + if sub_id in self.sync_level_graphql_subs[MAX_LEVEL]: + if not sync_level_wsubs[MAX_LEVEL]: + sync_level_wsubs[MAX_LEVEL].add(sub_id) + await self._update_subscription_level(w_id, MAX_LEVEL) + else: + sync_level_wsubs[MIN_LEVEL].add(sub_id) - def graphql_sub_discard(self, sub_id): + async def graphql_sub_discard(self, sub_id): """Discard graphql subscription references.""" - self.full_sync_gql_subs.discard(sub_id) - for w_id in self.full_sync_workflow_gql_subs: - self.full_sync_workflow_gql_subs[w_id].discard(w_id) - if not self.full_sync_workflow_gql_subs[w_id]: - self.full_sync_workflows.discard(w_id) + level = MIN_LEVEL + if sub_id in self.sync_level_graphql_subs[MAX_LEVEL]: + level = MAX_LEVEL + self.sync_level_graphql_subs[level].discard(sub_id) + for w_id in self.sync_level_workflow_graphql_subs: + self.sync_level_workflow_graphql_subs[w_id][level].discard( + sub_id + ) + # if there are no more max level subscriptions after removal + # of a max level sub, downgrade to min. + if ( + not self.sync_level_workflow_graphql_subs[w_id][level] + and level is MAX_LEVEL + ): + await self._update_subscription_level(w_id, MIN_LEVEL)