From 35b765ea14cec0fe14d9417afbc40dc67155c792 Mon Sep 17 00:00:00 2001 From: Dmitry Zolotukhin Date: Wed, 4 Dec 2024 22:03:07 +0100 Subject: [PATCH] Revert "Experiment: try to use a bufferpool again." This reverts commit 099d62cc2bc2aeb9dd3c492a51ed072c1751c816. Although performance is not an issue, this uses more memory than the standard memory manager. --- src/bufferpool.rs | 117 ------------------------------------------- src/ikev2/mod.rs | 66 ++++++++++-------------- src/ikev2/session.rs | 6 +-- src/main.rs | 1 - 4 files changed, 27 insertions(+), 163 deletions(-) delete mode 100644 src/bufferpool.rs diff --git a/src/bufferpool.rs b/src/bufferpool.rs deleted file mode 100644 index 9ddf152..0000000 --- a/src/bufferpool.rs +++ /dev/null @@ -1,117 +0,0 @@ -use std::{ - borrow::{Borrow, BorrowMut}, - fmt, - ops::{Deref, DerefMut}, -}; - -use log::debug; -use tokio::{ - runtime, - sync::{mpsc, oneshot}, -}; - -pub struct BufferPool { - capacity: usize, - buffer_size: usize, - rx_return: mpsc::Receiver>, - tx_return: mpsc::Sender>, -} - -impl BufferPool { - pub fn new(capacity: usize, buffer_size: usize) -> BufferPool { - let (tx_return, rx_return) = mpsc::channel(capacity); - BufferPool { - capacity, - buffer_size, - rx_return, - tx_return, - } - } - - pub async fn borrow(&mut self) -> Option { - let buf = if self.tx_return.strong_count() < self.capacity { - Vec::with_capacity(self.buffer_size) - } else { - self.rx_return.recv().await? - }; - Some(Buffer::new(buf, self.tx_return.clone()).await) - } -} - -pub struct Buffer { - buf: Option>, - tx_return: Option>>, -} - -impl Buffer { - async fn new(buf: Vec, tx_pool_return: mpsc::Sender>) -> Buffer { - let (tx_return, rx_return) = oneshot::channel(); - let rt = runtime::Handle::current(); - rt.spawn(async move { - if let Ok(buf) = rx_return.await { - if tx_pool_return.send(buf).await.is_err() { - debug!("Failed to return used buffer: listener closed"); - } - } else { - debug!("Failed to return used buffer: oneshot receiver closed"); - } - }); - Buffer { - buf: Some(buf), - tx_return: Some(tx_return), - } - } -} - -impl Deref for Buffer { - type Target = Vec; - - fn deref(&self) -> &Self::Target { - self.buf.as_ref().unwrap() - } -} - -impl DerefMut for Buffer { - fn deref_mut(&mut self) -> &mut Self::Target { - self.buf.as_mut().unwrap() - } -} - -impl Borrow> for Buffer { - fn borrow(&self) -> &Vec { - self.buf.as_ref().unwrap() - } -} - -impl BorrowMut> for Buffer { - fn borrow_mut(&mut self) -> &mut Vec { - self.buf.as_mut().unwrap() - } -} - -impl Drop for Buffer { - fn drop(&mut self) { - let mut buf = self.buf.take().expect("Buffer dropped"); - buf.truncate(0); - let tx_return = self.tx_return.take().expect("Buffer dropped twice"); - if tx_return.send(buf).is_err() { - debug!("Failed to return dropped buffer: listener closed"); - } - } -} - -impl fmt::Display for Buffer { - fn fmt(&self, f: &mut fmt::Formatter) -> std::fmt::Result { - if let Some(ref data) = self.buf { - for (i, b) in data.iter().enumerate() { - write!(f, "{:02x}", b)?; - if i + 1 < data.len() { - write!(f, " ")?; - } - } - Ok(()) - } else { - write!(f, "[None]") - } - } -} diff --git a/src/ikev2/mod.rs b/src/ikev2/mod.rs index d7f86c7..558407c 100644 --- a/src/ikev2/mod.rs +++ b/src/ikev2/mod.rs @@ -19,7 +19,7 @@ use tokio::{ time, }; -use crate::{bufferpool, fortivpn}; +use crate::fortivpn; mod crypto; mod esp; @@ -322,10 +322,13 @@ impl Sockets { socket: Arc, is_nat_port: bool, ) { - let mut buffer_pool = bufferpool::BufferPool::new(tx.max_capacity(), MAX_DATAGRAM_SIZE); loop { - let mut buf = buffer_pool.borrow().await.unwrap(); - buf.resize(MAX_DATAGRAM_SIZE, 0); + // Theoretically the allocator should be smart enough to recycle memory. + // In the unlikely case this becomes a problem, switching to stack-allocated + // arrays would reduce memory usage, but increase number of copy operations. + // As mpsc uses a queue internally, memory will be allocated for the queue elements + // in any case. + let mut buf = vec![0u8; MAX_DATAGRAM_SIZE]; let (bytes_res, remote_addr) = match socket.recv_from(&mut buf).await { Ok(res) => res, Err(err) => { @@ -375,7 +378,9 @@ impl Sockets { } fn create_sender(&self) -> UdpSender { - UdpSender::new(self.send_tx.clone()) + UdpSender { + tx: self.send_tx.clone(), + } } } @@ -383,7 +388,7 @@ struct UdpDatagram { remote_addr: SocketAddr, local_addr: SocketAddr, is_nat_port: bool, - bytes: bufferpool::Buffer, + bytes: Vec, } impl UdpDatagram { @@ -396,37 +401,19 @@ impl UdpDatagram { } } +#[derive(Clone)] struct UdpSender { tx: mpsc::Sender, - buffer_pool: bufferpool::BufferPool, -} - -impl UdpSender { - fn new(tx: mpsc::Sender) -> UdpSender { - UdpSender { - tx, - buffer_pool: bufferpool::BufferPool::new(128, MAX_DATAGRAM_SIZE), - } - } -} - -impl Clone for UdpSender { - fn clone(&self) -> Self { - UdpSender { - tx: self.tx.clone(), - buffer_pool: bufferpool::BufferPool::new(128, MAX_DATAGRAM_SIZE), - } - } } impl UdpSender { async fn send_datagram( - &mut self, + &self, local_addr: &SocketAddr, remote_addr: &SocketAddr, data: &[u8], ) -> Result<(), SendError> { - let mut buffer = self.buffer_pool.borrow().await.unwrap(); + let mut buffer = Vec::with_capacity(MAX_DATAGRAM_SIZE); buffer.extend_from_slice(data); self.tx .send(SendUdpDatagram { @@ -442,7 +429,7 @@ impl UdpSender { struct SendUdpDatagram { remote_addr: SocketAddr, local_addr: SocketAddr, - bytes: bufferpool::Buffer, + bytes: Vec, } enum SessionMessage { @@ -955,7 +942,7 @@ impl Sessions { .await?); } trace!( - "Received ESP packet from {}\n{}", + "Received ESP packet from {}\n{:?}", datagram.remote_addr, datagram.bytes, ); @@ -1061,14 +1048,14 @@ impl Sessions { enum FortiTunnelEvent { Connected(IpAddr, Vec), - ReceivedPacket(bufferpool::Buffer, usize), + ReceivedPacket(Vec, usize), Error(fortivpn::FortiError), Disconnected, EchoFailed(fortivpn::FortiError), } enum FortiTunnelCommand { - SendPacket(bufferpool::Buffer), + SendPacket(Vec), Disconnect, } @@ -1078,19 +1065,16 @@ struct FortiService { tunnel_tx: Option>, tunnel_rx: Option>, tunnel_task: Option>>, - buffer_pool: bufferpool::BufferPool, } impl FortiService { fn new(config: fortivpn::Config) -> FortiService { - let buffer_pool = bufferpool::BufferPool::new(128, fortivpn::PPP_MTU.into()); FortiService { config, ip_configuration: None, tunnel_tx: None, tunnel_rx: None, tunnel_task: None, - buffer_pool, } } @@ -1195,7 +1179,6 @@ impl FortiService { } let mut need_flush = false; let mut is_connected = true; - let mut buffer_pool = bufferpool::BufferPool::new(tx.max_capacity(), MAX_DATAGRAM_SIZE); while is_connected { let (can_recv, command, send_echo, flush) = { let mut received_packet = pin!(tunnel.peek_recv()); @@ -1223,8 +1206,7 @@ impl FortiService { .await }; if can_recv { - let mut buffer = buffer_pool.borrow().await.unwrap(); - buffer.resize(MAX_DATAGRAM_SIZE, 0); + let mut buffer = vec![0; MAX_ESP_PACKET_SIZE]; let event = match tunnel.try_read_packet(&mut buffer, None).await { Ok(packet_bytes) => FortiTunnelEvent::ReceivedPacket(buffer, packet_bytes), Err(err) => { @@ -1287,10 +1269,14 @@ impl FortiService { self.tunnel_task = Some(rt.spawn(async move { loop { let result = Self::run_tunnel(config.clone(), &service_tx, &mut tunnel_rx).await; - service_tx + if service_tx .send(FortiTunnelEvent::Disconnected) .await - .map_err(|_| "VPN sink channel closed")?; + .is_err() + { + debug!("VPN sink channel closed"); + tunnel_rx.close(); + } if let Err(err) = result.as_ref() { warn!("VPN channel closed with error: {}", err) } else if tunnel_rx.is_closed() { @@ -1349,7 +1335,7 @@ impl FortiService { async fn send_packet(&mut self, data: &[u8]) -> Result<(), IKEv2Error> { if let Some(tx) = self.tunnel_tx.as_mut() { - let mut buffer = self.buffer_pool.borrow().await.unwrap(); + let mut buffer = Vec::with_capacity(fortivpn::PPP_MTU as usize); if data.len() > buffer.capacity() { warn!( "ESP packet ({}) exceeds PPP MTU {}", diff --git a/src/ikev2/session.rs b/src/ikev2/session.rs index 9bbb40c..9a052ad 100644 --- a/src/ikev2/session.rs +++ b/src/ikev2/session.rs @@ -1747,11 +1747,7 @@ impl IKEv2Session { } } - pub async fn send_last_response( - &mut self, - message_id: u32, - is_nat: bool, - ) -> Result<(), SendError> { + pub async fn send_last_response(&self, message_id: u32, is_nat: bool) -> Result<(), SendError> { if message_id != self.remote_message_id { return Ok(()); } diff --git a/src/main.rs b/src/main.rs index cba946c..efc0ee5 100644 --- a/src/main.rs +++ b/src/main.rs @@ -11,7 +11,6 @@ use log::{debug, info}; use tokio::{signal, sync::mpsc}; use tokio_rustls::rustls; -mod bufferpool; mod fortivpn; mod http; mod logger;