diff --git a/cylc/uiserver/data_store_mgr.py b/cylc/uiserver/data_store_mgr.py index b7e526a0..fee55548 100644 --- a/cylc/uiserver/data_store_mgr.py +++ b/cylc/uiserver/data_store_mgr.py @@ -101,7 +101,22 @@ def __init__(self, workflows_mgr, log, max_threads=10): self.log = log self.data = {} self.w_subs: Dict[str, WorkflowSubscriber] = {} - self.topics = {ALL_DELTAS.encode('utf-8'), b'shutdown'} + self.topics = { + ALL_DELTAS.encode('utf-8'), + WORKFLOW.encode('utf-8'), + b'shutdown' + } + # If fragments in graphql sub for minimal sync + self.min_sync_fragments = { + 'AddedDelta', + 'WorkflowData', + 'UpdatedDelta' + } + # set of workflows to sync all data + self.full_sync_workflows = set() + self.full_sync_gql_subs = set() + # dict of workflow full sync subscriber IDs + self.full_sync_workflow_gql_subs = {} self.loop = None self.executor = ThreadPoolExecutor(max_threads) self.delta_queues = {} @@ -161,6 +176,9 @@ async def connect_workflow(self, w_id, contact_data): self.delta_queues[w_id] = {} + # setup sync subscriber set + self.full_sync_workflow_gql_subs[w_id] = set() + # Might be options other than threads to achieve # non-blocking subscriptions, but this works. self.executor.submit( @@ -170,7 +188,25 @@ async def connect_workflow(self, w_id, contact_data): contact_data[CFF.HOST], contact_data[CFF.PUBLISH_PORT] ) - successful_updates = await self._entire_workflow_update(ids=[w_id]) + + result = await self.workflow_data_update(w_id, minimal=True) + + if result: + # don't update the contact data until we have successfully updated + self._update_contact(w_id, contact_data) + + @log_call + async def workflow_data_update( + self, + w_id: str, + minimal: Optional[bool] = None + ): + if minimal is None: + minimal = w_id in self.full_sync_workflows + successful_updates = await self._entire_workflow_update( + ids=[w_id], + minimal=minimal + ) if w_id not in successful_updates: # something went wrong, undo any changes to allow for subsequent @@ -178,9 +214,7 @@ async def connect_workflow(self, w_id, contact_data): self.log.info(f'failed to connect to {w_id}') self.disconnect_workflow(w_id) return False - else: - # don't update the contact data until we have successfully updated - self._update_contact(w_id, contact_data) + return True @log_call def disconnect_workflow(self, w_id, update_contact=True): @@ -207,6 +241,9 @@ def disconnect_workflow(self, w_id, update_contact=True): if w_id in self.w_subs: self.w_subs[w_id].stop() del self.w_subs[w_id] + if w_id in self.full_sync_workflow_gql_subs: + del self.full_sync_workflow_gql_subs[w_id] + self.full_sync_workflows.discard(w_id) def get_workflows(self): """Return all workflows the data store is currently tracking. @@ -283,8 +320,18 @@ def _update_workflow_data(self, topic, delta, w_id): # close connections self.disconnect_workflow(w_id) return - self._apply_all_delta(w_id, delta) - self._delta_store_to_queues(w_id, topic, delta) + elif topic == WORKFLOW: + if w_id in self.full_sync_workflows: + return + self._apply_delta(w_id, WORKFLOW, delta) + # might seem clunky, but as with contact update, making it look + # like an ALL_DELTA avoids changing the resolver in cylc-flow + all_deltas = DELTAS_MAP[ALL_DELTAS]() + all_deltas.workflow.CopyFrom(delta) + self._delta_store_to_queues(w_id, ALL_DELTAS, all_deltas) + elif w_id in self.full_sync_workflows: + self._apply_all_delta(w_id, delta) + self._delta_store_to_queues(w_id, topic, delta) def _clear_data_field(self, w_id, field_name): if field_name == WORKFLOW: @@ -295,22 +342,26 @@ def _clear_data_field(self, w_id, field_name): def _apply_all_delta(self, w_id, delta): """Apply the AllDeltas delta.""" for field, sub_delta in delta.ListFields(): - delta_time = getattr(sub_delta, 'time', 0.0) - # If the workflow has reloaded clear the data before - # delta application. - if sub_delta.reloaded: - self._clear_data_field(w_id, field.name) - self.data[w_id]['delta_times'][field.name] = 0.0 - # hard to catch errors in a threaded async app, so use try-except. - try: - # Apply the delta if newer than the previously applied. - if delta_time >= self.data[w_id]['delta_times'][field.name]: - apply_delta(field.name, sub_delta, self.data[w_id]) - self.data[w_id]['delta_times'][field.name] = delta_time - if not sub_delta.reloaded: - self._reconcile_update(field.name, sub_delta, w_id) - except Exception as exc: - self.log.exception(exc) + self._apply_delta(w_id, field.name, sub_delta) + + def _apply_delta(self, w_id, name, delta): + """Apply delta.""" + delta_time = getattr(delta, 'time', 0.0) + # If the workflow has reloaded clear the data before + # delta application. + if delta.reloaded: + self._clear_data_field(w_id, name) + self.data[w_id]['delta_times'][name] = 0.0 + # hard to catch errors in a threaded async app, so use try-except. + try: + # Apply the delta if newer than the previously applied. + if delta_time >= self.data[w_id]['delta_times'][name]: + apply_delta(name, delta, self.data[w_id]) + self.data[w_id]['delta_times'][name] = delta_time + if not delta.reloaded: + self._reconcile_update(name, delta, w_id) + except Exception as exc: + self.log.exception(exc) def _delta_store_to_queues(self, w_id, topic, delta): # Queue delta for graphql subscription resolving @@ -369,7 +420,9 @@ def _reconcile_update(self, topic, delta, w_id): self.log.exception(exc) async def _entire_workflow_update( - self, ids: Optional[list] = None + self, + ids: Optional[list] = None, + minimal: Optional[bool] = False ) -> Set[str]: """Update entire local data-store of workflow(s). @@ -414,6 +467,8 @@ async def _entire_workflow_update( for key in DATA_TEMPLATE } continue + elif minimal: + continue new_data[field.name] = {n.id: n for n in value} self.data[w_id] = new_data successes.add(w_id) @@ -502,3 +557,33 @@ def _get_status_msg(self, w_id: str, is_active: bool) -> str: else: # the workflow has not yet run return 'not yet run' + + def graphql_sub_interrogate(self, sub_id, info): + """Scope data requirements.""" + fragments = set(info.fragments.keys()) + minimal = ( + fragments <= self.min_sync_fragments + and bool(fragments) + ) + if not minimal: + self.full_sync_gql_subs.add(sub_id) + return minimal + + async def graphql_sub_data_match(self, w_id, sub_id): + """Match store data level to requested graphql subscription.""" + if ( + sub_id in self.full_sync_gql_subs + and sub_id not in self.full_sync_workflow_gql_subs[w_id] + ): + self.full_sync_workflow_gql_subs[w_id].add(sub_id) + await self.workflow_data_update(w_id, minimal=False) + + self.full_sync_workflows.add(w_id) + + def graphql_sub_discard(self, sub_id): + """Discard graphql subscription references.""" + self.full_sync_gql_subs.discard(sub_id) + for w_id in self.full_sync_workflow_gql_subs: + self.full_sync_workflow_gql_subs[w_id].discard(w_id) + if not self.full_sync_workflow_gql_subs[w_id]: + self.full_sync_workflows.discard(w_id)