55//! A mechanism for maintaining a full mesh of trust quorum node connections
66
77use crate :: established_conn:: EstablishedConn ;
8- use trust_quorum_protocol:: { BaseboardId , PeerMsg } ;
8+ use trust_quorum_protocol:: { BaseboardId , Envelope , PeerMsg } ;
99
1010// TODO: Move to this crate
1111// https://github.com/oxidecomputer/omicron/issues/9311
@@ -50,7 +50,6 @@ pub enum AcceptError {
5050/// Messages sent from the main task to the connection managing tasks
5151#[ derive( Debug ) ]
5252pub enum MainToConnMsg {
53- #[ expect( unused) ]
5453 Msg ( WireMsg ) ,
5554}
5655
@@ -103,7 +102,6 @@ pub enum ConnToMainMsgInner {
103102 addr : SocketAddrV6 ,
104103 peer_id : BaseboardId ,
105104 } ,
106- #[ expect( unused) ]
107105 Received {
108106 from : BaseboardId ,
109107 msg : PeerMsg ,
@@ -120,7 +118,6 @@ pub enum ConnToMainMsgInner {
120118
121119pub struct TaskHandle {
122120 pub abort_handle : AbortHandle ,
123- #[ expect( unused) ]
124121 pub tx : mpsc:: Sender < MainToConnMsg > ,
125122 pub conn_type : ConnectionType ,
126123}
@@ -137,6 +134,10 @@ impl TaskHandle {
137134 pub fn abort ( & self ) {
138135 self . abort_handle . abort ( )
139136 }
137+
138+ pub async fn send ( & self , msg : PeerMsg ) {
139+ let _ = self . tx . send ( MainToConnMsg :: Msg ( WireMsg :: Tq ( msg) ) ) . await ;
140+ }
140141}
141142
142143impl BiHashItem for TaskHandle {
@@ -178,6 +179,10 @@ impl EstablishedTaskHandle {
178179 pub fn abort ( & self ) {
179180 self . task_handle . abort ( ) ;
180181 }
182+
183+ pub async fn send ( & self , msg : PeerMsg ) {
184+ let _ = self . task_handle . send ( msg) . await ;
185+ }
181186}
182187
183188impl TriHashItem for EstablishedTaskHandle {
@@ -375,6 +380,14 @@ impl ConnMgr {
375380 self . listen_addr
376381 }
377382
383+ pub async fn send ( & self , envelope : Envelope ) {
384+ let Envelope { to, msg, .. } = envelope;
385+ info ! ( self . log, "Sending {msg:?}" ; "peer_id" => %to) ;
386+ if let Some ( handle) = self . established . get1 ( & to) {
387+ handle. send ( msg) . await ;
388+ }
389+ }
390+
378391 /// Perform any polling related operations that the connection
379392 /// manager must perform concurrently.
380393 pub async fn step (
@@ -576,13 +589,15 @@ impl ConnMgr {
576589 /// easiest way to achieve this is to only connect to peers with addresses
577590 /// that sort less than our own and tear down any connections that no longer
578591 /// exist in `addrs`.
592+ ///
593+ /// Return the `BaseboardId` of all peers that have been disconnected.
579594 pub async fn update_bootstrap_connections (
580595 & mut self ,
581596 addrs : BTreeSet < SocketAddrV6 > ,
582597 corpus : Vec < Utf8PathBuf > ,
583- ) {
598+ ) -> BTreeSet < BaseboardId > {
584599 if self . bootstrap_addrs == addrs {
585- return ;
600+ return BTreeSet :: new ( ) ;
586601 }
587602
588603 // We don't try to compare addresses from accepted nodes. If DDMD
@@ -610,9 +625,13 @@ impl ConnMgr {
610625 self . connect_client ( corpus. clone ( ) , addr) . await ;
611626 }
612627
628+ let mut disconnected_peers = BTreeSet :: new ( ) ;
613629 for addr in to_disconnect {
614- self . disconnect_client ( addr) . await ;
630+ if let Some ( peer_id) = self . disconnect_client ( addr) . await {
631+ disconnected_peers. insert ( peer_id) ;
632+ }
615633 }
634+ disconnected_peers
616635 }
617636
618637 /// Spawn a task to estalbish a sprockets connection for the given address
@@ -691,7 +710,13 @@ impl ConnMgr {
691710 ///
692711 /// We don't tear down server connections this way as we don't know their
693712 /// listen port, just the ephemeral port.
694- async fn disconnect_client ( & mut self , addr : SocketAddrV6 ) {
713+ ///
714+ /// Return the `BaseboardId` of the peer if an established connection is
715+ // torn down.
716+ async fn disconnect_client (
717+ & mut self ,
718+ addr : SocketAddrV6 ,
719+ ) -> Option < BaseboardId > {
695720 if let Some ( handle) = self . connecting . remove2 ( & addr) {
696721 // The connection has not yet completed its handshake
697722 info ! (
@@ -700,6 +725,7 @@ impl ConnMgr {
700725 "remote_addr" => %addr
701726 ) ;
702727 handle. abort ( ) ;
728+ None
703729 } else {
704730 if let Some ( handle) = self . established . remove3 ( & addr) {
705731 info ! (
@@ -709,6 +735,9 @@ impl ConnMgr {
709735 "peer_id" => %handle. baseboard_id
710736 ) ;
711737 handle. abort ( ) ;
738+ Some ( handle. baseboard_id )
739+ } else {
740+ None
712741 }
713742 }
714743 }
0 commit comments