From 73a020f5e57f5d3bfe65cf1be461cc3b0d1e955f Mon Sep 17 00:00:00 2001 From: ivmarkov Date: Fri, 9 Aug 2024 10:11:37 +0300 Subject: [PATCH] Bugfix: wait for the reactor to confirm the modification (#3) * Bugfix: wait for the reactor to confirm the modification * Report out of memory on FDs bigger than N * Un-hardcode MAX_REGISTRATIONS on regular OSes * Clippy * Better method names --- src/reactor.rs | 115 ++++++++++++++++++++++++++++++++++--------------- 1 file changed, 81 insertions(+), 34 deletions(-) diff --git a/src/reactor.rs b/src/reactor.rs index ec604f7..6ccbd97 100644 --- a/src/reactor.rs +++ b/src/reactor.rs @@ -4,6 +4,7 @@ use core::task::Waker; use std::io::{self, ErrorKind}; use std::os::fd::{AsRawFd, FromRawFd, OwnedFd, RawFd}; +use std::sync::MutexGuard; use enumset::{EnumSet, EnumSetType}; @@ -13,9 +14,13 @@ use libc as sys; use crate::{syscall, syscall_los, syscall_los_eagain}; +// For ESP-IDF sys::FDSETSIZE is currently wrongly set to 1024 in the `libc` crate +// Therefore, use a custom value for now +#[cfg(target_os = "espidf")] const MAX_REGISTRATIONS: usize = 20; -//const FD_SEGMENT: usize = sys::FD_SETSIZE / core::mem::size_of::(); +#[cfg(not(target_os = "espidf"))] +const MAX_REGISTRATIONS: usize = sys::FD_SETSIZE; #[derive(EnumSetType, Debug)] pub(crate) enum Event { @@ -82,6 +87,7 @@ struct Registration { struct Registrations { vec: heapless::Vec, event_fd: Option, + waiting: usize, } impl Registrations { @@ -89,6 +95,7 @@ impl Registrations { Self { vec: heapless::Vec::new(), event_fd: None, + waiting: 0, } } @@ -103,7 +110,7 @@ impl Registrations { Err(ErrorKind::InvalidInput)?; } - if fd >= sys::FD_SETSIZE as RawFd { + if fd >= sys::FD_SETSIZE as RawFd || fd >= N as RawFd { Err(ErrorKind::OutOfMemory)?; } @@ -129,8 +136,6 @@ impl Registrations { self.vec.swap_remove(index); - self.notify()?; - Ok(()) } @@ -147,8 +152,6 @@ impl Registrations { } } - self.notify()?; - Ok(()) } @@ -312,6 +315,7 @@ impl Registrations { pub struct Reactor { registrations: std::sync::Mutex>, + condvar: std::sync::Condvar, started: AtomicBool, } @@ -319,6 +323,7 @@ impl Reactor { const fn new() -> Self { Self { registrations: std::sync::Mutex::new(Registrations::new()), + condvar: std::sync::Condvar::new(), started: AtomicBool::new(false), } } @@ -340,11 +345,11 @@ impl Reactor { } pub(crate) fn register(&self, fd: RawFd) -> io::Result<()> { - self.lock(|regs| regs.register(fd)) + self.modify(|regs| regs.register(fd)) } pub(crate) fn deregister(&self, fd: RawFd) -> io::Result<()> { - self.lock(|regs| regs.deregister(fd)) + self.modify(|regs| regs.deregister(fd)) } // pub(crate) fn set(&self, fd: RawFd, event: Event, waker: &Waker) -> io::Result<()> { @@ -352,11 +357,11 @@ impl Reactor { // } pub(crate) fn fetch(&self, fd: RawFd, event: Event) -> io::Result { - self.lock(|regs| regs.fetch(fd, event)) + self.modify(|regs| regs.fetch(fd, event)) } pub(crate) fn fetch_or_set(&self, fd: RawFd, event: Event, waker: &Waker) -> io::Result { - self.lock(|regs| { + self.modify(|regs| { if regs.fetch(fd, event)? { Ok(true) } else { @@ -368,58 +373,100 @@ impl Reactor { } fn run(&self) -> io::Result<()> { - if !self.lock(Registrations::create_notification)? { + if !self.lock(|mut guard| guard.create_notification())? { Err(ErrorKind::AlreadyExists)?; } debug!("Running"); + let mut fds = Fds::new(); + let mut update = false; + let result = loop { - let result = self.wait(); + let max = self.apply(|inner| { + if !update { + update = true; + } else { + inner.update_events(&fds)?; + } + + inner.set_fds(&mut fds) + }); + + let result = match max { + Err(err) => Err(err), + Ok(None) => unreachable!("EventFD is not there?"), + Ok(Some(max)) => { + debug!("Start select"); + + let result = syscall_los!(unsafe { + sys::select( + max + 1, + fds.read.assume_init_mut(), + fds.write.assume_init_mut(), + fds.except.assume_init_mut(), + core::ptr::null_mut(), + ) + }); + + debug!("End select"); + + result.map(|_| ()) + } + }; if result.is_err() { break result; } }; - if !self.lock(Registrations::destroy_notification)? { + if !self.lock(|mut guard| guard.destroy_notification())? { Err(ErrorKind::NotFound)?; } result } - fn wait(&self) -> io::Result<()> { - let mut fds = Fds::new(); - - if let Some(max) = self.lock(|inner| inner.set_fds(&mut fds))? { - debug!("Start select"); + fn modify(&self, f: F) -> io::Result + where + F: FnOnce(&mut Registrations) -> io::Result, + { + self.lock(|mut guard| { + guard.waiting += 1; - syscall_los!(unsafe { - sys::select( - max + 1, - fds.read.assume_init_mut(), - fds.write.assume_init_mut(), - fds.except.assume_init_mut(), - core::ptr::null_mut(), - ) - })?; + let result = f(&mut guard); - debug!("End select"); + guard.notify()?; - self.lock(|inner| inner.update_events(&fds))?; - } + let _guard = self + .condvar + .wait_while(guard, |registrations| registrations.waiting > 0) + .unwrap(); - Ok(()) + result + }) } - fn lock(&self, f: F) -> io::Result + fn apply(&self, f: F) -> io::Result where F: FnOnce(&mut Registrations) -> io::Result, { - let mut inner = self.registrations.lock().unwrap(); + self.lock(|mut guard| { + let result = f(&mut guard); + + guard.waiting = 0; - f(&mut inner) + self.condvar.notify_all(); + + result + }) + } + + fn lock(&self, f: F) -> io::Result + where + F: FnOnce(MutexGuard>) -> io::Result, + { + f(self.registrations.lock().unwrap()) } }