55//! A mechanism for maintaining a full mesh of trust quorum node connections
66
77use crate :: established_conn:: EstablishedConn ;
8- use trust_quorum_protocol:: { BaseboardId , PeerMsg } ;
8+ use trust_quorum_protocol:: { BaseboardId , Envelope , PeerMsg } ;
99// TODO: Move or copy this to this crate?
1010use bootstore:: schemes:: v0:: NetworkConfig ;
1111use camino:: Utf8PathBuf ;
@@ -47,7 +47,6 @@ pub enum AcceptError {
4747/// Messages sent from the main task to the connection managing tasks
4848#[ derive( Debug ) ]
4949pub enum MainToConnMsg {
50- #[ expect( unused) ]
5150 Msg ( WireMsg ) ,
5251}
5352
@@ -100,7 +99,6 @@ pub enum ConnToMainMsgInner {
10099 addr : SocketAddrV6 ,
101100 peer_id : BaseboardId ,
102101 } ,
103- #[ expect( unused) ]
104102 Received {
105103 from : BaseboardId ,
106104 msg : PeerMsg ,
@@ -117,7 +115,6 @@ pub enum ConnToMainMsgInner {
117115
118116pub struct TaskHandle {
119117 pub abort_handle : AbortHandle ,
120- #[ expect( unused) ]
121118 pub tx : mpsc:: Sender < MainToConnMsg > ,
122119 pub conn_type : ConnectionType ,
123120}
@@ -134,6 +131,10 @@ impl TaskHandle {
134131 pub fn abort ( & self ) {
135132 self . abort_handle . abort ( )
136133 }
134+
135+ pub async fn send ( & self , msg : PeerMsg ) {
136+ let _ = self . tx . send ( MainToConnMsg :: Msg ( WireMsg :: Tq ( msg) ) ) . await ;
137+ }
137138}
138139
139140impl BiHashItem for TaskHandle {
@@ -175,6 +176,10 @@ impl EstablishedTaskHandle {
175176 pub fn abort ( & self ) {
176177 self . task_handle . abort ( ) ;
177178 }
179+
180+ pub async fn send ( & self , msg : PeerMsg ) {
181+ let _ = self . task_handle . send ( msg) . await ;
182+ }
178183}
179184
180185impl TriHashItem for EstablishedTaskHandle {
@@ -372,6 +377,14 @@ impl ConnMgr {
372377 self . listen_addr
373378 }
374379
380+ pub async fn send ( & self , envelope : Envelope ) {
381+ let Envelope { to, msg, .. } = envelope;
382+ info ! ( self . log, "Sending {msg:?}" ; "peer_id" => %to) ;
383+ if let Some ( handle) = self . established . get1 ( & to) {
384+ handle. send ( msg) . await ;
385+ }
386+ }
387+
375388 /// Perform any polling related operations that the connection
376389 /// manager must perform concurrently.
377390 pub async fn step (
@@ -573,13 +586,15 @@ impl ConnMgr {
573586 /// easiest way to achieve this is to only connect to peers with addresses
574587 /// that sort less than our own and tear down any connections that no longer
575588 /// exist in `addrs`.
589+ ///
590+ /// Return the `BaseboardId` of all peers that have been disconnected.
576591 pub async fn update_bootstrap_connections (
577592 & mut self ,
578593 addrs : BTreeSet < SocketAddrV6 > ,
579594 corpus : Vec < Utf8PathBuf > ,
580- ) {
595+ ) -> BTreeSet < BaseboardId > {
581596 if self . bootstrap_addrs == addrs {
582- return ;
597+ return BTreeSet :: new ( ) ;
583598 }
584599
585600 // We don't try to compare addresses from accepted nodes. If DDMD
@@ -607,9 +622,13 @@ impl ConnMgr {
607622 self . connect_client ( corpus. clone ( ) , addr) . await ;
608623 }
609624
625+ let mut disconnected_peers = BTreeSet :: new ( ) ;
610626 for addr in to_disconnect {
611- self . disconnect_client ( addr) . await ;
627+ if let Some ( peer_id) = self . disconnect_client ( addr) . await {
628+ disconnected_peers. insert ( peer_id) ;
629+ }
612630 }
631+ disconnected_peers
613632 }
614633
615634 /// Spawn a task to estalbish a sprockets connection for the given address
@@ -688,7 +707,13 @@ impl ConnMgr {
688707 ///
689708 /// We don't tear down server connections this way as we don't know their
690709 /// listen port, just the ephemeral port.
691- async fn disconnect_client ( & mut self , addr : SocketAddrV6 ) {
710+ ///
711+ /// Return the `BaseboardId` of the peer if an established connection is
712+ // torn down.
713+ async fn disconnect_client (
714+ & mut self ,
715+ addr : SocketAddrV6 ,
716+ ) -> Option < BaseboardId > {
692717 if let Some ( handle) = self . connecting . remove2 ( & addr) {
693718 // The connection has not yet completed its handshake
694719 info ! (
@@ -697,6 +722,7 @@ impl ConnMgr {
697722 "remote_addr" => %addr
698723 ) ;
699724 handle. abort ( ) ;
725+ None
700726 } else {
701727 if let Some ( handle) = self . established . remove3 ( & addr) {
702728 info ! (
@@ -706,6 +732,9 @@ impl ConnMgr {
706732 "peer_id" => %handle. baseboard_id
707733 ) ;
708734 handle. abort ( ) ;
735+ Some ( handle. baseboard_id )
736+ } else {
737+ None
709738 }
710739 }
711740 }
0 commit comments