Skip to content

Commit

Permalink
waker: use popol::Waker type directly
Browse files Browse the repository at this point in the history
  • Loading branch information
dr-orlovsky committed May 18, 2023
1 parent 97a379f commit a7f48fd
Showing 1 changed file with 30 additions and 57 deletions.
87 changes: 30 additions & 57 deletions src/poller/popol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,8 @@
//! Poll engine provided by the [`popol`] crate.
use std::collections::VecDeque;
use std::io;
use std::io::{Error, Write};
use std::io::{self, Error};
use std::os::unix::io::{AsRawFd, RawFd};
use std::os::unix::net::UnixStream;
use std::sync::Arc;
use std::time::Duration;

Expand Down Expand Up @@ -64,7 +62,7 @@ impl Poller {
}

impl Poll for Poller {
type Waker = UnixStream;
type Waker = PopolWaker;

fn register(&mut self, fd: &impl AsRawFd, interest: IoType) {
#[cfg(feature = "log")]
Expand Down Expand Up @@ -162,68 +160,43 @@ impl From<IoType> for popol::Interest {
}
}

impl Waker for UnixStream {
type Send = Arc<UnixStream>;
type Recv = UnixStream;
/// Wrapper type around the waker provided by `popol` crate.
#[derive(Clone)]
pub struct PopolWaker(Arc<popol::Waker>);

impl Waker for PopolWaker {
type Send = Self;
type Recv = Self;

fn pair() -> Result<(Self::Send, Self::Recv), Error> {
let (waker_writer, waker_reader) = UnixStream::pair()?;
waker_reader.set_nonblocking(true)?;
waker_writer.set_nonblocking(true)?;
Ok((Arc::new(waker_writer), waker_reader))
let waker = Arc::new(popol::Waker::new()?);
Ok((PopolWaker(waker.clone()), PopolWaker(waker)))
}
}

impl WakerRecv for UnixStream {
fn reset(&self) { reset_fd(self).expect("waker failure"); }
impl io::Read for PopolWaker {
fn read(&mut self, _buf: &mut [u8]) -> io::Result<usize> {
self.reset();
// Waker reads only when there is something which was sent.
// That's why we just return here.
Ok(0)
}
}

impl WakerSend for Arc<UnixStream> {
fn wake(&self) -> io::Result<()> {
loop {
let mut waker = self.as_ref();
match (&mut waker).write_all(&[0x1]) {
Ok(_) => return Ok(()),
Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
#[cfg(feature = "log")]
log::error!(target: "reactor-controller", "Waker write queue got overfilled, resetting and repeating...");
reset_fd(&self.as_raw_fd())?;
}
Err(e) if e.kind() == io::ErrorKind::Interrupted => {
#[cfg(feature = "log")]
log::error!(target: "reactor-controller", "Waker failure, repeating...");
}
Err(e) => {
#[cfg(feature = "log")]
log::error!(target: "reactor-controller", "Waker error: {e}");

return Err(e);
}
}
}
}
impl AsRawFd for PopolWaker {
fn as_raw_fd(&self) -> RawFd { self.0.as_ref().as_raw_fd() }
}

fn reset_fd(fd: &impl AsRawFd) -> io::Result<()> {
let mut buf = [0u8; 4096];

loop {
// We use a low-level "read" here because the alternative is to create a `UnixStream`
// from the `RawFd`, which has "drop" semantics which we want to avoid.
match unsafe {
libc::read(fd.as_raw_fd(), buf.as_mut_ptr() as *mut libc::c_void, buf.len())
} {
-1 => match io::Error::last_os_error() {
e if e.kind() == io::ErrorKind::WouldBlock => return Ok(()),
e => {
#[cfg(feature = "log")]
log::error!(target: "reactor-controller", "Unable to reset waker queue: {e}");

return Err(e);
}
},
0 => return Ok(()),
_ => continue,
impl WakerRecv for PopolWaker {
fn reset(&self) {
if let Err(e) = popol::Waker::reset(self.0.as_ref()) {
#[cfg(feature = "log")]
log::error!(target: "reactor-controller", "Unable to reset waker queue: {e}");
panic!("unable to reset waker queue. Details: {e}");
}
}
}

impl WakerSend for PopolWaker {
fn wake(&self) -> io::Result<()> { self.0.wake() }
}

0 comments on commit a7f48fd

Please sign in to comment.