diff --git a/Cargo.lock b/Cargo.lock index 7a5497c5701..5d9a22cf579 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1926,6 +1926,15 @@ dependencies = [ "allocator-api2", ] +[[package]] +name = "hashlink" +version = "0.9.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6ba4ff7128dee98c7dc9794b6a411377e1404dba1c97deb8d1a55297bd25d8af" +dependencies = [ + "hashbrown 0.14.3", +] + [[package]] name = "heck" version = "0.4.1" @@ -2858,6 +2867,7 @@ dependencies = [ "futures", "futures-timer", "getrandom 0.2.15", + "hashlink", "hex_fmt", "libp2p-core", "libp2p-identity", diff --git a/Cargo.toml b/Cargo.toml index 1819e68edf4..8c5c4c320c1 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -141,6 +141,7 @@ tracing = "0.1.41" tracing-subscriber = "0.3.19" unsigned-varint = { version = "0.8.0" } web-time = "1.1.0" +hashlink = "0.9.0" [patch.crates-io] diff --git a/protocols/gossipsub/CHANGELOG.md b/protocols/gossipsub/CHANGELOG.md index e9663c4c39c..5e18f284fc4 100644 --- a/protocols/gossipsub/CHANGELOG.md +++ b/protocols/gossipsub/CHANGELOG.md @@ -1,5 +1,8 @@ ## 0.48.0 +- Introduce Gossipsub v1.2 [spec](https://github.com/libp2p/specs/blob/master/pubsub/gossipsub/gossipsub-v1.2.md). + See [PR 5697](https://github.com/libp2p/rust-libp2p/pull/5697) + - Correct state inconsistencies with the mesh and fanout when unsubscribing. See [PR 5690](https://github.com/libp2p/rust-libp2p/pull/5690) diff --git a/protocols/gossipsub/Cargo.toml b/protocols/gossipsub/Cargo.toml index 298cdee21e9..328d4367204 100644 --- a/protocols/gossipsub/Cargo.toml +++ b/protocols/gossipsub/Cargo.toml @@ -24,6 +24,7 @@ fnv = "1.0.7" futures = { workspace = true } futures-timer = "3.0.2" getrandom = { workspace = true } +hashlink = { workspace = true} hex_fmt = "0.3.0" web-time = { workspace = true } libp2p-core = { workspace = true } diff --git a/protocols/gossipsub/src/behaviour.rs b/protocols/gossipsub/src/behaviour.rs index 954e87ee470..4643f2bd97f 100644 --- a/protocols/gossipsub/src/behaviour.rs +++ b/protocols/gossipsub/src/behaviour.rs @@ -30,6 +30,7 @@ use std::{ use futures::FutureExt; use futures_timer::Delay; +use hashlink::LinkedHashMap; use libp2p_core::{ multiaddr::Protocol::{Ip4, Ip6}, transport::PortUse, @@ -63,8 +64,9 @@ use crate::{ topic::{Hasher, Topic, TopicHash}, transform::{DataTransform, IdentityTransform}, types::{ - ControlAction, Graft, IHave, IWant, Message, MessageAcceptance, MessageId, PeerConnections, - PeerInfo, PeerKind, Prune, RawMessage, RpcOut, Subscription, SubscriptionAction, + ControlAction, Graft, IDontWant, IHave, IWant, Message, MessageAcceptance, MessageId, + PeerConnections, PeerInfo, PeerKind, Prune, RawMessage, RpcOut, Subscription, + SubscriptionAction, }, FailedMessages, PublishError, SubscriptionError, TopicScoreParams, ValidationError, }; @@ -72,6 +74,12 @@ use crate::{ #[cfg(test)] mod tests; +/// IDONTWANT cache capacity. +const IDONTWANT_CAP: usize = 10_000; + +/// IDONTWANT timeout before removal. +const IDONTWANT_TIMEOUT: Duration = Duration::new(3, 0); + /// Determines if published messages should be signed or not. /// /// Without signing, a number of privacy preserving modes can be selected. @@ -306,7 +314,7 @@ pub struct Behaviour { /// Stores optional peer score data together with thresholds, decay interval and gossip /// promises. - peer_score: Option<(PeerScore, PeerScoreThresholds, Delay, GossipPromises)>, + peer_score: Option<(PeerScore, PeerScoreThresholds, Delay)>, /// Counts the number of `IHAVE` received from each peer since the last heartbeat. count_received_ihave: HashMap, @@ -331,6 +339,9 @@ pub struct Behaviour { /// Tracks the numbers of failed messages per peer-id. failed_messages: HashMap, + + /// Tracks recently sent `IWANT` messages and checks if peers respond to them. + gossip_promises: GossipPromises, } impl Behaviour @@ -464,6 +475,7 @@ where subscription_filter, data_transform, failed_messages: Default::default(), + gossip_promises: Default::default(), }) } } @@ -897,7 +909,7 @@ where let interval = Delay::new(params.decay_interval); let peer_score = PeerScore::new_with_message_delivery_time_callback(params, callback); - self.peer_score = Some((peer_score, threshold, interval, GossipPromises::default())); + self.peer_score = Some((peer_score, threshold, interval)); Ok(()) } @@ -1161,7 +1173,7 @@ where } fn score_below_threshold_from_scores( - peer_score: &Option<(PeerScore, PeerScoreThresholds, Delay, GossipPromises)>, + peer_score: &Option<(PeerScore, PeerScoreThresholds, Delay)>, peer_id: &PeerId, threshold: impl Fn(&PeerScoreThresholds) -> f64, ) -> (bool, f64) { @@ -1222,10 +1234,7 @@ where return false; } - self.peer_score - .as_ref() - .map(|(_, _, _, promises)| !promises.contains(id)) - .unwrap_or(true) + !self.gossip_promises.contains(id) }; for (topic, ids) in ihave_msgs { @@ -1272,13 +1281,11 @@ where iwant_ids_vec.truncate(iask); *iasked += iask; - if let Some((_, _, _, gossip_promises)) = &mut self.peer_score { - gossip_promises.add_promise( - *peer_id, - &iwant_ids_vec, - Instant::now() + self.config.iwant_followup_time(), - ); - } + self.gossip_promises.add_promise( + *peer_id, + &iwant_ids_vec, + Instant::now() + self.config.iwant_followup_time(), + ); tracing::trace!( peer=%peer_id, "IHAVE: Asking for the following messages from peer: {:?}", @@ -1642,14 +1649,15 @@ where peer=%propagation_source, "Rejecting message from blacklisted peer" ); - if let Some((peer_score, .., gossip_promises)) = &mut self.peer_score { + self.gossip_promises + .reject_message(msg_id, &RejectReason::BlackListedPeer); + if let Some((peer_score, ..)) = &mut self.peer_score { peer_score.reject_message( propagation_source, msg_id, &raw_message.topic, RejectReason::BlackListedPeer, ); - gossip_promises.reject_message(msg_id, &RejectReason::BlackListedPeer); } return false; } @@ -1731,6 +1739,9 @@ where // Calculate the message id on the transformed data. let msg_id = self.config.message_id(&message); + // Broadcast IDONTWANT messages. + self.send_idontwant(&raw_message, &msg_id, propagation_source); + // Check the validity of the message // Peers get penalized if this message is invalid. We don't add it to the duplicate cache // and instead continually penalize peers that repeatedly send this message. @@ -1758,9 +1769,11 @@ where // Tells score that message arrived (but is maybe not fully validated yet). // Consider the message as delivered for gossip promises. - if let Some((peer_score, .., gossip_promises)) = &mut self.peer_score { + self.gossip_promises.message_delivered(&msg_id); + + // Tells score that message arrived (but is maybe not fully validated yet). + if let Some((peer_score, ..)) = &mut self.peer_score { peer_score.validate_message(propagation_source, &msg_id, &message.topic); - gossip_promises.message_delivered(&msg_id); } // Add the message to our memcache @@ -1802,12 +1815,14 @@ where raw_message: &RawMessage, reject_reason: RejectReason, ) { - if let Some((peer_score, .., gossip_promises)) = &mut self.peer_score { - if let Some(metrics) = self.metrics.as_mut() { - metrics.register_invalid_message(&raw_message.topic); - } + if let Some(metrics) = self.metrics.as_mut() { + metrics.register_invalid_message(&raw_message.topic); + } - if let Ok(message) = self.data_transform.inbound_transform(raw_message.clone()) { + let message = self.data_transform.inbound_transform(raw_message.clone()); + + match (&mut self.peer_score, message) { + (Some((peer_score, ..)), Ok(message)) => { let message_id = self.config.message_id(&message); peer_score.reject_message( @@ -1817,13 +1832,22 @@ where reject_reason, ); - gossip_promises.reject_message(&message_id, &reject_reason); - } else { + self.gossip_promises + .reject_message(&message_id, &reject_reason); + } + (Some((peer_score, ..)), Err(_)) => { // The message is invalid, we reject it ignoring any gossip promises. If a peer is // advertising this message via an IHAVE and it's invalid it will be double // penalized, one for sending us an invalid and again for breaking a promise. peer_score.reject_invalid_message(propagation_source, &raw_message.topic); } + (None, Ok(message)) => { + // Valid transformation without peer scoring + let message_id = self.config.message_id(&message); + self.gossip_promises + .reject_message(&message_id, &reject_reason); + } + (None, Err(_)) => {} } } @@ -1890,7 +1914,7 @@ where // if the mesh needs peers add the peer to the mesh if !self.explicit_peers.contains(propagation_source) - && matches!(peer.kind, PeerKind::Gossipsubv1_1 | PeerKind::Gossipsub) + && peer.kind.is_gossipsub() && !Self::score_below_threshold_from_scores( &self.peer_score, propagation_source, @@ -1994,8 +2018,8 @@ where /// Applies penalties to peers that did not respond to our IWANT requests. fn apply_iwant_penalties(&mut self) { - if let Some((peer_score, .., gossip_promises)) = &mut self.peer_score { - for (peer, count) in gossip_promises.get_broken_promises() { + if let Some((peer_score, ..)) = &mut self.peer_score { + for (peer, count) in self.gossip_promises.get_broken_promises() { peer_score.add_penalty(&peer, count); if let Some(metrics) = self.metrics.as_mut() { metrics.register_score_penalty(Penalty::BrokenPromise); @@ -2216,7 +2240,7 @@ where && peers.len() > 1 && self.peer_score.is_some() { - if let Some((_, thresholds, _, _)) = &self.peer_score { + if let Some((_, thresholds, _)) = &self.peer_score { // Opportunistic grafting works as follows: we check the median score of peers // in the mesh; if this score is below the opportunisticGraftThreshold, we // select a few peers at random with score over the median. @@ -2309,7 +2333,7 @@ where for (topic_hash, peers) in self.fanout.iter_mut() { let mut to_remove_peers = Vec::new(); let publish_threshold = match &self.peer_score { - Some((_, thresholds, _, _)) => thresholds.publish_threshold, + Some((_, thresholds, _)) => thresholds.publish_threshold, _ => 0.0, }; for peer_id in peers.iter() { @@ -2402,6 +2426,17 @@ where } self.failed_messages.shrink_to_fit(); + // Flush stale IDONTWANTs. + for peer in self.connected_peers.values_mut() { + while let Some((_front, instant)) = peer.dont_send.front() { + if (*instant + IDONTWANT_TIMEOUT) >= Instant::now() { + break; + } else { + peer.dont_send.pop_front(); + } + } + } + tracing::debug!("Completed Heartbeat"); if let Some(metrics) = self.metrics.as_mut() { let duration = u64::try_from(start.elapsed().as_millis()).unwrap_or(u64::MAX); @@ -2557,6 +2592,49 @@ where } } + /// Helper function which sends an IDONTWANT message to mesh\[topic\] peers. + fn send_idontwant( + &mut self, + message: &RawMessage, + msg_id: &MessageId, + propagation_source: &PeerId, + ) { + let Some(mesh_peers) = self.mesh.get(&message.topic) else { + return; + }; + + let iwant_peers = self.gossip_promises.peers_for_message(msg_id); + + let recipient_peers: Vec = mesh_peers + .iter() + .chain(iwant_peers.iter()) + .filter(|peer_id| { + *peer_id != propagation_source && Some(*peer_id) != message.source.as_ref() + }) + .cloned() + .collect(); + + for peer_id in recipient_peers { + let Some(peer) = self.connected_peers.get_mut(&peer_id) else { + tracing::error!(peer = %peer_id, + "Could not IDONTWANT, peer doesn't exist in connected peer list"); + continue; + }; + + // Only gossipsub 1.2 peers support IDONTWANT. + if peer.kind != PeerKind::Gossipsubv1_2 { + continue; + } + + self.send_message( + peer_id, + RpcOut::IDontWant(IDontWant { + message_ids: vec![msg_id.clone()], + }), + ); + } + } + /// Helper function which forwards a message to mesh\[topic\] peers. /// /// Returns true if at least one peer was messaged. @@ -2612,13 +2690,23 @@ where } // forward the message to peers - for peer in recipient_peers.iter() { - let event = RpcOut::Forward { - message: message.clone(), - timeout: Delay::new(self.config.forward_queue_duration()), - }; - tracing::debug!(%peer, message=%msg_id, "Sending message to peer"); - self.send_message(*peer, event); + for peer_id in recipient_peers.iter() { + if let Some(peer) = self.connected_peers.get_mut(peer_id) { + if peer.dont_send.contains_key(msg_id) { + tracing::debug!(%peer_id, message=%msg_id, "Peer doesn't want message"); + continue; + } + + tracing::debug!(%peer_id, message=%msg_id, "Sending message to peer"); + + self.send_message( + *peer_id, + RpcOut::Forward { + message: message.clone(), + timeout: Delay::new(self.config.forward_queue_duration()), + }, + ); + } } tracing::debug!("Completed forwarding message"); true @@ -2754,7 +2842,7 @@ where failed_messages.non_priority += 1; failed_messages.forward += 1; } - RpcOut::IWant(_) | RpcOut::IHave(_) => { + RpcOut::IWant(_) | RpcOut::IHave(_) | RpcOut::IDontWant(_) => { failed_messages.non_priority += 1; } RpcOut::Graft(_) @@ -2914,7 +3002,7 @@ where // If metrics are enabled, register the disconnection of a peer based on its protocol. if let Some(metrics) = self.metrics.as_mut() { - metrics.peer_protocol_disconnected(connected_peer.kind.clone()); + metrics.peer_protocol_disconnected(connected_peer.kind); } self.connected_peers.remove(&peer_id); @@ -2994,6 +3082,7 @@ where connections: vec![], sender: Sender::new(self.config.connection_handler_queue_len()), topics: Default::default(), + dont_send: LinkedHashMap::new(), }); // Add the new connection connected_peer.connections.push(connection_id); @@ -3020,6 +3109,7 @@ where connections: vec![], sender: Sender::new(self.config.connection_handler_queue_len()), topics: Default::default(), + dont_send: LinkedHashMap::new(), }); // Add the new connection connected_peer.connections.push(connection_id); @@ -3041,7 +3131,7 @@ where // We have identified the protocol this peer is using if let Some(metrics) = self.metrics.as_mut() { - metrics.peer_protocol_connected(kind.clone()); + metrics.peer_protocol_connected(kind); } if let PeerKind::NotSupported = kind { @@ -3069,7 +3159,7 @@ where } HandlerEvent::MessageDropped(rpc) => { // Account for this in the scoring logic - if let Some((peer_score, _, _, _)) = &mut self.peer_score { + if let Some((peer_score, _, _)) = &mut self.peer_score { peer_score.failed_message_slow_peer(&propagation_source); } @@ -3177,6 +3267,24 @@ where peers, backoff, }) => prune_msgs.push((topic_hash, peers, backoff)), + ControlAction::IDontWant(IDontWant { message_ids }) => { + let Some(peer) = self.connected_peers.get_mut(&propagation_source) + else { + tracing::error!(peer = %propagation_source, + "Could not handle IDONTWANT, peer doesn't exist in connected peer list"); + continue; + }; + if let Some(metrics) = self.metrics.as_mut() { + metrics.register_idontwant(message_ids.len()); + } + for message_id in message_ids { + peer.dont_send.insert(message_id, Instant::now()); + // Don't exceed capacity. + if peer.dont_send.len() > IDONTWANT_CAP { + peer.dont_send.pop_front(); + } + } + } } } if !ihave_msgs.is_empty() { @@ -3202,7 +3310,7 @@ where } // update scores - if let Some((peer_score, _, delay, _)) = &mut self.peer_score { + if let Some((peer_score, _, delay)) = &mut self.peer_score { if delay.poll_unpin(cx).is_ready() { peer_score.refresh_scores(); delay.reset(peer_score.params.decay_interval); @@ -3329,7 +3437,7 @@ fn get_random_peers_dynamic( .iter() .filter(|(_, p)| p.topics.contains(topic_hash)) .filter(|(peer_id, _)| f(peer_id)) - .filter(|(_, p)| p.kind == PeerKind::Gossipsub || p.kind == PeerKind::Gossipsubv1_1) + .filter(|(_, p)| p.kind.is_gossipsub()) .map(|(peer_id, _)| *peer_id) .collect::>(); diff --git a/protocols/gossipsub/src/behaviour/tests.rs b/protocols/gossipsub/src/behaviour/tests.rs index bf3046da78b..f3d24897b0c 100644 --- a/protocols/gossipsub/src/behaviour/tests.rs +++ b/protocols/gossipsub/src/behaviour/tests.rs @@ -33,13 +33,7 @@ use crate::{ }; #[derive(Default, Debug)] -struct InjectNodes -// TODO: remove trait bound Default when this issue is fixed: -// https://github.com/colin-kiegel/rust-derive-builder/issues/93 -where - D: DataTransform + Default + Clone + Send + 'static, - F: TopicSubscriptionFilter + Clone + Default + Send + 'static, -{ +struct InjectNodes { peer_no: usize, topics: Vec, to_subscribe: bool, @@ -49,6 +43,7 @@ where scoring: Option<(PeerScoreParams, PeerScoreThresholds)>, data_transform: D, subscription_filter: F, + peer_kind: Option, } impl InjectNodes @@ -96,7 +91,7 @@ where let empty = vec![]; for i in 0..self.peer_no { - let (peer, receiver) = add_peer( + let (peer, receiver) = add_peer_with_addr_and_kind( &mut gs, if self.to_subscribe { &topic_hashes @@ -105,6 +100,8 @@ where }, i < self.outbound, i < self.explicit, + Multiaddr::empty(), + self.peer_kind.or(Some(PeerKind::Gossipsubv1_1)), ); peers.push(peer); receivers.insert(peer, receiver); @@ -153,6 +150,11 @@ where self.subscription_filter = subscription_filter; self } + + fn peer_kind(mut self, peer_kind: PeerKind) -> Self { + self.peer_kind = Some(peer_kind); + self + } } fn inject_nodes() -> InjectNodes @@ -235,10 +237,11 @@ where gs.connected_peers.insert( peer, PeerConnections { - kind: kind.clone().unwrap_or(PeerKind::Floodsub), + kind: kind.unwrap_or(PeerKind::Floodsub), connections: vec![connection_id], topics: Default::default(), sender, + dont_send: LinkedHashMap::new(), }, ); @@ -625,6 +628,7 @@ fn test_join() { connections: vec![connection_id], topics: Default::default(), sender, + dont_send: LinkedHashMap::new(), }, ); receivers.insert(random_peer, receiver); @@ -1020,6 +1024,7 @@ fn test_get_random_peers() { connections: vec![ConnectionId::new_unchecked(0)], topics: topics.clone(), sender: Sender::new(gs.config.connection_handler_queue_len()), + dont_send: LinkedHashMap::new(), }, ); } @@ -4614,9 +4619,9 @@ fn test_ignore_too_many_messages_in_ihave() { let (peer, receiver) = add_peer(&mut gs, &topics, false, false); receivers.insert(peer, receiver); - // peer has 20 messages + // peer has 30 messages let mut seq = 0; - let message_ids: Vec<_> = (0..20) + let message_ids: Vec<_> = (0..30) .map(|_| random_message(&mut seq, &topics)) .map(|msg| gs.data_transform.inbound_transform(msg).unwrap()) .map(|msg| config.message_id(&msg)) @@ -4658,7 +4663,7 @@ fn test_ignore_too_many_messages_in_ihave() { gs.heartbeat(); gs.handle_ihave( &peer, - vec![(topics[0].clone(), message_ids[10..20].to_vec())], + vec![(topics[0].clone(), message_ids[20..30].to_vec())], ); // we sent 10 iwant messages ids via a IWANT rpc. @@ -5266,6 +5271,194 @@ fn test_graft_without_subscribe() { let _ = gs.unsubscribe(&Topic::new(topic)); } +/// Test that a node sends IDONTWANT messages to the mesh peers +/// that run Gossipsub v1.2. +#[test] +fn sends_idontwant() { + let (mut gs, peers, receivers, topic_hashes) = inject_nodes1() + .peer_no(5) + .topics(vec![String::from("topic1")]) + .to_subscribe(true) + .gs_config(Config::default()) + .explicit(1) + .peer_kind(PeerKind::Gossipsubv1_2) + .create_network(); + + let local_id = PeerId::random(); + + let message = RawMessage { + source: Some(peers[1]), + data: vec![12], + sequence_number: Some(0), + topic: topic_hashes[0].clone(), + signature: None, + key: None, + validated: true, + }; + gs.handle_received_message(message.clone(), &local_id); + assert_eq!( + receivers + .into_iter() + .fold(0, |mut idontwants, (peer_id, c)| { + let non_priority = c.non_priority.get_ref(); + while !non_priority.is_empty() { + if let Ok(RpcOut::IDontWant(_)) = non_priority.try_recv() { + assert_ne!(peer_id, peers[1]); + idontwants += 1; + } + } + idontwants + }), + 3, + "IDONTWANT was not sent" + ); +} + +/// Test that a node doesn't send IDONTWANT messages to the mesh peers +/// that don't run Gossipsub v1.2. +#[test] +fn doesnt_send_idontwant() { + let (mut gs, peers, receivers, topic_hashes) = inject_nodes1() + .peer_no(5) + .topics(vec![String::from("topic1")]) + .to_subscribe(true) + .gs_config(Config::default()) + .explicit(1) + .peer_kind(PeerKind::Gossipsubv1_1) + .create_network(); + + let local_id = PeerId::random(); + + let message = RawMessage { + source: Some(peers[1]), + data: vec![12], + sequence_number: Some(0), + topic: topic_hashes[0].clone(), + signature: None, + key: None, + validated: true, + }; + gs.handle_received_message(message.clone(), &local_id); + assert_eq!( + receivers + .into_iter() + .fold(0, |mut idontwants, (peer_id, c)| { + let non_priority = c.non_priority.get_ref(); + while !non_priority.is_empty() { + if matches!(non_priority.try_recv(), Ok(RpcOut::IDontWant(_)) if peer_id != peers[1]) { + idontwants += 1; + } + } + idontwants + }), + 0, + "IDONTWANT were sent" + ); +} + +/// Test that a node doesn't forward a messages to the mesh peers +/// that sent IDONTWANT. +#[test] +fn doesnt_forward_idontwant() { + let (mut gs, peers, receivers, topic_hashes) = inject_nodes1() + .peer_no(4) + .topics(vec![String::from("topic1")]) + .to_subscribe(true) + .gs_config(Config::default()) + .explicit(1) + .peer_kind(PeerKind::Gossipsubv1_2) + .create_network(); + + let local_id = PeerId::random(); + + let raw_message = RawMessage { + source: Some(peers[1]), + data: vec![12], + sequence_number: Some(0), + topic: topic_hashes[0].clone(), + signature: None, + key: None, + validated: true, + }; + let message = gs + .data_transform + .inbound_transform(raw_message.clone()) + .unwrap(); + let message_id = gs.config.message_id(&message); + let peer = gs.connected_peers.get_mut(&peers[2]).unwrap(); + peer.dont_send.insert(message_id, Instant::now()); + + gs.handle_received_message(raw_message.clone(), &local_id); + assert_eq!( + receivers.into_iter().fold(0, |mut fwds, (peer_id, c)| { + let non_priority = c.non_priority.get_ref(); + while !non_priority.is_empty() { + if let Ok(RpcOut::Forward { .. }) = non_priority.try_recv() { + assert_ne!(peer_id, peers[2]); + fwds += 1; + } + } + fwds + }), + 2, + "IDONTWANT was not sent" + ); +} + +/// Test that a node parses an +/// IDONTWANT message to the respective peer. +#[test] +fn parses_idontwant() { + let (mut gs, peers, _receivers, _topic_hashes) = inject_nodes1() + .peer_no(2) + .topics(vec![String::from("topic1")]) + .to_subscribe(true) + .gs_config(Config::default()) + .explicit(1) + .peer_kind(PeerKind::Gossipsubv1_2) + .create_network(); + + let message_id = MessageId::new(&[0, 1, 2, 3]); + let rpc = Rpc { + messages: vec![], + subscriptions: vec![], + control_msgs: vec![ControlAction::IDontWant(IDontWant { + message_ids: vec![message_id.clone()], + })], + }; + gs.on_connection_handler_event( + peers[1], + ConnectionId::new_unchecked(0), + HandlerEvent::Message { + rpc, + invalid_messages: vec![], + }, + ); + let peer = gs.connected_peers.get_mut(&peers[1]).unwrap(); + assert!(peer.dont_send.get(&message_id).is_some()); +} + +/// Test that a node clears stale IDONTWANT messages. +#[test] +fn clear_stale_idontwant() { + let (mut gs, peers, _receivers, _topic_hashes) = inject_nodes1() + .peer_no(4) + .topics(vec![String::from("topic1")]) + .to_subscribe(true) + .gs_config(Config::default()) + .explicit(1) + .peer_kind(PeerKind::Gossipsubv1_2) + .create_network(); + + let peer = gs.connected_peers.get_mut(&peers[2]).unwrap(); + peer.dont_send + .insert(MessageId::new(&[1, 2, 3, 4]), Instant::now()); + std::thread::sleep(Duration::from_secs(3)); + gs.heartbeat(); + let peer = gs.connected_peers.get_mut(&peers[2]).unwrap(); + assert!(peer.dont_send.is_empty()); +} + #[test] fn test_all_queues_full() { let gs_config = ConfigBuilder::default() @@ -5289,6 +5482,7 @@ fn test_all_queues_full() { connections: vec![ConnectionId::new_unchecked(0)], topics: topics.clone(), sender: Sender::new(2), + dont_send: LinkedHashMap::new(), }, ); @@ -5323,6 +5517,7 @@ fn test_slow_peer_returns_failed_publish() { connections: vec![ConnectionId::new_unchecked(0)], topics: topics.clone(), sender: Sender::new(2), + dont_send: LinkedHashMap::new(), }, ); let peer_id = PeerId::random(); @@ -5334,6 +5529,7 @@ fn test_slow_peer_returns_failed_publish() { connections: vec![ConnectionId::new_unchecked(0)], topics: topics.clone(), sender: Sender::new(gs.config.connection_handler_queue_len()), + dont_send: LinkedHashMap::new(), }, ); @@ -5386,7 +5582,6 @@ fn test_slow_peer_returns_failed_ihave_handling() { topics.insert(topic_hash.clone()); let slow_peer_id = PeerId::random(); - peers.push(slow_peer_id); gs.connected_peers.insert( slow_peer_id, PeerConnections { @@ -5394,6 +5589,7 @@ fn test_slow_peer_returns_failed_ihave_handling() { connections: vec![ConnectionId::new_unchecked(0)], topics: topics.clone(), sender: Sender::new(2), + dont_send: LinkedHashMap::new(), }, ); peers.push(slow_peer_id); @@ -5409,9 +5605,11 @@ fn test_slow_peer_returns_failed_ihave_handling() { connections: vec![ConnectionId::new_unchecked(0)], topics: topics.clone(), sender: Sender::new(gs.config.connection_handler_queue_len()), + dont_send: LinkedHashMap::new(), }, ); + // First message. let publish_data = vec![1; 59]; let transformed = gs .data_transform @@ -5431,6 +5629,22 @@ fn test_slow_peer_returns_failed_ihave_handling() { &slow_peer_id, vec![(topic_hash.clone(), vec![msg_id.clone()])], ); + + // Second message. + let publish_data = vec![2; 59]; + let transformed = gs + .data_transform + .outbound_transform(&topic_hash, publish_data.clone()) + .unwrap(); + let raw_message = gs + .build_raw_message(topic_hash.clone(), transformed) + .unwrap(); + let msg_id = gs.config.message_id(&Message { + source: raw_message.source, + data: publish_data, + sequence_number: raw_message.sequence_number, + topic: raw_message.topic.clone(), + }); gs.handle_ihave(&slow_peer_id, vec![(topic_hash, vec![msg_id.clone()])]); gs.heartbeat(); @@ -5487,6 +5701,7 @@ fn test_slow_peer_returns_failed_iwant_handling() { connections: vec![ConnectionId::new_unchecked(0)], topics: topics.clone(), sender: Sender::new(2), + dont_send: LinkedHashMap::new(), }, ); peers.push(slow_peer_id); @@ -5502,6 +5717,7 @@ fn test_slow_peer_returns_failed_iwant_handling() { connections: vec![ConnectionId::new_unchecked(0)], topics: topics.clone(), sender: Sender::new(gs.config.connection_handler_queue_len()), + dont_send: LinkedHashMap::new(), }, ); @@ -5577,6 +5793,7 @@ fn test_slow_peer_returns_failed_forward() { connections: vec![ConnectionId::new_unchecked(0)], topics: topics.clone(), sender: Sender::new(2), + dont_send: LinkedHashMap::new(), }, ); peers.push(slow_peer_id); @@ -5592,6 +5809,7 @@ fn test_slow_peer_returns_failed_forward() { connections: vec![ConnectionId::new_unchecked(0)], topics: topics.clone(), sender: Sender::new(gs.config.connection_handler_queue_len()), + dont_send: LinkedHashMap::new(), }, ); @@ -5672,6 +5890,7 @@ fn test_slow_peer_is_downscored_on_publish() { connections: vec![ConnectionId::new_unchecked(0)], topics: topics.clone(), sender: Sender::new(2), + dont_send: LinkedHashMap::new(), }, ); gs.peer_score.as_mut().unwrap().0.add_peer(slow_peer_id); @@ -5684,6 +5903,7 @@ fn test_slow_peer_is_downscored_on_publish() { connections: vec![ConnectionId::new_unchecked(0)], topics: topics.clone(), sender: Sender::new(gs.config.connection_handler_queue_len()), + dont_send: LinkedHashMap::new(), }, ); diff --git a/protocols/gossipsub/src/generated/gossipsub/pb.rs b/protocols/gossipsub/src/generated/gossipsub/pb.rs index 9a074fd61fc..24ac80d2755 100644 --- a/protocols/gossipsub/src/generated/gossipsub/pb.rs +++ b/protocols/gossipsub/src/generated/gossipsub/pb.rs @@ -154,6 +154,7 @@ pub struct ControlMessage { pub iwant: Vec, pub graft: Vec, pub prune: Vec, + pub idontwant: Vec, } impl<'a> MessageRead<'a> for ControlMessage { @@ -165,6 +166,7 @@ impl<'a> MessageRead<'a> for ControlMessage { Ok(18) => msg.iwant.push(r.read_message::(bytes)?), Ok(26) => msg.graft.push(r.read_message::(bytes)?), Ok(34) => msg.prune.push(r.read_message::(bytes)?), + Ok(42) => msg.idontwant.push(r.read_message::(bytes)?), Ok(t) => { r.read_unknown(bytes, t)?; } Err(e) => return Err(e), } @@ -180,6 +182,7 @@ impl MessageWrite for ControlMessage { + self.iwant.iter().map(|s| 1 + sizeof_len((s).get_size())).sum::() + self.graft.iter().map(|s| 1 + sizeof_len((s).get_size())).sum::() + self.prune.iter().map(|s| 1 + sizeof_len((s).get_size())).sum::() + + self.idontwant.iter().map(|s| 1 + sizeof_len((s).get_size())).sum::() } fn write_message(&self, w: &mut Writer) -> Result<()> { @@ -187,6 +190,7 @@ impl MessageWrite for ControlMessage { for s in &self.iwant { w.write_with_tag(18, |w| w.write_message(s))?; } for s in &self.graft { w.write_with_tag(26, |w| w.write_message(s))?; } for s in &self.prune { w.write_with_tag(34, |w| w.write_message(s))?; } + for s in &self.idontwant { w.write_with_tag(42, |w| w.write_message(s))?; } Ok(()) } } @@ -331,6 +335,38 @@ impl MessageWrite for ControlPrune { } } +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Debug, Default, PartialEq, Clone)] +pub struct ControlIDontWant { + pub message_ids: Vec>, +} + +impl<'a> MessageRead<'a> for ControlIDontWant { + fn from_reader(r: &mut BytesReader, bytes: &'a [u8]) -> Result { + let mut msg = Self::default(); + while !r.is_eof() { + match r.next_tag(bytes) { + Ok(10) => msg.message_ids.push(r.read_bytes(bytes)?.to_owned()), + Ok(t) => { r.read_unknown(bytes, t)?; } + Err(e) => return Err(e), + } + } + Ok(msg) + } +} + +impl MessageWrite for ControlIDontWant { + fn get_size(&self) -> usize { + 0 + + self.message_ids.iter().map(|s| 1 + sizeof_len((s).len())).sum::() + } + + fn write_message(&self, w: &mut Writer) -> Result<()> { + for s in &self.message_ids { w.write_with_tag(10, |w| w.write_bytes(&**s))?; } + Ok(()) + } +} + #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Debug, Default, PartialEq, Clone)] pub struct PeerInfo { diff --git a/protocols/gossipsub/src/generated/rpc.proto b/protocols/gossipsub/src/generated/rpc.proto index 2ce12f3f37f..fe4d3bc9366 100644 --- a/protocols/gossipsub/src/generated/rpc.proto +++ b/protocols/gossipsub/src/generated/rpc.proto @@ -19,8 +19,8 @@ message Message { optional bytes data = 2; optional bytes seqno = 3; required string topic = 4; - optional bytes signature = 5; - optional bytes key = 6; + optional bytes signature = 5; + optional bytes key = 6; } message ControlMessage { @@ -28,6 +28,7 @@ message ControlMessage { repeated ControlIWant iwant = 2; repeated ControlGraft graft = 3; repeated ControlPrune prune = 4; + repeated ControlIDontWant idontwant = 5; } message ControlIHave { @@ -49,6 +50,10 @@ message ControlPrune { optional uint64 backoff = 3; // gossipsub v1.1 backoff time (in seconds) } +message ControlIDontWant { + repeated bytes message_ids = 1; +} + message PeerInfo { optional bytes peer_id = 1; optional bytes signed_peer_record = 2; diff --git a/protocols/gossipsub/src/gossip_promises.rs b/protocols/gossipsub/src/gossip_promises.rs index b64811bb062..284ba7cab01 100644 --- a/protocols/gossipsub/src/gossip_promises.rs +++ b/protocols/gossipsub/src/gossip_promises.rs @@ -41,6 +41,14 @@ impl GossipPromises { self.promises.contains_key(message) } + /// Get the peers we sent IWANT the input message id. + pub(crate) fn peers_for_message(&self, message_id: &MessageId) -> Vec { + self.promises + .get(message_id) + .map(|peers| peers.keys().copied().collect()) + .unwrap_or_default() + } + /// Track a promise to deliver a message from a list of [`MessageId`]s we are requesting. pub(crate) fn add_promise(&mut self, peer: PeerId, messages: &[MessageId], expires: Instant) { for message_id in messages { diff --git a/protocols/gossipsub/src/handler.rs b/protocols/gossipsub/src/handler.rs index 2936182c3f8..e66b606896b 100644 --- a/protocols/gossipsub/src/handler.rs +++ b/protocols/gossipsub/src/handler.rs @@ -229,7 +229,7 @@ impl EnabledHandler { if let Some(peer_kind) = self.peer_kind.as_ref() { self.peer_kind_sent = true; return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour( - HandlerEvent::PeerKind(peer_kind.clone()), + HandlerEvent::PeerKind(*peer_kind), )); } } diff --git a/protocols/gossipsub/src/metrics.rs b/protocols/gossipsub/src/metrics.rs index 2519da64b73..42dedc000b7 100644 --- a/protocols/gossipsub/src/metrics.rs +++ b/protocols/gossipsub/src/metrics.rs @@ -187,6 +187,12 @@ pub(crate) struct Metrics { /// topic. A very high metric might indicate an underperforming network. topic_iwant_msgs: Family, + /// The number of times we have received an IDONTWANT control message. + idontwant_msgs: Counter, + + /// The number of msg_id's we have received in every IDONTWANT control message. + idontwant_msgs_ids: Counter, + /// The size of the priority queue. priority_queue_size: Histogram, /// The size of the non-priority queue. @@ -324,6 +330,27 @@ impl Metrics { "topic_iwant_msgs", "Number of times we have decided an IWANT is required for this topic" ); + + let idontwant_msgs = { + let metric = Counter::default(); + registry.register( + "idontwant_msgs", + "The number of times we have received an IDONTWANT control message", + metric.clone(), + ); + metric + }; + + let idontwant_msgs_ids = { + let metric = Counter::default(); + registry.register( + "idontwant_msgs_ids", + "The number of msg_id's we have received in every total of all IDONTWANT control message.", + metric.clone(), + ); + metric + }; + let memcache_misses = { let metric = Counter::default(); registry.register( @@ -376,6 +403,8 @@ impl Metrics { heartbeat_duration, memcache_misses, topic_iwant_msgs, + idontwant_msgs, + idontwant_msgs_ids, priority_queue_size, non_priority_queue_size, } @@ -574,6 +603,12 @@ impl Metrics { } } + /// Register receiving an IDONTWANT msg for this topic. + pub(crate) fn register_idontwant(&mut self, msgs: usize) { + self.idontwant_msgs.inc(); + self.idontwant_msgs_ids.inc_by(msgs as u64); + } + /// Observes a heartbeat duration. pub(crate) fn observe_heartbeat_duration(&mut self, millis: u64) { self.heartbeat_duration.observe(millis as f64); diff --git a/protocols/gossipsub/src/protocol.rs b/protocols/gossipsub/src/protocol.rs index e4272737342..7ee6d5c8245 100644 --- a/protocols/gossipsub/src/protocol.rs +++ b/protocols/gossipsub/src/protocol.rs @@ -35,14 +35,19 @@ use crate::{ rpc_proto::proto, topic::TopicHash, types::{ - ControlAction, Graft, IHave, IWant, MessageId, PeerInfo, PeerKind, Prune, RawMessage, Rpc, - Subscription, SubscriptionAction, + ControlAction, Graft, IDontWant, IHave, IWant, MessageId, PeerInfo, PeerKind, Prune, + RawMessage, Rpc, Subscription, SubscriptionAction, }, ValidationError, }; pub(crate) const SIGNING_PREFIX: &[u8] = b"libp2p-pubsub:"; +pub(crate) const GOSSIPSUB_1_2_0_PROTOCOL: ProtocolId = ProtocolId { + protocol: StreamProtocol::new("/meshsub/1.2.0"), + kind: PeerKind::Gossipsubv1_2, +}; + pub(crate) const GOSSIPSUB_1_1_0_PROTOCOL: ProtocolId = ProtocolId { protocol: StreamProtocol::new("/meshsub/1.1.0"), kind: PeerKind::Gossipsubv1_1, @@ -72,7 +77,11 @@ impl Default for ProtocolConfig { Self { max_transmit_size: 65536, validation_mode: ValidationMode::Strict, - protocol_ids: vec![GOSSIPSUB_1_1_0_PROTOCOL, GOSSIPSUB_1_0_0_PROTOCOL], + protocol_ids: vec![ + GOSSIPSUB_1_2_0_PROTOCOL, + GOSSIPSUB_1_1_0_PROTOCOL, + GOSSIPSUB_1_0_0_PROTOCOL, + ], } } } @@ -479,10 +488,25 @@ impl Decoder for GossipsubCodec { })); } + let idontwant_msgs: Vec = rpc_control + .idontwant + .into_iter() + .map(|idontwant| { + ControlAction::IDontWant(IDontWant { + message_ids: idontwant + .message_ids + .into_iter() + .map(MessageId::from) + .collect::>(), + }) + }) + .collect(); + control_msgs.extend(ihave_msgs); control_msgs.extend(iwant_msgs); control_msgs.extend(graft_msgs); control_msgs.extend(prune_msgs); + control_msgs.extend(idontwant_msgs); } Ok(Some(HandlerEvent::Message { diff --git a/protocols/gossipsub/src/rpc.rs b/protocols/gossipsub/src/rpc.rs index b5f05c7b2e5..41b338267e9 100644 --- a/protocols/gossipsub/src/rpc.rs +++ b/protocols/gossipsub/src/rpc.rs @@ -89,7 +89,7 @@ impl Sender { | RpcOut::Prune(_) | RpcOut::Subscribe(_) | RpcOut::Unsubscribe(_) => &self.priority_sender, - RpcOut::Forward { .. } | RpcOut::IHave(_) | RpcOut::IWant(_) => { + RpcOut::Forward { .. } | RpcOut::IHave(_) | RpcOut::IWant(_) | RpcOut::IDontWant(_) => { &self.non_priority_sender } }; diff --git a/protocols/gossipsub/src/types.rs b/protocols/gossipsub/src/types.rs index bcb1f279ae5..6681eca1d93 100644 --- a/protocols/gossipsub/src/types.rs +++ b/protocols/gossipsub/src/types.rs @@ -22,12 +22,14 @@ use std::{collections::BTreeSet, fmt, fmt::Debug}; use futures_timer::Delay; +use hashlink::LinkedHashMap; use libp2p_identity::PeerId; use libp2p_swarm::ConnectionId; use prometheus_client::encoding::EncodeLabelValue; use quick_protobuf::MessageWrite; #[cfg(feature = "serde")] use serde::{Deserialize, Serialize}; +use web_time::Instant; use crate::{rpc::Sender, rpc_proto::proto, TopicHash}; @@ -109,11 +111,15 @@ pub(crate) struct PeerConnections { pub(crate) topics: BTreeSet, /// The rpc sender to the connection handler(s). pub(crate) sender: Sender, + /// Don't send messages. + pub(crate) dont_send: LinkedHashMap, } /// Describes the types of peers that can exist in the gossipsub context. -#[derive(Debug, Clone, PartialEq, Hash, EncodeLabelValue, Eq)] +#[derive(Debug, Clone, Copy, PartialEq, Hash, EncodeLabelValue, Eq)] pub enum PeerKind { + /// A gossipsub 1.2 peer. + Gossipsubv1_2, /// A gossipsub 1.1 peer. Gossipsubv1_1, /// A gossipsub 1.0 peer. @@ -149,6 +155,16 @@ pub struct RawMessage { pub validated: bool, } +impl PeerKind { + /// Returns true if peer speaks any gossipsub version. + pub(crate) fn is_gossipsub(&self) -> bool { + matches!( + self, + Self::Gossipsubv1_2 | Self::Gossipsubv1_1 | Self::Gossipsub + ) + } +} + impl RawMessage { /// Calculates the encoded length of this message (used for calculating metrics). pub fn raw_protobuf_len(&self) -> usize { @@ -246,6 +262,9 @@ pub enum ControlAction { Graft(Graft), /// The node has been removed from the mesh - Prune control message. Prune(Prune), + /// The node requests us to not forward message ids (peer_id + sequence _number) - IDontWant + /// control message. + IDontWant(IDontWant), } /// Node broadcasts known messages per topic - IHave control message. @@ -282,6 +301,13 @@ pub struct Prune { pub(crate) backoff: Option, } +/// The node requests us to not forward message ids - IDontWant control message. +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +pub struct IDontWant { + /// A list of known message ids. + pub(crate) message_ids: Vec, +} + /// A Gossipsub RPC message sent. #[derive(Debug)] pub enum RpcOut { @@ -303,6 +329,9 @@ pub enum RpcOut { IHave(IHave), /// Send a IWant control message. IWant(IWant), + /// The node requests us to not forward message ids (peer_id + sequence _number) - IDontWant + /// control message. + IDontWant(IDontWant), } impl RpcOut { @@ -363,6 +392,7 @@ impl From for proto::RPC { iwant: vec![], graft: vec![], prune: vec![], + idontwant: vec![], }), }, RpcOut::IWant(IWant { message_ids }) => proto::RPC { @@ -375,6 +405,7 @@ impl From for proto::RPC { }], graft: vec![], prune: vec![], + idontwant: vec![], }), }, RpcOut::Graft(Graft { topic_hash }) => proto::RPC { @@ -387,6 +418,7 @@ impl From for proto::RPC { topic_id: Some(topic_hash.into_string()), }], prune: vec![], + idontwant: vec![], }), }, RpcOut::Prune(Prune { @@ -413,9 +445,23 @@ impl From for proto::RPC { .collect(), backoff, }], + idontwant: vec![], }), } } + RpcOut::IDontWant(IDontWant { message_ids }) => proto::RPC { + publish: Vec::new(), + subscriptions: Vec::new(), + control: Some(proto::ControlMessage { + ihave: vec![], + iwant: vec![], + graft: vec![], + prune: vec![], + idontwant: vec![proto::ControlIDontWant { + message_ids: message_ids.into_iter().map(|msg_id| msg_id.0).collect(), + }], + }), + }, } } } @@ -474,6 +520,7 @@ impl From for proto::RPC { iwant: Vec::new(), graft: Vec::new(), prune: Vec::new(), + idontwant: Vec::new(), }; let empty_control_msg = rpc.control_msgs.is_empty(); @@ -522,6 +569,12 @@ impl From for proto::RPC { }; control.prune.push(rpc_prune); } + ControlAction::IDontWant(IDontWant { message_ids }) => { + let rpc_idontwant = proto::ControlIDontWant { + message_ids: message_ids.into_iter().map(|msg_id| msg_id.0).collect(), + }; + control.idontwant.push(rpc_idontwant); + } } } @@ -560,6 +613,7 @@ impl PeerKind { Self::Floodsub => "Floodsub", Self::Gossipsub => "Gossipsub v1.0", Self::Gossipsubv1_1 => "Gossipsub v1.1", + Self::Gossipsubv1_2 => "Gossipsub v1.2", } } }