diff --git a/fedn/fedn/common/net/grpc/fedn.proto b/fedn/fedn/common/net/grpc/fedn.proto index ff0ee293c..23c6a9375 100644 --- a/fedn/fedn/common/net/grpc/fedn.proto +++ b/fedn/fedn/common/net/grpc/fedn.proto @@ -203,7 +203,6 @@ message ReportResponse { service Control { rpc Start(ControlRequest) returns (ControlResponse); rpc Stop(ControlRequest) returns (ControlResponse); - rpc Configure(ControlRequest) returns (ReportResponse); rpc FlushAggregationQueue(ControlRequest) returns (ControlResponse); rpc Report(ControlRequest) returns (ReportResponse); } @@ -253,9 +252,7 @@ service Combiner { rpc ModelValidationRequestStream (ClientAvailableMessage) returns (stream ModelValidationRequest); rpc ModelValidationStream (ClientAvailableMessage) returns (stream ModelValidation); - rpc SendModelUpdateRequest (ModelUpdateRequest) returns (Response); rpc SendModelUpdate (ModelUpdate) returns (Response); - rpc SendModelValidationRequest (ModelValidationRequest) returns (Response); rpc SendModelValidation (ModelValidation) returns (Response); } diff --git a/fedn/fedn/network/api/interface.py b/fedn/fedn/network/api/interface.py index e56462493..f382ce23d 100644 --- a/fedn/fedn/network/api/interface.py +++ b/fedn/fedn/network/api/interface.py @@ -35,19 +35,6 @@ def _to_dict(self): data = {"name": self.name} return data - def _get_combiner_report(self, combiner_id): - """Get report response from combiner. - - :param combiner_id: The combiner id to get report response from. - :type combiner_id: str - ::return: The report response from combiner. - ::rtype: dict - """ - # Get CombinerInterface (fedn.network.combiner.inferface.CombinerInterface) for combiner_id - combiner = self.control.network.get_combiner(combiner_id) - report = combiner.report - return report - def _allowed_file_extension( self, filename, ALLOWED_EXTENSIONS={"gz", "bz2", "tar", "zip", "tgz"} ): @@ -91,29 +78,6 @@ def get_clients(self, limit=None, skip=None, status=False): return jsonify(result) - def get_active_clients(self, combiner_id): - """Get all active clients, i.e that are assigned to a combiner. - A report request to the combiner is neccessary to determine if a client is active or not. - - :param combiner_id: The combiner id to get active clients for. - :type combiner_id: str - :return: All active clients as a json response. - :rtype: :class:`flask.Response` - """ - # Get combiner interface object - combiner = self.control.network.get_combiner(combiner_id) - if combiner is None: - return ( - jsonify( - { - "success": False, - "message": f"Combiner {combiner_id} not found.", - } - ), - 404, - ) - response = combiner.list_active_clients() - return response def get_all_combiners(self, limit=None, skip=None): """Get all combiners from the statestore. @@ -154,7 +118,6 @@ def get_combiner(self, combiner_id): "fqdn": object["fqdn"], "parent_reducer": object["parent"]["name"], "port": object["port"], - "report": object["report"], "updated_at": object["updated_at"], } payload[id] = info @@ -836,8 +799,7 @@ def start_session( clients_available = 0 for combiner in self.control.network.get_combiners(): try: - combiner_state = combiner.report() - nr_active_clients = combiner_state["nr_active_clients"] + nr_active_clients = len(combiner.get_active_clients()) clients_available = clients_available + int(nr_active_clients) except CombinerUnavailableError as e: # TODO: Handle unavailable combiner, stop session or continue? diff --git a/fedn/fedn/network/api/network.py b/fedn/fedn/network/api/network.py index 6fcaad053..b47273a49 100644 --- a/fedn/fedn/network/api/network.py +++ b/fedn/fedn/network/api/network.py @@ -154,18 +154,3 @@ def get_client_info(self): :rtype: list(ObjectId) """ return self.statestore.list_clients() - - def describe(self): - """ Describe the network. - - :return: The network description - :rtype: dict - """ - network = [] - for combiner in self.get_combiners(): - try: - network.append(combiner.report()) - except CombinerUnavailableError: - # TODO, do better here. - pass - return network diff --git a/fedn/fedn/network/clients/client.py b/fedn/fedn/network/clients/client.py index 44b8e8a4e..9afef9956 100644 --- a/fedn/fedn/network/clients/client.py +++ b/fedn/fedn/network/clients/client.py @@ -441,29 +441,35 @@ def _listen_to_model_update_request_stream(self): # Add client to metadata self._add_grpc_metadata('client', self.name) - while True: + while self._attached: try: for request in self.combinerStub.ModelUpdateRequestStream(r, metadata=self.metadata): if request.sender.role == fedn.COMBINER: # Process training request self._send_status("Received model update request.", log_level=fedn.Status.AUDIT, type=fedn.StatusType.MODEL_UPDATE_REQUEST, request=request) + logger.info("Received model update request.") self.inbox.put(('train', request)) - if not self._attached: - return except grpc.RpcError as e: - _ = e.code() - except grpc.RpcError: - # TODO: make configurable - timeout = 5 - time.sleep(timeout) - except Exception: - raise + # Handle gRPC errors + status_code = e.code() + if status_code == grpc.StatusCode.UNAVAILABLE: + logger.warning("GRPC server unavailable during model update request stream. Retrying.") + # Retry after a delay + time.sleep(5) + else: + # Log the error and continue + logger.error(f"An error occurred during model update request stream: {e}") - if not self._attached: - return + except Exception as ex: + # Handle other exceptions + logger.error(f"An error occurred during model update request stream: {ex}") + + # Detach if not attached + if not self._attached: + return def _listen_to_model_validation_request_stream(self): """Subscribe to the model validation request stream. @@ -479,17 +485,27 @@ def _listen_to_model_validation_request_stream(self): try: for request in self.combinerStub.ModelValidationRequestStream(r, metadata=self.metadata): # Process validation request - _ = request.model_id - self._send_status("Recieved model validation request.", log_level=fedn.Status.AUDIT, - type=fedn.StatusType.MODEL_VALIDATION_REQUEST, request=request) + model_id = request.model_id + self._send_status("Received model validation request for model_id {}".format(model_id), + log_level=fedn.Status.AUDIT, type=fedn.StatusType.MODEL_VALIDATION_REQUEST, + request=request) + logger.info("Received model validation request for model_id {}".format(model_id)) self.inbox.put(('validate', request)) - except grpc.RpcError: - # TODO: make configurable - timeout = 5 - time.sleep(timeout) - except Exception: - raise + except grpc.RpcError as e: + # Handle gRPC errors + status_code = e.code() + if status_code == grpc.StatusCode.UNAVAILABLE: + logger.warning("GRPC server unavailable during model validation request stream. Retrying.") + # Retry after a delay + time.sleep(5) + else: + # Log the error and continue + logger.error(f"An error occurred during model validation request stream: {e}") + + except Exception as ex: + # Handle other exceptions + logger.error(f"An error occurred during model validation request stream: {ex}") if not self._attached: return diff --git a/fedn/fedn/network/combiner/aggregators/aggregatorbase.py b/fedn/fedn/network/combiner/aggregators/aggregatorbase.py index bcbb699e2..7e5665f44 100644 --- a/fedn/fedn/network/combiner/aggregators/aggregatorbase.py +++ b/fedn/fedn/network/combiner/aggregators/aggregatorbase.py @@ -4,6 +4,7 @@ from abc import ABC, abstractmethod import fedn.common.net.grpc.fedn_pb2 as fedn +from fedn.common.log_config import logger AGGREGATOR_PLUGIN_PATH = "fedn.network.combiner.aggregators.{}" @@ -60,7 +61,7 @@ def on_model_update(self, model_update): :type model_id: str """ try: - self.server.report_status("AGGREGATOR({}): callback received model update {}".format(self.name, model_update.model_update_id), + logger.info("AGGREGATOR({}): callback received model update {}".format(self.name, model_update.model_update_id), log_level=fedn.Status.INFO) # Validate the update and metadata @@ -69,9 +70,9 @@ def on_model_update(self, model_update): # Push the model update to the processing queue self.model_updates.put(model_update) else: - self.server.report_status("AGGREGATOR({}): Invalid model update, skipping.".format(self.name)) + logger.info("AGGREGATOR({}): Invalid model update, skipping.".format(self.name)) except Exception as e: - self.server.report_status("AGGREGATOR({}): Failed to receive model update! {}".format(self.name, e), + logger.info("AGGREGATOR({}): Failed to receive model update! {}".format(self.name, e), log_level=fedn.Status.WARNING) pass @@ -86,7 +87,7 @@ def _validate_model_update(self, model_update): # TODO: Validate the metadata to check that it contains all variables assumed by the aggregator. data = json.loads(model_update.meta)['training_metadata'] if 'num_examples' not in data.keys(): - self.server.report_status("AGGREGATOR({}): Model validation failed, num_examples missing in metadata.".format(self.name)) + logger.info("AGGREGATOR({}): Model validation failed, num_examples missing in metadata.".format(self.name)) return False return True diff --git a/fedn/fedn/network/combiner/aggregators/fedavg.py b/fedn/fedn/network/combiner/aggregators/fedavg.py index 0cd15b66a..ddc4f1bfd 100644 --- a/fedn/fedn/network/combiner/aggregators/fedavg.py +++ b/fedn/fedn/network/combiner/aggregators/fedavg.py @@ -1,5 +1,6 @@ import fedn.common.net.grpc.fedn_pb2 as fedn from fedn.network.combiner.aggregators.aggregatorbase import AggregatorBase +from fedn.common.log_config import logger class Aggregator(AggregatorBase): @@ -50,14 +51,14 @@ def combine_models(self, helper=None, time_window=180, max_nr_models=100, delete nr_aggregated_models = 0 total_examples = 0 - self.server.report_status( + logger.info( "AGGREGATOR({}): Aggregating model updates... ".format(self.name)) while not self.model_updates.empty(): try: # Get next model from queue model_next, metadata, model_id = self.next_model_update(helper) - self.server.report_status( + logger.info( "AGGREGATOR({}): Processing model update {}, metadata: {} ".format(self.name, model_id, metadata)) # Increment total number of examples @@ -73,16 +74,16 @@ def combine_models(self, helper=None, time_window=180, max_nr_models=100, delete # Delete model from storage if delete_models: self.modelservice.models.delete(model_id) - self.server.report_status( + logger.info( "AGGREGATOR({}): Deleted model update {} from storage.".format(self.name, model_id)) self.model_updates.task_done() except Exception as e: - self.server.report_status( + logger.info( "AGGREGATOR({}): Error encoutered while processing model update {}, skipping this update.".format(self.name, e)) self.model_updates.task_done() data['nr_aggregated_models'] = nr_aggregated_models - self.server.report_status("AGGREGATOR({}): Aggregation completed, aggregated {} models.".format(self.name, nr_aggregated_models), + logger.info("AGGREGATOR({}): Aggregation completed, aggregated {} models.".format(self.name, nr_aggregated_models), log_level=fedn.Status.INFO) return model, data diff --git a/fedn/fedn/network/combiner/interfaces.py b/fedn/fedn/network/combiner/interfaces.py index 6dfb0428d..1bd039f84 100644 --- a/fedn/fedn/network/combiner/interfaces.py +++ b/fedn/fedn/network/combiner/interfaces.py @@ -4,7 +4,6 @@ from io import BytesIO import grpc -from google.protobuf.json_format import MessageToJson import fedn.common.net.grpc.fedn_pb2 as fedn import fedn.common.net.grpc.fedn_pb2_grpc as rpc @@ -137,11 +136,6 @@ def to_dict(self): data['certificate'] = str(cert_b64).split('\'')[1] data['key'] = str(key_b64).split('\'')[1] - try: - data['report'] = self.report() - except CombinerUnavailableError: - data['report'] = None - return data def to_json(self): @@ -176,55 +170,6 @@ def get_key(self): else: return None - def report(self): - """ Recieve a status report from the combiner. - - :return: A dictionary describing the combiner state. - :rtype: dict - :raises CombinerUnavailableError: If the combiner is unavailable. - """ - channel = Channel(self.address, self.port, - self.certificate).get_channel() - control = rpc.ControlStub(channel) - request = fedn.ControlRequest() - try: - response = control.Report(request) - data = {} - for p in response.parameter: - data[p.key] = p.value - return data - except grpc.RpcError as e: - if e.code() == grpc.StatusCode.UNAVAILABLE: - raise CombinerUnavailableError - else: - raise - - def configure(self, config=None): - """ Configure the combiner. Set the parameters in config at the server. - - :param config: A dictionary containing parameters. - :type config: dict - """ - if not config: - config = self.config - channel = Channel(self.address, self.port, - self.certificate).get_channel() - control = rpc.ControlStub(channel) - - request = fedn.ControlRequest() - for key, value in config.items(): - p = request.parameter.add() - p.key = key - p.value = str(value) - - try: - control.Configure(request) - except grpc.RpcError as e: - if e.code() == grpc.StatusCode.UNAVAILABLE: - raise CombinerUnavailableError - else: - raise - def flush_model_update_queue(self): """ Reset the model update queue on the combiner. """ @@ -322,9 +267,12 @@ def allowing_clients(self): return False - def list_active_clients(self): + def list_active_clients(self, queue=1): """ List active clients. + :param queue: The channel (queque) to use (optional). Default is 1 = MODEL_UPDATE_REQUESTS channel. + see :class:`fedn.common.net.grpc.fedn_pb2.Channel` + :type channel: int :return: A list of active clients. :rtype: json """ @@ -332,6 +280,7 @@ def list_active_clients(self): self.certificate).get_channel() control = rpc.ConnectorStub(channel) request = fedn.ListClientsRequest() + request.channel = queue try: response = control.ListActiveClients(request) except grpc.RpcError as e: @@ -339,4 +288,4 @@ def list_active_clients(self): raise CombinerUnavailableError else: raise - return MessageToJson(response) + return response.client diff --git a/fedn/fedn/network/combiner/round.py b/fedn/fedn/network/combiner/round.py index dd41deee3..8c6867c91 100644 --- a/fedn/fedn/network/combiner/round.py +++ b/fedn/fedn/network/combiner/round.py @@ -6,6 +6,7 @@ from fedn.network.combiner.aggregators.aggregatorbase import get_aggregator from fedn.utils.helpers import get_helper +from fedn.common.log_config import logger class ModelUpdateError(Exception): @@ -49,8 +50,8 @@ def push_round_config(self, round_config): round_config['_job_id'] = str(uuid.uuid4()) self.round_configs.put(round_config) except Exception: - self.server.report_status( - "ROUNDCONTROL: Failed to push round config.", flush=True) + logger.warning( + "ROUNDCONTROL: Failed to push round config.") raise return round_config['_job_id'] @@ -68,7 +69,7 @@ def load_model_update(self, helper, model_id): try: model = self.modelservice.load_model_from_BytesIO(model_str.getbuffer(), helper) except IOError: - self.server.report_status( + logger.warning( "AGGREGATOR({}): Failed to load model!".format(self.name)) else: raise ModelUpdateError("Failed to load model.") @@ -95,8 +96,8 @@ def load_model_update_str(self, model_id, retry=3): while tries < retry: tries += 1 if not model_str or sys.getsizeof(model_str) == 80: - self.server.report_status( - "ROUNDCONTROL: Model download failed. retrying", flush=True) + logger.warning( + "ROUNDCONTROL: Model download failed. retrying") time.sleep(1) model_str = self.modelservice.get_model(model_id) @@ -139,7 +140,7 @@ def _training_round(self, config, clients): :rtype: model, dict """ - self.server.report_status( + logger.info( "ROUNDCONTROL: Initiating training round, participating clients: {}".format(clients)) meta = {} @@ -165,7 +166,7 @@ def _training_round(self, config, clients): try: helper = get_helper(config['helper_type']) - print("ROUNDCONTROL: Config delete_models_storage: {}".format(config['delete_models_storage']), flush=True) + logger.info("ROUNDCONTROL: Config delete_models_storage: {}".format(config['delete_models_storage'])) if config['delete_models_storage'] == 'True': delete_models = True else: @@ -173,7 +174,7 @@ def _training_round(self, config, clients): model, data = self.aggregator.combine_models(helper=helper, delete_models=delete_models) except Exception as e: - print("AGGREGATION FAILED AT COMBINER! {}".format(e), flush=True) + logger.warning("AGGREGATION FAILED AT COMBINER! {}".format(e)) meta['time_combination'] = time.time() - tic meta['aggregation_time'] = data @@ -204,9 +205,9 @@ def stage_model(self, model_id, timeout_retry=3, retry=2): # If the model is already in memory at the server we do not need to do anything. if self.modelservice.models.exist(model_id): - print("ROUNDCONTROL: Model already exists in memory, skipping model staging.", flush=True) + logger.info("ROUNDCONTROL: Model already exists in memory, skipping model staging.") return - print("ROUNDCONTROL: Model Staging, fetching model from storage...", flush=True) + print("ROUNDCONTROL: Model Staging, fetching model from storage...") # If not, download it and stage it in memory at the combiner. tries = 0 while True: @@ -215,13 +216,13 @@ def stage_model(self, model_id, timeout_retry=3, retry=2): if model: break except Exception: - self.server.report_status("ROUNDCONTROL: Could not fetch model from storage backend, retrying.", + logger.info("ROUNDCONTROL: Could not fetch model from storage backend, retrying.", flush=True) time.sleep(timeout_retry) tries += 1 if tries > retry: - self.server.report_status( - "ROUNDCONTROL: Failed to stage model {} from storage backend!".format(model_id), flush=True) + logger.info( + "ROUNDCONTROL: Failed to stage model {} from storage backend!".format(model_id)) return self.modelservice.set_model(model, model_id) @@ -242,8 +243,8 @@ def _assign_round_clients(self, n, type="trainers"): elif type == "trainers": clients = self.server.get_active_trainers() else: - self.server.report_status( - "ROUNDCONTROL(ERROR): {} is not a supported type of client".format(type), flush=True) + logger.info( + "ROUNDCONTROL(ERROR): {} is not a supported type of client".format(type)) raise # If the number of requested trainers exceeds the number of available, use all available. @@ -274,9 +275,9 @@ def _check_nr_round_clients(self, config, timeout=0.0): if active >= int(config['clients_requested']): return True else: - self.server.report_status("waiting for {} clients to get started, currently: {}".format( + logger.info("waiting for {} clients to get started, currently: {}".format( int(config['clients_requested']) - active, - active), flush=True) + active)) if t >= timeout: if active >= int(config['clients_required']): return True @@ -295,7 +296,7 @@ def execute_validation_round(self, round_config): :type round_config: dict """ model_id = round_config['model_id'] - self.server.report_status( + logger.info( "COMBINER orchestrating validation of model {}".format(model_id)) self.stage_model(model_id) validators = self._assign_round_clients( @@ -311,8 +312,8 @@ def execute_training_round(self, config): :rtype: dict """ - self.server.report_status( - "ROUNDCONTROL: Processing training round, job_id {}".format(config['_job_id']), flush=True) + logger.info( + "ROUNDCONTROL: Processing training round, job_id {}".format(config['_job_id'])) data = {} data['config'] = config @@ -326,7 +327,7 @@ def execute_training_round(self, config): data['data'] = meta if model is None: - self.server.report_status( + logger.warning( "\t Failed to update global model in round {0}!".format(config['round_id'])) if model is not None: @@ -338,8 +339,8 @@ def execute_training_round(self, config): a.close() data['model_id'] = model_id - self.server.report_status( - "ROUNDCONTROL: TRAINING ROUND COMPLETED. Aggregated model id: {}, Job id: {}".format(model_id, config['_job_id']), flush=True) + logger.info( + "ROUNDCONTROL: TRAINING ROUND COMPLETED. Aggregated model id: {}, Job id: {}".format(model_id, config['_job_id'])) return data @@ -370,14 +371,14 @@ def run(self, polling_interval=1.0): elif round_config['task'] == 'validation' or round_config['task'] == 'inference': self.execute_validation_round(round_config) else: - self.server.report_status( - "ROUNDCONTROL: Round config contains unkown task type.", flush=True) + logger.warning( + "ROUNDCONTROL: Round config contains unkown task type.") else: round_meta = {} round_meta['status'] = "Failed" round_meta['reason'] = "Failed to meet client allocation requirements for this round config." - self.server.report_status( - "ROUNDCONTROL: {0}".format(round_meta['reason']), flush=True) + logger.warning( + "ROUNDCONTROL: {0}".format(round_meta['reason'])) self.round_configs.task_done() except queue.Empty: diff --git a/fedn/fedn/network/combiner/server.py b/fedn/fedn/network/combiner/server.py index 7a9c87ff9..19eba6412 100644 --- a/fedn/fedn/network/combiner/server.py +++ b/fedn/fedn/network/combiner/server.py @@ -12,6 +12,8 @@ import fedn.common.net.grpc.fedn_pb2 as fedn import fedn.common.net.grpc.fedn_pb2_grpc as rpc +from fedn.common.log_config import (logger, set_log_level_from_string, + set_log_stream) from fedn.common.net.grpc.server import Server from fedn.common.storage.s3.s3repo import S3ModelRepository from fedn.common.tracer.mongotracer import MongoTracer @@ -58,6 +60,9 @@ class Combiner(rpc.CombinerServicer, rpc.ReducerServicer, rpc.ConnectorServicer, def __init__(self, config): """ Initialize Combiner server.""" + set_log_level_from_string(config.get('verbosity', "INFO")) + set_log_stream(config.get('logfile', None)) + # Client queues self.clients = {} @@ -83,26 +88,23 @@ def __init__(self, config): secure=config['secure'], verify=config['verify']) - response = None while True: - # announce combiner to discover service + # Announce combiner to discover service status, response = announce_client.announce() if status == Status.TryAgain: - print(response, flush=True) + logger.info(response) time.sleep(5) - continue - if status == Status.Assigned: + elif status == Status.Assigned: announce_config = response - print( - "COMBINER {0}: Announced successfully".format(self.id), flush=True) + logger.info("COMBINER {0}: Announced successfully".format(self.id)) break - if status == Status.UnAuthorized: - print(response, flush=True) - print("Status.UnAuthorized", flush=True) + elif status == Status.UnAuthorized: + logger.info(response) + logger.info("Status.UnAuthorized") sys.exit("Exiting: Unauthorized") - if status == Status.UnMatchedConfig: - print(response, flush=True) - print("Status.UnMatchedConfig", flush=True) + elif status == Status.UnMatchedConfig: + logger.info(response) + logger.info("Status.UnMatchedConfig") sys.exit("Exiting: Missing config") cert = announce_config['certificate'] @@ -152,23 +154,6 @@ def __whoami(self, client, instance): client.role = role_to_proto_role(instance.role) return client - def report_status(self, msg, log_level=fedn.Status.INFO, type=None, request=None, flush=True): - """ Report status of the combiner. - - :param msg: the message to report - :type msg: str - :param log_level: the log level to report at - :type log_level: :class:`fedn.common.net.grpc.fedn_pb2.Status` - :param type: the type of status to report - :type type: :class:`fedn.common.net.grpc.fedn_pb2.Status.Type` - :param request: the request to report status for - :type request: :class:`fedn.common.net.grpc.fedn_pb2.Request` - :param flush: whether to flush the message to stdout - :type flush: bool - """ - print("{}:COMBINER({}):{} {}".format(datetime.now().strftime( - '%Y-%m-%d %H:%M:%S'), self.id, log_level, msg), flush=flush) - def request_model_update(self, config, clients=[]): """ Ask clients to update the current global model. @@ -178,11 +163,10 @@ def request_model_update(self, config, clients=[]): :type clients: list """ - + # The request to be added to the client queue request = fedn.ModelUpdateRequest() - self.__whoami(request.sender, self) request.model_id = config['model_id'] - request.correlation_id = str(uuid.uuid4()) + request.correlation_id = str(uuid.uuid4()) # Obesolete? request.timestamp = str(datetime.now()) request.data = json.dumps(config) @@ -192,11 +176,14 @@ def request_model_update(self, config, clients=[]): for client in clients: request.receiver.name = client.name request.receiver.role = fedn.WORKER - _ = self.SendModelUpdateRequest(request, self) - # TODO: Check response + self._put_request_to_client_queue(request, fedn.Channel.MODEL_UPDATE_REQUESTS) - print("COMBINER: Sent model update request for model {} to clients {}".format( - request.model_id, clients), flush=True) + if len(clients) < 20: + logger.info("Sent model update request for model {} to clients {}".format( + request.model_id, clients)) + else: + logger.info("Sent model update request for model {} to {} clients".format( + request.model_id, len(clients))) def request_model_validation(self, model_id, config, clients=[]): """ Ask clients to validate the current global model. @@ -209,11 +196,10 @@ def request_model_validation(self, model_id, config, clients=[]): :type clients: list """ - + # The request to be added to the client queue request = fedn.ModelValidationRequest() - self.__whoami(request.sender, self) request.model_id = model_id - request.correlation_id = str(uuid.uuid4()) + request.correlation_id = str(uuid.uuid4()) # Obsolete? request.timestamp = str(datetime.now()) request.is_inference = (config['task'] == 'inference') @@ -223,24 +209,14 @@ def request_model_validation(self, model_id, config, clients=[]): for client in clients: request.receiver.name = client.name request.receiver.role = fedn.WORKER - self.SendModelValidationRequest(request, self) - - print("COMBINER: Sent validation request for model {} to clients {}".format( - model_id, clients), flush=True) - - def _list_clients(self, channel): - """ List active clients on a channel. + self._put_request_to_client_queue(request, fedn.Channel.MODEL_VALIDATION_REQUESTS) - :param channel: the channel to list clients for, for example MODEL_UPDATE_REQUESTS - :type channel: :class:`fedn.common.net.grpc.fedn_pb2.Channel` - :return: the list of active clients - :rtype: list - """ - request = fedn.ListClientsRequest() - self.__whoami(request.sender, self) - request.channel = channel - clients = self.ListActiveClients(request, self) - return clients.client + if len(clients) < 20: + logger.info("Sent model validation request for model {} to clients {}".format( + request.model_id, clients)) + else: + logger.info("Sent model validation request for model {} to {} clients".format( + request.model_id, len(clients))) def get_active_trainers(self): """ Get a list of active trainers. @@ -248,7 +224,7 @@ def get_active_trainers(self): :return: the list of active trainers :rtype: list """ - trainers = self._list_clients(fedn.Channel.MODEL_UPDATE_REQUESTS) + trainers = self._list_active_clients(fedn.Channel.MODEL_UPDATE_REQUESTS) return trainers def get_active_validators(self): @@ -257,7 +233,7 @@ def get_active_validators(self): :return: the list of active validators :rtype: list """ - validators = self._list_clients(fedn.Channel.MODEL_VALIDATION_REQUESTS) + validators = self._list_active_clients(fedn.Channel.MODEL_VALIDATION_REQUESTS) return validators def nr_active_trainers(self): @@ -316,46 +292,55 @@ def __get_queue(self, client, queue_name): except KeyError: raise - def _send_request(self, request, queue_name): - """ Send a request to a client. + def _list_subscribed_clients(self, queue_name): + """ List all clients subscribed to a queue. - :param request: the request to send - :type request: :class:`fedn.common.net.grpc.fedn_pb2.Request` - :param queue_name: the name of the queue to send the request to + :param queue_name: the name of the queue :type queue_name: str + :return: a list of client names + :rtype: list """ - self.__route_request_to_client(request, request.receiver, queue_name) + subscribed_clients = [] + for name, client in self.clients.items(): + if queue_name in client.keys(): + subscribed_clients.append(name) + return subscribed_clients - def _broadcast_request(self, request, queue_name): - """ Publish a request to all subscribed members. + def _list_active_clients(self, channel): + """ List all clients that have sent a status message in the last 10 seconds. - :param request: the request to send - :type request: :class:`fedn.common.net.grpc.fedn_pb2.Request` - :param queue_name: the name of the queue to send the request to - :type queue_name: str + :param channel: the name of the channel + :type channel: str + :return: a list of client names + :rtype: list """ - active_clients = self._list_active_clients() - for client in active_clients: - self.clients[client.name][queue_name].put(request) + active_clients = [] + for client in self._list_subscribed_clients(channel): + # This can break with different timezones. + now = datetime.now() + then = self.clients[client]["lastseen"] + # TODO: move the heartbeat timeout to config. + if (now - then) < timedelta(seconds=10): + active_clients.append(client) + return active_clients - def __route_request_to_client(self, request, client, queue_name): - """ Route a request to a client. + def _put_request_to_client_queue(self, request, queue_name): + """ Get a client specific queue and add a request to it. + The client is identified by the request.receiver. :param request: the request to send :type request: :class:`fedn.common.net.grpc.fedn_pb2.Request` - :param client: the client to send the request to - :type client: :class:`fedn.common.net.grpc.fedn_pb2.Client` :param queue_name: the name of the queue to send the request to :type queue_name: str - - :raises Exception: if the request could not be routed, direct cause of KeyError in __get_queue """ try: - q = self.__get_queue(client, queue_name) + q = self.__get_queue(request.receiver, queue_name) q.put(request) - except Exception: - print("Failed to route request to client: {} {}", - request.receiver, queue_name) + except Exception as e: + logger.error("Failed to put request to client queue {} for client {}: {}".format( + queue_name, + request.receiver.name, + str(e))) raise def _send_status(self, status): @@ -367,17 +352,11 @@ def _send_status(self, status): self.tracer.report_status(status) - def __register_heartbeat(self, client): - """ Register a client if first time connecting. Update heartbeat timestamp. + def _flush_model_update_queue(self): + """Clear the model update queue (aggregator). - :param client: the client to register - :type client: :class:`fedn.common.net.grpc.fedn_pb2.Client` + :return: True if successful, else False """ - self.__join_client(client) - self.clients[client.name]["lastseen"] = datetime.now() - - def flush_model_update_queue(self): - """Clear the model update queue (aggregator). """ q = self.control.aggregator.model_updates try: @@ -386,7 +365,8 @@ def flush_model_update_queue(self): q.all_tasks_done.notify_all() q.unfinished_tasks = 0 return True - except Exception: + except Exception as e: + logger.error("Failed to flush model update queue: %s", str(e)) return False ##################################################################################################################### @@ -418,25 +398,6 @@ def Start(self, control: fedn.ControlRequest, context): return response - # RPCs related to remote configuration of the server, round controller, - # aggregator and their states. - - def Configure(self, control: fedn.ControlRequest, context): - """ Configure the Combiner. - - :param control: the control request - :type control: :class:`fedn.common.net.grpc.fedn_pb2.ControlRequest` - :param context: the context (unused) - :type context: :class:`grpc._server._Context` - :return: the control response - :rtype: :class:`fedn.common.net.grpc.fedn_pb2.ControlResponse` - """ - for parameter in control.parameter: - setattr(self, parameter.key, parameter.value) - - response = fedn.ControlResponse() - return response - def FlushAggregationQueue(self, control: fedn.ControlRequest, context): """ Flush the queue. @@ -448,7 +409,7 @@ def FlushAggregationQueue(self, control: fedn.ControlRequest, context): :rtype: :class:`fedn.common.net.grpc.fedn_pb2.ControlResponse` """ - status = self.flush_model_update_queue() + status = self._flush_model_update_queue() response = fedn.ControlResponse() if status: @@ -474,72 +435,10 @@ def Stop(self, control: fedn.ControlRequest, context): print("\n RECIEVED **STOP** from Controller\n", flush=True) return response - def Report(self, control: fedn.ControlRequest, context): - """ Describe current state of the Combiner. - - :param control: the control request - :type control: :class:`fedn.common.net.grpc.fedn_pb2.ControlRequest` - :param context: the context (unused) - :type context: :class:`grpc._server._Context` - :return: the control response - :rtype: :class:`fedn.common.net.grpc.fedn_pb2.ControlResponse` - """ - - response = fedn.ControlResponse() - self.report_status("\n RECIEVED **REPORT** from Controller\n", - log_level=fedn.Status.INFO) - - control_state = self.control.aggregator.get_state() - self.report_status("Aggregator state: {}".format(control_state), log_level=fedn.Status.INFO) - p = response.parameter.add() - for key, value in control_state.items(): - p.key = str(key) - p.value = str(value) - - active_trainers = self.get_active_trainers() - p = response.parameter.add() - p.key = "nr_active_trainers" - p.value = str(len(active_trainers)) - - active_validators = self.get_active_validators() - p = response.parameter.add() - p.key = "nr_active_validators" - p.value = str(len(active_validators)) - - active_trainers_ = self.get_active_trainers() - active_trainers = [] - for client in active_trainers_: - active_trainers.append(client) - p = response.parameter.add() - p.key = "active_trainers" - p.value = str(active_trainers) - - active_validators_ = self.get_active_validators() - active_validators = [] - for client in active_validators_: - active_validators.append(client) - p = response.parameter.add() - p.key = "active_validators" - p.value = str(active_validators) - - p = response.parameter.add() - p.key = "nr_active_clients" - p.value = str(len(active_trainers)+len(active_validators)) - - p = response.parameter.add() - p.key = "nr_unprocessed_compute_plans" - p.value = str(self.control.round_configs.qsize()) - - p = response.parameter.add() - p.key = "name" - p.value = str(self.id) - - return response - ##################################################################################################################### def SendStatus(self, status: fedn.Status, context): - """ A client stream RPC endpoint that accepts status messages. + """ A client RPC endpoint that accepts status messages. :param status: the status message :type status: :class:`fedn.common.net.grpc.fedn_pb2.Status` @@ -555,41 +454,6 @@ def SendStatus(self, status: fedn.Status, context): response.response = "Status received." return response - def _list_subscribed_clients(self, queue_name): - """ List all clients subscribed to a queue. - - :param queue_name: the name of the queue - :type queue_name: str - :return: a list of client names - :rtype: list - """ - subscribed_clients = [] - for name, client in self.clients.items(): - if queue_name in client.keys(): - subscribed_clients.append(name) - return subscribed_clients - - def _list_active_clients(self, channel): - """ List all clients that have sent a status message in the last 10 seconds. - - :param channel: the name of the channel - :type channel: str - :return: a list of client names - :rtype: list - """ - active_clients = [] - for client in self._list_subscribed_clients(channel): - # This can break with different timezones. - now = datetime.now() - then = self.clients[client]["lastseen"] - # TODO: move the heartbeat timeout to config. - if (now - then) < timedelta(seconds=10): - active_clients.append(client) - return active_clients - - def _drop_inactive_clients(self): - """ TODO: Not implemented. Clean up clients that have missed the heartbeat. """ - def ListActiveClients(self, request: fedn.ListClientsRequest, context): """ RPC endpoint that returns a ClientList containing the names of all active clients. An active client has sent a status message / responded to a heartbeat @@ -604,6 +468,8 @@ def ListActiveClients(self, request: fedn.ListClientsRequest, context): """ clients = fedn.ClientList() active_clients = self._list_active_clients(request.channel) + logger.info("Active clients: {}".format(active_clients)) + logger.info("All clients: {}".format(self.clients)) for client in active_clients: clients.client.append(fedn.Client(name=client, role=fedn.WORKER)) @@ -651,7 +517,14 @@ def SendHeartbeat(self, heartbeat: fedn.Heartbeat, context): :return: the response :rtype: :class:`fedn.common.net.grpc.fedn_pb2.Response` """ - self.__register_heartbeat(heartbeat.sender) + logger.debug("GRPC: Received heartbeat from {}".format(heartbeat.sender.name)) + # Update the clients dict with the last seen timestamp. + client = heartbeat.sender + self.__join_client(client) + self.clients[client.name]["lastseen"] = datetime.now() + # Send a status message to the tracer to update the client status. + self.tracer.update_client_status(client.name, "online") + response = fedn.Response() response.sender.name = heartbeat.sender.name response.sender.role = heartbeat.sender.role @@ -780,23 +653,6 @@ def ModelValidationRequestStream(self, response, context): except queue.Empty: pass - def SendModelUpdateRequest(self, request, context): - """ Send a model update request. - - :param request: the request - :type request: :class:`fedn.common.net.grpc.fedn_pb2.ModelUpdateRequest` - :param context: the context - :type context: :class:`grpc._server._Context` - :return: the response - :rtype: :class:`fedn.common.net.grpc.fedn_pb2.Response` - """ - self._send_request(request, fedn.Channel.MODEL_UPDATE_REQUESTS) - - response = fedn.Response() - response.response = "RECEIVED ModelUpdateRequest from client {}".format( - request.sender.name) - return response # TODO Fill later - def SendModelUpdate(self, request, context): """ Send a model update response. @@ -814,23 +670,6 @@ def SendModelUpdate(self, request, context): response, response.sender.name) return response # TODO Fill later - def SendModelValidationRequest(self, request, context): - """ Send a model validation request. - - :param request: the request - :type request: :class:`fedn.common.net.grpc.fedn_pb2.ModelValidationRequest` - :param context: the context - :type context: :class:`grpc._server._Context` - :return: the response - :rtype: :class:`fedn.common.net.grpc.fedn_pb2.Response` - """ - self._send_request(request, fedn.Channel.MODEL_VALIDATION_REQUESTS) - - response = fedn.Response() - response.response = "RECEIVED ModelValidationRequest from client {}".format( - request.sender.name) - return response # TODO Fill later - def register_model_validation(self, validation): """Register a model validation. @@ -850,8 +689,7 @@ def SendModelValidation(self, request, context): :return: the response :rtype: :class:`fedn.common.net.grpc.fedn_pb2.Response` """ - self.report_status("Recieved ModelValidation from {}".format(request.sender.name), - log_level=fedn.Status.INFO) + logger.info("Recieved ModelValidation from {}".format(request.sender.name)) self.register_model_validation(request) diff --git a/fedn/fedn/network/combiner/server_tests.py b/fedn/fedn/network/combiner/server_tests.py new file mode 100644 index 000000000..e69de29bb diff --git a/fedn/fedn/network/controller/controlbase.py b/fedn/fedn/network/controller/controlbase.py index fab6a2027..71f660e24 100644 --- a/fedn/fedn/network/controller/controlbase.py +++ b/fedn/fedn/network/controller/controlbase.py @@ -301,35 +301,27 @@ def get_participating_combiners(self, combiner_round_config): combiners = [] for combiner in self.network.get_combiners(): try: - combiner_state = combiner.report() + # Current gRPC endpoint only returns active clients (both trainers and validators) + nr_active_clients = len(combiner.list_active_clients()) except CombinerUnavailableError: self._handle_unavailable_combiner(combiner) combiner_state = None if combiner_state is not None: is_participating = self.evaluate_round_participation_policy( - combiner_round_config, combiner_state + combiner_round_config, nr_active_clients ) if is_participating: combiners.append((combiner, combiner_round_config)) return combiners def evaluate_round_participation_policy( - self, compute_plan, combiner_state + self, compute_plan, nr_active_clients ): """Evaluate policy for combiner round-participation. A combiner participates if it is responsive and reports enough active clients to participate in the round. """ - - if compute_plan["task"] == "training": - nr_active_clients = int(combiner_state["nr_active_trainers"]) - elif compute_plan["task"] == "validation": - nr_active_clients = int(combiner_state["nr_active_validators"]) - else: - print("Invalid task type!", flush=True) - return False - if int(compute_plan["clients_required"]) <= nr_active_clients: return True else: diff --git a/fedn/fedn/network/loadbalancer/leastpacked.py b/fedn/fedn/network/loadbalancer/leastpacked.py index 9e4aaba0d..c2fe9addb 100644 --- a/fedn/fedn/network/loadbalancer/leastpacked.py +++ b/fedn/fedn/network/loadbalancer/leastpacked.py @@ -23,12 +23,13 @@ def find_combiner(self): for combiner in self.network.get_combiners(): try: if combiner.allowing_clients(): - combiner_state = combiner.report() + # Using default default Channel = 1 + nr_active_clients = len(combiner.list_active_clients()) if not min_clients: - min_clients = combiner_state['nr_active_clients'] + min_clients = nr_active_clients selected_combiner = combiner - elif combiner_state['nr_active_clients'] < min_clients: - min_clients = combiner_state['nr_active_clients'] + elif nr_active_clients < min_clients: + min_clients = nr_active_clients selected_combiner = combiner except CombinerUnavailableError: pass