Skip to content

Commit

Permalink
Revert "Experiment: try to use a bufferpool again."
Browse files Browse the repository at this point in the history
This reverts commit 099d62c.

Although performance is not an issue, this uses more memory than the
standard memory manager.
  • Loading branch information
zlogic committed Dec 4, 2024
1 parent 099d62c commit 35b765e
Show file tree
Hide file tree
Showing 4 changed files with 27 additions and 163 deletions.
117 changes: 0 additions & 117 deletions src/bufferpool.rs

This file was deleted.

66 changes: 26 additions & 40 deletions src/ikev2/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use tokio::{
time,
};

use crate::{bufferpool, fortivpn};
use crate::fortivpn;

mod crypto;
mod esp;
Expand Down Expand Up @@ -322,10 +322,13 @@ impl Sockets {
socket: Arc<UdpSocket>,
is_nat_port: bool,
) {
let mut buffer_pool = bufferpool::BufferPool::new(tx.max_capacity(), MAX_DATAGRAM_SIZE);
loop {
let mut buf = buffer_pool.borrow().await.unwrap();
buf.resize(MAX_DATAGRAM_SIZE, 0);
// Theoretically the allocator should be smart enough to recycle memory.
// In the unlikely case this becomes a problem, switching to stack-allocated
// arrays would reduce memory usage, but increase number of copy operations.
// As mpsc uses a queue internally, memory will be allocated for the queue elements
// in any case.
let mut buf = vec![0u8; MAX_DATAGRAM_SIZE];
let (bytes_res, remote_addr) = match socket.recv_from(&mut buf).await {
Ok(res) => res,
Err(err) => {
Expand Down Expand Up @@ -375,15 +378,17 @@ impl Sockets {
}

fn create_sender(&self) -> UdpSender {
UdpSender::new(self.send_tx.clone())
UdpSender {
tx: self.send_tx.clone(),
}
}
}

struct UdpDatagram {
remote_addr: SocketAddr,
local_addr: SocketAddr,
is_nat_port: bool,
bytes: bufferpool::Buffer,
bytes: Vec<u8>,
}

impl UdpDatagram {
Expand All @@ -396,37 +401,19 @@ impl UdpDatagram {
}
}

#[derive(Clone)]
struct UdpSender {
tx: mpsc::Sender<SendUdpDatagram>,
buffer_pool: bufferpool::BufferPool,
}

impl UdpSender {
fn new(tx: mpsc::Sender<SendUdpDatagram>) -> UdpSender {
UdpSender {
tx,
buffer_pool: bufferpool::BufferPool::new(128, MAX_DATAGRAM_SIZE),
}
}
}

impl Clone for UdpSender {
fn clone(&self) -> Self {
UdpSender {
tx: self.tx.clone(),
buffer_pool: bufferpool::BufferPool::new(128, MAX_DATAGRAM_SIZE),
}
}
}

impl UdpSender {
async fn send_datagram(
&mut self,
&self,
local_addr: &SocketAddr,
remote_addr: &SocketAddr,
data: &[u8],
) -> Result<(), SendError> {
let mut buffer = self.buffer_pool.borrow().await.unwrap();
let mut buffer = Vec::with_capacity(MAX_DATAGRAM_SIZE);
buffer.extend_from_slice(data);
self.tx
.send(SendUdpDatagram {
Expand All @@ -442,7 +429,7 @@ impl UdpSender {
struct SendUdpDatagram {
remote_addr: SocketAddr,
local_addr: SocketAddr,
bytes: bufferpool::Buffer,
bytes: Vec<u8>,
}

enum SessionMessage {
Expand Down Expand Up @@ -955,7 +942,7 @@ impl Sessions {
.await?);
}
trace!(
"Received ESP packet from {}\n{}",
"Received ESP packet from {}\n{:?}",
datagram.remote_addr,
datagram.bytes,
);
Expand Down Expand Up @@ -1061,14 +1048,14 @@ impl Sessions {

enum FortiTunnelEvent {
Connected(IpAddr, Vec<IpAddr>),
ReceivedPacket(bufferpool::Buffer, usize),
ReceivedPacket(Vec<u8>, usize),
Error(fortivpn::FortiError),
Disconnected,
EchoFailed(fortivpn::FortiError),
}

enum FortiTunnelCommand {
SendPacket(bufferpool::Buffer),
SendPacket(Vec<u8>),
Disconnect,
}

Expand All @@ -1078,19 +1065,16 @@ struct FortiService {
tunnel_tx: Option<mpsc::Sender<FortiTunnelCommand>>,
tunnel_rx: Option<mpsc::Receiver<FortiTunnelEvent>>,
tunnel_task: Option<JoinHandle<Result<(), IKEv2Error>>>,
buffer_pool: bufferpool::BufferPool,
}

impl FortiService {
fn new(config: fortivpn::Config) -> FortiService {
let buffer_pool = bufferpool::BufferPool::new(128, fortivpn::PPP_MTU.into());
FortiService {
config,
ip_configuration: None,
tunnel_tx: None,
tunnel_rx: None,
tunnel_task: None,
buffer_pool,
}
}

Expand Down Expand Up @@ -1195,7 +1179,6 @@ impl FortiService {
}
let mut need_flush = false;
let mut is_connected = true;
let mut buffer_pool = bufferpool::BufferPool::new(tx.max_capacity(), MAX_DATAGRAM_SIZE);
while is_connected {
let (can_recv, command, send_echo, flush) = {
let mut received_packet = pin!(tunnel.peek_recv());
Expand Down Expand Up @@ -1223,8 +1206,7 @@ impl FortiService {
.await
};
if can_recv {
let mut buffer = buffer_pool.borrow().await.unwrap();
buffer.resize(MAX_DATAGRAM_SIZE, 0);
let mut buffer = vec![0; MAX_ESP_PACKET_SIZE];
let event = match tunnel.try_read_packet(&mut buffer, None).await {
Ok(packet_bytes) => FortiTunnelEvent::ReceivedPacket(buffer, packet_bytes),
Err(err) => {
Expand Down Expand Up @@ -1287,10 +1269,14 @@ impl FortiService {
self.tunnel_task = Some(rt.spawn(async move {
loop {
let result = Self::run_tunnel(config.clone(), &service_tx, &mut tunnel_rx).await;
service_tx
if service_tx
.send(FortiTunnelEvent::Disconnected)
.await
.map_err(|_| "VPN sink channel closed")?;
.is_err()
{
debug!("VPN sink channel closed");
tunnel_rx.close();
}
if let Err(err) = result.as_ref() {
warn!("VPN channel closed with error: {}", err)
} else if tunnel_rx.is_closed() {
Expand Down Expand Up @@ -1349,7 +1335,7 @@ impl FortiService {

async fn send_packet(&mut self, data: &[u8]) -> Result<(), IKEv2Error> {
if let Some(tx) = self.tunnel_tx.as_mut() {
let mut buffer = self.buffer_pool.borrow().await.unwrap();
let mut buffer = Vec::with_capacity(fortivpn::PPP_MTU as usize);
if data.len() > buffer.capacity() {
warn!(
"ESP packet ({}) exceeds PPP MTU {}",
Expand Down
6 changes: 1 addition & 5 deletions src/ikev2/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1747,11 +1747,7 @@ impl IKEv2Session {
}
}

pub async fn send_last_response(
&mut self,
message_id: u32,
is_nat: bool,
) -> Result<(), SendError> {
pub async fn send_last_response(&self, message_id: u32, is_nat: bool) -> Result<(), SendError> {
if message_id != self.remote_message_id {
return Ok(());
}
Expand Down
1 change: 0 additions & 1 deletion src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ use log::{debug, info};
use tokio::{signal, sync::mpsc};
use tokio_rustls::rustls;

mod bufferpool;
mod fortivpn;
mod http;
mod logger;
Expand Down

0 comments on commit 35b765e

Please sign in to comment.