Skip to content

Commit

Permalink
review fix: propagate healthcheck message after validation
Browse files Browse the repository at this point in the history
  • Loading branch information
shamardy committed Nov 13, 2024
1 parent 515f6ae commit b81bfc6
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 15 deletions.
27 changes: 13 additions & 14 deletions mm2src/mm2_main/src/lp_healthcheck.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,15 @@ use instant::{Duration, Instant};
use lazy_static::lazy_static;
use mm2_core::mm_ctx::MmArc;
use mm2_err_handle::prelude::MmError;
use mm2_err_handle::prelude::*;
use mm2_libp2p::p2p_ctx::P2PContext;
use mm2_libp2p::{decode_message, encode_message, pub_sub_topic, Libp2pPublic, PeerAddress, TopicPrefix};
use ser_error_derive::SerializeErrorType;
use serde::{Deserialize, Serialize};
use std::convert::TryFrom;
use std::sync::Mutex;

use crate::lp_network::broadcast_p2p_msg;
use crate::lp_network::{broadcast_p2p_msg, P2PRequestError, P2PRequestResult};

pub(crate) const PEER_HEALTHCHECK_PREFIX: TopicPrefix = "hcheck";

Expand Down Expand Up @@ -279,7 +280,10 @@ pub async fn peer_connection_healthcheck_rpc(
Ok(rx.timeout(timeout_duration).await == Ok(Ok(())))
}

pub(crate) async fn process_p2p_healthcheck_message(ctx: &MmArc, message: mm2_libp2p::GossipsubMessage) {
pub(crate) async fn process_p2p_healthcheck_message(
ctx: &MmArc,
message: mm2_libp2p::GossipsubMessage,
) -> P2PRequestResult<()> {
macro_rules! try_or_return {
($exp:expr, $msg: expr) => {
match $exp {
Expand All @@ -292,24 +296,17 @@ pub(crate) async fn process_p2p_healthcheck_message(ctx: &MmArc, message: mm2_li
};
}

let data = try_or_return!(
HealthcheckMessage::decode(&message.data),
"Couldn't decode healthcheck message"
);
let data = HealthcheckMessage::decode(&message.data)
.map_to_mm(|e| P2PRequestError::DecodeError(format!("Couldn't decode healthcheck message: {}", e)))?;
let sender_peer = data.is_received_message_valid().map_to_mm(|e| {
P2PRequestError::ValidationFailed(format!("Received an invalid healthcheck message. Error: {}", e))
})?;

let ctx = ctx.clone();

// Pass the remaining work to another thread to free up this one as soon as possible,
// so KDF can handle a high amount of healthcheck messages more efficiently.
ctx.spawner().spawn(async move {
let sender_peer = match data.is_received_message_valid() {
Ok(t) => t,
Err(e) => {
log::error!("Received an invalid healthcheck message. Error: {e}");
return;
},
};

if data.should_reply() {
// Reply the message so they know we are healthy.

Expand Down Expand Up @@ -337,6 +334,8 @@ pub(crate) async fn process_p2p_healthcheck_message(ctx: &MmArc, message: mm2_li
};
}
});

Ok(())
}

#[cfg(any(test, target_arch = "wasm32"))]
Expand Down
8 changes: 7 additions & 1 deletion mm2src/mm2_main/src/lp_network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ pub enum P2PRequestError {
ResponseError(String),
#[display(fmt = "Expected 1 response, found {}", _0)]
ExpectedSingleResponseError(usize),
ValidationFailed(String),
}

/// Enum covering error cases that can happen during P2P message processing.
Expand Down Expand Up @@ -215,7 +216,12 @@ async fn process_p2p_message(
}
},
Some(lp_healthcheck::PEER_HEALTHCHECK_PREFIX) => {
lp_healthcheck::process_p2p_healthcheck_message(&ctx, message).await
if let Err(e) = lp_healthcheck::process_p2p_healthcheck_message(&ctx, message).await {
log::error!("{}", e);
return;
}

to_propagate = true;
},
None | Some(_) => (),
}
Expand Down

0 comments on commit b81bfc6

Please sign in to comment.