From f8be4e6e41206a5fda5bac276ebca793ed2e24cb Mon Sep 17 00:00:00 2001 From: Tibo-lg Date: Wed, 14 Jul 2021 11:44:36 +0900 Subject: [PATCH] Add unknown message handler to peer manager --- fuzz/src/full_stack.rs | 6 +- lightning-background-processor/src/lib.rs | 12 +- lightning-net-tokio/src/lib.rs | 29 ++- lightning/src/ln/peer_handler.rs | 258 +++++++++++++++------- lightning/src/ln/wire.rs | 7 + lightning/src/util/events.rs | 2 +- 6 files changed, 209 insertions(+), 105 deletions(-) diff --git a/fuzz/src/full_stack.rs b/fuzz/src/full_stack.rs index f68cc8f3df7..b5786987132 100644 --- a/fuzz/src/full_stack.rs +++ b/fuzz/src/full_stack.rs @@ -34,7 +34,7 @@ use lightning::chain::transaction::OutPoint; use lightning::chain::keysinterface::{InMemorySigner, KeysInterface}; use lightning::ln::{PaymentHash, PaymentPreimage, PaymentSecret}; use lightning::ln::channelmanager::{ChainParameters, ChannelManager}; -use lightning::ln::peer_handler::{MessageHandler,PeerManager,SocketDescriptor}; +use lightning::ln::peer_handler::{MessageHandler,PeerManager,SocketDescriptor,IgnoringUnknownMessageHandler}; use lightning::ln::msgs::DecodeError; use lightning::routing::router::get_route; use lightning::routing::network_graph::NetGraphMsgHandler; @@ -159,7 +159,7 @@ type ChannelMan = ChannelManager< EnforcingSigner, Arc, Arc, Arc, Arc, Arc>>, Arc, Arc, Arc, Arc>; -type PeerMan<'a> = PeerManager, Arc, Arc, Arc>>, Arc>; +type PeerMan<'a> = PeerManager, Arc, Arc, Arc>>, Arc, Arc>; struct MoneyLossDetector<'a> { manager: Arc, @@ -374,7 +374,7 @@ pub fn do_test(data: &[u8], logger: &Arc) { let mut loss_detector = MoneyLossDetector::new(&peers, channelmanager.clone(), monitor.clone(), PeerManager::new(MessageHandler { chan_handler: channelmanager.clone(), route_handler: net_graph_msg_handler.clone(), - }, our_network_key, &[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 15, 0], Arc::clone(&logger))); + }, our_network_key, &[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 15, 0], Arc::clone(&logger), Arc::new(IgnoringUnknownMessageHandler{}))); let mut should_forward = false; let mut payments_received: Vec = Vec::new(); diff --git a/lightning-background-processor/src/lib.rs b/lightning-background-processor/src/lib.rs index 3a36bb5625e..93d259a51ff 100644 --- a/lightning-background-processor/src/lib.rs +++ b/lightning-background-processor/src/lib.rs @@ -15,7 +15,7 @@ use lightning::chain::channelmonitor; use lightning::chain::keysinterface::{Sign, KeysInterface}; use lightning::ln::channelmanager::ChannelManager; use lightning::ln::msgs::{ChannelMessageHandler, RoutingMessageHandler}; -use lightning::ln::peer_handler::{PeerManager, SocketDescriptor}; +use lightning::ln::peer_handler::{PeerManager, SocketDescriptor, UnknownMessageHandler}; use lightning::util::events::{EventHandler, EventsProvider}; use lightning::util::logger::Logger; use std::sync::Arc; @@ -118,7 +118,8 @@ impl BackgroundProcessor { CMP: 'static + Send + ChannelManagerPersister, M: 'static + Deref> + Send + Sync, CM: 'static + Deref> + Send + Sync, - PM: 'static + Deref> + Send + Sync, + UMH: 'static + Deref + Send + Sync, + PM: 'static + Deref> + Send + Sync, > (persister: CMP, event_handler: EH, chain_monitor: M, channel_manager: CM, peer_manager: PM, logger: L) -> Self where @@ -131,6 +132,7 @@ impl BackgroundProcessor { P::Target: 'static + channelmonitor::Persist, CMH::Target: 'static + ChannelMessageHandler, RMH::Target: 'static + RoutingMessageHandler, + UMH::Target: 'static + UnknownMessageHandler, { let stop_thread = Arc::new(AtomicBool::new(false)); let stop_thread_clone = stop_thread.clone(); @@ -182,7 +184,7 @@ mod tests { use lightning::ln::channelmanager::{BREAKDOWN_TIMEOUT, ChainParameters, ChannelManager, SimpleArcChannelManager}; use lightning::ln::features::InitFeatures; use lightning::ln::msgs::ChannelMessageHandler; - use lightning::ln::peer_handler::{PeerManager, MessageHandler, SocketDescriptor}; + use lightning::ln::peer_handler::{PeerManager, MessageHandler, SocketDescriptor, IgnoringUnknownMessageHandler}; use lightning::util::config::UserConfig; use lightning::util::events::{Event, MessageSendEventsProvider, MessageSendEvent}; use lightning::util::ser::Writeable; @@ -210,7 +212,7 @@ mod tests { struct Node { node: Arc>, - peer_manager: Arc, Arc, Arc>>, + peer_manager: Arc, Arc, Arc, Arc>>, chain_monitor: Arc, persister: Arc, tx_broadcaster: Arc, @@ -251,7 +253,7 @@ mod tests { let params = ChainParameters { network, best_block }; let manager = Arc::new(ChannelManager::new(fee_estimator.clone(), chain_monitor.clone(), tx_broadcaster.clone(), logger.clone(), keys_manager.clone(), UserConfig::default(), params)); let msg_handler = MessageHandler { chan_handler: Arc::new(test_utils::TestChannelMessageHandler::new()), route_handler: Arc::new(test_utils::TestRoutingMessageHandler::new() )}; - let peer_manager = Arc::new(PeerManager::new(msg_handler, keys_manager.get_node_secret(), &seed, logger.clone())); + let peer_manager = Arc::new(PeerManager::new(msg_handler, keys_manager.get_node_secret(), &seed, logger.clone(), Arc::new(IgnoringUnknownMessageHandler {}))); let node = Node { node: manager, peer_manager, chain_monitor, persister, tx_broadcaster, logger, best_block }; nodes.push(node); } diff --git a/lightning-net-tokio/src/lib.rs b/lightning-net-tokio/src/lib.rs index 5f5fece0d2e..68a25a75ef3 100644 --- a/lightning-net-tokio/src/lib.rs +++ b/lightning-net-tokio/src/lib.rs @@ -37,7 +37,7 @@ //! type DataPersister = dyn lightning::chain::channelmonitor::Persist + Send + Sync; //! type ChainMonitor = lightning::chain::chainmonitor::ChainMonitor, Arc, Arc, Arc, Arc>; //! type ChannelManager = Arc>; -//! type PeerManager = Arc>; +//! type PeerManager = Arc>>; //! //! // Connect to node with pubkey their_node_id at addr: //! async fn connect_to_node(peer_manager: PeerManager, chain_monitor: Arc, channel_manager: ChannelManager, their_node_id: PublicKey, addr: SocketAddr) { @@ -80,6 +80,7 @@ use tokio::io::{AsyncReadExt, AsyncWrite, AsyncWriteExt}; use lightning::ln::peer_handler; use lightning::ln::peer_handler::SocketDescriptor as LnSocketTrait; +use lightning::ln::peer_handler::UnknownMessageHandler; use lightning::ln::msgs::{ChannelMessageHandler, RoutingMessageHandler}; use lightning::util::logger::Logger; @@ -119,10 +120,11 @@ struct Connection { id: u64, } impl Connection { - async fn schedule_read(peer_manager: Arc, Arc, Arc>>, us: Arc>, mut reader: io::ReadHalf, mut read_wake_receiver: mpsc::Receiver<()>, mut write_avail_receiver: mpsc::Receiver<()>) where + async fn schedule_read(peer_manager: Arc, Arc, Arc, Arc>>, us: Arc>, mut reader: io::ReadHalf, mut read_wake_receiver: mpsc::Receiver<()>, mut write_avail_receiver: mpsc::Receiver<()>) where CMH: ChannelMessageHandler + 'static, RMH: RoutingMessageHandler + 'static, - L: Logger + 'static + ?Sized { + L: Logger + 'static + ?Sized, + UMH: UnknownMessageHandler + 'static { // 8KB is nice and big but also should never cause any issues with stack overflowing. let mut buf = [0; 8192]; @@ -222,10 +224,11 @@ impl Connection { /// The returned future will complete when the peer is disconnected and associated handling /// futures are freed, though, because all processing futures are spawned with tokio::spawn, you do /// not need to poll the provided future in order to make progress. -pub fn setup_inbound(peer_manager: Arc, Arc, Arc>>, stream: StdTcpStream) -> impl std::future::Future where +pub fn setup_inbound(peer_manager: Arc, Arc, Arc, Arc>>, stream: StdTcpStream) -> impl std::future::Future where CMH: ChannelMessageHandler + 'static + Send + Sync, RMH: RoutingMessageHandler + 'static + Send + Sync, - L: Logger + 'static + ?Sized + Send + Sync { + L: Logger + 'static + ?Sized + Send + Sync, + UMH: UnknownMessageHandler + 'static + Send + Sync { let (reader, write_receiver, read_receiver, us) = Connection::new(stream); #[cfg(debug_assertions)] let last_us = Arc::clone(&us); @@ -262,10 +265,11 @@ pub fn setup_inbound(peer_manager: Arc(peer_manager: Arc, Arc, Arc>>, their_node_id: PublicKey, stream: StdTcpStream) -> impl std::future::Future where +pub fn setup_outbound(peer_manager: Arc, Arc, Arc, Arc>>, their_node_id: PublicKey, stream: StdTcpStream) -> impl std::future::Future where CMH: ChannelMessageHandler + 'static + Send + Sync, RMH: RoutingMessageHandler + 'static + Send + Sync, - L: Logger + 'static + ?Sized + Send + Sync { + L: Logger + 'static + ?Sized + Send + Sync, + UMH: UnknownMessageHandler + 'static + Send + Sync { let (reader, mut write_receiver, read_receiver, us) = Connection::new(stream); #[cfg(debug_assertions)] let last_us = Arc::clone(&us); @@ -332,10 +336,11 @@ pub fn setup_outbound(peer_manager: Arc(peer_manager: Arc, Arc, Arc>>, their_node_id: PublicKey, addr: SocketAddr) -> Option> where +pub async fn connect_outbound(peer_manager: Arc, Arc, Arc, Arc>>, their_node_id: PublicKey, addr: SocketAddr) -> Option> where CMH: ChannelMessageHandler + 'static + Send + Sync, RMH: RoutingMessageHandler + 'static + Send + Sync, - L: Logger + 'static + ?Sized + Send + Sync { + L: Logger + 'static + ?Sized + Send + Sync, + UMH: UnknownMessageHandler + 'static + Send + Sync { if let Ok(Ok(stream)) = time::timeout(Duration::from_secs(10), async { TcpStream::connect(&addr).await.map(|s| s.into_std().unwrap()) }).await { Some(setup_outbound(peer_manager, their_node_id, stream)) } else { None } @@ -467,7 +472,7 @@ impl Hash for SocketDescriptor { mod tests { use lightning::ln::features::*; use lightning::ln::msgs::*; - use lightning::ln::peer_handler::{MessageHandler, PeerManager}; + use lightning::ln::peer_handler::{MessageHandler, PeerManager, IgnoringUnknownMessageHandler}; use lightning::util::events::*; use bitcoin::secp256k1::{Secp256k1, SecretKey, PublicKey}; @@ -563,7 +568,7 @@ mod tests { let a_manager = Arc::new(PeerManager::new(MessageHandler { chan_handler: Arc::clone(&a_handler), route_handler: Arc::clone(&a_handler), - }, a_key.clone(), &[1; 32], Arc::new(TestLogger()))); + }, a_key.clone(), &[1; 32], Arc::new(TestLogger()), Arc::new(IgnoringUnknownMessageHandler {}))); let (b_connected_sender, mut b_connected) = mpsc::channel(1); let (b_disconnected_sender, mut b_disconnected) = mpsc::channel(1); @@ -577,7 +582,7 @@ mod tests { let b_manager = Arc::new(PeerManager::new(MessageHandler { chan_handler: Arc::clone(&b_handler), route_handler: Arc::clone(&b_handler), - }, b_key.clone(), &[2; 32], Arc::new(TestLogger()))); + }, b_key.clone(), &[2; 32], Arc::new(TestLogger()), Arc::new(IgnoringUnknownMessageHandler {}))); // We bind on localhost, hoping the environment is properly configured with a local // address. This may not always be the case in containers and the like, so if this test is diff --git a/lightning/src/ln/peer_handler.rs b/lightning/src/ln/peer_handler.rs index 96ec31c98ee..832537bbbb5 100644 --- a/lightning/src/ln/peer_handler.rs +++ b/lightning/src/ln/peer_handler.rs @@ -68,6 +68,38 @@ impl Deref for IgnoringMessageHandler { fn deref(&self) -> &Self { self } } +/// A dummy implementation of `UnknownMessageHandler` that does nothing. +pub struct IgnoringUnknownMessageHandler{} + +/// Define a dummy type to satisfy the constraint of UnknownMessageHandle `Message` +/// associated type for implementing it for IgnoringUnknownMessageHandler. +type DummyType = (); +impl Encode for DummyType { + const TYPE: u16 = 0; +} +impl Writeable for DummyType { + fn write(&self, _writer: &mut W) -> Result<(), ::std::io::Error> { + Ok(()) + } +} + +impl UnknownMessageHandler for IgnoringUnknownMessageHandler { + type MessageEnum = (); + fn read(&self, _message_type: u16, _buffer: &mut R) -> Result, msgs::DecodeError> { + Ok(None) + } + + fn handle_unknown_message(&self, _msg: Self::MessageEnum) -> Result<(), MessageHandlingError> { + // Since we always return `None` in the read the handle method should never be called. + unreachable!(); + } +} + +impl Deref for IgnoringUnknownMessageHandler { + type Target = IgnoringUnknownMessageHandler; + fn deref(&self) -> &Self { self } +} + /// A dummy struct which implements `ChannelMessageHandler` without having any channels. /// You can provide one of these as the route_handler in a MessageHandler. pub struct ErroringMessageHandler { @@ -173,6 +205,16 @@ pub struct MessageHandler where pub route_handler: RM, } +/// Handler for messages external to the LN protocol. +pub trait UnknownMessageHandler { + /// A type that represents an enumeration of messages that can be handled by the handler. + type MessageEnum; + /// + fn read(&self, msg_type: u16, buffer: &mut R) -> Result, msgs::DecodeError>; + /// Called with the struct that was previously decoded calling the `read` method. + fn handle_unknown_message(&self, msg: Self::MessageEnum) -> Result<(), MessageHandlingError>; +} + /// Provides an object which can be used to send data to and which uniquely identifies a connection /// to a remote host. You will need to be able to generate multiple of these which meet Eq and /// implement Hash to meet the PeerManager API. @@ -311,7 +353,7 @@ fn _check_usize_is_32_or_64() { /// lifetimes). Other times you can afford a reference, which is more efficient, in which case /// SimpleRefPeerManager is the more appropriate type. Defining these type aliases prevents /// issues such as overly long function definitions. -pub type SimpleArcPeerManager = PeerManager>, Arc, Arc>>, Arc>; +pub type SimpleArcPeerManager = PeerManager>, Arc, Arc>>, Arc, UMH>; /// SimpleRefPeerManager is a type alias for a PeerManager reference, and is the reference /// counterpart to the SimpleArcPeerManager type alias. Use this type by default when you don't @@ -319,7 +361,7 @@ pub type SimpleArcPeerManager = PeerManager = PeerManager, &'e NetGraphMsgHandler<&'g C, &'f L>, &'f L>; +pub type SimpleRefPeerManager<'a, 'b, 'c, 'd, 'e, 'f, 'g, SD, M, T, F, C, L, UMH> = PeerManager, &'e NetGraphMsgHandler<&'g C, &'f L>, &'f L, UMH>; /// A PeerManager manages a set of peers, described by their [`SocketDescriptor`] and marshalls /// socket events into messages which it passes on to its [`MessageHandler`]. @@ -340,14 +382,16 @@ pub type SimpleRefPeerManager<'a, 'b, 'c, 'd, 'e, 'f, 'g, SD, M, T, F, C, L> = P /// you're using lightning-net-tokio. /// /// [`read_event`]: PeerManager::read_event -pub struct PeerManager where +pub struct PeerManager where CM::Target: ChannelMessageHandler, RM::Target: RoutingMessageHandler, - L::Target: Logger { + L::Target: Logger, + UMH::Target: UnknownMessageHandler { message_handler: MessageHandler, peers: Mutex>, our_node_secret: SecretKey, ephemeral_key_midstate: Sha256Engine, + unknown_message_handler: UMH, // Usize needs to be at least 32 bits to avoid overflowing both low and high. If usize is 64 // bits we will never realistically count into high: @@ -357,8 +401,11 @@ pub struct PeerManager PeerManager where +impl PeerManager where CM::Target: ChannelMessageHandler, L::Target: Logger { /// Constructs a new PeerManager with the given ChannelMessageHandler. No routing message @@ -396,11 +443,11 @@ impl PeerManager PeerManager where +impl PeerManager where RM::Target: RoutingMessageHandler, L::Target: Logger { /// Constructs a new PeerManager with the given RoutingMessageHandler. No channel message @@ -416,18 +463,19 @@ impl PeerManager PeerManager where +impl PeerManager where CM::Target: ChannelMessageHandler, RM::Target: RoutingMessageHandler, - L::Target: Logger { + L::Target: Logger, + UMH::Target: UnknownMessageHandler { /// Constructs a new PeerManager with the given message handlers and node_id secret key /// ephemeral_random_data is used to derive per-connection ephemeral keys and must be /// cryptographically secure random bytes. - pub fn new(message_handler: MessageHandler, our_node_secret: SecretKey, ephemeral_random_data: &[u8; 32], logger: L) -> Self { + pub fn new(message_handler: MessageHandler, our_node_secret: SecretKey, ephemeral_random_data: &[u8; 32], logger: L, unknown_message_handler: UMH) -> Self { let mut ephemeral_key_midstate = Sha256::engine(); ephemeral_key_midstate.input(ephemeral_random_data); @@ -442,6 +490,7 @@ impl PeerManager PeerManager(&self, node_id: &PublicKey, message: &M) { + let mut peers_lock = self.peers.lock().unwrap(); + let peers = &mut *peers_lock; + match peers.node_id_to_descriptor.get(node_id) { + Some(descriptor) => match peers.peers.get_mut(&descriptor) { + Some(mut peer) => { + if peer.their_features.is_some() { + self.enqueue_message(&mut peer, message); + } + }, + None => panic!("Inconsistent peers set state!"), + }, + None => { // Do nothing + }, + }; + } + /// Append a message to a peer's pending outbound/write buffer, and update the map of peers needing sends accordingly. fn enqueue_message(&self, peer: &mut Peer, message: &M) { let mut buffer = VecWriter(Vec::new()); @@ -801,47 +868,80 @@ impl PeerManager x, - Err(e) => { - match e { - msgs::DecodeError::UnknownVersion => return Err(PeerHandleError { no_connection_possible: false }), - msgs::DecodeError::UnknownRequiredFeature => { - log_trace!(self.logger, "Got a channel/node announcement with an known required feature flag, you may want to update!"); - continue; + macro_rules! handle_message_result { + ($result: expr) => { + match $result { + Ok(x) => x, + Err(e) => match e { + msgs::DecodeError::UnknownVersion => return Err(PeerHandleError { no_connection_possible: false }), + msgs::DecodeError::UnknownRequiredFeature => { + log_trace!(self.logger, "Got a channel/node announcement with an known required feature flag, you may want to update!"); + continue; + } + msgs::DecodeError::InvalidValue => { + log_debug!(self.logger, "Got an invalid value while deserializing message"); + return Err(PeerHandleError { no_connection_possible: false }); + } + msgs::DecodeError::ShortRead => { + log_debug!(self.logger, "Deserialization failed due to shortness of message"); + return Err(PeerHandleError { no_connection_possible: false }); + } + msgs::DecodeError::BadLengthDescriptor => return Err(PeerHandleError { no_connection_possible: false }), + msgs::DecodeError::Io(_) => return Err(PeerHandleError { no_connection_possible: false }), + msgs::DecodeError::UnsupportedCompression => { + log_trace!(self.logger, "We don't support zlib-compressed message fields, ignoring message"); + continue; + } } - msgs::DecodeError::InvalidValue => { - log_debug!(self.logger, "Got an invalid value while deserializing message"); - return Err(PeerHandleError { no_connection_possible: false }); - } - msgs::DecodeError::ShortRead => { - log_debug!(self.logger, "Deserialization failed due to shortness of message"); - return Err(PeerHandleError { no_connection_possible: false }); - } - msgs::DecodeError::BadLengthDescriptor => return Err(PeerHandleError { no_connection_possible: false }), - msgs::DecodeError::Io(_) => return Err(PeerHandleError { no_connection_possible: false }), - msgs::DecodeError::UnsupportedCompression => { - log_trace!(self.logger, "We don't support zlib-compressed message fields, ignoring message"); - continue; + }; + } + } + + macro_rules! handle_handling_error { + ($error: expr) => { + match $error { + MessageHandlingError::PeerHandleError(e) => { return Err(e) }, + MessageHandlingError::LightningError(e) => { + try_potential_handleerror!(Err(e)); } } - } - }; + }; + } + + let mut reader = ::std::io::Cursor::new(&msg_data[..]); + let message = handle_message_result!(wire::read(&mut reader)); + + // Need an Init as first message + if let wire::Message::Init(_) = message { + } else if peer.their_features.is_none() { + log_debug!(self.logger, "Peer {} sent non-Init first message", log_pubkey!(peer.their_node_id.unwrap())); + return Err(PeerHandleError{ no_connection_possible: false }.into()); + } - match self.handle_message(peer, message) { - Err(handling_error) => match handling_error { - MessageHandlingError::PeerHandleError(e) => { return Err(e) }, - MessageHandlingError::LightningError(e) => { - try_potential_handleerror!(Err(e)); + if let wire::Message::Unknown(msg_type) = message { + let message_opt = handle_message_result!(self.unknown_message_handler.read(*msg_type, &mut reader)); + if let Some(message) = message_opt { + if let Err(e) = self.unknown_message_handler.handle_unknown_message(message) { + handle_handling_error!(e); + } + } else { + if msg_type.is_even() { + log_debug!(self.logger, "Received unknown even message of type {}, disconnecting peer!", msg_type); + // Fail the channel if message is an even, unknown type as per BOLT #1. + return Err(PeerHandleError{ no_connection_possible: true }.into()); + } + + log_trace!(self.logger, "Received unknown odd message of type {}, ignoring", msg_type); + } + } else { + match self.handle_message(peer, message) { + Err(handling_error) => handle_handling_error!(handling_error), + Ok(Some(msg)) => { + peer_node_id = Some(peer.their_node_id.expect("After noise is complete, their_node_id is always set")); + msgs_to_forward.push(msg); }, - }, - Ok(Some(msg)) => { - peer_node_id = Some(peer.their_node_id.expect("After noise is complete, their_node_id is always set")); - msgs_to_forward.push(msg); - }, - Ok(None) => {}, + Ok(None) => {}, + } } } } @@ -868,13 +968,6 @@ impl PeerManager Result, MessageHandlingError> { log_trace!(self.logger, "Received message {:?} from {}", message, log_pubkey!(peer.their_node_id.unwrap())); - // Need an Init as first message - if let wire::Message::Init(_) = message { - } else if peer.their_features.is_none() { - log_debug!(self.logger, "Peer {} sent non-Init first message", log_pubkey!(peer.their_node_id.unwrap())); - return Err(PeerHandleError{ no_connection_possible: false }.into()); - } - let mut should_forward = None; match message { @@ -1033,14 +1126,9 @@ impl PeerManager { - log_debug!(self.logger, "Received unknown even message of type {}, disconnecting peer!", msg_type); - // Fail the channel if message is an even, unknown type as per BOLT #1. - return Err(PeerHandleError{ no_connection_possible: true }.into()); + wire::Message::Unknown(_msg) => { + // Handled in `do_read_event`. }, - wire::Message::Unknown(msg_type) => { - log_trace!(self.logger, "Received unknown odd message of type {}, ignoring", msg_type); - } }; Ok(should_forward) } @@ -1135,27 +1223,29 @@ impl PeerManager { - { - match peers.node_id_to_descriptor.get($node_id) { - Some(descriptor) => match peers.peers.get_mut(&descriptor) { - Some(peer) => { - if peer.their_features.is_none() { - continue; - } - peer - }, - None => panic!("Inconsistent peers set state!"), - }, - None => { - continue; + + macro_rules! get_peer_for_forwarding { + ($node_id: expr) => { + { + match peers.node_id_to_descriptor.get($node_id) { + Some(descriptor) => match peers.peers.get_mut(&descriptor) { + Some(peer) => { + if peer.their_features.is_none() { + continue; + } + peer }, - } + None => panic!("Inconsistent peers set state!"), + }, + None => { + continue; + }, } } } + } + + for event in events_generated.drain(..) { match event { MessageSendEvent::SendAcceptChannel { ref node_id, ref msg } => { log_debug!(self.logger, "Handling SendAcceptChannel event in peer_handler for node {} for channel {}", @@ -1322,7 +1412,7 @@ impl PeerManager PeerManager(peer_count: usize, cfgs: &'a Vec) -> Vec> { + fn create_network<'a>(peer_count: usize, cfgs: &'a Vec) -> Vec> { let mut peers = Vec::new(); for i in 0..peer_count { let node_secret = SecretKey::from_slice(&[42 + i as u8; 32]).unwrap(); let ephemeral_bytes = [i as u8; 32]; let msg_handler = MessageHandler { chan_handler: &cfgs[i].chan_handler, route_handler: &cfgs[i].routing_handler }; - let peer = PeerManager::new(msg_handler, node_secret, &ephemeral_bytes, &cfgs[i].logger); + let peer = PeerManager::new(msg_handler, node_secret, &ephemeral_bytes, &cfgs[i].logger, IgnoringUnknownMessageHandler {}); peers.push(peer); } peers } - fn establish_connection<'a>(peer_a: &PeerManager, peer_b: &PeerManager) -> (FileDescriptor, FileDescriptor) { + fn establish_connection<'a>(peer_a: &PeerManager, peer_b: &PeerManager) -> (FileDescriptor, FileDescriptor) { let secp_ctx = Secp256k1::new(); let a_id = PublicKey::from_secret_key(&secp_ctx, &peer_a.our_node_secret); let mut fd_a = FileDescriptor { fd: 1, outbound_data: Arc::new(Mutex::new(Vec::new())) }; diff --git a/lightning/src/ln/wire.rs b/lightning/src/ln/wire.rs index 0ee280b50e4..7a495d98f51 100644 --- a/lightning/src/ln/wire.rs +++ b/lightning/src/ln/wire.rs @@ -62,6 +62,13 @@ pub enum Message { #[derive(Clone, Copy, Debug)] pub struct MessageType(u16); +impl ::std::ops::Deref for MessageType { + type Target = u16; + fn deref(&self) -> &Self::Target { + &self.0 + } +} + impl Message { #[allow(dead_code)] // This method is only used in tests /// Returns the type that was used to decode the message payload. diff --git a/lightning/src/util/events.rs b/lightning/src/util/events.rs index 0fc7c6b3a3d..122b2efc1e6 100644 --- a/lightning/src/util/events.rs +++ b/lightning/src/util/events.rs @@ -433,7 +433,7 @@ pub enum MessageSendEvent { node_id: PublicKey, /// The reply_channel_range which should be sent. msg: msgs::ReplyChannelRange, - } + }, } /// A trait indicating an object may generate message send events