Skip to content

Commit

Permalink
use the new error interest in tokio to detect the timestamp being ava…
Browse files Browse the repository at this point in the history
…ilable
  • Loading branch information
Folkert de Vries authored and rnijveld committed Nov 17, 2023
1 parent b84ab24 commit 34c982a
Show file tree
Hide file tree
Showing 3 changed files with 10 additions and 94 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ rand = "0.8.0"
arbitrary = { version = "1.0" }
thiserror = "1.0.10"
libc = "0.2.145"
tokio = "1.28"
tokio = "1.32"
toml = ">=0.5.0,<0.8.0"
async-trait = "0.1.22"

Expand Down
60 changes: 0 additions & 60 deletions ntp-udp/src/raw_socket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -528,66 +528,6 @@ pub(crate) mod timestamping_config {
}
}

#[cfg(target_os = "linux")]
pub(crate) mod err_queue_waiter {

use std::os::unix::prelude::{AsRawFd, RawFd};

use tokio::io::{unix::AsyncFd, Interest};

use crate::raw_socket::cerr;

pub struct ErrQueueWaiter {
epoll_fd: AsyncFd<RawFd>,
}

fn create_error(inner: std::io::Error) -> std::io::Error {
std::io::Error::new(
std::io::ErrorKind::Other,
format!("could not create error queue waiter epoll socket: {inner:?}"),
)
}

impl ErrQueueWaiter {
pub fn new(source: &impl AsRawFd) -> std::io::Result<Self> {
// Safety: safe to call with
let epoll = cerr(unsafe { libc::epoll_create(1) }).map_err(create_error)?;

let mut ev = libc::epoll_event {
events: libc::EPOLLERR as _,
u64: 0,
};

cerr(unsafe {
libc::epoll_ctl(
epoll,
libc::EPOLL_CTL_ADD,
source.as_raw_fd(),
&mut ev as *mut _,
)
})
.map_err(create_error)?;

Ok(Self {
epoll_fd: AsyncFd::new(epoll)?,
})
}

pub async fn wait(&self) -> std::io::Result<()> {
self.epoll_fd
.async_io(Interest::READABLE, |fd| {
let mut ev = libc::epoll_event { events: 0, u64: 0 };

match unsafe { libc::epoll_wait(*fd, &mut ev as *mut _, 1, 0) } {
0 => Err(std::io::ErrorKind::WouldBlock.into()),
_ => Ok(()),
}
})
.await
}
}
}

pub(crate) mod interface_iterator {
use crate::interface::{sockaddr_to_socket_addr, InterfaceData, InterfaceName};
use std::str::FromStr;
Expand Down
42 changes: 9 additions & 33 deletions ntp-udp/src/socket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,8 @@ use crate::{
EnableTimestamps,
};

#[cfg(target_os = "linux")]
use crate::raw_socket::err_queue_waiter::ErrQueueWaiter;

pub struct UdpSocket {
io: AsyncFd<std::net::UdpSocket>,
#[cfg(target_os = "linux")]
err_queue_waiter: ErrQueueWaiter,
send_counter: u32,
timestamping: EnableTimestamps,
}
Expand Down Expand Up @@ -92,8 +87,6 @@ impl UdpSocket {
set_timestamping_options(&socket, method, timestamping)?;

Ok(UdpSocket {
#[cfg(target_os = "linux")]
err_queue_waiter: ErrQueueWaiter::new(&socket)?,
io: AsyncFd::new(socket)?,
send_counter: 0,
timestamping,
Expand Down Expand Up @@ -132,8 +125,6 @@ impl UdpSocket {
set_timestamping_options(&socket, DEFAULT_TIMESTAMP_METHOD, timestamping)?;

Ok(UdpSocket {
#[cfg(target_os = "linux")]
err_queue_waiter: ErrQueueWaiter::new(&socket)?,
io: AsyncFd::new(socket)?,
send_counter: 0,
timestamping,
Expand Down Expand Up @@ -202,31 +193,16 @@ impl UdpSocket {
let msg = "waiting for timestamp socket to become readable to fetch a send timestamp";
tracing::trace!(msg);

// Send timestamps are sent to the udp socket's error queue. Sadly, tokio does not
// currently support awaiting whether there is something in the error queue
// see https://github.com/tokio-rs/tokio/issues/4885.
//
// Therefore, we manually configure an extra file descriptor to listen for POLLPRI on
// the main udp socket. This `exceptional_condition` file descriptor becomes readable
// when there is something in the error queue.
let try_read = |udp_socket: &std::net::UdpSocket| {
fetch_send_timestamp_help(udp_socket, expected_counter)
};

loop {
// Send timestamps are sent to the udp socket's error queue. Sadly, tokio does not
// currently support awaiting whether there is something in the error queue
// see https://github.com/tokio-rs/tokio/issues/4885.
self.err_queue_waiter.wait().await?;

match fetch_send_timestamp_help(self.io.get_ref(), expected_counter) {
Ok(Some(send_timestamp)) => {
return Ok(send_timestamp);
}
Ok(None) => {
continue;
}
Err(e) => {
tracing::warn!(error = ?&e, "Error fetching timestamp");
return Err(e);
}
}
// the timestamp being available triggers the error interest
match self.io.async_io(Interest::ERROR, try_read).await? {
Some(timestamp) => return Ok(timestamp),
None => continue,
};
}
}

Expand Down

0 comments on commit 34c982a

Please sign in to comment.