Skip to content

Commit

Permalink
Bugfix/SK-1241 | Fix missing ROLE and TYPE in client status messages (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
Wrede authored Dec 4, 2024
1 parent 1940e14 commit b5f0a02
Show file tree
Hide file tree
Showing 6 changed files with 143 additions and 126 deletions.
20 changes: 12 additions & 8 deletions fedn/network/clients/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -406,7 +406,7 @@ def _listen_to_task_stream(self):
# Process training request
self.send_status(
"Received model update request.",
log_level=fedn.Status.AUDIT,
log_level=fedn.LogLevel.AUDIT,
type=fedn.StatusType.MODEL_UPDATE_REQUEST,
request=request,
sesssion_id=request.session_id,
Expand Down Expand Up @@ -641,7 +641,7 @@ def process_request(self):
_ = self.combinerStub.SendModelUpdate(update, metadata=self.metadata)
self.send_status(
"Model update completed.",
log_level=fedn.Status.AUDIT,
log_level=fedn.LogLevel.AUDIT,
type=fedn.StatusType.MODEL_UPDATE,
request=update,
sesssion_id=request.session_id,
Expand All @@ -655,7 +655,7 @@ def process_request(self):
logger.debug(e)
else:
self.send_status(
"Client {} failed to complete model update.", log_level=fedn.Status.WARNING, request=request, sesssion_id=request.session_id
"Client {} failed to complete model update.", log_level=fedn.LogLevel.WARNING, request=request, sesssion_id=request.session_id
)

self.state = ClientState.idle
Expand Down Expand Up @@ -683,7 +683,11 @@ def process_request(self):

status_type = fedn.StatusType.MODEL_VALIDATION
self.send_status(
"Model validation completed.", log_level=fedn.Status.AUDIT, type=status_type, request=validation, sesssion_id=request.session_id
"Model validation completed.",
log_level=fedn.LogLevel.AUDIT,
type=status_type,
request=validation,
sesssion_id=request.session_id,
)
except grpc.RpcError as e:
status_code = e.code()
Expand All @@ -695,7 +699,7 @@ def process_request(self):
else:
self.send_status(
"Client {} failed to complete model validation.".format(self.name),
log_level=fedn.Status.WARNING,
log_level=fedn.LogLevel.WARNING,
request=request,
sesssion_id=request.session_id,
)
Expand Down Expand Up @@ -736,7 +740,7 @@ def process_request(self):
_ = self.combinerStub.SendModelPrediction(prediction, metadata=self.metadata)
status_type = fedn.StatusType.MODEL_PREDICTION
self.send_status(
"Model prediction completed.", log_level=fedn.Status.AUDIT, type=status_type, request=prediction, sesssion_id=request.session_id
"Model prediction completed.", log_level=fedn.LogLevel.AUDIT, type=status_type, request=prediction, sesssion_id=request.session_id
)
except grpc.RpcError as e:
status_code = e.code()
Expand Down Expand Up @@ -789,13 +793,13 @@ def _send_heartbeat(self, update_frequency=2.0):
logger.info("SendStatus: Client disconnected.")
return

def send_status(self, msg, log_level=fedn.Status.INFO, type=None, request=None, sesssion_id: str = None):
def send_status(self, msg, log_level=fedn.LogLevel.INFO, type=None, request=None, sesssion_id: str = None):
"""Send status message.
:param msg: The message to send.
:type msg: str
:param log_level: The log level of the message.
:type log_level: fedn.Status.INFO, fedn.Status.WARNING, fedn.Status.ERROR
:type log_level: fedn.LogLevel.INFO, fedn.LogLevel.WARNING, fedn.LogLevel.ERROR
:param type: The type of the message.
:type type: str
:param request: The request message.
Expand Down
24 changes: 18 additions & 6 deletions fedn/network/clients/fedn_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,13 @@ def update_local_model(self, request):
logger.error("No train callback set")
return

self.send_status(f"\t Starting processing of training request for model_id {model_id}", sesssion_id=request.session_id, sender_name=self.name)
self.send_status(
f"\t Starting processing of training request for model_id {model_id}",
sesssion_id=request.session_id,
sender_name=self.name,
log_level=fedn.LogLevel.INFO,
type=fedn.StatusType.MODEL_UPDATE,
)

logger.info(f"Running train callback with model ID: {model_id}")
client_settings = json.loads(request.data).get("client_settings", {})
Expand All @@ -241,7 +247,7 @@ def update_local_model(self, request):

self.send_status(
"Model update completed.",
log_level=fedn.Status.AUDIT,
log_level=fedn.LogLevel.AUDIT,
type=fedn.StatusType.MODEL_UPDATE,
request=update,
sesssion_id=request.session_id,
Expand All @@ -251,7 +257,13 @@ def update_local_model(self, request):
def validate_global_model(self, request):
model_id = request.model_id

self.send_status(f"Processing validate request for model_id {model_id}", sesssion_id=request.session_id, sender_name=self.name)
self.send_status(
f"Processing validate request for model_id {model_id}",
sesssion_id=request.session_id,
sender_name=self.name,
log_level=fedn.LogLevel.INFO,
type=fedn.StatusType.MODEL_VALIDATION,
)

in_model = self.get_model_from_combiner(id=model_id, client_id=self.client_id)

Expand All @@ -275,7 +287,7 @@ def validate_global_model(self, request):
if result:
self.send_status(
"Model validation completed.",
log_level=fedn.Status.AUDIT,
log_level=fedn.LogLevel.AUDIT,
type=fedn.StatusType.MODEL_VALIDATION,
request=validation,
sesssion_id=request.session_id,
Expand All @@ -284,7 +296,7 @@ def validate_global_model(self, request):
else:
self.send_status(
"Client {} failed to complete model validation.".format(self.name),
log_level=fedn.Status.WARNING,
log_level=fedn.LogLevel.WARNING,
request=request,
sesssion_id=request.session_id,
sender_name=self.name,
Expand Down Expand Up @@ -362,7 +374,7 @@ def get_model_from_combiner(self, id: str, client_id: str, timeout: int = 20) ->
def send_model_to_combiner(self, model: BytesIO, id: str):
return self.grpc_handler.send_model_to_combiner(model, id)

def send_status(self, msg: str, log_level=fedn.Status.INFO, type=None, request=None, sesssion_id: str = None, sender_name: str = None):
def send_status(self, msg: str, log_level=fedn.LogLevel.INFO, type=None, request=None, sesssion_id: str = None, sender_name: str = None):
return self.grpc_handler.send_status(msg, log_level, type, request, sesssion_id, sender_name)

def send_model_update(self, update: fedn.ModelUpdate) -> bool:
Expand Down
12 changes: 6 additions & 6 deletions fedn/network/clients/grpc_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -164,9 +164,9 @@ def listen_to_task_stream(self, client_name: str, client_id: str, callback: Call
for request in self.combinerStub.TaskStream(r, metadata=self.metadata):
if request.sender.role == fedn.COMBINER:
self.send_status(
"Received model update request.",
log_level=fedn.Status.AUDIT,
type=fedn.StatusType.MODEL_UPDATE_REQUEST,
"Received request from combiner.",
log_level=fedn.LogLevel.AUDIT,
type=request.type,
request=request,
sesssion_id=request.session_id,
sender_name=client_name,
Expand All @@ -183,13 +183,13 @@ def listen_to_task_stream(self, client_name: str, client_id: str, callback: Call
logger.error(f"GRPC (TaskStream): An error occurred: {e}")
self._handle_unknown_error(e, "TaskStream", lambda: self.listen_to_task_stream(client_name, client_id, callback))

def send_status(self, msg: str, log_level=fedn.Status.INFO, type=None, request=None, sesssion_id: str = None, sender_name: str = None):
def send_status(self, msg: str, log_level=fedn.LogLevel.INFO, type=None, request=None, sesssion_id: str = None, sender_name: str = None):
"""Send status message.
:param msg: The message to send.
:type msg: str
:param log_level: The log level of the message.
:type log_level: fedn.Status.INFO, fedn.Status.WARNING, fedn.Status.ERROR
:type log_level: fedn.LogLevel.INFO, fedn.LogLevel.WARNING, fedn.LogLevel.ERROR
:param type: The type of the message.
:type type: str
:param request: The request message.
Expand All @@ -198,7 +198,7 @@ def send_status(self, msg: str, log_level=fedn.Status.INFO, type=None, request=N
status = fedn.Status()
status.timestamp.GetCurrentTime()
status.sender.name = sender_name
status.sender.role = fedn.WORKER
status.sender.role = fedn.Role.WORKER
status.log_level = log_level
status.status = str(msg)
status.session_id = sesssion_id
Expand Down
6 changes: 3 additions & 3 deletions fedn/network/combiner/combiner.py
Original file line number Diff line number Diff line change
Expand Up @@ -668,9 +668,8 @@ def TaskStream(self, response, context):
metadata = dict(metadata)
logger.info("grpc.Combiner.TaskStream: Client connected: {}\n".format(metadata["client"]))

status = fedn.Status(status="Client {} connecting to TaskStream.".format(client.name))
status = fedn.Status(status="Client {} connecting to TaskStream.".format(client.name), log_level=fedn.LogLevel.INFO, type=fedn.StatusType.NETWORK)
logger.info("Client {} connecting to TaskStream.".format(client.name))
status.log_level = fedn.Status.INFO
status.timestamp.GetCurrentTime()

self.__whoami(status.sender, self)
Expand Down Expand Up @@ -724,7 +723,8 @@ def TaskStream(self, response, context):
logger.error("Error in ModelUpdateRequestStream: {}".format(e))
logger.warning("Client {} disconnected from TaskStream".format(client.name))
status = fedn.Status(status="Client {} disconnected from TaskStream.".format(client.name))
status.log_level = fedn.Status.INFO
status.log_level = fedn.LogLevel.INFO
status.type = fedn.StatusType.NETWORK
status.timestamp.GetCurrentTime()
self.__whoami(status.sender, self)
self._send_status(status)
Expand Down
19 changes: 10 additions & 9 deletions fedn/network/grpc/fedn.proto
Original file line number Diff line number Diff line change
Expand Up @@ -16,20 +16,21 @@ enum StatusType {
MODEL_VALIDATION_REQUEST = 3;
MODEL_VALIDATION = 4;
MODEL_PREDICTION = 5;
NETWORK = 6;
}

enum LogLevel {
NONE = 0;
INFO = 1;
DEBUG = 2;
WARNING = 3;
ERROR = 4;
AUDIT = 5;
}

message Status {
Client sender = 1;
string status = 2;

enum LogLevel {
INFO = 0;
DEBUG = 1;
WARNING = 2;
ERROR = 3;
AUDIT = 4;
}

LogLevel log_level = 3;
string data = 4;
string correlation_id = 5;
Expand Down
Loading

0 comments on commit b5f0a02

Please sign in to comment.