Skip to content
Merged
26 changes: 14 additions & 12 deletions fuzz/src/onion_message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ use lightning::onion_message::async_payments::{
AsyncPaymentsMessageHandler, HeldHtlcAvailable, ReleaseHeldHtlc,
};
use lightning::onion_message::messenger::{
CustomOnionMessageHandler, Destination, MessageRouter, OnionMessagePath, OnionMessenger,
PendingOnionMessage, Responder, ResponseInstruction,
CustomOnionMessageHandler, Destination, MessageRouter, MessageSendInstructions,
OnionMessagePath, OnionMessenger, Responder, ResponseInstruction,
};
use lightning::onion_message::offers::{OffersMessage, OffersMessageHandler};
use lightning::onion_message::packet::OnionMessageContents;
Expand Down Expand Up @@ -109,8 +109,8 @@ impl OffersMessageHandler for TestOffersMessageHandler {
fn handle_message(
&self, _message: OffersMessage, _context: Option<OffersContext>,
_responder: Option<Responder>,
) -> ResponseInstruction<OffersMessage> {
ResponseInstruction::NoResponse
) -> Option<(OffersMessage, ResponseInstruction)> {
None
}
}

Expand All @@ -119,13 +119,15 @@ struct TestAsyncPaymentsMessageHandler {}
impl AsyncPaymentsMessageHandler for TestAsyncPaymentsMessageHandler {
fn held_htlc_available(
&self, message: HeldHtlcAvailable, responder: Option<Responder>,
) -> ResponseInstruction<ReleaseHeldHtlc> {
) -> Option<(ReleaseHeldHtlc, ResponseInstruction)> {
let responder = match responder {
Some(resp) => resp,
None => return ResponseInstruction::NoResponse,
None => return None,
};
responder
.respond(ReleaseHeldHtlc { payment_release_secret: message.payment_release_secret })
Some((
ReleaseHeldHtlc { payment_release_secret: message.payment_release_secret },
responder.respond(),
))
}
fn release_held_htlc(&self, _message: ReleaseHeldHtlc) {}
}
Expand Down Expand Up @@ -158,10 +160,10 @@ impl CustomOnionMessageHandler for TestCustomMessageHandler {
fn handle_custom_message(
&self, message: Self::CustomMessage, _context: Option<Vec<u8>>,
responder: Option<Responder>,
) -> ResponseInstruction<Self::CustomMessage> {
) -> Option<(Self::CustomMessage, ResponseInstruction)> {
match responder {
Some(responder) => responder.respond(message),
None => ResponseInstruction::NoResponse,
Some(responder) => Some((message, responder.respond())),
None => None,
}
}
fn read_custom_message<R: io::Read>(
Expand All @@ -171,7 +173,7 @@ impl CustomOnionMessageHandler for TestCustomMessageHandler {
buffer.read_to_limit(&mut buf, u64::MAX)?;
return Ok(Some(TestCustomMessage {}));
}
fn release_pending_custom_messages(&self) -> Vec<PendingOnionMessage<Self::CustomMessage>> {
fn release_pending_custom_messages(&self) -> Vec<(TestCustomMessage, MessageSendInstructions)> {
vec![]
}
}
Expand Down
102 changes: 51 additions & 51 deletions lightning/src/ln/channelmanager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ use crate::offers::parse::Bolt12SemanticError;
use crate::offers::refund::{Refund, RefundBuilder};
use crate::offers::signer;
use crate::onion_message::async_payments::{AsyncPaymentsMessage, HeldHtlcAvailable, ReleaseHeldHtlc, AsyncPaymentsMessageHandler};
use crate::onion_message::messenger::{new_pending_onion_message, Destination, MessageRouter, PendingOnionMessage, Responder, ResponseInstruction};
use crate::onion_message::messenger::{Destination, MessageRouter, Responder, ResponseInstruction, MessageSendInstructions};
use crate::onion_message::offers::{OffersMessage, OffersMessageHandler};
use crate::sign::{EntropySource, NodeSigner, Recipient, SignerProvider};
use crate::sign::ecdsa::EcdsaChannelSigner;
Expand Down Expand Up @@ -2277,9 +2277,9 @@ where
needs_persist_flag: AtomicBool,

#[cfg(not(any(test, feature = "_test_utils")))]
pending_offers_messages: Mutex<Vec<PendingOnionMessage<OffersMessage>>>,
pending_offers_messages: Mutex<Vec<(OffersMessage, MessageSendInstructions)>>,
#[cfg(any(test, feature = "_test_utils"))]
pub(crate) pending_offers_messages: Mutex<Vec<PendingOnionMessage<OffersMessage>>>,
pub(crate) pending_offers_messages: Mutex<Vec<(OffersMessage, MessageSendInstructions)>>,

/// Tracks the message events that are to be broadcasted when we are connected to some peer.
pending_broadcast_messages: Mutex<Vec<MessageSendEvent>>,
Expand Down Expand Up @@ -9068,21 +9068,21 @@ where
.flat_map(|reply_path| offer.paths().iter().map(move |path| (path, reply_path)))
.take(OFFERS_MESSAGE_REQUEST_LIMIT)
.for_each(|(path, reply_path)| {
let message = new_pending_onion_message(
OffersMessage::InvoiceRequest(invoice_request.clone()),
Destination::BlindedPath(path.clone()),
Some(reply_path.clone()),
);
pending_offers_messages.push(message);
let instructions = MessageSendInstructions::WithSpecifiedReplyPath {
destination: Destination::BlindedPath(path.clone()),
reply_path: reply_path.clone(),
};
let message = OffersMessage::InvoiceRequest(invoice_request.clone());
pending_offers_messages.push((message, instructions));
});
} else if let Some(signing_pubkey) = offer.signing_pubkey() {
for reply_path in reply_paths {
let message = new_pending_onion_message(
OffersMessage::InvoiceRequest(invoice_request.clone()),
Destination::Node(signing_pubkey),
Some(reply_path),
);
pending_offers_messages.push(message);
let instructions = MessageSendInstructions::WithSpecifiedReplyPath {
destination: Destination::Node(signing_pubkey),
reply_path,
};
let message = OffersMessage::InvoiceRequest(invoice_request.clone());
pending_offers_messages.push((message, instructions));
}
} else {
debug_assert!(false);
Expand Down Expand Up @@ -9162,25 +9162,25 @@ where
let mut pending_offers_messages = self.pending_offers_messages.lock().unwrap();
if refund.paths().is_empty() {
for reply_path in reply_paths {
let message = new_pending_onion_message(
OffersMessage::Invoice(invoice.clone()),
Destination::Node(refund.payer_id()),
Some(reply_path),
);
pending_offers_messages.push(message);
let instructions = MessageSendInstructions::WithSpecifiedReplyPath {
destination: Destination::Node(refund.payer_id()),
reply_path,
};
let message = OffersMessage::Invoice(invoice.clone());
pending_offers_messages.push((message, instructions));
}
} else {
reply_paths
.iter()
.flat_map(|reply_path| refund.paths().iter().map(move |path| (path, reply_path)))
.take(OFFERS_MESSAGE_REQUEST_LIMIT)
.for_each(|(path, reply_path)| {
let message = new_pending_onion_message(
OffersMessage::Invoice(invoice.clone()),
Destination::BlindedPath(path.clone()),
Some(reply_path.clone()),
);
pending_offers_messages.push(message);
let instructions = MessageSendInstructions::WithSpecifiedReplyPath {
destination: Destination::BlindedPath(path.clone()),
reply_path: reply_path.clone(),
};
let message = OffersMessage::Invoice(invoice.clone());
pending_offers_messages.push((message, instructions));
});
}

Expand Down Expand Up @@ -10749,41 +10749,41 @@ where
{
fn handle_message(
&self, message: OffersMessage, context: Option<OffersContext>, responder: Option<Responder>,
) -> ResponseInstruction<OffersMessage> {
) -> Option<(OffersMessage, ResponseInstruction)> {
let secp_ctx = &self.secp_ctx;
let expanded_key = &self.inbound_payment_key;

match message {
OffersMessage::InvoiceRequest(invoice_request) => {
let responder = match responder {
Some(responder) => responder,
None => return ResponseInstruction::NoResponse,
None => return None,
};

let nonce = match context {
None if invoice_request.metadata().is_some() => None,
Some(OffersContext::InvoiceRequest { nonce }) => Some(nonce),
_ => return ResponseInstruction::NoResponse,
_ => return None,
};

let invoice_request = match nonce {
Some(nonce) => match invoice_request.verify_using_recipient_data(
nonce, expanded_key, secp_ctx,
) {
Ok(invoice_request) => invoice_request,
Err(()) => return ResponseInstruction::NoResponse,
Err(()) => return None,
},
None => match invoice_request.verify_using_metadata(expanded_key, secp_ctx) {
Ok(invoice_request) => invoice_request,
Err(()) => return ResponseInstruction::NoResponse,
Err(()) => return None,
},
};

let amount_msats = match InvoiceBuilder::<DerivedSigningPubkey>::amount_msats(
&invoice_request.inner
) {
Ok(amount_msats) => amount_msats,
Err(error) => return responder.respond(OffersMessage::InvoiceError(error.into())),
Err(error) => return Some((OffersMessage::InvoiceError(error.into()), responder.respond())),
};

let relative_expiry = DEFAULT_RELATIVE_EXPIRY.as_secs() as u32;
Expand All @@ -10793,7 +10793,7 @@ where
Ok((payment_hash, payment_secret)) => (payment_hash, payment_secret),
Err(()) => {
let error = Bolt12SemanticError::InvalidAmount;
return responder.respond(OffersMessage::InvoiceError(error.into()));
return Some((OffersMessage::InvoiceError(error.into()), responder.respond()));
},
};

Expand All @@ -10807,7 +10807,7 @@ where
Ok(payment_paths) => payment_paths,
Err(()) => {
let error = Bolt12SemanticError::MissingPaths;
return responder.respond(OffersMessage::InvoiceError(error.into()));
return Some((OffersMessage::InvoiceError(error.into()), responder.respond()));
},
};

Expand Down Expand Up @@ -10852,14 +10852,14 @@ where
};

match response {
Ok(invoice) => responder.respond(OffersMessage::Invoice(invoice)),
Err(error) => responder.respond(OffersMessage::InvoiceError(error.into())),
Ok(invoice) => Some((OffersMessage::Invoice(invoice), responder.respond())),
Err(error) => Some((OffersMessage::InvoiceError(error.into()), responder.respond())),
}
},
OffersMessage::Invoice(invoice) => {
let payment_id = match self.verify_bolt12_invoice(&invoice, context.as_ref()) {
Ok(payment_id) => payment_id,
Err(()) => return ResponseInstruction::NoResponse,
Err(()) => return None,
};

let logger = WithContext::from(
Expand All @@ -10871,7 +10871,7 @@ where
payment_id, invoice, context, responder,
};
self.pending_events.lock().unwrap().push_back((event, None));
return ResponseInstruction::NoResponse;
return None;
}

let error = match self.send_payment_for_verified_bolt12_invoice(
Expand All @@ -10890,26 +10890,26 @@ where
},
Err(Bolt12PaymentError::UnexpectedInvoice)
| Err(Bolt12PaymentError::DuplicateInvoice)
| Ok(()) => return ResponseInstruction::NoResponse,
| Ok(()) => return None,
};

match responder {
Some(responder) => responder.respond(OffersMessage::InvoiceError(error)),
Some(responder) => Some((OffersMessage::InvoiceError(error), responder.respond())),
None => {
log_trace!(logger, "No reply path to send error: {:?}", error);
ResponseInstruction::NoResponse
None
},
}
},
#[cfg(async_payments)]
OffersMessage::StaticInvoice(_invoice) => {
match responder {
Some(responder) => {
responder.respond(OffersMessage::InvoiceError(
InvoiceError::from_string("Static invoices not yet supported".to_string())
))
return Some((OffersMessage::InvoiceError(
InvoiceError::from_string("Static invoices not yet supported".to_string())
), responder.respond()));
},
None => return ResponseInstruction::NoResponse,
None => return None,
}
},
OffersMessage::InvoiceError(invoice_error) => {
Expand All @@ -10932,12 +10932,12 @@ where
_ => {},
}

ResponseInstruction::NoResponse
None
},
}
}

fn release_pending_messages(&self) -> Vec<PendingOnionMessage<OffersMessage>> {
fn release_pending_messages(&self) -> Vec<(OffersMessage, MessageSendInstructions)> {
core::mem::take(&mut self.pending_offers_messages.lock().unwrap())
}
}
Expand All @@ -10956,13 +10956,13 @@ where
{
fn held_htlc_available(
&self, _message: HeldHtlcAvailable, _responder: Option<Responder>
) -> ResponseInstruction<ReleaseHeldHtlc> {
ResponseInstruction::NoResponse
) -> Option<(ReleaseHeldHtlc, ResponseInstruction)> {
None
}

fn release_held_htlc(&self, _message: ReleaseHeldHtlc) {}

fn release_pending_messages(&self) -> Vec<PendingOnionMessage<AsyncPaymentsMessage>> {
fn release_pending_messages(&self) -> Vec<(AsyncPaymentsMessage, MessageSendInstructions)> {
Vec::new()
}
}
Expand Down
Loading