From 34c982ae5208aff5efd9ece0248743dbca84eec9 Mon Sep 17 00:00:00 2001 From: Folkert de Vries Date: Fri, 17 Nov 2023 15:28:34 +0100 Subject: [PATCH] use the new error interest in tokio to detect the timestamp being available --- Cargo.toml | 2 +- ntp-udp/src/raw_socket.rs | 60 --------------------------------------- ntp-udp/src/socket.rs | 42 ++++++--------------------- 3 files changed, 10 insertions(+), 94 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index e1e1f1e45..8b39a1fb1 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -49,7 +49,7 @@ rand = "0.8.0" arbitrary = { version = "1.0" } thiserror = "1.0.10" libc = "0.2.145" -tokio = "1.28" +tokio = "1.32" toml = ">=0.5.0,<0.8.0" async-trait = "0.1.22" diff --git a/ntp-udp/src/raw_socket.rs b/ntp-udp/src/raw_socket.rs index 02be6ba54..5732c03ec 100644 --- a/ntp-udp/src/raw_socket.rs +++ b/ntp-udp/src/raw_socket.rs @@ -528,66 +528,6 @@ pub(crate) mod timestamping_config { } } -#[cfg(target_os = "linux")] -pub(crate) mod err_queue_waiter { - - use std::os::unix::prelude::{AsRawFd, RawFd}; - - use tokio::io::{unix::AsyncFd, Interest}; - - use crate::raw_socket::cerr; - - pub struct ErrQueueWaiter { - epoll_fd: AsyncFd, - } - - fn create_error(inner: std::io::Error) -> std::io::Error { - std::io::Error::new( - std::io::ErrorKind::Other, - format!("could not create error queue waiter epoll socket: {inner:?}"), - ) - } - - impl ErrQueueWaiter { - pub fn new(source: &impl AsRawFd) -> std::io::Result { - // Safety: safe to call with - let epoll = cerr(unsafe { libc::epoll_create(1) }).map_err(create_error)?; - - let mut ev = libc::epoll_event { - events: libc::EPOLLERR as _, - u64: 0, - }; - - cerr(unsafe { - libc::epoll_ctl( - epoll, - libc::EPOLL_CTL_ADD, - source.as_raw_fd(), - &mut ev as *mut _, - ) - }) - .map_err(create_error)?; - - Ok(Self { - epoll_fd: AsyncFd::new(epoll)?, - }) - } - - pub async fn wait(&self) -> std::io::Result<()> { - self.epoll_fd - .async_io(Interest::READABLE, |fd| { - let mut ev = libc::epoll_event { events: 0, u64: 0 }; - - match unsafe { libc::epoll_wait(*fd, &mut ev as *mut _, 1, 0) } { - 0 => Err(std::io::ErrorKind::WouldBlock.into()), - _ => Ok(()), - } - }) - .await - } - } -} - pub(crate) mod interface_iterator { use crate::interface::{sockaddr_to_socket_addr, InterfaceData, InterfaceName}; use std::str::FromStr; diff --git a/ntp-udp/src/socket.rs b/ntp-udp/src/socket.rs index 02d34ee03..63d9b2ec8 100644 --- a/ntp-udp/src/socket.rs +++ b/ntp-udp/src/socket.rs @@ -15,13 +15,8 @@ use crate::{ EnableTimestamps, }; -#[cfg(target_os = "linux")] -use crate::raw_socket::err_queue_waiter::ErrQueueWaiter; - pub struct UdpSocket { io: AsyncFd, - #[cfg(target_os = "linux")] - err_queue_waiter: ErrQueueWaiter, send_counter: u32, timestamping: EnableTimestamps, } @@ -92,8 +87,6 @@ impl UdpSocket { set_timestamping_options(&socket, method, timestamping)?; Ok(UdpSocket { - #[cfg(target_os = "linux")] - err_queue_waiter: ErrQueueWaiter::new(&socket)?, io: AsyncFd::new(socket)?, send_counter: 0, timestamping, @@ -132,8 +125,6 @@ impl UdpSocket { set_timestamping_options(&socket, DEFAULT_TIMESTAMP_METHOD, timestamping)?; Ok(UdpSocket { - #[cfg(target_os = "linux")] - err_queue_waiter: ErrQueueWaiter::new(&socket)?, io: AsyncFd::new(socket)?, send_counter: 0, timestamping, @@ -202,31 +193,16 @@ impl UdpSocket { let msg = "waiting for timestamp socket to become readable to fetch a send timestamp"; tracing::trace!(msg); - // Send timestamps are sent to the udp socket's error queue. Sadly, tokio does not - // currently support awaiting whether there is something in the error queue - // see https://github.com/tokio-rs/tokio/issues/4885. - // - // Therefore, we manually configure an extra file descriptor to listen for POLLPRI on - // the main udp socket. This `exceptional_condition` file descriptor becomes readable - // when there is something in the error queue. + let try_read = |udp_socket: &std::net::UdpSocket| { + fetch_send_timestamp_help(udp_socket, expected_counter) + }; + loop { - // Send timestamps are sent to the udp socket's error queue. Sadly, tokio does not - // currently support awaiting whether there is something in the error queue - // see https://github.com/tokio-rs/tokio/issues/4885. - self.err_queue_waiter.wait().await?; - - match fetch_send_timestamp_help(self.io.get_ref(), expected_counter) { - Ok(Some(send_timestamp)) => { - return Ok(send_timestamp); - } - Ok(None) => { - continue; - } - Err(e) => { - tracing::warn!(error = ?&e, "Error fetching timestamp"); - return Err(e); - } - } + // the timestamp being available triggers the error interest + match self.io.async_io(Interest::ERROR, try_read).await? { + Some(timestamp) => return Ok(timestamp), + None => continue, + }; } }