Skip to content

Commit

Permalink
use sendto.rs in blocking_client
Browse files Browse the repository at this point in the history
  • Loading branch information
grooviegermanikus committed Dec 4, 2024
1 parent 4eada10 commit 3109167
Show file tree
Hide file tree
Showing 3 changed files with 161 additions and 91 deletions.
1 change: 1 addition & 0 deletions blocking_client/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
pub mod client;
pub mod configure_client;
pub mod quiche_client_loop;
mod sendto;
102 changes: 11 additions & 91 deletions blocking_client/src/quiche_client_loop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ use std::{
sync::{atomic::AtomicBool, Arc},
time::{Duration, Instant},
};
use std::net::{IpAddr, Ipv4Addr, SocketAddrV4};

use log::{debug, error, info, trace};
use quic_geyser_common::{defaults::MAX_DATAGRAM_SIZE, message::Message};
Expand All @@ -16,7 +15,9 @@ use quic_geyser_quiche_utils::{

use anyhow::{bail, Context};
use ring::rand::{SecureRandom, SystemRandom};
use quic_geyser_common::net::parse_host_port;
use crate::sendto::{detect_gso, send_to};

const ENABLE_PACING: bool = true;

pub fn client_loop(
mut config: quiche::Config,
Expand Down Expand Up @@ -235,21 +236,14 @@ pub fn client_loop(
}
};

let send_result = if enable_gso {
send_linux_optimized(
&socket,
&out[..write],
&send_info,
enable_gso,
MAX_DATAGRAM_SIZE as u16,
)
} else {
send_generic(
&socket,
&out[..write],
&send_info
)
};
let send_result = send_to(
&socket,
&out[..write],
&send_info,
MAX_DATAGRAM_SIZE,
ENABLE_PACING,
enable_gso,
);

if let Err(e) = send_result {
if e.kind() == std::io::ErrorKind::WouldBlock {
Expand All @@ -272,80 +266,6 @@ pub fn client_loop(
Ok(())
}

#[cfg(target_os = "linux")]
fn detect_gso(socket: &mio::net::UdpSocket, segment_size: usize) -> bool {
use nix::sys::socket::setsockopt;
use nix::sys::socket::sockopt::UdpGsoSegment;
use std::os::unix::io::AsRawFd;

// mio::net::UdpSocket doesn't implement AsFd (yet?).
let fd = unsafe { std::os::fd::BorrowedFd::borrow_raw(socket.as_raw_fd()) };

setsockopt(&fd, UdpGsoSegment, &(segment_size as i32)).is_ok()
}

#[cfg(not(target_os = "linux"))]
fn detect_gso(_: &mio::net::UdpSocket, _: usize) -> bool {
false
}




#[cfg(target_os = "linux")]
fn send_linux_optimized(
socket: &mio::net::UdpSocket,
buf: &[u8],
send_info: &quiche::SendInfo,
enable_gso: bool,
segment_size: u16,
) -> std::io::Result<usize> {
use nix::sys::socket::sendmsg;
use nix::sys::socket::ControlMessage;
use nix::sys::socket::MsgFlags;
use nix::sys::socket::SockaddrStorage;
use std::io::IoSlice;
use std::os::unix::io::AsRawFd;

let mut cmgs = Vec::with_capacity(2);

if enable_gso {
cmgs.push(ControlMessage::UdpGsoSegments(&segment_size));
};

let iov = [IoSlice::new(buf)];
let dst = SockaddrStorage::from(send_info.to);
let sockfd = socket.as_raw_fd();

match sendmsg(sockfd, &iov, &cmgs, MsgFlags::empty(), Some(&dst)) {
Ok(v) => Ok(v),
Err(e) => Err(e.into()),
}
}

#[cfg(not(target_os = "linux"))]
fn send_linux_optimized(
socket: &mio::net::UdpSocket,
buf: &[u8],
send_info: &quiche::SendInfo,
_enable_gso: bool,
_segment_size: u16,
) -> std::io::Result<usize> {
// note: this will implicitly be determined from set_txtime_sockopt
panic!("send_with_pacing is not supported on this platform");
}

// send without any gso etc.
fn send_generic(
socket: &mio::net::UdpSocket,
buf: &[u8],
send_info: &quiche::SendInfo,
) -> std::io::Result<usize> {
socket.send_to(&buf, send_info.to)
}



#[cfg(test)]
mod tests {
use std::{
Expand Down
149 changes: 149 additions & 0 deletions blocking_client/src/sendto.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
// Copyright (C) 2021, Cloudflare, Inc.
// All rights reserved.
//
// Redistribution and use in source and binary forms, with or without
// modification, are permitted provided that the following conditions are
// met:
//
// * Redistributions of source code must retain the above copyright notice,
// this list of conditions and the following disclaimer.
//
// * Redistributions in binary form must reproduce the above copyright
// notice, this list of conditions and the following disclaimer in the
// documentation and/or other materials provided with the distribution.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
// IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
// THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
// PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR
// CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
// EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
// PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
// LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
// NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

use std::cmp;

use std::io;

/// For Linux, try to detect GSO is available.
#[cfg(target_os = "linux")]
pub fn detect_gso(socket: &mio::net::UdpSocket, segment_size: usize) -> bool {
use nix::sys::socket::setsockopt;
use nix::sys::socket::sockopt::UdpGsoSegment;
use std::os::unix::io::AsRawFd;

// mio::net::UdpSocket doesn't implement AsFd (yet?).
let fd = unsafe { std::os::fd::BorrowedFd::borrow_raw(socket.as_raw_fd()) };

setsockopt(&fd, UdpGsoSegment, &(segment_size as i32)).is_ok()
}

/// For non-Linux, there is no GSO support.
#[cfg(not(target_os = "linux"))]
pub fn detect_gso(_socket: &mio::net::UdpSocket, _segment_size: usize) -> bool {
false
}

/// Send packets using sendmsg() with GSO.
#[cfg(target_os = "linux")]
fn send_to_gso_pacing(
socket: &mio::net::UdpSocket, buf: &[u8], send_info: &quiche::SendInfo,
segment_size: usize,
) -> io::Result<usize> {
use nix::sys::socket::sendmsg;
use nix::sys::socket::ControlMessage;
use nix::sys::socket::MsgFlags;
use nix::sys::socket::SockaddrStorage;
use std::io::IoSlice;
use std::os::unix::io::AsRawFd;

let iov = [IoSlice::new(buf)];
let segment_size = segment_size as u16;
let dst = SockaddrStorage::from(send_info.to);
let sockfd = socket.as_raw_fd();

// GSO option.
let cmsg_gso = ControlMessage::UdpGsoSegments(&segment_size);

// Pacing option.
let send_time = std_time_to_u64(&send_info.at);
let cmsg_txtime = ControlMessage::TxTime(&send_time);

match sendmsg(
sockfd,
&iov,
&[cmsg_gso, cmsg_txtime],
MsgFlags::empty(),
Some(&dst),
) {
Ok(v) => Ok(v),
Err(e) => Err(e.into()),
}
}

/// For non-Linux platforms.
#[cfg(not(target_os = "linux"))]
fn send_to_gso_pacing(
_socket: &mio::net::UdpSocket, _buf: &[u8], _send_info: &quiche::SendInfo,
_segment_size: usize,
) -> io::Result<usize> {
panic!("send_to_gso() should not be called on non-linux platforms");
}

/// A wrapper function of send_to().
///
/// When GSO and SO_TXTIME are enabled, send packets using send_to_gso().
/// Otherwise, send packets using socket.send_to().
pub fn send_to(
socket: &mio::net::UdpSocket, buf: &[u8], send_info: &quiche::SendInfo,
segment_size: usize, pacing: bool, enable_gso: bool,
) -> io::Result<usize> {
if pacing && enable_gso {
match send_to_gso_pacing(socket, buf, send_info, segment_size) {
Ok(v) => {
return Ok(v);
},
Err(e) => {
return Err(e);
},
}
}

let mut off = 0;
let mut left = buf.len();
let mut written = 0;

while left > 0 {
let pkt_len = cmp::min(left, segment_size);

match socket.send_to(&buf[off..off + pkt_len], send_info.to) {
Ok(v) => {
written += v;
},
Err(e) => return Err(e),
}

off += pkt_len;
left -= pkt_len;
}

Ok(written)
}

#[cfg(target_os = "linux")]
fn std_time_to_u64(time: &std::time::Instant) -> u64 {
const NANOS_PER_SEC: u64 = 1_000_000_000;

const INSTANT_ZERO: std::time::Instant =
unsafe { std::mem::transmute(std::time::UNIX_EPOCH) };

let raw_time = time.duration_since(INSTANT_ZERO);

let sec = raw_time.as_secs();
let nsec = raw_time.subsec_nanos();

sec * NANOS_PER_SEC + nsec as u64
}

0 comments on commit 3109167

Please sign in to comment.