diff --git a/protocols/gossipsub/src/behaviour.rs b/protocols/gossipsub/src/behaviour.rs index 5b5e3b71793..e1e5ecbc25e 100644 --- a/protocols/gossipsub/src/behaviour.rs +++ b/protocols/gossipsub/src/behaviour.rs @@ -31,6 +31,7 @@ use std::{ use futures::StreamExt; use futures_ticker::Ticker; +use futures_timer::Delay; use prometheus_client::registry::Registry; use rand::{seq::SliceRandom, thread_rng}; @@ -57,7 +58,7 @@ use crate::types::{ ControlAction, Message, MessageAcceptance, MessageId, PeerInfo, RawMessage, Subscription, SubscriptionAction, }; -use crate::types::{PeerConnections, PeerKind, RpcOut}; +use crate::types::{PeerConnections, PeerKind, RpcOut, RpcOutKind}; use crate::{backoff::BackoffStorage, FailedMessages}; use crate::{ config::{Config, ValidationMode}, @@ -538,9 +539,10 @@ where } // send subscription request to all peers - for (peer_id, peer) in self.connected_peers.iter_mut() { + for peer_id in self.connected_peers.keys().copied().collect::>() { tracing::debug!(%peer_id, "Sending SUBSCRIBE to peer"); - peer.sender.subscribe(topic_hash.clone()); + let event = RpcOut::Subscribe(topic_hash.clone()); + self.send_message(peer_id, event); } // call JOIN(topic) @@ -564,9 +566,10 @@ where } // announce to all peers - for (peer_id, peer) in self.connected_peers.iter_mut() { - tracing::debug!(%peer_id, "Sending UNSUBSCRIBE to peer"); - peer.sender.unsubscribe(topic_hash.clone()); + for peer in self.connected_peers.keys().copied().collect::>() { + tracing::debug!(%peer, "Sending UNSUBSCRIBE to peer"); + let event = RpcOut::Unsubscribe(topic_hash.clone()); + self.send_message(peer, event); } // call LEAVE(topic) @@ -734,29 +737,15 @@ where // Send to peers we know are subscribed to the topic. let mut publish_failed = true; for peer_id in recipient_peers.iter() { - if let Some(peer) = self.connected_peers.get_mut(peer_id) { - tracing::trace!(peer=%peer_id, "Sending message to peer"); - match peer.sender.publish( - raw_message.clone(), - self.config.publish_queue_duration(), - self.metrics.as_mut(), - ) { - Ok(_) => publish_failed = false, - Err(_) => { - let failed_messages = self.failed_messages.entry(*peer_id).or_default(); - failed_messages.priority += 1; - failed_messages.publish += 1; - - tracing::warn!(peer_id=%peer_id, "Publish queue full. Could not publish to peer"); - // Downscore the peer due to failed message. - if let Some((peer_score, ..)) = &mut self.peer_score { - peer_score.failed_message_slow_peer(peer_id); - } - } - } - } else { - tracing::error!(peer_id = %peer_id, - "Could not PUBLISH, peer doesn't exist in connected peer list"); + tracing::trace!(peer=%peer_id, "Sending message to peer"); + if self.send_message( + *peer_id, + RpcOut::Publish { + message: raw_message.clone(), + timeout: Delay::new(self.config.publish_queue_duration()), + }, + ) { + publish_failed = false } } @@ -1041,18 +1030,12 @@ where for peer_id in added_peers { // Send a GRAFT control message - if let Some((peer_score, ..)) = &mut self.peer_score { - peer_score.graft(&peer_id, topic_hash.clone()); - } - if let Some(peer) = &mut self.connected_peers.get_mut(&peer_id) { - tracing::debug!(peer=%peer_id, "JOIN: Sending Graft message to peer"); - peer.sender.graft(Graft { + self.send_message( + peer_id, + RpcOut::Graft(Graft { topic_hash: topic_hash.clone(), - }); - } else { - tracing::error!(peer = %peer_id, - "Could not GRAFT, peer doesn't exist in connected peer list"); - } + }), + ); // If the peer did not previously exist in any mesh, inform the handler peer_added_to_mesh( @@ -1145,13 +1128,7 @@ where for peer_id in peers { // Send a PRUNE control message let prune = self.make_prune(topic_hash, &peer_id, self.config.do_px(), true); - if let Some(peer) = &mut self.connected_peers.get_mut(&peer_id) { - tracing::debug!(%peer_id, "LEAVE: Sending PRUNE to peer"); - peer.sender.prune(prune); - } else { - tracing::error!(peer = %peer_id, - "Could not PRUNE, peer doesn't exist in connected peer list"); - } + self.send_message(peer_id, RpcOut::Prune(prune)); // If the peer did not previously exist in any mesh, inform the handler peer_removed_from_mesh( @@ -1306,35 +1283,12 @@ where Instant::now() + self.config.iwant_followup_time(), ); } - if let Some(peer) = &mut self.connected_peers.get_mut(peer_id) { - tracing::trace!( - peer=%peer_id, - "IHAVE: Asking for the following messages from peer: {:?}", - iwant_ids_vec - ); - - if peer - .sender - .iwant(IWant { - message_ids: iwant_ids_vec, - }) - .is_err() - { - tracing::warn!(peer=%peer_id, "Send Queue full. Could not send IWANT"); - - if let Some((peer_score, ..)) = &mut self.peer_score { - peer_score.failed_message_slow_peer(peer_id); - } - // Increment failed message count - self.failed_messages - .entry(*peer_id) - .or_default() - .non_priority += 1; - } - } else { - tracing::error!(peer = %peer_id, - "Could not IWANT, peer doesn't exist in connected peer list"); - } + self.send_message( + *peer_id, + RpcOut::IWant(IWant { + message_ids: iwant_ids_vec, + }), + ); } tracing::trace!(peer=%peer_id, "Completed IHAVE handling for peer"); } @@ -1368,29 +1322,15 @@ where message=%id, "IWANT: Peer has asked for message too many times; ignoring request" ); - } else if let Some(peer) = &mut self.connected_peers.get_mut(peer_id) { - tracing::debug!(peer=%peer_id, "IWANT: Sending cached messages to peer"); - if peer - .sender - .forward( - msg, - self.config.forward_queue_duration(), - self.metrics.as_mut(), - ) - .is_err() - { - // Downscore the peer - if let Some((peer_score, ..)) = &mut self.peer_score { - peer_score.failed_message_slow_peer(peer_id); - } - // Increment the failed message count - let failed_messages = self.failed_messages.entry(*peer_id).or_default(); - failed_messages.forward += 1; - failed_messages.non_priority += 1; - } } else { - tracing::error!(peer = %peer_id, - "Could not IWANT, peer doesn't exist in connected peer list"); + tracing::debug!(peer=%peer_id, "IWANT: Sending cached messages to peer"); + self.send_message( + *peer_id, + RpcOut::Forward { + message: msg, + timeout: Delay::new(self.config.forward_queue_duration()), + }, + ); } } } @@ -1406,13 +1346,14 @@ where let mut do_px = self.config.do_px(); + let Some(connected_peer) = self.connected_peers.get_mut(peer_id) else { + tracing::error!(peer_id = %peer_id, "Peer non-existent when handling graft"); + return; + }; + // For each topic, if a peer has grafted us, then we necessarily must be in their mesh // and they must be subscribed to the topic. Ensure we have recorded the mapping. for topic in &topics { - let Some(connected_peer) = self.connected_peers.get_mut(peer_id) else { - tracing::error!(peer_id = %peer_id, "Peer non-existent when handling graft"); - return; - }; if connected_peer.topics.insert(topic.clone()) { if let Some(m) = self.metrics.as_mut() { m.inc_topic_peers(topic); @@ -1543,19 +1484,13 @@ where if !to_prune_topics.is_empty() { // build the prune messages to send let on_unsubscribe = false; - let mut sender = match self.connected_peers.get_mut(peer_id) { - Some(connected_peer) => connected_peer.sender.clone(), - None => { - tracing::error!(peer_id = %peer_id, "Peer non-existent when handling graft and obtaining a sender"); - return; - } - }; for prune in to_prune_topics .iter() .map(|t| self.make_prune(t, peer_id, do_px, on_unsubscribe)) + .collect::>() { - sender.prune(prune); + self.send_message(*peer_id, RpcOut::Prune(prune)); } // Send the prune messages to the peer tracing::debug!( @@ -2036,13 +1971,8 @@ where // If we need to send grafts to peer, do so immediately, rather than waiting for the // heartbeat. - if let Some(peer) = &mut self.connected_peers.get_mut(propagation_source) { - for topic_hash in topics_to_graft.into_iter() { - peer.sender.graft(Graft { topic_hash }); - } - } else { - tracing::error!(peer = %propagation_source, - "Could not GRAFT, peer doesn't exist in connected peer list"); + for topic_hash in topics_to_graft.into_iter() { + self.send_message(*propagation_source, RpcOut::Graft(Graft { topic_hash })); } // Notify the application of the subscriptions @@ -2477,6 +2407,7 @@ where /// and fanout peers fn emit_gossip(&mut self) { let mut rng = thread_rng(); + let mut messages = Vec::new(); for (topic_hash, peers) in self.mesh.iter().chain(self.fanout.iter()) { let mut message_ids = self.mcache.get_gossip_message_ids(topic_hash); if message_ids.is_empty() { @@ -2524,32 +2455,18 @@ where } // send an IHAVE message - if let Some(peer) = &mut self.connected_peers.get_mut(&peer_id) { - if peer - .sender - .ihave(IHave { - topic_hash: topic_hash.clone(), - message_ids: peer_message_ids, - }) - .is_err() - { - tracing::warn!(peer=%peer_id, "Send Queue full. Could not send IHAVE"); - - if let Some((peer_score, ..)) = &mut self.peer_score { - peer_score.failed_message_slow_peer(&peer_id); - } - // Increment failed message count - self.failed_messages - .entry(peer_id) - .or_default() - .non_priority += 1; - } - } else { - tracing::error!(peer = %peer_id, - "Could not IHAVE, peer doesn't exist in connected peer list"); - } + messages.push(( + peer_id, + RpcOut::IHave(IHave { + topic_hash: topic_hash.clone(), + message_ids: peer_message_ids, + }), + )); } } + for (peer_id, message) in messages { + self.send_message(peer_id, message); + } } /// Handles multiple GRAFT/PRUNE messages and coalesces them into chunked gossip control @@ -2578,43 +2495,35 @@ where &self.connected_peers, ); } + let rpc_msgs = topics.iter().map(|topic_hash| { + RpcOut::Graft(Graft { + topic_hash: topic_hash.clone(), + }) + }); // If there are prunes associated with the same peer add them. // NOTE: In this case a peer has been added to a topic mesh, and removed from another. // It therefore must be in at least one mesh and we do not need to inform the handler // of its removal from another. - // send the control messages - let mut sender = match self.connected_peers.get_mut(&peer_id) { - Some(connected_peer) => connected_peer.sender.clone(), - None => { - tracing::error!(peer_id = %peer_id, "Peer non-existent when sending graft/prune"); - return; - } - }; - // The following prunes are not due to unsubscribing. - let prunes = to_prune + let prune_msgs = to_prune .remove(&peer_id) .into_iter() .flatten() .map(|topic_hash| { - self.make_prune( + let prune = self.make_prune( &topic_hash, &peer_id, self.config.do_px() && !no_px.contains(&peer_id), false, - ) - }); - - for topic_hash in topics { - sender.graft(Graft { - topic_hash: topic_hash.clone(), + ); + RpcOut::Prune(prune) }); - } - for prune in prunes { - sender.prune(prune); + // send the rpc messages + for msg in rpc_msgs.chain(prune_msgs).collect::>() { + self.send_message(peer_id, msg); } } @@ -2628,12 +2537,7 @@ where self.config.do_px() && !no_px.contains(peer_id), false, ); - if let Some(peer) = self.connected_peers.get_mut(peer_id) { - peer.sender.prune(prune); - } else { - tracing::error!(peer = %peer_id, - "Could not PRUNE, peer doesn't exist in connected peer list"); - } + self.send_message(*peer_id, RpcOut::Prune(prune)); // inform the handler peer_removed_from_mesh( @@ -2671,14 +2575,15 @@ where // Add explicit peers for peer_id in &self.explicit_peers { - if let Some(peer) = self.connected_peers.get(peer_id) { - if Some(peer_id) != propagation_source - && !originating_peers.contains(peer_id) - && Some(peer_id) != message.source.as_ref() - && peer.topics.contains(&message.topic) - { - recipient_peers.insert(*peer_id); - } + let Some(peer) = self.connected_peers.get(peer_id) else { + continue; + }; + if Some(peer_id) != propagation_source + && !originating_peers.contains(peer_id) + && Some(peer_id) != message.source.as_ref() + && peer.topics.contains(&message.topic) + { + recipient_peers.insert(*peer_id); } } @@ -2696,39 +2601,21 @@ where } } + if recipient_peers.is_empty() { + return false; + } + // forward the message to peers - if !recipient_peers.is_empty() { - for peer_id in recipient_peers.iter() { - if let Some(peer) = self.connected_peers.get_mut(peer_id) { - tracing::debug!(%peer_id, message=%msg_id, "Sending message to peer"); - if peer - .sender - .forward( - message.clone(), - self.config.forward_queue_duration(), - self.metrics.as_mut(), - ) - .is_err() - { - // Downscore the peer - if let Some((peer_score, ..)) = &mut self.peer_score { - peer_score.failed_message_slow_peer(peer_id); - } - // Increment the failed message count - let failed_messages = self.failed_messages.entry(*peer_id).or_default(); - failed_messages.forward += 1; - failed_messages.non_priority += 1; - } - } else { - tracing::error!(peer = %peer_id, - "Could not FORWARD, peer doesn't exist in connected peer list"); - } - } - tracing::debug!("Completed forwarding message"); - true - } else { - false + for peer in recipient_peers.iter() { + let event = RpcOut::Forward { + message: message.clone(), + timeout: Delay::new(self.config.forward_queue_duration()), + }; + tracing::debug!(%peer, message=%msg_id, "Sending message to peer"); + self.send_message(*peer, event); } + tracing::debug!("Completed forwarding message"); + true } /// Constructs a [`RawMessage`] performing message signing if required. @@ -2823,6 +2710,66 @@ where } } + /// Send a [`RpcOut`] message to a peer. + /// + /// Returns `true` if sending was successful, `false` otherwise. + /// The method will update the peer score and failed message counter if + /// sending the message failed due to the channel to the connection handler being + /// full (which indicates a slow peer). + fn send_message(&mut self, peer_id: PeerId, rpc: RpcOut) -> bool { + if let Some(m) = self.metrics.as_mut() { + if let RpcOut::Publish { ref message, .. } | RpcOut::Forward { ref message, .. } = rpc { + // register bytes sent on the internal metrics. + m.msg_sent(&message.topic, message.raw_protobuf_len()); + } + } + + let Some(peer) = &mut self.connected_peers.get_mut(&peer_id) else { + tracing::error!(peer = %peer_id, + "Could not send rpc to connection handler, peer doesn't exist in connected peer list"); + return false; + }; + + let rpc_kind = rpc.kind(); + + // Try sending the message to the connection handler. + if peer.sender.send_message(rpc).is_ok() { + true + } else { + // Sending failed because the channel is full. + tracing::warn!(peer=%peer_id, "Send Queue full. Could not send {:?}.", rpc_kind); + + // Update failed message counter. + let failed_messages = self.failed_messages.entry(peer_id).or_default(); + match rpc_kind { + RpcOutKind::Publish => { + failed_messages.priority += 1; + failed_messages.publish += 1; + } + RpcOutKind::Forward => { + failed_messages.non_priority += 1; + failed_messages.forward += 1; + } + RpcOutKind::IWant | RpcOutKind::IHave => { + failed_messages.non_priority += 1; + } + RpcOutKind::Graft + | RpcOutKind::Prune + | RpcOutKind::Subscribe + | RpcOutKind::Unsubscribe => { + unreachable!("Channel for highpriority contorl messages is unbounded and should always be open.") + } + } + + // Update peer score. + if let Some((peer_score, ..)) = &mut self.peer_score { + peer_score.failed_message_slow_peer(&peer_id); + } + + false + } + } + fn on_connection_established( &mut self, ConnectionEstablished { @@ -2870,13 +2817,8 @@ where tracing::debug!(peer=%peer_id, "New peer connected"); // We need to send our subscriptions to the newly-connected node. - if let Some(peer) = self.connected_peers.get_mut(&peer_id) { - for topic_hash in self.mesh.clone().into_keys() { - peer.sender.subscribe(topic_hash); - } - } else { - tracing::error!(peer = %peer_id, - "Could not SUBSCRIBE, peer doesn't exist in connected peer list"); + for topic_hash in self.mesh.clone().into_keys() { + self.send_message(peer_id, RpcOut::Subscribe(topic_hash)); } } diff --git a/protocols/gossipsub/src/behaviour/tests.rs b/protocols/gossipsub/src/behaviour/tests.rs index 2d5240f5d0b..42d987960c9 100644 --- a/protocols/gossipsub/src/behaviour/tests.rs +++ b/protocols/gossipsub/src/behaviour/tests.rs @@ -5675,13 +5675,16 @@ async fn test_timedout_messages_are_reported() { let mut gs: Behaviour = Behaviour::new(MessageAuthenticity::RandomAuthor, gs_config).unwrap(); - let mut sender = RpcSender::new(2); + let sender = RpcSender::new(2); let topic_hash = Topic::new("Test").hash(); let publish_data = vec![2; 59]; let raw_message = gs.build_raw_message(topic_hash, publish_data).unwrap(); sender - .publish(raw_message, Duration::from_nanos(1), None) + .send_message(RpcOut::Publish { + message: raw_message, + timeout: Delay::new(Duration::from_nanos(1)), + }) .unwrap(); let mut receiver = sender.new_receiver(); let stale = future::poll_fn(|cx| receiver.poll_stale(cx)).await.unwrap(); @@ -5690,10 +5693,14 @@ async fn test_timedout_messages_are_reported() { #[test] fn test_priority_messages_are_always_sent() { - let mut sender = RpcSender::new(2); + let sender = RpcSender::new(2); let topic_hash = Topic::new("Test").hash(); // Fill the buffer with the first message. - sender.subscribe(topic_hash.clone()); - sender.subscribe(topic_hash.clone()); - sender.unsubscribe(topic_hash.clone()); + assert!(sender + .send_message(RpcOut::Subscribe(topic_hash.clone())) + .is_ok()); + assert!(sender + .send_message(RpcOut::Subscribe(topic_hash.clone())) + .is_ok()); + assert!(sender.send_message(RpcOut::Unsubscribe(topic_hash)).is_ok()); } diff --git a/protocols/gossipsub/src/types.rs b/protocols/gossipsub/src/types.rs index ff2d5cf406b..76226d20ac5 100644 --- a/protocols/gossipsub/src/types.rs +++ b/protocols/gossipsub/src/types.rs @@ -19,7 +19,6 @@ // DEALINGS IN THE SOFTWARE. //! A collection of types using the Gossipsub system. -use crate::metrics::Metrics; use crate::TopicHash; use async_channel::{Receiver, Sender}; use futures::{stream::Peekable, Stream, StreamExt}; @@ -36,7 +35,6 @@ use std::sync::{ Arc, }; use std::task::{Context, Poll}; -use std::time::Duration; use std::{collections::BTreeSet, fmt}; use crate::rpc_proto::proto; @@ -113,7 +111,7 @@ impl std::fmt::Debug for MessageId { } } -#[derive(Debug, Clone)] +#[derive(Debug)] pub(crate) struct PeerConnections { /// The kind of protocol the peer supports. pub(crate) kind: PeerKind, @@ -322,6 +320,24 @@ impl RpcOut { pub fn into_protobuf(self) -> proto::RPC { self.into() } + + /// Returns an enum that just stores the type of this [`RpcOut`], + /// without any data. + /// + /// Can be used to avoid cloning of an [`RpcOut`] message when just + /// the type is needed. + pub(crate) fn kind(&self) -> RpcOutKind { + match self { + RpcOut::Publish { .. } => RpcOutKind::Publish, + RpcOut::Forward { .. } => RpcOutKind::Forward, + RpcOut::Subscribe(_) => RpcOutKind::Subscribe, + RpcOut::Unsubscribe(_) => RpcOutKind::Unsubscribe, + RpcOut::Graft(_) => RpcOutKind::Graft, + RpcOut::Prune(_) => RpcOutKind::Prune, + RpcOut::IHave(_) => RpcOutKind::IHave, + RpcOut::IWant(_) => RpcOutKind::IWant, + } + } } impl From for proto::RPC { @@ -431,6 +447,19 @@ impl From for proto::RPC { } } +/// Variant of [`RpcOut`] without any data. +#[derive(Debug, Clone, Copy)] +pub(crate) enum RpcOutKind { + Publish, + Forward, + Subscribe, + Unsubscribe, + Graft, + Prune, + IHave, + IWant, +} + /// An RPC received/sent. #[derive(Clone, PartialEq, Eq, Hash)] pub struct Rpc { @@ -588,7 +617,7 @@ impl fmt::Display for PeerKind { } /// `RpcOut` sender that is priority aware. -#[derive(Debug, Clone)] +#[derive(Debug)] pub(crate) struct RpcSender { cap: usize, len: Arc, @@ -627,102 +656,26 @@ impl RpcSender { } } - /// Send a `RpcOut::Graft` message to the `RpcReceiver` - /// this is high priority. - pub(crate) fn graft(&mut self, graft: Graft) { - self.priority_sender - .try_send(RpcOut::Graft(graft)) - .expect("Channel is unbounded and should always be open"); - } - - /// Send a `RpcOut::Prune` message to the `RpcReceiver` - /// this is high priority. - pub(crate) fn prune(&mut self, prune: Prune) { - self.priority_sender - .try_send(RpcOut::Prune(prune)) - .expect("Channel is unbounded and should always be open"); - } - - /// Send a `RpcOut::IHave` message to the `RpcReceiver` - /// this is low priority, if the queue is full an Err is returned. - #[allow(clippy::result_large_err)] - pub(crate) fn ihave(&mut self, ihave: IHave) -> Result<(), RpcOut> { - self.non_priority_sender - .try_send(RpcOut::IHave(ihave)) - .map_err(|err| err.into_inner()) - } - - /// Send a `RpcOut::IHave` message to the `RpcReceiver` - /// this is low priority, if the queue is full an Err is returned. - #[allow(clippy::result_large_err)] - pub(crate) fn iwant(&mut self, iwant: IWant) -> Result<(), RpcOut> { - self.non_priority_sender - .try_send(RpcOut::IWant(iwant)) - .map_err(|err| err.into_inner()) - } - - /// Send a `RpcOut::Subscribe` message to the `RpcReceiver` - /// this is high priority. - pub(crate) fn subscribe(&mut self, topic: TopicHash) { - self.priority_sender - .try_send(RpcOut::Subscribe(topic)) - .expect("Channel is unbounded and should always be open"); - } - - /// Send a `RpcOut::Unsubscribe` message to the `RpcReceiver` - /// this is high priority. - pub(crate) fn unsubscribe(&mut self, topic: TopicHash) { - self.priority_sender - .try_send(RpcOut::Unsubscribe(topic)) - .expect("Channel is unbounded and should always be open"); - } - - /// Send a `RpcOut::Publish` message to the `RpcReceiver` - /// this is high priority. If message sending fails, an `Err` is returned. - pub(crate) fn publish( - &mut self, - message: RawMessage, - timeout: Duration, - metrics: Option<&mut Metrics>, - ) -> Result<(), ()> { - if self.len.load(Ordering::Relaxed) >= self.cap { - return Err(()); - } - self.priority_sender - .try_send(RpcOut::Publish { - message: message.clone(), - timeout: Delay::new(timeout), - }) - .expect("Channel is unbounded and should always be open"); - self.len.fetch_add(1, Ordering::Relaxed); - - if let Some(m) = metrics { - m.msg_sent(&message.topic, message.raw_protobuf_len()); - } - - Ok(()) - } - - /// Send a `RpcOut::Forward` message to the `RpcReceiver` - /// this is high priority. If the queue is full the message is discarded. - pub(crate) fn forward( - &mut self, - message: RawMessage, - timeout: Duration, - metrics: Option<&mut Metrics>, - ) -> Result<(), ()> { - self.non_priority_sender - .try_send(RpcOut::Forward { - message: message.clone(), - timeout: Delay::new(timeout), - }) - .map_err(|_| ())?; - - if let Some(m) = metrics { - m.msg_sent(&message.topic, message.raw_protobuf_len()); + pub(crate) fn send_message(&self, rpc: RpcOut) -> Result<(), ()> { + if let RpcOut::Publish { .. } = rpc { + // Update number of publish message in queue. + self.len + .fetch_update(Ordering::Relaxed, Ordering::Relaxed, |curr| { + (curr < self.cap).then_some(curr + 1) + }) + .map_err(|_| ())?; } - - Ok(()) + let sender = match rpc { + RpcOut::Publish { .. } + | RpcOut::Graft(_) + | RpcOut::Prune(_) + | RpcOut::Subscribe(_) + | RpcOut::Unsubscribe(_) => &self.priority_sender, + RpcOut::Forward { .. } | RpcOut::IHave(_) | RpcOut::IWant(_) => { + &self.non_priority_sender + } + }; + sender.try_send(rpc).map_err(|_| ()) } /// Returns the current size of the priority queue.