Skip to content

Commit cad7208

Browse files
committed
Ensure we call send_data when we need to pause/unpause reads
In the previous commit, we moved the `send_data` `resume_read` flag to also indicate that we should pause if its unset. This should work as we mostly only set the flag when we're sending but may cause us to fail to pause if we are blocked on gossip validation but `awaiting_write_event` wasn't set as we had previously failed to fully flush a buffer (which no longer implies read-pause). Here we make this logic much more robust by ensuring we always make at least one `send_data` call in `do_attempt_write_data` if we need to pause read (or unpause read). Backport of 77cbb61
1 parent e61a843 commit cad7208

File tree

1 file changed

+18
-9
lines changed

1 file changed

+18
-9
lines changed

lightning/src/ln/peer_handler.rs

Lines changed: 18 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -781,6 +781,9 @@ struct Peer {
781781
/// Note that these messages are *not* encrypted/MAC'd, and are only serialized.
782782
gossip_broadcast_buffer: VecDeque<MessageBuf>,
783783
awaiting_write_event: bool,
784+
/// Set to true if the last call to [`SocketDescriptor::send_data`] for this peer had the
785+
/// `should_read` flag unset, indicating we've told the driver to stop reading from this peer.
786+
sent_pause_read: bool,
784787

785788
pending_read_buffer: Vec<u8>,
786789
pending_read_buffer_pos: usize,
@@ -1440,6 +1443,7 @@ where
14401443
pending_outbound_buffer_first_msg_offset: 0,
14411444
gossip_broadcast_buffer: VecDeque::new(),
14421445
awaiting_write_event: false,
1446+
sent_pause_read: false,
14431447

14441448
pending_read_buffer,
14451449
pending_read_buffer_pos: 0,
@@ -1500,6 +1504,7 @@ where
15001504
pending_outbound_buffer_first_msg_offset: 0,
15011505
gossip_broadcast_buffer: VecDeque::new(),
15021506
awaiting_write_event: false,
1507+
sent_pause_read: false,
15031508

15041509
pending_read_buffer,
15051510
pending_read_buffer_pos: 0,
@@ -1535,10 +1540,14 @@ where
15351540
}
15361541

15371542
fn do_attempt_write_data(
1538-
&self, descriptor: &mut Descriptor, peer: &mut Peer, force_one_write: bool,
1543+
&self, descriptor: &mut Descriptor, peer: &mut Peer, mut force_one_write: bool,
15391544
) {
1540-
let mut have_written = false;
1541-
while !peer.awaiting_write_event {
1545+
// If we detect that we should be reading from the peer but reads are currently paused, or
1546+
// vice versa, then we need to tell the socket driver to update their internal flag
1547+
// indicating whether or not reads are paused. Do this by forcing a write with the desired
1548+
// `continue_read` flag set, even if no outbound messages are currently queued.
1549+
force_one_write |= self.peer_should_read(peer) == peer.sent_pause_read;
1550+
while force_one_write || !peer.awaiting_write_event {
15421551
if peer.should_buffer_onion_message() {
15431552
if let Some((peer_node_id, _)) = peer.their_node_id {
15441553
let handler = &self.message_handler.onion_message_handler;
@@ -1606,20 +1615,20 @@ where
16061615
let should_read = self.peer_should_read(peer);
16071616
let next_buff = match peer.pending_outbound_buffer.front() {
16081617
None => {
1609-
if force_one_write && !have_written {
1610-
if should_read {
1611-
let data_sent = descriptor.send_data(&[], should_read);
1612-
debug_assert_eq!(data_sent, 0, "Can't write more than no data");
1613-
}
1618+
if force_one_write {
1619+
let data_sent = descriptor.send_data(&[], should_read);
1620+
debug_assert_eq!(data_sent, 0, "Can't write more than no data");
1621+
peer.sent_pause_read = !should_read;
16141622
}
16151623
return;
16161624
},
16171625
Some(buff) => buff,
16181626
};
1627+
force_one_write = false;
16191628

16201629
let pending = &next_buff[peer.pending_outbound_buffer_first_msg_offset..];
16211630
let data_sent = descriptor.send_data(pending, should_read);
1622-
have_written = true;
1631+
peer.sent_pause_read = !should_read;
16231632
peer.pending_outbound_buffer_first_msg_offset += data_sent;
16241633
if peer.pending_outbound_buffer_first_msg_offset == next_buff.len() {
16251634
peer.pending_outbound_buffer_first_msg_offset = 0;

0 commit comments

Comments
 (0)