From 961f0b2d21e9209df00ba4ad7aaa035b9dd01b44 Mon Sep 17 00:00:00 2001 From: alaydshah Date: Fri, 14 Jun 2024 07:18:51 +0000 Subject: [PATCH] Proposing removal of nested protocol managers --- .../scheduler_core/message_center.py | 9 ++----- .../scheduler_base_protocol_manager.py | 26 ------------------- .../scheduler/scheduler_core/status_center.py | 9 ++----- .../slave/base_slave_protocol_manager.py | 26 ------------------- 4 files changed, 4 insertions(+), 66 deletions(-) diff --git a/python/fedml/computing/scheduler/scheduler_core/message_center.py b/python/fedml/computing/scheduler/scheduler_core/message_center.py index 2bfa3b514..77bfc00b9 100755 --- a/python/fedml/computing/scheduler/scheduler_core/message_center.py +++ b/python/fedml/computing/scheduler/scheduler_core/message_center.py @@ -297,9 +297,6 @@ def remove_message_listener(self, topic): self.listener_topics.remove(topic) self.listener_handler_funcs.pop(topic) - def get_message_runner(self): - return None - def get_listener_message_queue(self): return self.listener_message_queue @@ -321,11 +318,9 @@ def start_listener( self.listener_message_event = multiprocessing.Event() self.listener_message_event.clear() self.listener_agent_config = agent_config - message_runner = self.get_message_runner() - message_runner.listener_agent_config = agent_config if platform.system() == "Windows": self.listener_message_center_process = multiprocessing.Process( - target=message_runner.run_listener_dispatcher, args=( + target=self.run_listener_dispatcher, args=( self.listener_message_event, self.listener_message_queue, self.listener_handler_funcs, sender_message_queue, message_center_name, extra_queues @@ -333,7 +328,7 @@ def start_listener( ) else: self.listener_message_center_process = fedml.get_process( - target=message_runner.run_listener_dispatcher, args=( + target=self.run_listener_dispatcher, args=( self.listener_message_event, self.listener_message_queue, self.listener_handler_funcs, sender_message_queue, message_center_name, extra_queues diff --git a/python/fedml/computing/scheduler/scheduler_core/scheduler_base_protocol_manager.py b/python/fedml/computing/scheduler/scheduler_core/scheduler_base_protocol_manager.py index f80508a50..5c23abe19 100755 --- a/python/fedml/computing/scheduler/scheduler_core/scheduler_base_protocol_manager.py +++ b/python/fedml/computing/scheduler/scheduler_core/scheduler_base_protocol_manager.py @@ -256,32 +256,6 @@ def generate_status_report(self, run_id, edge_id, server_agent_id=None): status_reporter.server_agent_id = server_agent_id return status_reporter - @abstractmethod - def generate_protocol_manager(self): - # Generate the protocol manager instance and set the attribute values. - return None - - def get_message_runner(self): - if self.message_status_runner is not None: - return self.message_status_runner - - self.message_status_runner = self.generate_protocol_manager() - self.message_status_runner.status_queue = self.get_status_queue() - - return self.message_status_runner - - def get_status_runner(self): - if self.message_status_runner is None: - self.get_message_runner() - if self.message_status_runner is not None: - self.message_status_runner.sender_message_queue = self.message_center.get_sender_message_queue() - - if self.message_status_runner is not None: - self.message_status_runner.sender_message_queue = self.message_center.get_sender_message_queue() - return self.message_status_runner - - return None - def get_get_protocol_communication_manager(self): return self.communication_mgr diff --git a/python/fedml/computing/scheduler/scheduler_core/status_center.py b/python/fedml/computing/scheduler/scheduler_core/status_center.py index 7e0cf1f98..58f48895f 100755 --- a/python/fedml/computing/scheduler/scheduler_core/status_center.py +++ b/python/fedml/computing/scheduler/scheduler_core/status_center.py @@ -93,7 +93,6 @@ def __init__(self, message_queue=None): self.status_listener_message_center_queue = None self.status_message_center = None self.status_manager_instance = None - self.status_runner = None self.is_deployment_status_center = False def __repr__(self): @@ -103,9 +102,6 @@ def __repr__(self): attrs=" ".join("{}={!r}".format(k, v) for k, v in self.__dict__.items()), ) - def get_status_runner(self): - return None - def start_status_center(self, sender_message_center_queue=None, listener_message_center_queue=None, is_slave_agent=False): self.status_queue = multiprocessing.Manager().Queue(-1) @@ -113,9 +109,8 @@ def start_status_center(self, sender_message_center_queue=None, self.status_event.clear() self.status_sender_message_center_queue = sender_message_center_queue self.status_listener_message_center_queue = listener_message_center_queue - self.status_runner = self.get_status_runner() - target_func = self.status_runner.run_status_dispatcher if not is_slave_agent else \ - self.status_runner.run_status_dispatcher_in_slave + target_func = self.run_status_dispatcher if not is_slave_agent else \ + self.run_status_dispatcher_in_slave if platform.system() == "Windows": self.status_center_process = multiprocessing.Process( target=target_func, args=( diff --git a/python/fedml/computing/scheduler/slave/base_slave_protocol_manager.py b/python/fedml/computing/scheduler/slave/base_slave_protocol_manager.py index 447bd05cd..d275852f2 100755 --- a/python/fedml/computing/scheduler/slave/base_slave_protocol_manager.py +++ b/python/fedml/computing/scheduler/slave/base_slave_protocol_manager.py @@ -435,32 +435,6 @@ def callback_broadcasted_job_status(self, topic, payload): logging.info("process status in the broadcast job status callback.") self.process_status(run_id, job_status, self.edge_id) - def generate_protocol_manager(self): - message_status_runner = self._generate_protocol_manager_instance( - self.args, agent_config=self.agent_config - ) - message_status_runner.request_json = self.request_json - message_status_runner.disable_client_login = self.disable_client_login - message_status_runner.message_center_name = self.message_center_name - message_status_runner.run_id = self.run_id - message_status_runner.edge_id = self.edge_id - message_status_runner.edge_user_name = self.edge_user_name - message_status_runner.edge_extra_url = self.edge_extra_url - message_status_runner.server_agent_id = self.server_agent_id - message_status_runner.current_device_id = self.current_device_id - message_status_runner.unique_device_id = self.unique_device_id - message_status_runner.subscribed_topics = self.subscribed_topics - message_status_runner.running_request_json = self.running_request_json - message_status_runner.request_json = self.start_request_json - message_status_runner.user_name = self.user_name - message_status_runner.general_edge_id = self.general_edge_id - message_status_runner.server_id = self.server_id - message_status_runner.model_device_server_id = self.model_device_server_id - message_status_runner.model_device_client_edge_id_list = self.model_device_client_edge_id_list - message_status_runner.status_queue = self.get_status_queue() - - return message_status_runner - def process_status(self, run_id, status, edge_id, master_id=None): run_id_str = str(run_id)