Skip to content

Commit

Permalink
add workflow only endpoint, sub/unsub subscriber
Browse files Browse the repository at this point in the history
  • Loading branch information
dwsutherland committed May 30, 2024
1 parent 3c40408 commit 50e7c94
Show file tree
Hide file tree
Showing 4 changed files with 39 additions and 1 deletion.
17 changes: 17 additions & 0 deletions cylc/flow/data_store_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -2705,6 +2705,23 @@ def get_entire_workflow(self):

return workflow_msg

def get_workflow_only(self):
"""Gather workflow summary data into a Protobuf message.
No tasks / cycles, etc, just workflow stuff.
Returns:
cylc.flow.data_messages_pb2.PbEntireWorkflow
"""

workflow_msg = PbEntireWorkflow()
workflow_msg.workflow.CopyFrom(

Check warning on line 2719 in cylc/flow/data_store_mgr.py

View check run for this annotation

Codecov / codecov/patch

cylc/flow/data_store_mgr.py#L2718-L2719

Added lines #L2718 - L2719 were not covered by tests
self.data[self.workflow_id][WORKFLOW]
)

return workflow_msg

Check warning on line 2723 in cylc/flow/data_store_mgr.py

View check run for this annotation

Codecov / codecov/patch

cylc/flow/data_store_mgr.py#L2723

Added line #L2723 was not covered by tests

def get_publish_deltas(self):
"""Return deltas for publishing."""
all_deltas = DELTAS_MAP[ALL_DELTAS]()
Expand Down
2 changes: 1 addition & 1 deletion cylc/flow/network/resolvers.py
Original file line number Diff line number Diff line change
Expand Up @@ -663,7 +663,7 @@ async def subscribe_delta(
import traceback
logger.warning(traceback.format_exc())
finally:
self.data_store_mgr.graphql_sub_discard(sub_id)
await self.data_store_mgr.graphql_sub_discard(sub_id)

Check warning on line 666 in cylc/flow/network/resolvers.py

View check run for this annotation

Codecov / codecov/patch

cylc/flow/network/resolvers.py#L666

Added line #L666 was not covered by tests
for w_id in w_ids:
if delta_queues.get(w_id, {}).get(sub_id):
del delta_queues[w_id][sub_id]
Expand Down
11 changes: 11 additions & 0 deletions cylc/flow/network/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
# maps server methods to the protobuf message (for client/UIS import)
PB_METHOD_MAP: Dict[str, Any] = {
'pb_entire_workflow': PbEntireWorkflow,
'pb_workflow_only': PbEntireWorkflow,
'pb_data_elements': DELTAS_MAP
}

Expand Down Expand Up @@ -416,6 +417,16 @@ def pb_entire_workflow(self, **_kwargs) -> bytes:
pb_msg = self.schd.data_store_mgr.get_entire_workflow()
return pb_msg.SerializeToString()

@authorise()
@expose
def pb_workflow_only(self, **_kwargs) -> bytes:
"""Send only the workflow data, not tasks etc.
Returns serialised Protobuf message
"""
pb_msg = self.schd.data_store_mgr.get_workflow_only()
return pb_msg.SerializeToString()

Check warning on line 428 in cylc/flow/network/server.py

View check run for this annotation

Codecov / codecov/patch

cylc/flow/network/server.py#L427-L428

Added lines #L427 - L428 were not covered by tests

@authorise()
@expose
def pb_data_elements(self, element_type: str, **_kwargs) -> bytes:
Expand Down
10 changes: 10 additions & 0 deletions cylc/flow/network/subscriber.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,16 @@ def _socket_options(self) -> None:
for topic in self.topics:
self.socket.setsockopt(zmq.SUBSCRIBE, topic)

def unsubscribe_topic(self, topic):
if topic in self.topics:
self.socket.setsockopt(zmq.UNSUBSCRIBE, topic)
self.topics.discard(topic)

Check warning on line 99 in cylc/flow/network/subscriber.py

View check run for this annotation

Codecov / codecov/patch

cylc/flow/network/subscriber.py#L98-L99

Added lines #L98 - L99 were not covered by tests

def subscribe_topic(self, topic):
if topic not in self.topics:
self.socket.setsockopt(zmq.SUBSCRIBE, topic)
self.topics.add(topic)

Check warning on line 104 in cylc/flow/network/subscriber.py

View check run for this annotation

Codecov / codecov/patch

cylc/flow/network/subscriber.py#L103-L104

Added lines #L103 - L104 were not covered by tests

async def subscribe(self, msg_handler, *args, **kwargs):
"""Subscribe to updates from the provided socket."""
while True:
Expand Down

0 comments on commit 50e7c94

Please sign in to comment.