From 2252a587a3adf1b75afb6ec2d16c0cb42465cc6b Mon Sep 17 00:00:00 2001 From: xieyxclack <31954383+xieyxclack@users.noreply.github.com> Date: Thu, 5 May 2022 13:25:05 +0800 Subject: [PATCH] update doc of core (#57) --- federatedscope/core/fed_runner.py | 15 +++-- federatedscope/core/worker/base_worker.py | 2 +- federatedscope/core/worker/client.py | 55 +++++++++++++++--- federatedscope/core/worker/server.py | 70 +++++++++++++++++++---- 4 files changed, 120 insertions(+), 22 deletions(-) diff --git a/federatedscope/core/fed_runner.py b/federatedscope/core/fed_runner.py index 94bf8fc01..af5c6cf0a 100644 --- a/federatedscope/core/fed_runner.py +++ b/federatedscope/core/fed_runner.py @@ -11,7 +11,13 @@ class FedRunner(object): """ - This class is used for building up the Federated Learning course + This class is used to construct an FL course, which includes `_set_up` and `run`. + + Arguments: + data: The data used in the FL courses, which are formatted as {'ID':data} for standalone mode. More details can be found in federatedscope.core.auxiliaries.data_builder . + server_class: The server class is used for instantiating a (customized) server. + client_class: The client class is used for instantiating a (customized) client. + config: The configurations of the FL course. """ def __init__(self, data, @@ -39,7 +45,7 @@ def __init__(self, def _setup_for_standalone(self): """ - To set up server and client for standalone mode + To set up server and client for standalone mode. """ self.server = self._setup_server() @@ -65,7 +71,7 @@ def _setup_for_standalone(self): def _setup_for_distributed(self): """ - To set up server or client for distributed mode + To set up server or client for distributed mode. """ self.server_address = { 'host': self.cfg.distribute.server_host, @@ -83,7 +89,8 @@ def _setup_for_distributed(self): def run(self): """ - To run an FL course, which is called after server/client has been set up + To run an FL course, which is called after server/client has been set up. + For the standalone mode, a shared message queue will be set up to simulate ``receiving message``. """ if self.mode == 'standalone': # trigger the FL course diff --git a/federatedscope/core/worker/base_worker.py b/federatedscope/core/worker/base_worker.py index e58f25578..150b3f4d4 100644 --- a/federatedscope/core/worker/base_worker.py +++ b/federatedscope/core/worker/base_worker.py @@ -3,7 +3,7 @@ class Worker(object): """ - The base worker class + The base worker class. """ def __init__(self, ID=-1, state=0, config=None, model=None, strategy=None): self._ID = ID diff --git a/federatedscope/core/worker/client.py b/federatedscope/core/worker/client.py index 3856a16a1..99f5d97fb 100644 --- a/federatedscope/core/worker/client.py +++ b/federatedscope/core/worker/client.py @@ -16,16 +16,17 @@ class Client(Worker): """ The Client class, which describes the behaviors of client in an FL course. - The attributes include: + The behaviors are described by the handling functions (named as callback_funcs_for_xxx) + + Arguments: ID: The unique ID of the client, which is assigned by the server when joining the FL course server_id: (Default) 0 state: The training round - config: the configuration + config: The configuration data: The data owned by the client - model: The local model + model: The model maintained locally device: The device to run local training and evaluation strategy: redundant attribute - The behaviors are described by the handled functions (named as callback_funcs_for_xxx) """ def __init__(self, ID=-1, @@ -100,7 +101,11 @@ def __init__(self, def register_handlers(self, msg_type, callback_func): """ - To bind a message type with a handled function + To bind a message type with a handling function. + + Arguments: + msg_type (str): The defined message type + callback_func: The handling functions to handle the received message """ self.msg_handlers[msg_type] = callback_func @@ -119,7 +124,7 @@ def _register_default_handlers(self): def join_in(self): """ - To send 'join_in' message to the server + To send 'join_in' message to the server for joining in the FL course. """ self.comm_manager.send( Message(msg_type='join_in', @@ -129,7 +134,7 @@ def join_in(self): def run(self): """ - To wait for the messages and handle them (for distributed mode) + To listen to the message and handle them accordingly (used for distributed mode) """ while True: msg = self.comm_manager.receive() @@ -140,6 +145,12 @@ def run(self): break def callback_funcs_for_model_para(self, message: Message): + """ + The handling function for receiving model parameters, which triggers the local training process. This handling function is widely used in various FL courses. + + Arguments: + message: The received message, which includes sender, receiver, state, and content. More detail can be found in federatedscope.core.message + """ if 'ss' in message.msg_type: # A fragment of the shared secret state, content = message.state, message.content @@ -240,12 +251,24 @@ def callback_funcs_for_model_para(self, message: Message): content=(sample_size, model_para_all))) def callback_funcs_for_assign_id(self, message: Message): + """ + The handling function for receiving the client_ID assigned by the server (during the joining process), which is used in the distributed mode. + + Arguments: + message: The received message + """ content = message.content self.ID = int(content) logger.info('Client (address {}:{}) is assigned with #{:d}.'.format( self.comm_manager.host, self.comm_manager.port, self.ID)) def callback_funcs_for_join_in_info(self, message: Message): + """ + The handling function for receiving the request of join in information (such as batch_size, num_of_samples) during the joining process. + + Arguments: + message: The received message + """ requirements = message.content join_in_info = dict() for requirement in requirements: @@ -267,12 +290,24 @@ def callback_funcs_for_join_in_info(self, message: Message): content=join_in_info)) def callback_funcs_for_address(self, message: Message): + """ + The handling function for receiving other clients' IP addresses, which is used for constructing a complex topology + + Arguments: + message: The received message + """ content = message.content for neighbor_id, address in content.items(): if int(neighbor_id) != self.ID: self.comm_manager.add_neighbors(neighbor_id, address) def callback_funcs_for_evaluate(self, message: Message): + """ + The handling function for receiving the request of evaluating + + Arguments: + message: The received message + """ sender = message.sender self.state = message.state if message.content != None: @@ -321,6 +356,12 @@ def callback_funcs_for_evaluate(self, message: Message): content=metrics)) def callback_funcs_for_finish(self, message: Message): + """ + The handling function for receiving the signal of finishing the FL course + + Arguments: + message: The received message + """ logger.info( "================= receiving Finish Message ============================" ) diff --git a/federatedscope/core/worker/server.py b/federatedscope/core/worker/server.py index 84e019b70..8aa1e0415 100644 --- a/federatedscope/core/worker/server.py +++ b/federatedscope/core/worker/server.py @@ -20,7 +20,9 @@ class Server(Worker): """ The Server class, which describes the behaviors of server in an FL course. - The attributes include: + The behaviors are described by the handled functions (named as callback_funcs_for_xxx). + + Arguments: ID: The unique ID of the server, which is set to 0 by default state: The training round config: the configuration @@ -30,7 +32,6 @@ class Server(Worker): total_round_num: The total number of the training round device: The device to run local training and evaluation strategy: redundant attribute - The behaviors are described by the handled functions (named as callback_funcs_for_xxx) """ def __init__(self, ID=-1, @@ -158,7 +159,11 @@ def register_noise_injector(self, func): def register_handlers(self, msg_type, callback_func): """ - To bind a message type with a handled function + To bind a message type with a handling function. + + Arguments: + msg_type (str): The defined message type + callback_func: The handling functions to handle the received message """ self.msg_handlers[msg_type] = callback_func @@ -170,7 +175,7 @@ def _register_default_handlers(self): def run(self): """ - To start the FL course, listen and handle messages (for distributed mode) + To start the FL course, listen and handle messages (for distributed mode). """ # Begin: Broadcast model parameters and start to FL train @@ -226,7 +231,10 @@ def check_and_move_on(self, check_eval_result=False, min_received_num=None): """ - To check the message_buffer, when enough messages are receiving, trigger some events (such as perform aggregation, evaluation, and move to the next training round) + To check the message_buffer. When enough messages are receiving, some events (such as perform aggregation, evaluation, and move to the next training round) would be triggered. + + Arguments: + check_eval_result (bool): If True, check the message buffer for evaluation; and check the message buffer for training otherwise. """ if min_received_num is None: min_received_num = self._cfg.federate.sample_client_num @@ -307,8 +315,9 @@ def check_and_move_on(self, def check_and_save(self): """ - To save the results and save model after each evaluation + To save the results and save model after each evaluation. """ + # early stopping should_stop = False @@ -341,6 +350,10 @@ def check_and_save(self): self.state += 1 def save_best_results(self): + """ + To Save the best evaluation results. + """ + if self._cfg.federate.save_to != '': self.aggregator.save_model(self._cfg.federate.save_to, self.state) formatted_best_res = self._monitor.format_eval_res( @@ -359,12 +372,13 @@ def save_formatted_results(self, formatted_res): def merge_eval_results_from_all_clients(self, final_round=False): """ - Merge evaluation results from all clients, - update best, log the merged results and save then into eval_results.log + Merge evaluation results from all clients, update best, log the merged results and save then into eval_results.log - :param final_round: - :return: + Arguments: + final_round (bool): Whether it is the final round of training + :returns: the formatted merged results """ + round = max(self.msg_buffer['eval'].keys()) eval_msg_buffer = self.msg_buffer['eval'][round] metrics_all_clients = dict() @@ -403,7 +417,12 @@ def broadcast_model_para(self, sample_client_num=-1): """ To broadcast the message to all clients or sampled clients + + Arguments: + msg_type: 'model_para' or other user defined msg_type + sample_client_num: the number of sampled clients in the broadcast behavior. And sample_client_num = -1 denotes to broadcast to all the clients. """ + if sample_client_num > 0: receiver = np.random.choice(np.arange(1, self.client_num + 1), size=sample_client_num, @@ -442,6 +461,7 @@ def broadcast_client_address(self): """ To broadcast the communication addresses of clients (used for additive secret sharing) """ + self.comm_manager.send( Message(msg_type='address', sender=self.ID, @@ -463,6 +483,7 @@ def check_buffer(self, :returns: Whether enough messages have been received or not :rtype: bool """ + if check_eval_result: if 'eval' not in self.msg_buffer.keys() or len( self.msg_buffer['eval'].keys()) == 0: @@ -479,6 +500,10 @@ def check_buffer(self, return True def check_client_join_in(self): + """ + To check whether all the clients have joined in the FL course. + """ + if len(self._cfg.federate.join_in_info) != 0: return len(self.join_in_info) == self.client_num else: @@ -488,6 +513,7 @@ def trigger_for_start(self): """ To start the FL course when the expected number of clients have joined """ + if self.check_client_join_in(): if self._cfg.federate.use_ss: self.broadcast_client_address() @@ -518,6 +544,7 @@ def eval(self): """ To conduct evaluation. When cfg.federate.make_global_eval=True, a global evaluation is conducted by the server. """ + if self._cfg.federate.make_global_eval: # By default, the evaluation is conducted one-by-one for all internal models; # for other cases such as ensemble, override the eval function @@ -550,6 +577,14 @@ def eval(self): self.broadcast_model_para(msg_type='evaluate') def callback_funcs_model_para(self, message: Message): + """ + The handling function for receiving model parameters, which triggers check_and_move_on (perform aggregation when enough feedback has been received). + This handling function is widely used in various FL courses. + + Arguments: + message: The received message, which includes sender, receiver, state, and content. More detail can be found in federatedscope.core.message + """ + round, sender, content = message.state, message.sender, message.content # For a new round if round not in self.msg_buffer['train'].keys(): @@ -563,6 +598,14 @@ def callback_funcs_model_para(self, message: Message): return self.check_and_move_on() def callback_funcs_for_join_in(self, message: Message): + """ + The handling function for receiving the join in information. The server might request for some information (such as num_of_samples) if necessary, assign IDs for the servers. + If all the clients have joined in, the training process will be triggered. + + Arguments: + message: The received message + """ + if 'info' in message.msg_type: sender, info = message.sender, message.content for key in self._cfg.federate.join_in_info: @@ -598,6 +641,13 @@ def callback_funcs_for_join_in(self, message: Message): self.trigger_for_start() def callback_funcs_for_metrics(self, message: Message): + """ + The handling function for receiving the evaluation results, which triggers check_and_move_on (perform aggregation when enough feedback has been received). + + Arguments: + message: The received message + """ + round, sender, content = message.state, message.sender, message.content if round not in self.msg_buffer['eval'].keys():