@@ -61,7 +61,7 @@ use crate::{
6161 mcache:: MessageCache ,
6262 peer_score:: { PeerScore , PeerScoreParams , PeerScoreState , PeerScoreThresholds , RejectReason } ,
6363 protocol:: SIGNING_PREFIX ,
64- rpc :: Sender ,
64+ queue :: Queue ,
6565 rpc_proto:: proto,
6666 subscription_filter:: { AllowAllSubscriptionFilter , TopicSubscriptionFilter } ,
6767 time_cache:: DuplicateCache ,
@@ -751,6 +751,7 @@ where
751751 if self . send_message (
752752 * peer_id,
753753 RpcOut :: Publish {
754+ message_id : msg_id. clone ( ) ,
754755 message : raw_message. clone ( ) ,
755756 timeout : Delay :: new ( self . config . publish_queue_duration ( ) ) ,
756757 } ,
@@ -1341,6 +1342,7 @@ where
13411342 self . send_message (
13421343 * peer_id,
13431344 RpcOut :: Forward {
1345+ message_id : id. clone ( ) ,
13441346 message : msg,
13451347 timeout : Delay :: new ( self . config . forward_queue_duration ( ) ) ,
13461348 } ,
@@ -2081,9 +2083,9 @@ where
20812083 // steady-state size of the queues.
20822084 #[ cfg( feature = "metrics" ) ]
20832085 if let Some ( m) = & mut self . metrics {
2084- for sender_queue in self . connected_peers . values ( ) . map ( |v| & v. sender ) {
2085- m. observe_priority_queue_size ( sender_queue. priority_queue_len ( ) ) ;
2086- m. observe_non_priority_queue_size ( sender_queue. non_priority_queue_len ( ) ) ;
2086+ for sender_queue in self . connected_peers . values ( ) . map ( |v| & v. messages ) {
2087+ m. observe_priority_queue_size ( sender_queue. priority_len ( ) ) ;
2088+ m. observe_non_priority_queue_size ( sender_queue. non_priority_len ( ) ) ;
20872089 }
20882090 }
20892091
@@ -2499,6 +2501,11 @@ where
24992501 // Report expired messages
25002502 for ( peer_id, failed_messages) in self . failed_messages . drain ( ) {
25012503 tracing:: debug!( "Peer couldn't consume messages: {:?}" , failed_messages) ;
2504+ #[ cfg( feature = "metrics" ) ]
2505+ if let Some ( metrics) = self . metrics . as_mut ( ) {
2506+ metrics. observe_failed_priority_messages ( failed_messages. priority ) ;
2507+ metrics. observe_failed_non_priority_messages ( failed_messages. non_priority ) ;
2508+ }
25022509 self . events
25032510 . push_back ( ToSwarm :: GenerateEvent ( Event :: SlowPeer {
25042511 peer_id,
@@ -2746,6 +2753,7 @@ where
27462753 self . send_message (
27472754 * peer_id,
27482755 RpcOut :: Forward {
2756+ message_id : msg_id. clone ( ) ,
27492757 message : message. clone ( ) ,
27502758 timeout : Delay :: new ( self . config . forward_queue_duration ( ) ) ,
27512759 } ,
@@ -2874,33 +2882,20 @@ where
28742882 return false ;
28752883 }
28762884
2877- // Try sending the message to the connection handler.
2878- match peer. sender . send_message ( rpc) {
2885+ // Try sending the message to the connection handler,
2886+ // High priority messages should not fail.
2887+ match peer. messages . try_push ( rpc) {
28792888 Ok ( ( ) ) => true ,
28802889 Err ( rpc) => {
28812890 // Sending failed because the channel is full.
28822891 tracing:: warn!( peer=%peer_id, "Send Queue full. Could not send {:?}." , rpc) ;
28832892
28842893 // Update failed message counter.
28852894 let failed_messages = self . failed_messages . entry ( peer_id) . or_default ( ) ;
2886- match rpc {
2887- RpcOut :: Publish { .. } => {
2888- failed_messages. priority += 1 ;
2889- failed_messages. publish += 1 ;
2890- }
2891- RpcOut :: Forward { .. } => {
2892- failed_messages. non_priority += 1 ;
2893- failed_messages. forward += 1 ;
2894- }
2895- RpcOut :: IWant ( _) | RpcOut :: IHave ( _) | RpcOut :: IDontWant ( _) => {
2896- failed_messages. non_priority += 1 ;
2897- }
2898- RpcOut :: Graft ( _)
2899- | RpcOut :: Prune ( _)
2900- | RpcOut :: Subscribe ( _)
2901- | RpcOut :: Unsubscribe ( _) => {
2902- unreachable ! ( "Channel for highpriority control messages is unbounded and should always be open." )
2903- }
2895+ if rpc. priority ( ) {
2896+ failed_messages. priority += 1 ;
2897+ } else {
2898+ failed_messages. non_priority += 1 ;
29042899 }
29052900
29062901 // Update peer score.
@@ -3125,23 +3120,22 @@ where
31253120 // The protocol negotiation occurs once a message is sent/received. Once this happens we
31263121 // update the type of peer that this is in order to determine which kind of routing should
31273122 // occur.
3128- let connected_peer = self
3129- . connected_peers
3130- . entry ( peer_id)
3131- . or_insert_with ( || PeerDetails {
3132- kind : PeerKind :: Floodsub ,
3133- connections : vec ! [ ] ,
3134- outbound : false ,
3135- sender : Sender :: new ( self . config . connection_handler_queue_len ( ) ) ,
3136- topics : Default :: default ( ) ,
3137- dont_send : LinkedHashMap :: new ( ) ,
3138- } ) ;
3123+ let connected_peer = self . connected_peers . entry ( peer_id) . or_insert ( PeerDetails {
3124+ kind : PeerKind :: Floodsub ,
3125+ connections : vec ! [ ] ,
3126+ outbound : false ,
3127+ messages : Queue :: new ( self . config . connection_handler_queue_len ( ) ) ,
3128+ topics : Default :: default ( ) ,
3129+ dont_send : LinkedHashMap :: new ( ) ,
3130+ } ) ;
31393131 // Add the new connection
31403132 connected_peer. connections . push ( connection_id) ;
31413133
3134+ // This clones a reference to the Queue so any new handlers reference the same underlying
3135+ // queue. No data is actually cloned here.
31423136 Ok ( Handler :: new (
31433137 self . config . protocol_config ( ) ,
3144- connected_peer. sender . new_receiver ( ) ,
3138+ connected_peer. messages . clone ( ) ,
31453139 ) )
31463140 }
31473141
@@ -3153,25 +3147,24 @@ where
31533147 _: Endpoint ,
31543148 _: PortUse ,
31553149 ) -> Result < THandler < Self > , ConnectionDenied > {
3156- let connected_peer = self
3157- . connected_peers
3158- . entry ( peer_id)
3159- . or_insert_with ( || PeerDetails {
3160- kind : PeerKind :: Floodsub ,
3161- connections : vec ! [ ] ,
3162- // Diverging from the go implementation we only want to consider a peer as outbound
3163- // peer if its first connection is outbound.
3164- outbound : !self . px_peers . contains ( & peer_id) ,
3165- sender : Sender :: new ( self . config . connection_handler_queue_len ( ) ) ,
3166- topics : Default :: default ( ) ,
3167- dont_send : LinkedHashMap :: new ( ) ,
3168- } ) ;
3150+ let connected_peer = self . connected_peers . entry ( peer_id) . or_insert ( PeerDetails {
3151+ kind : PeerKind :: Floodsub ,
3152+ connections : vec ! [ ] ,
3153+ // Diverging from the go implementation we only want to consider a peer as outbound peer
3154+ // if its first connection is outbound.
3155+ outbound : !self . px_peers . contains ( & peer_id) ,
3156+ messages : Queue :: new ( self . config . connection_handler_queue_len ( ) ) ,
3157+ topics : Default :: default ( ) ,
3158+ dont_send : LinkedHashMap :: new ( ) ,
3159+ } ) ;
31693160 // Add the new connection
31703161 connected_peer. connections . push ( connection_id) ;
31713162
3163+ // This clones a reference to the Queue so any new handlers reference the same underlying
3164+ // queue. No data is actually cloned here.
31723165 Ok ( Handler :: new (
31733166 self . config . protocol_config ( ) ,
3174- connected_peer. sender . new_receiver ( ) ,
3167+ connected_peer. messages . clone ( ) ,
31753168 ) )
31763169 }
31773170
@@ -3213,6 +3206,8 @@ where
32133206 }
32143207 }
32153208 }
3209+ // rpc is only used for metrics code.
3210+ #[ allow( unused_variables) ]
32163211 HandlerEvent :: MessageDropped ( rpc) => {
32173212 // Account for this in the scoring logic
32183213 if let PeerScoreState :: Active ( peer_score) = & mut self . peer_score {
@@ -3221,32 +3216,7 @@ where
32213216
32223217 // Keep track of expired messages for the application layer.
32233218 let failed_messages = self . failed_messages . entry ( propagation_source) . or_default ( ) ;
3224- failed_messages. timeout += 1 ;
3225- match rpc {
3226- RpcOut :: Publish { .. } => {
3227- failed_messages. publish += 1 ;
3228- }
3229- RpcOut :: Forward { .. } => {
3230- failed_messages. forward += 1 ;
3231- }
3232- _ => { }
3233- }
3234-
3235- // Record metrics on the failure.
3236- #[ cfg( feature = "metrics" ) ]
3237- if let Some ( metrics) = self . metrics . as_mut ( ) {
3238- match rpc {
3239- RpcOut :: Publish { message, .. } => {
3240- metrics. publish_msg_dropped ( & message. topic ) ;
3241- metrics. timeout_msg_dropped ( & message. topic ) ;
3242- }
3243- RpcOut :: Forward { message, .. } => {
3244- metrics. forward_msg_dropped ( & message. topic ) ;
3245- metrics. timeout_msg_dropped ( & message. topic ) ;
3246- }
3247- _ => { }
3248- }
3249- }
3219+ failed_messages. non_priority += 1 ;
32503220 }
32513221 HandlerEvent :: Message {
32523222 rpc,
@@ -3345,10 +3315,17 @@ where
33453315 "Could not handle IDONTWANT, peer doesn't exist in connected peer list" ) ;
33463316 continue ;
33473317 } ;
3318+
3319+ // Remove messages from the queue.
3320+ #[ allow( unused) ]
3321+ let removed = peer. messages . remove_data_messages ( & message_ids) ;
3322+
33483323 #[ cfg( feature = "metrics" ) ]
33493324 if let Some ( metrics) = self . metrics . as_mut ( ) {
33503325 metrics. register_idontwant ( message_ids. len ( ) ) ;
3326+ metrics. register_removed_messages ( removed) ;
33513327 }
3328+
33523329 for message_id in message_ids {
33533330 peer. dont_send . insert ( message_id, Instant :: now ( ) ) ;
33543331 // Don't exceed capacity.
0 commit comments