Skip to content
This repository has been archived by the owner on Oct 31, 2024. It is now read-only.

Commit

Permalink
chore: adding more logs
Browse files Browse the repository at this point in the history
Signed-off-by: Simon Paitrault <[email protected]>
  • Loading branch information
Freyskeyd committed Apr 1, 2024
1 parent b1a3714 commit d501785
Show file tree
Hide file tree
Showing 5 changed files with 43 additions and 18 deletions.
23 changes: 16 additions & 7 deletions crates/topos-p2p/src/behaviour/gossip.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,15 @@ impl Behaviour {
) -> Result<MessageId, P2PError> {
match topic {
TOPOS_GOSSIP | TOPOS_ECHO | TOPOS_READY => {
let msg_id = self.gossipsub.publish(IdentTopic::new(topic), message)?;
let topic = IdentTopic::new(topic);
let topic_hash = topic.hash();
let msg_id = self.gossipsub.publish(topic, message)?;
trace!("Published on topos_gossip: {:?}", msg_id);

for p in self.gossipsub.mesh_peers(&topic_hash) {
debug!("Sent gossipsub message({}) to {} peer", msg_id, p);
}

Ok(msg_id)
}
_ => Err(P2PError::InvalidGossipTopic(topic)),
Expand Down Expand Up @@ -175,32 +181,35 @@ impl NetworkBehaviour for Behaviour {
} => match topic.as_str() {
TOPOS_GOSSIP => {
return Poll::Ready(ToSwarm::GenerateEvent(ComposedEvent::Gossipsub(
crate::event::GossipEvent::Message {
Box::new(crate::event::GossipEvent::Message {
propagated_by: propagation_source,
topic: TOPOS_GOSSIP,
message: data,
source,
id: message_id,
},
}),
)))
}
TOPOS_ECHO => {
return Poll::Ready(ToSwarm::GenerateEvent(ComposedEvent::Gossipsub(
crate::event::GossipEvent::Message {
Box::new(crate::event::GossipEvent::Message {
propagated_by: propagation_source,
topic: TOPOS_ECHO,
message: data,
source,
id: message_id,
},
}),
)))
}
TOPOS_READY => {
return Poll::Ready(ToSwarm::GenerateEvent(ComposedEvent::Gossipsub(
crate::event::GossipEvent::Message {
Box::new(crate::event::GossipEvent::Message {
propagated_by: propagation_source,
topic: TOPOS_READY,
message: data,
source,
id: message_id,
},
}),
)))
}
_ => {}
Expand Down
5 changes: 4 additions & 1 deletion crates/topos-p2p/src/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use crate::behaviour::{grpc, HealthStatus};
pub enum GossipEvent {
/// A message has been received from a peer on one of the subscribed topics
Message {
propagated_by: PeerId,
source: Option<PeerId>,
topic: &'static str,
message: Vec<u8>,
Expand All @@ -18,7 +19,7 @@ pub enum GossipEvent {
pub enum ComposedEvent {
Kademlia(Box<kad::Event>),
PeerInfo(Box<identify::Event>),
Gossipsub(GossipEvent),
Gossipsub(Box<GossipEvent>),
Grpc(grpc::Event),
Void,
}
Expand Down Expand Up @@ -49,9 +50,11 @@ impl From<void::Void> for ComposedEvent {

/// Represents the events that the p2p layer can emit
#[derive(Debug)]
#[allow(clippy::large_enum_variant)]
pub enum Event {
/// An event emitted when a gossip message is received
Gossip {
propagated_by: PeerId,
from: PeerId,
data: Vec<u8>,
id: String,
Expand Down
17 changes: 11 additions & 6 deletions crates/topos-p2p/src/runtime/handle_event/gossipsub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,41 +9,46 @@ use crate::{constants, event::GossipEvent, Event, Runtime, TOPOS_ECHO, TOPOS_GOS
use super::{EventHandler, EventResult};

#[async_trait::async_trait]
impl EventHandler<GossipEvent> for Runtime {
async fn handle(&mut self, event: GossipEvent) -> EventResult {
impl EventHandler<Box<GossipEvent>> for Runtime {
async fn handle(&mut self, event: Box<GossipEvent>) -> EventResult {
if let GossipEvent::Message {
propagated_by,
source: Some(source),
message,
topic,
id,
} = event
} = *event
{
if self.event_sender.capacity() < *constants::CAPACITY_EVENT_STREAM_BUFFER {
P2P_EVENT_STREAM_CAPACITY_TOTAL.inc();
}

debug!("Received message from {:?} on topic {:?}", source, topic);
debug!(
"Received message({id}) from source {:?} on topic {:?} propagated by {propagated_by}",
source, topic
);

match topic {
TOPOS_GOSSIP => P2P_MESSAGE_RECEIVED_ON_GOSSIP_TOTAL.inc(),
TOPOS_ECHO => P2P_MESSAGE_RECEIVED_ON_ECHO_TOTAL.inc(),
TOPOS_READY => P2P_MESSAGE_RECEIVED_ON_READY_TOTAL.inc(),
_ => {
error!("Received message on unknown topic {:?}", topic);
error!("Received message({id}) on unknown topic {:?}", topic);
return Ok(());
}
}

if let Err(e) = self
.event_sender
.send(Event::Gossip {
propagated_by,
from: source,
data: message,
id: id.to_string(),
})
.await
{
error!("Failed to send gossip event to runtime: {:?}", e);
error!("Failed to send gossip event({id}) to runtime: {:?}", e);
}
}

Expand Down
12 changes: 8 additions & 4 deletions crates/topos-tce/src/app_context/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,13 @@ impl AppContext {
&evt
);

if let NetEvent::Gossip { data, from, id } = evt {
if let NetEvent::Gossip {
data,
propagated_by: received_from,
from,
id,
} = evt
{
if let Ok(DoubleEchoRequest {
request: Some(double_echo_request),
}) = DoubleEchoRequest::decode(&data[..])
Expand All @@ -39,10 +45,8 @@ impl AppContext {
}
info!(
message_id = id,
"Received certificate {} from GossipSub message({}) from {}",
"Received certificate {} from GossipSub message({id}) from {from} | propagated by {received_from}",
cert.id,
id,
from
);

match self.validator_store.insert_pending_certificate(&cert).await {
Expand Down
4 changes: 4 additions & 0 deletions crates/topos-tce/src/tests/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ async fn handle_gossip(
};
context
.on_net_event(topos_p2p::Event::Gossip {
propagated_by: PeerId::random(),
from: PeerId::random(),
data: msg.encode_to_vec(),
id: "0".to_string(),
Expand Down Expand Up @@ -67,6 +68,7 @@ async fn handle_echo(
};
context
.on_net_event(topos_p2p::Event::Gossip {
propagated_by: PeerId::random(),
from: PeerId::random(),
data: msg.encode_to_vec(),
id: "0".to_string(),
Expand Down Expand Up @@ -97,6 +99,7 @@ async fn handle_ready(
};
context
.on_net_event(topos_p2p::Event::Gossip {
propagated_by: PeerId::random(),
from: PeerId::random(),
data: msg.encode_to_vec(),
id: "0".to_string(),
Expand Down Expand Up @@ -131,6 +134,7 @@ async fn handle_already_delivered(

context
.on_net_event(topos_p2p::Event::Gossip {
propagated_by: PeerId::random(),
from: PeerId::random(),
data: msg.encode_to_vec(),
id: "0".to_string(),
Expand Down

0 comments on commit d501785

Please sign in to comment.