diff --git a/include/libp2p/muxer/yamux/yamuxed_connection.hpp b/include/libp2p/muxer/yamux/yamuxed_connection.hpp index a3b25483..6ac7dd59 100644 --- a/include/libp2p/muxer/yamux/yamuxed_connection.hpp +++ b/include/libp2p/muxer/yamux/yamuxed_connection.hpp @@ -239,6 +239,8 @@ namespace libp2p::connection { uint32_t ping_counter_ = 0; + bool close_after_write_ = false; + public: LIBP2P_METRICS_INSTANCE_COUNT_IF_ENABLED( libp2p::connection::YamuxedConnection); diff --git a/src/muxer/yamux/yamuxed_connection.cpp b/src/muxer/yamux/yamuxed_connection.cpp index aadfde9d..ffe7dbab 100644 --- a/src/muxer/yamux/yamuxed_connection.cpp +++ b/src/muxer/yamux/yamuxed_connection.cpp @@ -537,12 +537,6 @@ namespace libp2p::connection { SL_DEBUG(log(), "closing connection, reason: {}", notify_streams_code); - write_queue_.clear(); - - if (reply_to_peer_code.has_value() && !connection_->isClosed()) { - enqueue(goAwayMsg(reply_to_peer_code.value())); - } - Streams streams; streams.swap(streams_); @@ -560,6 +554,14 @@ namespace libp2p::connection { if (closed_callback_) { closed_callback_(remote_peer_, shared_from_this()); } + + close_after_write_ = true; + if (reply_to_peer_code) { + enqueue(goAwayMsg(*reply_to_peer_code)); + } else { + write_queue_.clear(); + std::ignore = connection_->close(); + } } void YamuxedConnection::writeStreamData(uint32_t stream_id, BytesIn data) { @@ -634,6 +636,8 @@ namespace libp2p::connection { void YamuxedConnection::onDataWritten(outcome::result res, StreamId stream_id) { if (!res) { + write_queue_.clear(); + std::ignore = connection_->close(); // write error close(res.error(), boost::none); return; @@ -672,10 +676,12 @@ namespace libp2p::connection { is_writing_ = false; - if (started_ && !write_queue_.empty()) { + if (not write_queue_.empty()) { auto next_packet = std::move(write_queue_.front()); write_queue_.pop_front(); doWrite(std::move(next_packet)); + } else if (close_after_write_) { + std::ignore = connection_->close(); } }