diff --git a/protocols/gossipsub/CHANGELOG.md b/protocols/gossipsub/CHANGELOG.md index 5e18f284fc4..d5054bea945 100644 --- a/protocols/gossipsub/CHANGELOG.md +++ b/protocols/gossipsub/CHANGELOG.md @@ -1,4 +1,6 @@ ## 0.48.0 +- Add configurable `idontwant_message_size_threshold` parameter. + See [PR 5770](https://github.com/libp2p/rust-libp2p/pull/5770) - Introduce Gossipsub v1.2 [spec](https://github.com/libp2p/specs/blob/master/pubsub/gossipsub/gossipsub-v1.2.md). See [PR 5697](https://github.com/libp2p/rust-libp2p/pull/5697) diff --git a/protocols/gossipsub/src/behaviour.rs b/protocols/gossipsub/src/behaviour.rs index 4643f2bd97f..4de314d7423 100644 --- a/protocols/gossipsub/src/behaviour.rs +++ b/protocols/gossipsub/src/behaviour.rs @@ -1739,8 +1739,10 @@ where // Calculate the message id on the transformed data. let msg_id = self.config.message_id(&message); - // Broadcast IDONTWANT messages. - self.send_idontwant(&raw_message, &msg_id, propagation_source); + // Broadcast IDONTWANT messages + if raw_message.raw_protobuf_len() > self.config.idontwant_message_size_threshold() { + self.send_idontwant(&raw_message, &msg_id, propagation_source); + } // Check the validity of the message // Peers get penalized if this message is invalid. We don't add it to the duplicate cache @@ -1757,6 +1759,7 @@ where self.mcache.observe_duplicate(&msg_id, propagation_source); return; } + tracing::debug!( message=%msg_id, "Put message in duplicate_cache and resolve promises" diff --git a/protocols/gossipsub/src/behaviour/tests.rs b/protocols/gossipsub/src/behaviour/tests.rs index f3d24897b0c..bed74ecdce7 100644 --- a/protocols/gossipsub/src/behaviour/tests.rs +++ b/protocols/gossipsub/src/behaviour/tests.rs @@ -5288,7 +5288,7 @@ fn sends_idontwant() { let message = RawMessage { source: Some(peers[1]), - data: vec![12], + data: vec![12u8; 1024], sequence_number: Some(0), topic: topic_hashes[0].clone(), signature: None, @@ -5314,6 +5314,48 @@ fn sends_idontwant() { ); } +#[test] +fn doesnt_sends_idontwant_for_lower_message_size() { + let (mut gs, peers, receivers, topic_hashes) = inject_nodes1() + .peer_no(5) + .topics(vec![String::from("topic1")]) + .to_subscribe(true) + .gs_config(Config::default()) + .explicit(1) + .peer_kind(PeerKind::Gossipsubv1_2) + .create_network(); + + let local_id = PeerId::random(); + + let message = RawMessage { + source: Some(peers[1]), + data: vec![12], + sequence_number: Some(0), + topic: topic_hashes[0].clone(), + signature: None, + key: None, + validated: true, + }; + + gs.handle_received_message(message.clone(), &local_id); + assert_eq!( + receivers + .into_iter() + .fold(0, |mut idontwants, (peer_id, c)| { + let non_priority = c.non_priority.get_ref(); + while !non_priority.is_empty() { + if let Ok(RpcOut::IDontWant(_)) = non_priority.try_recv() { + assert_ne!(peer_id, peers[1]); + idontwants += 1; + } + } + idontwants + }), + 0, + "IDONTWANT was sent" + ); +} + /// Test that a node doesn't send IDONTWANT messages to the mesh peers /// that don't run Gossipsub v1.2. #[test] diff --git a/protocols/gossipsub/src/config.rs b/protocols/gossipsub/src/config.rs index d53908ad267..7bd6c9c583d 100644 --- a/protocols/gossipsub/src/config.rs +++ b/protocols/gossipsub/src/config.rs @@ -98,6 +98,7 @@ pub struct Config { connection_handler_queue_len: usize, connection_handler_publish_duration: Duration, connection_handler_forward_duration: Duration, + idontwant_message_size_threshold: usize, } impl Config { @@ -371,6 +372,16 @@ impl Config { pub fn forward_queue_duration(&self) -> Duration { self.connection_handler_forward_duration } + + // The message size threshold for which IDONTWANT messages are sent. + // Sending IDONTWANT messages for small messages can have a negative effect to the overall + // traffic and CPU load. This acts as a lower bound cutoff for the message size to which + // IDONTWANT won't be sent to peers. Only works if the peers support Gossipsub1.2 + // (see https://github.com/libp2p/specs/blob/master/pubsub/gossipsub/gossipsub-v1.2.md#idontwant-message) + // default is 1kB + pub fn idontwant_message_size_threshold(&self) -> usize { + self.idontwant_message_size_threshold + } } impl Default for Config { @@ -443,6 +454,7 @@ impl Default for ConfigBuilder { connection_handler_queue_len: 5000, connection_handler_publish_duration: Duration::from_secs(5), connection_handler_forward_duration: Duration::from_secs(1), + idontwant_message_size_threshold: 1000, }, invalid_protocol: false, } @@ -829,6 +841,17 @@ impl ConfigBuilder { self } + // The message size threshold for which IDONTWANT messages are sent. + // Sending IDONTWANT messages for small messages can have a negative effect to the overall + // traffic and CPU load. This acts as a lower bound cutoff for the message size to which + // IDONTWANT won't be sent to peers. Only works if the peers support Gossipsub1.2 + // (see https://github.com/libp2p/specs/blob/master/pubsub/gossipsub/gossipsub-v1.2.md#idontwant-message) + // default is 1kB + pub fn idontwant_message_size_threshold(&mut self, size: usize) -> &mut Self { + self.config.idontwant_message_size_threshold = size; + self + } + /// Constructs a [`Config`] from the given configuration and validates the settings. pub fn build(&self) -> Result { // check all constraints on config @@ -899,6 +922,10 @@ impl std::fmt::Debug for Config { "published_message_ids_cache_time", &self.published_message_ids_cache_time, ); + let _ = builder.field( + "idontwant_message_size_threhold", + &self.idontwant_message_size_threshold, + ); builder.finish() } }