Skip to content

Commit

Permalink
messenger: wait for onion update before sending ir
Browse files Browse the repository at this point in the history
This fixes a race condition that occurs when we try to make a payment before
the PeerConnected event has reached the onion messenger with the updated
status. To fix this, we add a wait period in the onion messenger to wait for
this event to come in before we send the invoice request.
  • Loading branch information
orbitalturtle committed Jul 2, 2024
1 parent 2f1dd02 commit aa9caf9
Show file tree
Hide file tree
Showing 2 changed files with 176 additions and 110 deletions.
22 changes: 12 additions & 10 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -132,11 +132,15 @@ pub struct LifecycleSignals {
pub listener: Listener,
}

pub struct LndkOnionMessenger {}
pub struct LndkOnionMessenger {
current_peers: Mutex<HashMap<PublicKey, bool>>,
}

impl LndkOnionMessenger {
pub fn new() -> Self {
LndkOnionMessenger {}
LndkOnionMessenger {
current_peers: Mutex::new(HashMap::new()),
}
}

pub async fn run(
Expand Down Expand Up @@ -180,6 +184,10 @@ impl LndkOnionMessenger {
let onion_support = features_support_onion_messages(&peer.features);
peer_support.insert(pubkey, onion_support);
}
{
let mut current_peers_mut = self.current_peers.lock().unwrap();
current_peers_mut.clone_from(&peer_support);
}

// Create an onion messenger that depends on LND's signer client and consume related events.
let mut node_client = client.signer().clone();
Expand All @@ -197,14 +205,8 @@ impl LndkOnionMessenger {
);

let mut peers_client = client.lightning().clone();
self.run_onion_messenger(
peer_support,
&mut peers_client,
onion_messenger,
network,
args.signals,
)
.await
self.run_onion_messenger(&mut peers_client, onion_messenger, network, args.signals)
.await
}
}

Expand Down
264 changes: 164 additions & 100 deletions src/onion_messenger.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,6 @@ impl LndkOnionMessenger {
CMH: Deref,
>(
&self,
current_peers: HashMap<PublicKey, bool>,
ln_client: &mut tonic_lnd::LightningClient,
onion_messenger: OnionMessenger<ES, NS, L, MR, OMH, CMH>,
network: Network,
Expand All @@ -134,6 +133,11 @@ impl LndkOnionMessenger {
// one go (before we boot up the consumer). The number of peers that we have is also
// related to the number of events we can expect to process, so it's a sensible enough
// buffer size.
let current_peers;
{
let current_peers_mut = self.current_peers.lock().unwrap();
current_peers = current_peers_mut.clone();
}
let (sender, receiver) = channel(current_peers.len() + 1);
for (peer, onion_support) in current_peers.clone() {
sender
Expand Down Expand Up @@ -231,14 +235,15 @@ impl LndkOnionMessenger {
let mut message_sender = CustomMessenger {
client: ln_client.clone(),
};
let consume_result = consume_messenger_events(
onion_messenger,
receiver,
&mut message_sender,
rate_limiter,
network,
)
.await;
let consume_result = self
.consume_messenger_events(
onion_messenger,
receiver,
&mut message_sender,
rate_limiter,
network,
)
.await;
match consume_result {
Ok(_) => info!("Consume messenger events exited."),
Err(e) => {
Expand Down Expand Up @@ -602,81 +607,134 @@ impl fmt::Display for MessengerEvents {
}
}

/// consume_messenger_events receives a series of onion messaging related events and delivers them
/// to the OnionMessenger provided, using the RateLimiter to limit resources consumed by each peer.
async fn consume_messenger_events(
onion_messenger: impl OnionMessageHandler,
mut events: Receiver<MessengerEvents>,
message_sender: &mut impl SendCustomMessage,
rate_limiter: &mut impl RateLimiter,
network: Network,
) -> Result<(), ConsumerError> {
let network = vec![ChainHash::using_genesis_block(network)];

while let Some(onion_event) = events.recv().await {
match onion_event {
// We don't want to log SendOutgoing events, since we send out this event every 100 ms.
MessengerEvents::SendOutgoing => {}
_ => info!("Consume messenger events received: {onion_event}."),
};
impl LndkOnionMessenger {
/// consume_messenger_events receives a series of onion messaging related events and delivers
/// them to the OnionMessenger provided, using the RateLimiter to limit resources consumed by
/// each peer.
async fn consume_messenger_events(
&self,
onion_messenger: impl OnionMessageHandler,
mut events: Receiver<MessengerEvents>,
message_sender: &mut impl SendCustomMessage,
rate_limiter: &mut impl RateLimiter,
network: Network,
) -> Result<(), ConsumerError> {
let network = vec![ChainHash::using_genesis_block(network)];

while let Some(onion_event) = events.recv().await {
match onion_event {
// We don't want to log SendOutgoing events, since we send out this event every 100
// ms.
MessengerEvents::SendOutgoing => {}
_ => info!("Consume messenger events received: {onion_event}."),
};

match onion_event {
MessengerEvents::PeerConnected(pubkey, onion_support) => {
let init_features = if onion_support {
let onion_message_optional: u64 = 1 << ONION_MESSAGES_OPTIONAL;
InitFeatures::from_le_bytes(onion_message_optional.to_le_bytes().to_vec())
} else {
InitFeatures::empty()
};
match onion_event {
MessengerEvents::PeerConnected(pubkey, onion_support) => {
let init_features = if onion_support {
let onion_message_optional: u64 = 1 << ONION_MESSAGES_OPTIONAL;
InitFeatures::from_le_bytes(onion_message_optional.to_le_bytes().to_vec())
} else {
InitFeatures::empty()
};

onion_messenger
.peer_connected(
&pubkey,
&Init {
features: init_features,
remote_network_address: None,
networks: Some(network.clone()),
},
false,
)
.map_err(|_| ConsumerError::OnionMessengerFailure)?;

// In addition to keeping the onion messenger up to date with the latest peers, we
// need to keep our local version up to date so we send outgoing OMs
// all of our peers.
rate_limiter.peer_connected(pubkey);
}
MessengerEvents::PeerDisconnected(pubkey) => {
onion_messenger.peer_disconnected(&pubkey);
onion_messenger
.peer_connected(
&pubkey,
&Init {
features: init_features,
remote_network_address: None,
networks: Some(network.clone()),
},
false,
)
.map_err(|_| ConsumerError::OnionMessengerFailure)?;

// In addition to keeping the onion messenger up to date with the latest peers,
// we need to keep our local data up to date so we send
// outgoing OMs to all of our peers.
{
let mut current_peers = self.current_peers.lock().unwrap();
current_peers.insert(pubkey, onion_support);
std::mem::drop(current_peers);
}
rate_limiter.peer_connected(pubkey);
}
MessengerEvents::PeerDisconnected(pubkey) => {
onion_messenger.peer_disconnected(&pubkey);

// In addition to keeping the onion messenger up to date with the latest peers, we
// need to keep our local version up to date so we send outgoing OMs
// to our correct peers.
rate_limiter.peer_disconnected(pubkey);
}
MessengerEvents::IncomingMessage(pubkey, onion_message) => {
if !rate_limiter.query_peer(pubkey) {
info!("Peer: {pubkey} hit rate limit, dropping incoming onion message");
continue;
// In addition to keeping the onion messenger up to date with the latest peers,
// we need to keep our local version up to date so we send
// outgoing OMs to our correct peers.
rate_limiter.peer_disconnected(pubkey);
}
MessengerEvents::IncomingMessage(pubkey, onion_message) => {
if !rate_limiter.query_peer(pubkey) {
info!("Peer: {pubkey} hit rate limit, dropping incoming onion message");
continue;
}

onion_messenger.handle_onion_message(&pubkey, &onion_message)
}
MessengerEvents::SendOutgoing => {
for peer in rate_limiter.peers() {
if let Some(msg) = onion_messenger.next_onion_message_for_peer(peer) {
info!("Sending outgoing onion message to {peer}.");
relay_outgoing_msg_event(&peer, msg, message_sender).await;
onion_messenger.handle_onion_message(&pubkey, &onion_message)
}
MessengerEvents::SendOutgoing => {
for peer in rate_limiter.peers() {
let onion_support;
let current_peers;
{
let current_peers_mut = self.current_peers.lock().unwrap();
current_peers = current_peers_mut.clone();
onion_support = current_peers.get(&peer).unwrap_or(&false)
}

// If our local state says the peer we're sending a message to doesn't
// support onion messaging, we might need to give the onion messenger
// time to update its state.
let onion_support_timeout = 5;
if !onion_support {
match timeout(
Duration::from_secs(onion_support_timeout),
self.check_onion_status(&peer),
)
.await
{
Ok(_) => {}
Err(_) => {
warn!("Onion support status did not turn to true in {} seconds.", onion_support_timeout);
}
};
}

if let Some(msg) = onion_messenger.next_onion_message_for_peer(peer) {
info!("Sending outgoing onion message to {peer}.");
relay_outgoing_msg_event(&peer, msg, message_sender).await;
}
}
}
}
MessengerEvents::ProducerExit(e) => {
return Err(e);
MessengerEvents::ProducerExit(e) => {
return Err(e);
}
}
}

Ok(())
}
}

impl LndkOnionMessenger {
async fn check_onion_status(&self, pubkey: &PublicKey) {
loop {
{
let current_peers = self.current_peers.lock().unwrap();
if let Some(onion_support) = current_peers.get(pubkey) {
if *onion_support {
return;
}
};
}

Ok(())
sleep(Duration::from_millis(500)).await;
}
}
}

#[async_trait]
Expand Down Expand Up @@ -965,15 +1023,17 @@ mod tests {
.await
.unwrap();

let consume_err = consume_messenger_events(
mock,
receiver,
&mut sender_mock,
&mut rate_limiter,
Network::Regtest,
)
.await
.expect_err("consume should error");
let messenger = LndkOnionMessenger::new();
let consume_err = messenger
.consume_messenger_events(
mock,
receiver,
&mut sender_mock,
&mut rate_limiter,
Network::Regtest,
)
.await
.expect_err("consume should error");
matches!(consume_err, ConsumerError::PeerProducerExit);
}

Expand All @@ -994,15 +1054,17 @@ mod tests {

let mut sender_mock = MockSendCustomMessenger::new();

let consume_err = consume_messenger_events(
mock,
receiver,
&mut sender_mock,
&mut rate_limiter,
Network::Regtest,
)
.await
.expect_err("consume should error");
let messenger = LndkOnionMessenger::new();
let consume_err = messenger
.consume_messenger_events(
mock,
receiver,
&mut sender_mock,
&mut rate_limiter,
Network::Regtest,
)
.await
.expect_err("consume should error");
matches!(consume_err, ConsumerError::OnionMessengerFailure);
}

Expand All @@ -1015,15 +1077,17 @@ mod tests {
let mut sender_mock = MockSendCustomMessenger::new();
let mut rate_limiter = MockRateLimiter::new();

assert!(consume_messenger_events(
MockOnionHandler::new(),
receiver_done,
&mut sender_mock,
&mut rate_limiter,
Network::Regtest,
)
.await
.is_ok());
let messenger = LndkOnionMessenger::new();
assert!(messenger
.consume_messenger_events(
MockOnionHandler::new(),
receiver_done,
&mut sender_mock,
&mut rate_limiter,
Network::Regtest,
)
.await
.is_ok());
}

#[tokio::test]
Expand Down

0 comments on commit aa9caf9

Please sign in to comment.