Skip to content

Commit

Permalink
sync data-level of subscription
Browse files Browse the repository at this point in the history
  • Loading branch information
dwsutherland committed May 29, 2024
1 parent dfd80ca commit f98f2df
Showing 1 changed file with 109 additions and 24 deletions.
133 changes: 109 additions & 24 deletions cylc/uiserver/data_store_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,22 @@ def __init__(self, workflows_mgr, log, max_threads=10):
self.log = log
self.data = {}
self.w_subs: Dict[str, WorkflowSubscriber] = {}
self.topics = {ALL_DELTAS.encode('utf-8'), b'shutdown'}
self.topics = {
ALL_DELTAS.encode('utf-8'),
WORKFLOW.encode('utf-8'),
b'shutdown'
}
# If fragments in graphql sub for minimal sync
self.min_sync_fragments = {
'AddedDelta',
'WorkflowData',
'UpdatedDelta'
}
# set of workflows to sync all data
self.full_sync_workflows = set()
self.full_sync_gql_subs = set()
# dict of workflow full sync subscriber IDs
self.full_sync_workflow_gql_subs = {}
self.loop = None
self.executor = ThreadPoolExecutor(max_threads)
self.delta_queues = {}
Expand Down Expand Up @@ -161,6 +176,9 @@ async def connect_workflow(self, w_id, contact_data):

self.delta_queues[w_id] = {}

# setup sync subscriber set
self.full_sync_workflow_gql_subs[w_id] = set()

# Might be options other than threads to achieve
# non-blocking subscriptions, but this works.
self.executor.submit(
Expand All @@ -170,17 +188,33 @@ async def connect_workflow(self, w_id, contact_data):
contact_data[CFF.HOST],
contact_data[CFF.PUBLISH_PORT]
)
successful_updates = await self._entire_workflow_update(ids=[w_id])

result = await self.workflow_data_update(w_id, minimal=True)

if result:
# don't update the contact data until we have successfully updated
self._update_contact(w_id, contact_data)

@log_call
async def workflow_data_update(
self,
w_id: str,
minimal: Optional[bool] = None
):
if minimal is None:
minimal = w_id in self.full_sync_workflows
successful_updates = await self._entire_workflow_update(
ids=[w_id],
minimal=minimal
)

if w_id not in successful_updates:
# something went wrong, undo any changes to allow for subsequent
# connection attempts
self.log.info(f'failed to connect to {w_id}')
self.disconnect_workflow(w_id)
return False
else:
# don't update the contact data until we have successfully updated
self._update_contact(w_id, contact_data)
return True

@log_call
def disconnect_workflow(self, w_id, update_contact=True):
Expand All @@ -207,6 +241,9 @@ def disconnect_workflow(self, w_id, update_contact=True):
if w_id in self.w_subs:
self.w_subs[w_id].stop()
del self.w_subs[w_id]
if w_id in self.full_sync_workflow_gql_subs:
del self.full_sync_workflow_gql_subs[w_id]
self.full_sync_workflows.discard(w_id)

def get_workflows(self):
"""Return all workflows the data store is currently tracking.
Expand Down Expand Up @@ -283,8 +320,18 @@ def _update_workflow_data(self, topic, delta, w_id):
# close connections
self.disconnect_workflow(w_id)
return
self._apply_all_delta(w_id, delta)
self._delta_store_to_queues(w_id, topic, delta)
elif topic == WORKFLOW:
if w_id in self.full_sync_workflows:
return
self._apply_delta(w_id, WORKFLOW, delta)
# might seem clunky, but as with contact update, making it look
# like an ALL_DELTA avoids changing the resolver in cylc-flow
all_deltas = DELTAS_MAP[ALL_DELTAS]()
all_deltas.workflow.CopyFrom(delta)
self._delta_store_to_queues(w_id, ALL_DELTAS, all_deltas)
elif w_id in self.full_sync_workflows:
self._apply_all_delta(w_id, delta)
self._delta_store_to_queues(w_id, topic, delta)

def _clear_data_field(self, w_id, field_name):
if field_name == WORKFLOW:
Expand All @@ -295,22 +342,26 @@ def _clear_data_field(self, w_id, field_name):
def _apply_all_delta(self, w_id, delta):
"""Apply the AllDeltas delta."""
for field, sub_delta in delta.ListFields():
delta_time = getattr(sub_delta, 'time', 0.0)
# If the workflow has reloaded clear the data before
# delta application.
if sub_delta.reloaded:
self._clear_data_field(w_id, field.name)
self.data[w_id]['delta_times'][field.name] = 0.0
# hard to catch errors in a threaded async app, so use try-except.
try:
# Apply the delta if newer than the previously applied.
if delta_time >= self.data[w_id]['delta_times'][field.name]:
apply_delta(field.name, sub_delta, self.data[w_id])
self.data[w_id]['delta_times'][field.name] = delta_time
if not sub_delta.reloaded:
self._reconcile_update(field.name, sub_delta, w_id)
except Exception as exc:
self.log.exception(exc)
self._apply_delta(w_id, field.name, sub_delta)

def _apply_delta(self, w_id, name, delta):
"""Apply delta."""
delta_time = getattr(delta, 'time', 0.0)
# If the workflow has reloaded clear the data before
# delta application.
if delta.reloaded:
self._clear_data_field(w_id, name)
self.data[w_id]['delta_times'][name] = 0.0
# hard to catch errors in a threaded async app, so use try-except.
try:
# Apply the delta if newer than the previously applied.
if delta_time >= self.data[w_id]['delta_times'][name]:
apply_delta(name, delta, self.data[w_id])
self.data[w_id]['delta_times'][name] = delta_time
if not delta.reloaded:
self._reconcile_update(name, delta, w_id)
except Exception as exc:
self.log.exception(exc)

def _delta_store_to_queues(self, w_id, topic, delta):
# Queue delta for graphql subscription resolving
Expand Down Expand Up @@ -369,7 +420,9 @@ def _reconcile_update(self, topic, delta, w_id):
self.log.exception(exc)

async def _entire_workflow_update(
self, ids: Optional[list] = None
self,
ids: Optional[list] = None,
minimal: Optional[bool] = False
) -> Set[str]:
"""Update entire local data-store of workflow(s).
Expand Down Expand Up @@ -414,6 +467,8 @@ async def _entire_workflow_update(
for key in DATA_TEMPLATE
}
continue
elif minimal:
continue
new_data[field.name] = {n.id: n for n in value}
self.data[w_id] = new_data
successes.add(w_id)
Expand Down Expand Up @@ -502,3 +557,33 @@ def _get_status_msg(self, w_id: str, is_active: bool) -> str:
else:
# the workflow has not yet run
return 'not yet run'

def graphql_sub_interrogate(self, sub_id, info):
"""Scope data requirements."""
fragments = set(info.fragments.keys())
minimal = (
fragments <= self.min_sync_fragments
and bool(fragments)
)
if not minimal:
self.full_sync_gql_subs.add(sub_id)
return minimal

async def graphql_sub_data_match(self, w_id, sub_id):
"""Match store data level to requested graphql subscription."""
if (
sub_id in self.full_sync_gql_subs
and sub_id not in self.full_sync_workflow_gql_subs[w_id]
):
self.full_sync_workflow_gql_subs[w_id].add(sub_id)
await self.workflow_data_update(w_id, minimal=False)

self.full_sync_workflows.add(w_id)

def graphql_sub_discard(self, sub_id):
"""Discard graphql subscription references."""
self.full_sync_gql_subs.discard(sub_id)
for w_id in self.full_sync_workflow_gql_subs:
self.full_sync_workflow_gql_subs[w_id].discard(w_id)
if not self.full_sync_workflow_gql_subs[w_id]:
self.full_sync_workflows.discard(w_id)

0 comments on commit f98f2df

Please sign in to comment.