Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bugfix/SK-1241 | Fix missing ROLE and TYPE in client status messages #766

Merged
merged 6 commits into from
Dec 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 12 additions & 8 deletions fedn/network/clients/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -406,7 +406,7 @@ def _listen_to_task_stream(self):
# Process training request
self.send_status(
"Received model update request.",
log_level=fedn.Status.AUDIT,
log_level=fedn.LogLevel.AUDIT,
type=fedn.StatusType.MODEL_UPDATE_REQUEST,
request=request,
sesssion_id=request.session_id,
Expand Down Expand Up @@ -641,7 +641,7 @@ def process_request(self):
_ = self.combinerStub.SendModelUpdate(update, metadata=self.metadata)
self.send_status(
"Model update completed.",
log_level=fedn.Status.AUDIT,
log_level=fedn.LogLevel.AUDIT,
type=fedn.StatusType.MODEL_UPDATE,
request=update,
sesssion_id=request.session_id,
Expand All @@ -655,7 +655,7 @@ def process_request(self):
logger.debug(e)
else:
self.send_status(
"Client {} failed to complete model update.", log_level=fedn.Status.WARNING, request=request, sesssion_id=request.session_id
"Client {} failed to complete model update.", log_level=fedn.LogLevel.WARNING, request=request, sesssion_id=request.session_id
)

self.state = ClientState.idle
Expand Down Expand Up @@ -683,7 +683,11 @@ def process_request(self):

status_type = fedn.StatusType.MODEL_VALIDATION
self.send_status(
"Model validation completed.", log_level=fedn.Status.AUDIT, type=status_type, request=validation, sesssion_id=request.session_id
"Model validation completed.",
log_level=fedn.LogLevel.AUDIT,
type=status_type,
request=validation,
sesssion_id=request.session_id,
)
except grpc.RpcError as e:
status_code = e.code()
Expand All @@ -695,7 +699,7 @@ def process_request(self):
else:
self.send_status(
"Client {} failed to complete model validation.".format(self.name),
log_level=fedn.Status.WARNING,
log_level=fedn.LogLevel.WARNING,
request=request,
sesssion_id=request.session_id,
)
Expand Down Expand Up @@ -736,7 +740,7 @@ def process_request(self):
_ = self.combinerStub.SendModelPrediction(prediction, metadata=self.metadata)
status_type = fedn.StatusType.MODEL_PREDICTION
self.send_status(
"Model prediction completed.", log_level=fedn.Status.AUDIT, type=status_type, request=prediction, sesssion_id=request.session_id
"Model prediction completed.", log_level=fedn.LogLevel.AUDIT, type=status_type, request=prediction, sesssion_id=request.session_id
)
except grpc.RpcError as e:
status_code = e.code()
Expand Down Expand Up @@ -789,13 +793,13 @@ def _send_heartbeat(self, update_frequency=2.0):
logger.info("SendStatus: Client disconnected.")
return

def send_status(self, msg, log_level=fedn.Status.INFO, type=None, request=None, sesssion_id: str = None):
def send_status(self, msg, log_level=fedn.LogLevel.INFO, type=None, request=None, sesssion_id: str = None):
"""Send status message.

:param msg: The message to send.
:type msg: str
:param log_level: The log level of the message.
:type log_level: fedn.Status.INFO, fedn.Status.WARNING, fedn.Status.ERROR
:type log_level: fedn.LogLevel.INFO, fedn.LogLevel.WARNING, fedn.LogLevel.ERROR
:param type: The type of the message.
:type type: str
:param request: The request message.
Expand Down
24 changes: 18 additions & 6 deletions fedn/network/clients/fedn_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,13 @@ def update_local_model(self, request):
logger.error("No train callback set")
return

self.send_status(f"\t Starting processing of training request for model_id {model_id}", sesssion_id=request.session_id, sender_name=self.name)
self.send_status(
f"\t Starting processing of training request for model_id {model_id}",
sesssion_id=request.session_id,
sender_name=self.name,
log_level=fedn.LogLevel.INFO,
type=fedn.StatusType.MODEL_UPDATE,
)

logger.info(f"Running train callback with model ID: {model_id}")
client_settings = json.loads(request.data).get("client_settings", {})
Expand All @@ -241,7 +247,7 @@ def update_local_model(self, request):

self.send_status(
"Model update completed.",
log_level=fedn.Status.AUDIT,
log_level=fedn.LogLevel.AUDIT,
type=fedn.StatusType.MODEL_UPDATE,
request=update,
sesssion_id=request.session_id,
Expand All @@ -251,7 +257,13 @@ def update_local_model(self, request):
def validate_global_model(self, request):
model_id = request.model_id

self.send_status(f"Processing validate request for model_id {model_id}", sesssion_id=request.session_id, sender_name=self.name)
self.send_status(
f"Processing validate request for model_id {model_id}",
sesssion_id=request.session_id,
sender_name=self.name,
log_level=fedn.LogLevel.INFO,
type=fedn.StatusType.MODEL_VALIDATION,
)

in_model = self.get_model_from_combiner(id=model_id, client_id=self.client_id)

Expand All @@ -275,7 +287,7 @@ def validate_global_model(self, request):
if result:
self.send_status(
"Model validation completed.",
log_level=fedn.Status.AUDIT,
log_level=fedn.LogLevel.AUDIT,
type=fedn.StatusType.MODEL_VALIDATION,
request=validation,
sesssion_id=request.session_id,
Expand All @@ -284,7 +296,7 @@ def validate_global_model(self, request):
else:
self.send_status(
"Client {} failed to complete model validation.".format(self.name),
log_level=fedn.Status.WARNING,
log_level=fedn.LogLevel.WARNING,
request=request,
sesssion_id=request.session_id,
sender_name=self.name,
Expand Down Expand Up @@ -362,7 +374,7 @@ def get_model_from_combiner(self, id: str, client_id: str, timeout: int = 20) ->
def send_model_to_combiner(self, model: BytesIO, id: str):
return self.grpc_handler.send_model_to_combiner(model, id)

def send_status(self, msg: str, log_level=fedn.Status.INFO, type=None, request=None, sesssion_id: str = None, sender_name: str = None):
def send_status(self, msg: str, log_level=fedn.LogLevel.INFO, type=None, request=None, sesssion_id: str = None, sender_name: str = None):
return self.grpc_handler.send_status(msg, log_level, type, request, sesssion_id, sender_name)

def send_model_update(self, update: fedn.ModelUpdate) -> bool:
Expand Down
12 changes: 6 additions & 6 deletions fedn/network/clients/grpc_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -164,9 +164,9 @@ def listen_to_task_stream(self, client_name: str, client_id: str, callback: Call
for request in self.combinerStub.TaskStream(r, metadata=self.metadata):
if request.sender.role == fedn.COMBINER:
self.send_status(
"Received model update request.",
log_level=fedn.Status.AUDIT,
type=fedn.StatusType.MODEL_UPDATE_REQUEST,
"Received request from combiner.",
log_level=fedn.LogLevel.AUDIT,
type=request.type,
request=request,
sesssion_id=request.session_id,
sender_name=client_name,
Expand All @@ -183,13 +183,13 @@ def listen_to_task_stream(self, client_name: str, client_id: str, callback: Call
logger.error(f"GRPC (TaskStream): An error occurred: {e}")
self._handle_unknown_error(e, "TaskStream", lambda: self.listen_to_task_stream(client_name, client_id, callback))

def send_status(self, msg: str, log_level=fedn.Status.INFO, type=None, request=None, sesssion_id: str = None, sender_name: str = None):
def send_status(self, msg: str, log_level=fedn.LogLevel.INFO, type=None, request=None, sesssion_id: str = None, sender_name: str = None):
"""Send status message.

:param msg: The message to send.
:type msg: str
:param log_level: The log level of the message.
:type log_level: fedn.Status.INFO, fedn.Status.WARNING, fedn.Status.ERROR
:type log_level: fedn.LogLevel.INFO, fedn.LogLevel.WARNING, fedn.LogLevel.ERROR
:param type: The type of the message.
:type type: str
:param request: The request message.
Expand All @@ -198,7 +198,7 @@ def send_status(self, msg: str, log_level=fedn.Status.INFO, type=None, request=N
status = fedn.Status()
status.timestamp.GetCurrentTime()
status.sender.name = sender_name
status.sender.role = fedn.WORKER
status.sender.role = fedn.Role.WORKER
status.log_level = log_level
status.status = str(msg)
status.session_id = sesssion_id
Expand Down
6 changes: 3 additions & 3 deletions fedn/network/combiner/combiner.py
Original file line number Diff line number Diff line change
Expand Up @@ -668,9 +668,8 @@ def TaskStream(self, response, context):
metadata = dict(metadata)
logger.info("grpc.Combiner.TaskStream: Client connected: {}\n".format(metadata["client"]))

status = fedn.Status(status="Client {} connecting to TaskStream.".format(client.name))
status = fedn.Status(status="Client {} connecting to TaskStream.".format(client.name), log_level=fedn.LogLevel.INFO, type=fedn.StatusType.NETWORK)
logger.info("Client {} connecting to TaskStream.".format(client.name))
status.log_level = fedn.Status.INFO
status.timestamp.GetCurrentTime()

self.__whoami(status.sender, self)
Expand Down Expand Up @@ -724,7 +723,8 @@ def TaskStream(self, response, context):
logger.error("Error in ModelUpdateRequestStream: {}".format(e))
logger.warning("Client {} disconnected from TaskStream".format(client.name))
status = fedn.Status(status="Client {} disconnected from TaskStream.".format(client.name))
status.log_level = fedn.Status.INFO
status.log_level = fedn.LogLevel.INFO
status.type = fedn.StatusType.NETWORK
status.timestamp.GetCurrentTime()
self.__whoami(status.sender, self)
self._send_status(status)
Expand Down
19 changes: 10 additions & 9 deletions fedn/network/grpc/fedn.proto
Original file line number Diff line number Diff line change
Expand Up @@ -16,20 +16,21 @@ enum StatusType {
MODEL_VALIDATION_REQUEST = 3;
MODEL_VALIDATION = 4;
MODEL_PREDICTION = 5;
NETWORK = 6;
}

enum LogLevel {
NONE = 0;
INFO = 1;
DEBUG = 2;
WARNING = 3;
ERROR = 4;
AUDIT = 5;
}

message Status {
Client sender = 1;
string status = 2;

enum LogLevel {
INFO = 0;
DEBUG = 1;
WARNING = 2;
ERROR = 3;
AUDIT = 4;
}

LogLevel log_level = 3;
string data = 4;
string correlation_id = 5;
Expand Down
Loading
Loading