diff --git a/fuzz/src/full_stack.rs b/fuzz/src/full_stack.rs index d1adf06e1ed..c0a460b6cc4 100644 --- a/fuzz/src/full_stack.rs +++ b/fuzz/src/full_stack.rs @@ -34,7 +34,7 @@ use lightning::chain::transaction::OutPoint; use lightning::chain::keysinterface::{InMemorySigner, KeysInterface}; use lightning::ln::{PaymentHash, PaymentPreimage, PaymentSecret}; use lightning::ln::channelmanager::{ChainParameters, ChannelManager}; -use lightning::ln::peer_handler::{MessageHandler,PeerManager,SocketDescriptor}; +use lightning::ln::peer_handler::{MessageHandler,PeerManager,SocketDescriptor,IgnoringCustomMessageHandler}; use lightning::ln::msgs::DecodeError; use lightning::routing::router::get_route; use lightning::routing::network_graph::NetGraphMsgHandler; @@ -159,7 +159,7 @@ type ChannelMan = ChannelManager< EnforcingSigner, Arc, Arc, Arc, Arc, Arc>>, Arc, Arc, Arc, Arc>; -type PeerMan<'a> = PeerManager, Arc, Arc, Arc>>, Arc>; +type PeerMan<'a> = PeerManager, Arc, Arc, Arc>>, Arc, Arc>; struct MoneyLossDetector<'a> { manager: Arc, @@ -374,7 +374,7 @@ pub fn do_test(data: &[u8], logger: &Arc) { let mut loss_detector = MoneyLossDetector::new(&peers, channelmanager.clone(), monitor.clone(), PeerManager::new(MessageHandler { chan_handler: channelmanager.clone(), route_handler: net_graph_msg_handler.clone(), - }, our_network_key, &[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 15, 0], Arc::clone(&logger))); + }, our_network_key, &[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 15, 0], Arc::clone(&logger), Arc::new(IgnoringCustomMessageHandler{}))); let mut should_forward = false; let mut payments_received: Vec = Vec::new(); diff --git a/lightning-background-processor/src/lib.rs b/lightning-background-processor/src/lib.rs index 55ac14c048c..2e031af6ab2 100644 --- a/lightning-background-processor/src/lib.rs +++ b/lightning-background-processor/src/lib.rs @@ -15,7 +15,7 @@ use lightning::chain::channelmonitor; use lightning::chain::keysinterface::{Sign, KeysInterface}; use lightning::ln::channelmanager::ChannelManager; use lightning::ln::msgs::{ChannelMessageHandler, RoutingMessageHandler}; -use lightning::ln::peer_handler::{PeerManager, SocketDescriptor}; +use lightning::ln::peer_handler::{PeerManager, SocketDescriptor, CustomMessageHandler}; use lightning::util::events::{EventHandler, EventsProvider}; use lightning::util::logger::Logger; use std::sync::Arc; @@ -120,7 +120,8 @@ impl BackgroundProcessor { CMP: 'static + Send + ChannelManagerPersister, M: 'static + Deref> + Send + Sync, CM: 'static + Deref> + Send + Sync, - PM: 'static + Deref> + Send + Sync, + UMH: 'static + Deref + Send + Sync, + PM: 'static + Deref> + Send + Sync, > (persister: CMP, event_handler: EH, chain_monitor: M, channel_manager: CM, peer_manager: PM, logger: L) -> Self where @@ -133,6 +134,7 @@ impl BackgroundProcessor { P::Target: 'static + channelmonitor::Persist, CMH::Target: 'static + ChannelMessageHandler, RMH::Target: 'static + RoutingMessageHandler, + UMH::Target: 'static + CustomMessageHandler, { let stop_thread = Arc::new(AtomicBool::new(false)); let stop_thread_clone = stop_thread.clone(); @@ -224,7 +226,7 @@ mod tests { use lightning::ln::channelmanager::{BREAKDOWN_TIMEOUT, ChainParameters, ChannelManager, SimpleArcChannelManager}; use lightning::ln::features::InitFeatures; use lightning::ln::msgs::ChannelMessageHandler; - use lightning::ln::peer_handler::{PeerManager, MessageHandler, SocketDescriptor}; + use lightning::ln::peer_handler::{PeerManager, MessageHandler, SocketDescriptor, IgnoringCustomMessageHandler}; use lightning::util::config::UserConfig; use lightning::util::events::{Event, MessageSendEventsProvider, MessageSendEvent}; use lightning::util::ser::Writeable; @@ -252,7 +254,7 @@ mod tests { struct Node { node: Arc>, - peer_manager: Arc, Arc, Arc>>, + peer_manager: Arc, Arc, Arc, Arc>>, chain_monitor: Arc, persister: Arc, tx_broadcaster: Arc, @@ -293,7 +295,7 @@ mod tests { let params = ChainParameters { network, best_block }; let manager = Arc::new(ChannelManager::new(fee_estimator.clone(), chain_monitor.clone(), tx_broadcaster.clone(), logger.clone(), keys_manager.clone(), UserConfig::default(), params)); let msg_handler = MessageHandler { chan_handler: Arc::new(test_utils::TestChannelMessageHandler::new()), route_handler: Arc::new(test_utils::TestRoutingMessageHandler::new() )}; - let peer_manager = Arc::new(PeerManager::new(msg_handler, keys_manager.get_node_secret(), &seed, logger.clone())); + let peer_manager = Arc::new(PeerManager::new(msg_handler, keys_manager.get_node_secret(), &seed, logger.clone(), Arc::new(IgnoringCustomMessageHandler{}))); let node = Node { node: manager, peer_manager, chain_monitor, persister, tx_broadcaster, logger, best_block }; nodes.push(node); } diff --git a/lightning-net-tokio/src/lib.rs b/lightning-net-tokio/src/lib.rs index 5f5fece0d2e..5e47c8e8222 100644 --- a/lightning-net-tokio/src/lib.rs +++ b/lightning-net-tokio/src/lib.rs @@ -80,6 +80,7 @@ use tokio::io::{AsyncReadExt, AsyncWrite, AsyncWriteExt}; use lightning::ln::peer_handler; use lightning::ln::peer_handler::SocketDescriptor as LnSocketTrait; +use lightning::ln::peer_handler::CustomMessageHandler; use lightning::ln::msgs::{ChannelMessageHandler, RoutingMessageHandler}; use lightning::util::logger::Logger; @@ -119,10 +120,11 @@ struct Connection { id: u64, } impl Connection { - async fn schedule_read(peer_manager: Arc, Arc, Arc>>, us: Arc>, mut reader: io::ReadHalf, mut read_wake_receiver: mpsc::Receiver<()>, mut write_avail_receiver: mpsc::Receiver<()>) where + async fn schedule_read(peer_manager: Arc, Arc, Arc, Arc>>, us: Arc>, mut reader: io::ReadHalf, mut read_wake_receiver: mpsc::Receiver<()>, mut write_avail_receiver: mpsc::Receiver<()>) where CMH: ChannelMessageHandler + 'static, RMH: RoutingMessageHandler + 'static, - L: Logger + 'static + ?Sized { + L: Logger + 'static + ?Sized, + UMH: CustomMessageHandler + 'static { // 8KB is nice and big but also should never cause any issues with stack overflowing. let mut buf = [0; 8192]; @@ -222,10 +224,11 @@ impl Connection { /// The returned future will complete when the peer is disconnected and associated handling /// futures are freed, though, because all processing futures are spawned with tokio::spawn, you do /// not need to poll the provided future in order to make progress. -pub fn setup_inbound(peer_manager: Arc, Arc, Arc>>, stream: StdTcpStream) -> impl std::future::Future where +pub fn setup_inbound(peer_manager: Arc, Arc, Arc, Arc>>, stream: StdTcpStream) -> impl std::future::Future where CMH: ChannelMessageHandler + 'static + Send + Sync, RMH: RoutingMessageHandler + 'static + Send + Sync, - L: Logger + 'static + ?Sized + Send + Sync { + L: Logger + 'static + ?Sized + Send + Sync, + UMH: CustomMessageHandler + 'static + Send + Sync { let (reader, write_receiver, read_receiver, us) = Connection::new(stream); #[cfg(debug_assertions)] let last_us = Arc::clone(&us); @@ -262,10 +265,11 @@ pub fn setup_inbound(peer_manager: Arc(peer_manager: Arc, Arc, Arc>>, their_node_id: PublicKey, stream: StdTcpStream) -> impl std::future::Future where +pub fn setup_outbound(peer_manager: Arc, Arc, Arc, Arc>>, their_node_id: PublicKey, stream: StdTcpStream) -> impl std::future::Future where CMH: ChannelMessageHandler + 'static + Send + Sync, RMH: RoutingMessageHandler + 'static + Send + Sync, - L: Logger + 'static + ?Sized + Send + Sync { + L: Logger + 'static + ?Sized + Send + Sync, + UMH: CustomMessageHandler + 'static + Send + Sync { let (reader, mut write_receiver, read_receiver, us) = Connection::new(stream); #[cfg(debug_assertions)] let last_us = Arc::clone(&us); @@ -332,10 +336,11 @@ pub fn setup_outbound(peer_manager: Arc(peer_manager: Arc, Arc, Arc>>, their_node_id: PublicKey, addr: SocketAddr) -> Option> where +pub async fn connect_outbound(peer_manager: Arc, Arc, Arc, Arc>>, their_node_id: PublicKey, addr: SocketAddr) -> Option> where CMH: ChannelMessageHandler + 'static + Send + Sync, RMH: RoutingMessageHandler + 'static + Send + Sync, - L: Logger + 'static + ?Sized + Send + Sync { + L: Logger + 'static + ?Sized + Send + Sync, + UMH: CustomMessageHandler + 'static + Send + Sync { if let Ok(Ok(stream)) = time::timeout(Duration::from_secs(10), async { TcpStream::connect(&addr).await.map(|s| s.into_std().unwrap()) }).await { Some(setup_outbound(peer_manager, their_node_id, stream)) } else { None } @@ -563,7 +568,7 @@ mod tests { let a_manager = Arc::new(PeerManager::new(MessageHandler { chan_handler: Arc::clone(&a_handler), route_handler: Arc::clone(&a_handler), - }, a_key.clone(), &[1; 32], Arc::new(TestLogger()))); + }, a_key.clone(), &[1; 32], Arc::new(TestLogger()), Arc::new(lightning::ln::peer_handler::IgnoringCustomMessageHandler{}))); let (b_connected_sender, mut b_connected) = mpsc::channel(1); let (b_disconnected_sender, mut b_disconnected) = mpsc::channel(1); @@ -577,7 +582,7 @@ mod tests { let b_manager = Arc::new(PeerManager::new(MessageHandler { chan_handler: Arc::clone(&b_handler), route_handler: Arc::clone(&b_handler), - }, b_key.clone(), &[2; 32], Arc::new(TestLogger()))); + }, b_key.clone(), &[2; 32], Arc::new(TestLogger()), Arc::new(lightning::ln::peer_handler::IgnoringCustomMessageHandler{}))); // We bind on localhost, hoping the environment is properly configured with a local // address. This may not always be the case in containers and the like, so if this test is diff --git a/lightning/src/ln/mod.rs b/lightning/src/ln/mod.rs index 7500b93c005..66c63f04d90 100644 --- a/lightning/src/ln/mod.rs +++ b/lightning/src/ln/mod.rs @@ -35,7 +35,7 @@ pub(crate) mod peer_channel_encryptor; mod channel; mod onion_utils; -mod wire; +pub mod wire; // Older rustc (which we support) refuses to let us call the get_payment_preimage_hash!() macro // without the node parameter being mut. This is incorrect, and thus newer rustcs will complain diff --git a/lightning/src/ln/peer_handler.rs b/lightning/src/ln/peer_handler.rs index be12a32e99e..b09f41112a2 100644 --- a/lightning/src/ln/peer_handler.rs +++ b/lightning/src/ln/peer_handler.rs @@ -69,6 +69,27 @@ impl Deref for IgnoringMessageHandler { fn deref(&self) -> &Self { self } } +/// A dummy implementation of `CustomMessageHandler` that does nothing. +pub struct IgnoringCustomMessageHandler{} + +impl wire::Read for IgnoringCustomMessageHandler { + type CustomMessageType = (); + fn read(&self, _message_type: u16, _buffer: &mut R) -> Result, msgs::DecodeError> { + Ok(None) + } +} + +impl CustomMessageHandler for IgnoringCustomMessageHandler { + fn handle_unknown_message(&self, _msg: Self::CustomMessageType) -> Result<(), MessageHandlingError> { + // Since we always return `None` in the read the handle method should never be called. + unreachable!(); + } +} +impl Deref for IgnoringCustomMessageHandler { + type Target = IgnoringCustomMessageHandler; + fn deref(&self) -> &Self { self } +} + /// A dummy struct which implements `ChannelMessageHandler` without having any channels. /// You can provide one of these as the route_handler in a MessageHandler. pub struct ErroringMessageHandler { @@ -174,6 +195,13 @@ pub struct MessageHandler where pub route_handler: RM, } +/// Handler for messages external to the LN protocol. +pub trait CustomMessageHandler : wire::Read { + /// Called with the message type that was received and the buffer to be read. + /// Can return a `MessageHandlingError` if the message could not be handled. + fn handle_unknown_message(&self, msg: Self::CustomMessageType) -> Result<(), MessageHandlingError>; +} + /// Provides an object which can be used to send data to and which uniquely identifies a connection /// to a remote host. You will need to be able to generate multiple of these which meet Eq and /// implement Hash to meet the PeerManager API. @@ -314,7 +342,7 @@ fn _check_usize_is_32_or_64() { /// lifetimes). Other times you can afford a reference, which is more efficient, in which case /// SimpleRefPeerManager is the more appropriate type. Defining these type aliases prevents /// issues such as overly long function definitions. -pub type SimpleArcPeerManager = PeerManager>, Arc, Arc>>, Arc>; +pub type SimpleArcPeerManager = PeerManager>, Arc, Arc>>, Arc, Arc>; /// SimpleRefPeerManager is a type alias for a PeerManager reference, and is the reference /// counterpart to the SimpleArcPeerManager type alias. Use this type by default when you don't @@ -322,7 +350,7 @@ pub type SimpleArcPeerManager = PeerManager = PeerManager, &'e NetGraphMsgHandler<&'g C, &'f L>, &'f L>; +pub type SimpleRefPeerManager<'a, 'b, 'c, 'd, 'e, 'f, 'g, SD, M, T, F, C, L> = PeerManager, &'e NetGraphMsgHandler<&'g C, &'f L>, &'f L, IgnoringCustomMessageHandler>; /// A PeerManager manages a set of peers, described by their [`SocketDescriptor`] and marshalls /// socket events into messages which it passes on to its [`MessageHandler`]. @@ -343,14 +371,16 @@ pub type SimpleRefPeerManager<'a, 'b, 'c, 'd, 'e, 'f, 'g, SD, M, T, F, C, L> = P /// you're using lightning-net-tokio. /// /// [`read_event`]: PeerManager::read_event -pub struct PeerManager where +pub struct PeerManager where CM::Target: ChannelMessageHandler, RM::Target: RoutingMessageHandler, - L::Target: Logger { + L::Target: Logger, + CMH::Target: CustomMessageHandler + wire::Read { message_handler: MessageHandler, peers: Mutex>, our_node_secret: SecretKey, ephemeral_key_midstate: Sha256Engine, + custom_message_handler: CMH, // Usize needs to be at least 32 bits to avoid overflowing both low and high. If usize is 64 // bits we will never realistically count into high: @@ -360,8 +390,11 @@ pub struct PeerManager PeerManager where +impl PeerManager where CM::Target: ChannelMessageHandler, L::Target: Logger { /// Constructs a new PeerManager with the given ChannelMessageHandler. No routing message @@ -399,11 +432,11 @@ impl PeerManager PeerManager where +impl PeerManager where RM::Target: RoutingMessageHandler, L::Target: Logger { /// Constructs a new PeerManager with the given RoutingMessageHandler. No channel message @@ -419,18 +452,19 @@ impl PeerManager PeerManager where +impl PeerManager where CM::Target: ChannelMessageHandler, RM::Target: RoutingMessageHandler, - L::Target: Logger { + L::Target: Logger, + CMH::Target: CustomMessageHandler + wire::Read { /// Constructs a new PeerManager with the given message handlers and node_id secret key /// ephemeral_random_data is used to derive per-connection ephemeral keys and must be /// cryptographically secure random bytes. - pub fn new(message_handler: MessageHandler, our_node_secret: SecretKey, ephemeral_random_data: &[u8; 32], logger: L) -> Self { + pub fn new(message_handler: MessageHandler, our_node_secret: SecretKey, ephemeral_random_data: &[u8; 32], logger: L, custom_message_handler: CMH) -> Self { let mut ephemeral_key_midstate = Sha256::engine(); ephemeral_key_midstate.input(ephemeral_random_data); @@ -445,6 +479,7 @@ impl PeerManager PeerManager(&self, node_id: &PublicKey, message: &M) { + let mut peers_lock = self.peers.lock().unwrap(); + let peers = &mut *peers_lock; + match peers.node_id_to_descriptor.get(node_id) { + Some(descriptor) => match peers.peers.get_mut(&descriptor) { + Some(mut peer) => { + if peer.their_features.is_some() { + self.enqueue_message(&mut peer, message); + } + }, + None => panic!("Inconsistent peers set state!"), + }, + None => { // Do nothing + }, + }; + } + /// Append a message to a peer's pending outbound/write buffer, and update the map of peers needing sends accordingly. fn enqueue_message(&self, peer: &mut Peer, message: &M) { let mut buffer = VecWriter(Vec::new()); @@ -805,7 +859,7 @@ impl PeerManager x, Err(e) => { @@ -868,174 +922,171 @@ impl PeerManager Result, MessageHandlingError> { - log_trace!(self.logger, "Received message {:?} from {}", message, log_pubkey!(peer.their_node_id.unwrap())); - - // Need an Init as first message - if let wire::Message::Init(_) = message { - } else if peer.their_features.is_none() { - log_debug!(self.logger, "Peer {} sent non-Init first message", log_pubkey!(peer.their_node_id.unwrap())); - return Err(PeerHandleError{ no_connection_possible: false }.into()); - } - + fn handle_message(&self, peer: &mut Peer, message: wire::MessageOrCustom<<::Target as wire::Read>::CustomMessageType>) -> Result, MessageHandlingError> { let mut should_forward = None; match message { - // Setup and Control messages: - wire::Message::Init(msg) => { - if msg.features.requires_unknown_bits() { - log_debug!(self.logger, "Peer features required unknown version bits"); - return Err(PeerHandleError{ no_connection_possible: true }.into()); - } - if peer.their_features.is_some() { - return Err(PeerHandleError{ no_connection_possible: false }.into()); - } + wire::MessageOrCustom::Custom(custom) => self.custom_message_handler.handle_unknown_message(custom)?, + wire::MessageOrCustom::LN(ln_message) => { + log_trace!(self.logger, "Received message {:?} from {}", ln_message, log_pubkey!(peer.their_node_id.unwrap())); + match ln_message { + // Setup and Control messages: + wire::Message::Init(msg) => { + if msg.features.requires_unknown_bits() { + log_debug!(self.logger, "Peer features required unknown version bits"); + return Err(PeerHandleError{ no_connection_possible: true }.into()); + } + if peer.their_features.is_some() { + return Err(PeerHandleError{ no_connection_possible: false }.into()); + } - log_info!(self.logger, "Received peer Init message: {}", msg.features); + log_info!(self.logger, "Received peer Init message: {}", msg.features); - if msg.features.initial_routing_sync() { - peer.sync_status = InitSyncTracker::ChannelsSyncing(0); - } - if !msg.features.supports_static_remote_key() { - log_debug!(self.logger, "Peer {} does not support static remote key, disconnecting with no_connection_possible", log_pubkey!(peer.their_node_id.unwrap())); - return Err(PeerHandleError{ no_connection_possible: true }.into()); - } + if msg.features.initial_routing_sync() { + peer.sync_status = InitSyncTracker::ChannelsSyncing(0); + } + if !msg.features.supports_static_remote_key() { + log_debug!(self.logger, "Peer {} does not support static remote key, disconnecting with no_connection_possible", log_pubkey!(peer.their_node_id.unwrap())); + return Err(PeerHandleError{ no_connection_possible: true }.into()); + } - self.message_handler.route_handler.sync_routing_table(&peer.their_node_id.unwrap(), &msg); + self.message_handler.route_handler.sync_routing_table(&peer.their_node_id.unwrap(), &msg); - self.message_handler.chan_handler.peer_connected(&peer.their_node_id.unwrap(), &msg); - peer.their_features = Some(msg.features); - }, - wire::Message::Error(msg) => { - let mut data_is_printable = true; - for b in msg.data.bytes() { - if b < 32 || b > 126 { - data_is_printable = false; - break; - } - } + self.message_handler.chan_handler.peer_connected(&peer.their_node_id.unwrap(), &msg); + peer.their_features = Some(msg.features); + }, + wire::Message::Error(msg) => { + let mut data_is_printable = true; + for b in msg.data.bytes() { + if b < 32 || b > 126 { + data_is_printable = false; + break; + } + } - if data_is_printable { - log_debug!(self.logger, "Got Err message from {}: {}", log_pubkey!(peer.their_node_id.unwrap()), msg.data); - } else { - log_debug!(self.logger, "Got Err message from {} with non-ASCII error message", log_pubkey!(peer.their_node_id.unwrap())); - } - self.message_handler.chan_handler.handle_error(&peer.their_node_id.unwrap(), &msg); - if msg.channel_id == [0; 32] { - return Err(PeerHandleError{ no_connection_possible: true }.into()); - } - }, + if data_is_printable { + log_debug!(self.logger, "Got Err message from {}: {}", log_pubkey!(peer.their_node_id.unwrap()), msg.data); + } else { + log_debug!(self.logger, "Got Err message from {} with non-ASCII error message", log_pubkey!(peer.their_node_id.unwrap())); + } + self.message_handler.chan_handler.handle_error(&peer.their_node_id.unwrap(), &msg); + if msg.channel_id == [0; 32] { + return Err(PeerHandleError{ no_connection_possible: true }.into()); + } + }, - wire::Message::Ping(msg) => { - if msg.ponglen < 65532 { - let resp = msgs::Pong { byteslen: msg.ponglen }; - self.enqueue_message(peer, &resp); - } - }, - wire::Message::Pong(_msg) => { - peer.awaiting_pong = false; - }, + wire::Message::Ping(msg) => { + if msg.ponglen < 65532 { + let resp = msgs::Pong { byteslen: msg.ponglen }; + self.enqueue_message(peer, &resp); + } + }, + wire::Message::Pong(_msg) => { + peer.awaiting_pong = false; + }, - // Channel messages: - wire::Message::OpenChannel(msg) => { - self.message_handler.chan_handler.handle_open_channel(&peer.their_node_id.unwrap(), peer.their_features.clone().unwrap(), &msg); - }, - wire::Message::AcceptChannel(msg) => { - self.message_handler.chan_handler.handle_accept_channel(&peer.their_node_id.unwrap(), peer.their_features.clone().unwrap(), &msg); - }, + // Channel messages: + wire::Message::OpenChannel(msg) => { + self.message_handler.chan_handler.handle_open_channel(&peer.their_node_id.unwrap(), peer.their_features.clone().unwrap(), &msg); + }, + wire::Message::AcceptChannel(msg) => { + self.message_handler.chan_handler.handle_accept_channel(&peer.their_node_id.unwrap(), peer.their_features.clone().unwrap(), &msg); + }, - wire::Message::FundingCreated(msg) => { - self.message_handler.chan_handler.handle_funding_created(&peer.their_node_id.unwrap(), &msg); - }, - wire::Message::FundingSigned(msg) => { - self.message_handler.chan_handler.handle_funding_signed(&peer.their_node_id.unwrap(), &msg); - }, - wire::Message::FundingLocked(msg) => { - self.message_handler.chan_handler.handle_funding_locked(&peer.their_node_id.unwrap(), &msg); - }, + wire::Message::FundingCreated(msg) => { + self.message_handler.chan_handler.handle_funding_created(&peer.their_node_id.unwrap(), &msg); + }, + wire::Message::FundingSigned(msg) => { + self.message_handler.chan_handler.handle_funding_signed(&peer.their_node_id.unwrap(), &msg); + }, + wire::Message::FundingLocked(msg) => { + self.message_handler.chan_handler.handle_funding_locked(&peer.their_node_id.unwrap(), &msg); + }, - wire::Message::Shutdown(msg) => { - self.message_handler.chan_handler.handle_shutdown(&peer.their_node_id.unwrap(), peer.their_features.as_ref().unwrap(), &msg); - }, - wire::Message::ClosingSigned(msg) => { - self.message_handler.chan_handler.handle_closing_signed(&peer.their_node_id.unwrap(), &msg); - }, + wire::Message::Shutdown(msg) => { + self.message_handler.chan_handler.handle_shutdown(&peer.their_node_id.unwrap(), peer.their_features.as_ref().unwrap(), &msg); + }, + wire::Message::ClosingSigned(msg) => { + self.message_handler.chan_handler.handle_closing_signed(&peer.their_node_id.unwrap(), &msg); + }, - // Commitment messages: - wire::Message::UpdateAddHTLC(msg) => { - self.message_handler.chan_handler.handle_update_add_htlc(&peer.their_node_id.unwrap(), &msg); - }, - wire::Message::UpdateFulfillHTLC(msg) => { - self.message_handler.chan_handler.handle_update_fulfill_htlc(&peer.their_node_id.unwrap(), &msg); - }, - wire::Message::UpdateFailHTLC(msg) => { - self.message_handler.chan_handler.handle_update_fail_htlc(&peer.their_node_id.unwrap(), &msg); - }, - wire::Message::UpdateFailMalformedHTLC(msg) => { - self.message_handler.chan_handler.handle_update_fail_malformed_htlc(&peer.their_node_id.unwrap(), &msg); - }, + // Commitment messages: + wire::Message::UpdateAddHTLC(msg) => { + self.message_handler.chan_handler.handle_update_add_htlc(&peer.their_node_id.unwrap(), &msg); + }, + wire::Message::UpdateFulfillHTLC(msg) => { + self.message_handler.chan_handler.handle_update_fulfill_htlc(&peer.their_node_id.unwrap(), &msg); + }, + wire::Message::UpdateFailHTLC(msg) => { + self.message_handler.chan_handler.handle_update_fail_htlc(&peer.their_node_id.unwrap(), &msg); + }, + wire::Message::UpdateFailMalformedHTLC(msg) => { + self.message_handler.chan_handler.handle_update_fail_malformed_htlc(&peer.their_node_id.unwrap(), &msg); + }, - wire::Message::CommitmentSigned(msg) => { - self.message_handler.chan_handler.handle_commitment_signed(&peer.their_node_id.unwrap(), &msg); - }, - wire::Message::RevokeAndACK(msg) => { - self.message_handler.chan_handler.handle_revoke_and_ack(&peer.their_node_id.unwrap(), &msg); - }, - wire::Message::UpdateFee(msg) => { - self.message_handler.chan_handler.handle_update_fee(&peer.their_node_id.unwrap(), &msg); - }, - wire::Message::ChannelReestablish(msg) => { - self.message_handler.chan_handler.handle_channel_reestablish(&peer.their_node_id.unwrap(), &msg); - }, + wire::Message::CommitmentSigned(msg) => { + self.message_handler.chan_handler.handle_commitment_signed(&peer.their_node_id.unwrap(), &msg); + }, + wire::Message::RevokeAndACK(msg) => { + self.message_handler.chan_handler.handle_revoke_and_ack(&peer.their_node_id.unwrap(), &msg); + }, + wire::Message::UpdateFee(msg) => { + self.message_handler.chan_handler.handle_update_fee(&peer.their_node_id.unwrap(), &msg); + }, + wire::Message::ChannelReestablish(msg) => { + self.message_handler.chan_handler.handle_channel_reestablish(&peer.their_node_id.unwrap(), &msg); + }, - // Routing messages: - wire::Message::AnnouncementSignatures(msg) => { - self.message_handler.chan_handler.handle_announcement_signatures(&peer.their_node_id.unwrap(), &msg); - }, - wire::Message::ChannelAnnouncement(msg) => { - if self.message_handler.route_handler.handle_channel_announcement(&msg) - .map_err(|e| -> MessageHandlingError { e.into() })? { - should_forward = Some(wire::Message::ChannelAnnouncement(msg)); - } - }, - wire::Message::NodeAnnouncement(msg) => { - if self.message_handler.route_handler.handle_node_announcement(&msg) - .map_err(|e| -> MessageHandlingError { e.into() })? { - should_forward = Some(wire::Message::NodeAnnouncement(msg)); - } - }, - wire::Message::ChannelUpdate(msg) => { - self.message_handler.chan_handler.handle_channel_update(&peer.their_node_id.unwrap(), &msg); - if self.message_handler.route_handler.handle_channel_update(&msg) - .map_err(|e| -> MessageHandlingError { e.into() })? { - should_forward = Some(wire::Message::ChannelUpdate(msg)); - } - }, - wire::Message::QueryShortChannelIds(msg) => { - self.message_handler.route_handler.handle_query_short_channel_ids(&peer.their_node_id.unwrap(), msg)?; - }, - wire::Message::ReplyShortChannelIdsEnd(msg) => { - self.message_handler.route_handler.handle_reply_short_channel_ids_end(&peer.their_node_id.unwrap(), msg)?; - }, - wire::Message::QueryChannelRange(msg) => { - self.message_handler.route_handler.handle_query_channel_range(&peer.their_node_id.unwrap(), msg)?; - }, - wire::Message::ReplyChannelRange(msg) => { - self.message_handler.route_handler.handle_reply_channel_range(&peer.their_node_id.unwrap(), msg)?; - }, - wire::Message::GossipTimestampFilter(_msg) => { - // TODO: handle message - }, + // Routing messages: + wire::Message::AnnouncementSignatures(msg) => { + self.message_handler.chan_handler.handle_announcement_signatures(&peer.their_node_id.unwrap(), &msg); + }, + wire::Message::ChannelAnnouncement(msg) => { + if self.message_handler.route_handler.handle_channel_announcement(&msg) + .map_err(|e| -> MessageHandlingError { e.into() })? { + should_forward = Some(wire::Message::ChannelAnnouncement(msg)); + } + }, + wire::Message::NodeAnnouncement(msg) => { + if self.message_handler.route_handler.handle_node_announcement(&msg) + .map_err(|e| -> MessageHandlingError { e.into() })? { + should_forward = Some(wire::Message::NodeAnnouncement(msg)); + } + }, + wire::Message::ChannelUpdate(msg) => { + self.message_handler.chan_handler.handle_channel_update(&peer.their_node_id.unwrap(), &msg); + if self.message_handler.route_handler.handle_channel_update(&msg) + .map_err(|e| -> MessageHandlingError { e.into() })? { + should_forward = Some(wire::Message::ChannelUpdate(msg)); + } + }, + wire::Message::QueryShortChannelIds(msg) => { + self.message_handler.route_handler.handle_query_short_channel_ids(&peer.their_node_id.unwrap(), msg)?; + }, + wire::Message::ReplyShortChannelIdsEnd(msg) => { + self.message_handler.route_handler.handle_reply_short_channel_ids_end(&peer.their_node_id.unwrap(), msg)?; + }, + wire::Message::QueryChannelRange(msg) => { + self.message_handler.route_handler.handle_query_channel_range(&peer.their_node_id.unwrap(), msg)?; + }, + wire::Message::ReplyChannelRange(msg) => { + self.message_handler.route_handler.handle_reply_channel_range(&peer.their_node_id.unwrap(), msg)?; + }, + wire::Message::GossipTimestampFilter(_msg) => { + // TODO: handle message + }, - // Unknown messages: - wire::Message::Unknown(msg_type) if msg_type.is_even() => { - log_debug!(self.logger, "Received unknown even message of type {}, disconnecting peer!", msg_type); - // Fail the channel if message is an even, unknown type as per BOLT #1. - return Err(PeerHandleError{ no_connection_possible: true }.into()); + // Unknown messages: + wire::Message::Unknown(msg_type) if msg_type.is_even() => { + log_debug!(self.logger, "Received unknown even message of type {}, disconnecting peer!", msg_type); + // Fail the channel if message is an even, unknown type as per BOLT #1. + return Err(PeerHandleError{ no_connection_possible: true }.into()); + }, + wire::Message::Unknown(msg_type) => { + log_trace!(self.logger, "Received unknown odd message of type {}, ignoring", msg_type); + }, + }; }, - wire::Message::Unknown(msg_type) => { - log_trace!(self.logger, "Received unknown odd message of type {}, ignoring", msg_type); - } }; Ok(should_forward) } @@ -1130,27 +1181,29 @@ impl PeerManager { - { - match peers.node_id_to_descriptor.get($node_id) { - Some(descriptor) => match peers.peers.get_mut(&descriptor) { - Some(peer) => { - if peer.their_features.is_none() { - continue; - } - peer - }, - None => panic!("Inconsistent peers set state!"), - }, - None => { - continue; + + macro_rules! get_peer_for_forwarding { + ($node_id: expr) => { + { + match peers.node_id_to_descriptor.get($node_id) { + Some(descriptor) => match peers.peers.get_mut(&descriptor) { + Some(peer) => { + if peer.their_features.is_none() { + continue; + } + peer }, - } + None => panic!("Inconsistent peers set state!"), + }, + None => { + continue; + }, } } } + } + + for event in events_generated.drain(..) { match event { MessageSendEvent::SendAcceptChannel { ref node_id, ref msg } => { log_debug!(self.logger, "Handling SendAcceptChannel event in peer_handler for node {} for channel {}", @@ -1317,7 +1370,7 @@ impl PeerManager PeerManager(peer_count: usize, cfgs: &'a Vec) -> Vec> { + fn create_network<'a>(peer_count: usize, cfgs: &'a Vec) -> Vec> { let mut peers = Vec::new(); for i in 0..peer_count { let node_secret = SecretKey::from_slice(&[42 + i as u8; 32]).unwrap(); let ephemeral_bytes = [i as u8; 32]; let msg_handler = MessageHandler { chan_handler: &cfgs[i].chan_handler, route_handler: &cfgs[i].routing_handler }; - let peer = PeerManager::new(msg_handler, node_secret, &ephemeral_bytes, &cfgs[i].logger); + let peer = PeerManager::new(msg_handler, node_secret, &ephemeral_bytes, &cfgs[i].logger, IgnoringCustomMessageHandler {}); peers.push(peer); } peers } - fn establish_connection<'a>(peer_a: &PeerManager, peer_b: &PeerManager) -> (FileDescriptor, FileDescriptor) { + fn establish_connection<'a>(peer_a: &PeerManager, peer_b: &PeerManager) -> (FileDescriptor, FileDescriptor) { let secp_ctx = Secp256k1::new(); let a_id = PublicKey::from_secret_key(&secp_ctx, &peer_a.our_node_secret); let mut fd_a = FileDescriptor { fd: 1, outbound_data: Arc::new(Mutex::new(Vec::new())) }; diff --git a/lightning/src/ln/wire.rs b/lightning/src/ln/wire.rs index e3ff3095219..4a4c1e4d7e3 100644 --- a/lightning/src/ln/wire.rs +++ b/lightning/src/ln/wire.rs @@ -22,6 +22,24 @@ use io; use ln::msgs; use util::ser::{Readable, Writeable, Writer}; +/// Trait to be implemented by custom message (unrelated to the channel/gossip LN layers) +/// decoders. +pub trait Read { + /// The type of the message decoded by the implementation. + type CustomMessageType; + /// Decodes a custom message to `CustomMessageType`. If the given message type is known to the implementation and + /// the message could be decoded, must return `Ok(Some(message))`. If the message type + /// is unknown to the implementation, must return `Ok(None)`. If a decoding error + /// occur, must return `Err(DecodeError::X)` where `X` details the encountered error. + fn read(&self, message_type: u16, buffer: &mut R) -> Result, msgs::DecodeError>; +} + +// Really needs a better name +pub(crate) enum MessageOrCustom { + LN(Message), + Custom(T), +} + /// A Lightning message returned by [`read()`] when decoding bytes received over the wire. Each /// variant contains a message from [`msgs`] or otherwise the message type if unknown. #[allow(missing_docs)] @@ -63,6 +81,13 @@ pub enum Message { #[derive(Clone, Copy, Debug)] pub struct MessageType(u16); +impl ::std::ops::Deref for MessageType { + type Target = u16; + fn deref(&self) -> &Self::Target { + &self.0 + } +} + impl Message { #[allow(dead_code)] // This method is only used in tests /// Returns the type that was used to decode the message payload. @@ -120,95 +145,102 @@ impl ::core::fmt::Display for MessageType { /// # Errors /// /// Returns an error if the message payload code not be decoded as the specified type. -pub fn read(buffer: &mut R) -> Result { +pub(crate) fn read(buffer: &mut R, custom_reader: &H) -> Result, msgs::DecodeError> + where + H::Target: Read { let message_type = ::read(buffer)?; match message_type { msgs::Init::TYPE => { - Ok(Message::Init(Readable::read(buffer)?)) + Ok(MessageOrCustom::LN(Message::Init(Readable::read(buffer)?))) }, msgs::ErrorMessage::TYPE => { - Ok(Message::Error(Readable::read(buffer)?)) + Ok(MessageOrCustom::LN(Message::Error(Readable::read(buffer)?))) }, msgs::Ping::TYPE => { - Ok(Message::Ping(Readable::read(buffer)?)) + Ok(MessageOrCustom::LN(Message::Ping(Readable::read(buffer)?))) }, msgs::Pong::TYPE => { - Ok(Message::Pong(Readable::read(buffer)?)) + Ok(MessageOrCustom::LN(Message::Pong(Readable::read(buffer)?))) }, msgs::OpenChannel::TYPE => { - Ok(Message::OpenChannel(Readable::read(buffer)?)) + Ok(MessageOrCustom::LN(Message::OpenChannel(Readable::read(buffer)?))) }, msgs::AcceptChannel::TYPE => { - Ok(Message::AcceptChannel(Readable::read(buffer)?)) + Ok(MessageOrCustom::LN(Message::AcceptChannel(Readable::read(buffer)?))) }, msgs::FundingCreated::TYPE => { - Ok(Message::FundingCreated(Readable::read(buffer)?)) + Ok(MessageOrCustom::LN(Message::FundingCreated(Readable::read(buffer)?))) }, msgs::FundingSigned::TYPE => { - Ok(Message::FundingSigned(Readable::read(buffer)?)) + Ok(MessageOrCustom::LN(Message::FundingSigned(Readable::read(buffer)?))) }, msgs::FundingLocked::TYPE => { - Ok(Message::FundingLocked(Readable::read(buffer)?)) + Ok(MessageOrCustom::LN(Message::FundingLocked(Readable::read(buffer)?))) }, msgs::Shutdown::TYPE => { - Ok(Message::Shutdown(Readable::read(buffer)?)) + Ok(MessageOrCustom::LN(Message::Shutdown(Readable::read(buffer)?))) }, msgs::ClosingSigned::TYPE => { - Ok(Message::ClosingSigned(Readable::read(buffer)?)) + Ok(MessageOrCustom::LN(Message::ClosingSigned(Readable::read(buffer)?))) }, msgs::UpdateAddHTLC::TYPE => { - Ok(Message::UpdateAddHTLC(Readable::read(buffer)?)) + Ok(MessageOrCustom::LN(Message::UpdateAddHTLC(Readable::read(buffer)?))) }, msgs::UpdateFulfillHTLC::TYPE => { - Ok(Message::UpdateFulfillHTLC(Readable::read(buffer)?)) + Ok(MessageOrCustom::LN(Message::UpdateFulfillHTLC(Readable::read(buffer)?))) }, msgs::UpdateFailHTLC::TYPE => { - Ok(Message::UpdateFailHTLC(Readable::read(buffer)?)) + Ok(MessageOrCustom::LN(Message::UpdateFailHTLC(Readable::read(buffer)?))) }, msgs::UpdateFailMalformedHTLC::TYPE => { - Ok(Message::UpdateFailMalformedHTLC(Readable::read(buffer)?)) + Ok(MessageOrCustom::LN(Message::UpdateFailMalformedHTLC(Readable::read(buffer)?))) }, msgs::CommitmentSigned::TYPE => { - Ok(Message::CommitmentSigned(Readable::read(buffer)?)) + Ok(MessageOrCustom::LN(Message::CommitmentSigned(Readable::read(buffer)?))) }, msgs::RevokeAndACK::TYPE => { - Ok(Message::RevokeAndACK(Readable::read(buffer)?)) + Ok(MessageOrCustom::LN(Message::RevokeAndACK(Readable::read(buffer)?))) }, msgs::UpdateFee::TYPE => { - Ok(Message::UpdateFee(Readable::read(buffer)?)) + Ok(MessageOrCustom::LN(Message::UpdateFee(Readable::read(buffer)?))) }, msgs::ChannelReestablish::TYPE => { - Ok(Message::ChannelReestablish(Readable::read(buffer)?)) + Ok(MessageOrCustom::LN(Message::ChannelReestablish(Readable::read(buffer)?))) }, msgs::AnnouncementSignatures::TYPE => { - Ok(Message::AnnouncementSignatures(Readable::read(buffer)?)) + Ok(MessageOrCustom::LN(Message::AnnouncementSignatures(Readable::read(buffer)?))) }, msgs::ChannelAnnouncement::TYPE => { - Ok(Message::ChannelAnnouncement(Readable::read(buffer)?)) + Ok(MessageOrCustom::LN(Message::ChannelAnnouncement(Readable::read(buffer)?))) }, msgs::NodeAnnouncement::TYPE => { - Ok(Message::NodeAnnouncement(Readable::read(buffer)?)) + Ok(MessageOrCustom::LN(Message::NodeAnnouncement(Readable::read(buffer)?))) }, msgs::ChannelUpdate::TYPE => { - Ok(Message::ChannelUpdate(Readable::read(buffer)?)) + Ok(MessageOrCustom::LN(Message::ChannelUpdate(Readable::read(buffer)?))) }, msgs::QueryShortChannelIds::TYPE => { - Ok(Message::QueryShortChannelIds(Readable::read(buffer)?)) + Ok(MessageOrCustom::LN(Message::QueryShortChannelIds(Readable::read(buffer)?))) }, msgs::ReplyShortChannelIdsEnd::TYPE => { - Ok(Message::ReplyShortChannelIdsEnd(Readable::read(buffer)?)) + Ok(MessageOrCustom::LN(Message::ReplyShortChannelIdsEnd(Readable::read(buffer)?))) }, msgs::QueryChannelRange::TYPE => { - Ok(Message::QueryChannelRange(Readable::read(buffer)?)) + Ok(MessageOrCustom::LN(Message::QueryChannelRange(Readable::read(buffer)?))) }, msgs::ReplyChannelRange::TYPE => { - Ok(Message::ReplyChannelRange(Readable::read(buffer)?)) + Ok(MessageOrCustom::LN(Message::ReplyChannelRange(Readable::read(buffer)?))) } msgs::GossipTimestampFilter::TYPE => { - Ok(Message::GossipTimestampFilter(Readable::read(buffer)?)) + Ok(MessageOrCustom::LN(Message::GossipTimestampFilter(Readable::read(buffer)?))) }, _ => { - Ok(Message::Unknown(MessageType(message_type))) + // if let Some(custom) = custom_reader.read(message_type, buffer)? { + if let Some(custom) = custom_reader.read(message_type, buffer)? { + Ok(MessageOrCustom::Custom(custom)) + } else { + Ok(MessageOrCustom::LN(Message::Unknown(MessageType(message_type)))) + } }, } } @@ -355,6 +387,7 @@ mod tests { use super::*; use prelude::*; use core::convert::TryInto; + use ::ln::peer_handler::IgnoringCustomMessageHandler; // Big-endian wire encoding of Pong message (type = 19, byteslen = 2). const ENCODED_PONG: [u8; 6] = [0u8, 19u8, 0u8, 2u8, 0u8, 0u8]; @@ -363,38 +396,39 @@ mod tests { fn read_empty_buffer() { let buffer = []; let mut reader = io::Cursor::new(buffer); - assert!(read(&mut reader).is_err()); + assert!(read(&mut reader, &IgnoringCustomMessageHandler{}).is_err()); } #[test] fn read_incomplete_type() { let buffer = &ENCODED_PONG[..1]; let mut reader = io::Cursor::new(buffer); - assert!(read(&mut reader).is_err()); + assert!(read(&mut reader, &IgnoringCustomMessageHandler{}).is_err()); } #[test] fn read_empty_payload() { let buffer = &ENCODED_PONG[..2]; let mut reader = io::Cursor::new(buffer); - assert!(read(&mut reader).is_err()); + assert!(read(&mut reader, &IgnoringCustomMessageHandler{}).is_err()); } #[test] fn read_invalid_message() { let buffer = &ENCODED_PONG[..4]; let mut reader = io::Cursor::new(buffer); - assert!(read(&mut reader).is_err()); + assert!(read(&mut reader, &IgnoringCustomMessageHandler{}).is_err()); } #[test] fn read_known_message() { let buffer = &ENCODED_PONG[..]; let mut reader = io::Cursor::new(buffer); - let message = read(&mut reader).unwrap(); + let message = read(&mut reader, &IgnoringCustomMessageHandler{}).unwrap(); match message { - Message::Pong(_) => (), - _ => panic!("Expected pong message; found message type: {}", message.type_id()), + MessageOrCustom::LN(Message::Pong(_)) => (), + MessageOrCustom::LN(msg) => panic!("Expected pong message; found message type: {}", msg.type_id()), + MessageOrCustom::Custom(_) => panic!("Expected pong message; found custom message"), } } @@ -402,10 +436,11 @@ mod tests { fn read_unknown_message() { let buffer = &::core::u16::MAX.to_be_bytes(); let mut reader = io::Cursor::new(buffer); - let message = read(&mut reader).unwrap(); + let message = read(&mut reader, &IgnoringCustomMessageHandler{}).unwrap(); match message { - Message::Unknown(MessageType(::core::u16::MAX)) => (), - _ => panic!("Expected message type {}; found: {}", ::core::u16::MAX, message.type_id()), + MessageOrCustom::LN(Message::Unknown(MessageType(::core::u16::MAX))) => (), + MessageOrCustom::LN(msg) => panic!("Expected message type {}; found: {}", ::core::u16::MAX, msg.type_id()), + MessageOrCustom::Custom(_) => panic!("Expected message type {}; found custom message", ::core::u16::MAX), } } @@ -428,13 +463,14 @@ mod tests { assert!(write(&message, &mut buffer).is_ok()); let mut reader = io::Cursor::new(buffer); - let decoded_message = read(&mut reader).unwrap(); + let decoded_message = read(&mut reader, &IgnoringCustomMessageHandler{}).unwrap(); match decoded_message { - Message::Pong(msgs::Pong { byteslen: 2u16 }) => (), - Message::Pong(msgs::Pong { byteslen }) => { + MessageOrCustom::LN(Message::Pong(msgs::Pong { byteslen: 2u16 })) => (), + MessageOrCustom::LN(Message::Pong(msgs::Pong { byteslen })) => { panic!("Expected byteslen {}; found: {}", message.byteslen, byteslen); }, - _ => panic!("Expected pong message; found message type: {}", decoded_message.type_id()), + MessageOrCustom::LN(msg) => panic!("Expected pong message; found message type: {}", msg.type_id()), + MessageOrCustom::Custom(_) => panic!("Expected pong message; found custom message"), } } @@ -466,9 +502,9 @@ mod tests { fn check_init_msg(buffer: Vec, expect_unknown: bool) { let mut reader = io::Cursor::new(buffer); - let decoded_msg = read(&mut reader).unwrap(); + let decoded_msg = read(&mut reader, &IgnoringCustomMessageHandler{}).unwrap(); match decoded_msg { - Message::Init(msgs::Init { features }) => { + MessageOrCustom::LN(Message::Init(msgs::Init { features })) => { assert!(features.supports_variable_length_onion()); assert!(features.supports_upfront_shutdown_script()); assert!(features.supports_gossip_queries()); @@ -476,7 +512,8 @@ mod tests { assert!(!features.requires_unknown_bits()); assert!(!features.initial_routing_sync()); }, - _ => panic!("Expected init message, found message type: {}", decoded_msg.type_id()) + MessageOrCustom::LN(msg) => panic!("Expected init message, found message type: {}", msg.type_id()), + MessageOrCustom::Custom(_) => panic!("Expected init message; found custom message"), } } @@ -485,15 +522,16 @@ mod tests { // Taken from lnd v0.9.0-beta. let buffer = vec![1, 1, 91, 164, 146, 213, 213, 165, 21, 227, 102, 33, 105, 179, 214, 21, 221, 175, 228, 93, 57, 177, 191, 127, 107, 229, 31, 50, 21, 81, 179, 71, 39, 18, 35, 2, 89, 224, 110, 123, 66, 39, 148, 246, 177, 85, 12, 19, 70, 226, 173, 132, 156, 26, 122, 146, 71, 213, 247, 48, 93, 190, 185, 177, 12, 172, 0, 3, 2, 162, 161, 94, 103, 195, 37, 2, 37, 242, 97, 140, 2, 111, 69, 85, 39, 118, 30, 221, 99, 254, 120, 49, 103, 22, 170, 227, 111, 172, 164, 160, 49, 68, 138, 116, 16, 22, 206, 107, 51, 153, 255, 97, 108, 105, 99, 101, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 7, 1, 172, 21, 0, 2, 38, 7]; let mut reader = io::Cursor::new(buffer); - let decoded_msg = read(&mut reader).unwrap(); + let decoded_msg = read(&mut reader, &IgnoringCustomMessageHandler{}).unwrap(); match decoded_msg { - Message::NodeAnnouncement(msgs::NodeAnnouncement { contents: msgs::UnsignedNodeAnnouncement { features, ..}, ..}) => { + MessageOrCustom::LN(Message::NodeAnnouncement(msgs::NodeAnnouncement { contents: msgs::UnsignedNodeAnnouncement { features, ..}, ..})) => { assert!(features.supports_variable_length_onion()); assert!(features.supports_upfront_shutdown_script()); assert!(features.supports_gossip_queries()); assert!(!features.requires_unknown_bits()); }, - _ => panic!("Expected node announcement, found message type: {}", decoded_msg.type_id()) + MessageOrCustom::LN(msg) => panic!("Expected node announcement, found message type: {}", msg.type_id()), + MessageOrCustom::Custom(_) => panic!("Expected node announcement; found custom message"), } } @@ -502,12 +540,13 @@ mod tests { // Taken from lnd v0.9.0-beta. let buffer = vec![1, 0, 82, 238, 153, 33, 128, 87, 215, 2, 28, 241, 140, 250, 98, 255, 56, 5, 79, 240, 214, 231, 172, 35, 240, 171, 44, 9, 78, 91, 8, 193, 102, 5, 17, 178, 142, 106, 180, 183, 46, 38, 217, 212, 25, 236, 69, 47, 92, 217, 181, 221, 161, 205, 121, 201, 99, 38, 158, 216, 186, 193, 230, 86, 222, 6, 206, 67, 22, 255, 137, 212, 141, 161, 62, 134, 76, 48, 241, 54, 50, 167, 187, 247, 73, 27, 74, 1, 129, 185, 197, 153, 38, 90, 255, 138, 39, 161, 102, 172, 213, 74, 107, 88, 150, 90, 0, 49, 104, 7, 182, 184, 194, 219, 181, 172, 8, 245, 65, 226, 19, 228, 101, 145, 25, 159, 52, 31, 58, 93, 53, 59, 218, 91, 37, 84, 103, 17, 74, 133, 33, 35, 2, 203, 101, 73, 19, 94, 175, 122, 46, 224, 47, 168, 128, 128, 25, 26, 25, 214, 52, 247, 43, 241, 117, 52, 206, 94, 135, 156, 52, 164, 143, 234, 58, 185, 50, 185, 140, 198, 174, 71, 65, 18, 105, 70, 131, 172, 137, 0, 164, 51, 215, 143, 117, 119, 217, 241, 197, 177, 227, 227, 170, 199, 114, 7, 218, 12, 107, 30, 191, 236, 203, 21, 61, 242, 48, 192, 90, 233, 200, 199, 111, 162, 68, 234, 54, 219, 1, 233, 66, 5, 82, 74, 84, 211, 95, 199, 245, 202, 89, 223, 102, 124, 62, 166, 253, 253, 90, 180, 118, 21, 61, 110, 37, 5, 96, 167, 0, 0, 6, 34, 110, 70, 17, 26, 11, 89, 202, 175, 18, 96, 67, 235, 91, 191, 40, 195, 79, 58, 94, 51, 42, 31, 199, 178, 183, 60, 241, 136, 145, 15, 0, 2, 65, 0, 0, 1, 0, 0, 2, 37, 242, 97, 140, 2, 111, 69, 85, 39, 118, 30, 221, 99, 254, 120, 49, 103, 22, 170, 227, 111, 172, 164, 160, 49, 68, 138, 116, 16, 22, 206, 107, 3, 54, 61, 144, 88, 171, 247, 136, 208, 99, 9, 135, 37, 201, 178, 253, 136, 0, 185, 235, 68, 160, 106, 110, 12, 46, 21, 125, 204, 18, 75, 234, 16, 3, 42, 171, 28, 52, 224, 11, 30, 30, 253, 156, 148, 175, 203, 121, 250, 111, 122, 195, 84, 122, 77, 183, 56, 135, 101, 88, 41, 60, 191, 99, 232, 85, 2, 36, 17, 156, 11, 8, 12, 189, 177, 68, 88, 28, 15, 207, 21, 179, 151, 56, 226, 158, 148, 3, 120, 113, 177, 243, 184, 17, 173, 37, 46, 222, 16]; let mut reader = io::Cursor::new(buffer); - let decoded_msg = read(&mut reader).unwrap(); + let decoded_msg = read(&mut reader, &IgnoringCustomMessageHandler{}).unwrap(); match decoded_msg { - Message::ChannelAnnouncement(msgs::ChannelAnnouncement { contents: msgs::UnsignedChannelAnnouncement { features, ..}, ..}) => { + MessageOrCustom::LN(Message::ChannelAnnouncement(msgs::ChannelAnnouncement { contents: msgs::UnsignedChannelAnnouncement { features, ..}, ..})) => { assert!(!features.requires_unknown_bits()); }, - _ => panic!("Expected node announcement, found message type: {}", decoded_msg.type_id()) + MessageOrCustom::LN(msg) => panic!("Expected node announcement, found message type: {}", msg.type_id()), + MessageOrCustom::Custom(_) => panic!("Expected node announcement; found custom message"), } } } diff --git a/lightning/src/util/events.rs b/lightning/src/util/events.rs index 1780483cb8b..ac119296abd 100644 --- a/lightning/src/util/events.rs +++ b/lightning/src/util/events.rs @@ -513,7 +513,7 @@ pub enum MessageSendEvent { node_id: PublicKey, /// The reply_channel_range which should be sent. msg: msgs::ReplyChannelRange, - } + }, } /// A trait indicating an object may generate message send events