Skip to content

Commit

Permalink
Bugfix: wait for the reactor to confirm the modification (#3)
Browse files Browse the repository at this point in the history
* Bugfix: wait for the reactor to confirm the modification

* Report out of memory on FDs bigger than N

* Un-hardcode MAX_REGISTRATIONS on regular OSes

* Clippy

* Better method names
  • Loading branch information
ivmarkov authored Aug 9, 2024
1 parent b28e2c7 commit 73a020f
Showing 1 changed file with 81 additions and 34 deletions.
115 changes: 81 additions & 34 deletions src/reactor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use core::task::Waker;

use std::io::{self, ErrorKind};
use std::os::fd::{AsRawFd, FromRawFd, OwnedFd, RawFd};
use std::sync::MutexGuard;

use enumset::{EnumSet, EnumSetType};

Expand All @@ -13,9 +14,13 @@ use libc as sys;

use crate::{syscall, syscall_los, syscall_los_eagain};

// For ESP-IDF sys::FDSETSIZE is currently wrongly set to 1024 in the `libc` crate
// Therefore, use a custom value for now
#[cfg(target_os = "espidf")]
const MAX_REGISTRATIONS: usize = 20;

//const FD_SEGMENT: usize = sys::FD_SETSIZE / core::mem::size_of::<sys::fd_set>();
#[cfg(not(target_os = "espidf"))]
const MAX_REGISTRATIONS: usize = sys::FD_SETSIZE;

#[derive(EnumSetType, Debug)]
pub(crate) enum Event {
Expand Down Expand Up @@ -82,13 +87,15 @@ struct Registration {
struct Registrations<const N: usize> {
vec: heapless::Vec<Registration, N>,
event_fd: Option<OwnedFd>,
waiting: usize,
}

impl<const N: usize> Registrations<N> {
const fn new() -> Self {
Self {
vec: heapless::Vec::new(),
event_fd: None,
waiting: 0,
}
}

Expand All @@ -103,7 +110,7 @@ impl<const N: usize> Registrations<N> {
Err(ErrorKind::InvalidInput)?;
}

if fd >= sys::FD_SETSIZE as RawFd {
if fd >= sys::FD_SETSIZE as RawFd || fd >= N as RawFd {
Err(ErrorKind::OutOfMemory)?;
}

Expand All @@ -129,8 +136,6 @@ impl<const N: usize> Registrations<N> {

self.vec.swap_remove(index);

self.notify()?;

Ok(())
}

Expand All @@ -147,8 +152,6 @@ impl<const N: usize> Registrations<N> {
}
}

self.notify()?;

Ok(())
}

Expand Down Expand Up @@ -312,13 +315,15 @@ impl<const N: usize> Registrations<N> {

pub struct Reactor<const N: usize> {
registrations: std::sync::Mutex<Registrations<N>>,
condvar: std::sync::Condvar,
started: AtomicBool,
}

impl<const N: usize> Reactor<N> {
const fn new() -> Self {
Self {
registrations: std::sync::Mutex::new(Registrations::new()),
condvar: std::sync::Condvar::new(),
started: AtomicBool::new(false),
}
}
Expand All @@ -340,23 +345,23 @@ impl<const N: usize> Reactor<N> {
}

pub(crate) fn register(&self, fd: RawFd) -> io::Result<()> {
self.lock(|regs| regs.register(fd))
self.modify(|regs| regs.register(fd))
}

pub(crate) fn deregister(&self, fd: RawFd) -> io::Result<()> {
self.lock(|regs| regs.deregister(fd))
self.modify(|regs| regs.deregister(fd))
}

// pub(crate) fn set(&self, fd: RawFd, event: Event, waker: &Waker) -> io::Result<()> {
// self.lock(|regs| regs.set(fd, event, waker))
// }

pub(crate) fn fetch(&self, fd: RawFd, event: Event) -> io::Result<bool> {
self.lock(|regs| regs.fetch(fd, event))
self.modify(|regs| regs.fetch(fd, event))
}

pub(crate) fn fetch_or_set(&self, fd: RawFd, event: Event, waker: &Waker) -> io::Result<bool> {
self.lock(|regs| {
self.modify(|regs| {
if regs.fetch(fd, event)? {
Ok(true)
} else {
Expand All @@ -368,58 +373,100 @@ impl<const N: usize> Reactor<N> {
}

fn run(&self) -> io::Result<()> {
if !self.lock(Registrations::create_notification)? {
if !self.lock(|mut guard| guard.create_notification())? {
Err(ErrorKind::AlreadyExists)?;
}

debug!("Running");

let mut fds = Fds::new();
let mut update = false;

let result = loop {
let result = self.wait();
let max = self.apply(|inner| {
if !update {
update = true;
} else {
inner.update_events(&fds)?;
}

inner.set_fds(&mut fds)
});

let result = match max {
Err(err) => Err(err),
Ok(None) => unreachable!("EventFD is not there?"),
Ok(Some(max)) => {
debug!("Start select");

let result = syscall_los!(unsafe {
sys::select(
max + 1,
fds.read.assume_init_mut(),
fds.write.assume_init_mut(),
fds.except.assume_init_mut(),
core::ptr::null_mut(),
)
});

debug!("End select");

result.map(|_| ())
}
};

if result.is_err() {
break result;
}
};

if !self.lock(Registrations::destroy_notification)? {
if !self.lock(|mut guard| guard.destroy_notification())? {
Err(ErrorKind::NotFound)?;
}

result
}

fn wait(&self) -> io::Result<()> {
let mut fds = Fds::new();

if let Some(max) = self.lock(|inner| inner.set_fds(&mut fds))? {
debug!("Start select");
fn modify<F, R>(&self, f: F) -> io::Result<R>
where
F: FnOnce(&mut Registrations<N>) -> io::Result<R>,
{
self.lock(|mut guard| {
guard.waiting += 1;

syscall_los!(unsafe {
sys::select(
max + 1,
fds.read.assume_init_mut(),
fds.write.assume_init_mut(),
fds.except.assume_init_mut(),
core::ptr::null_mut(),
)
})?;
let result = f(&mut guard);

debug!("End select");
guard.notify()?;

self.lock(|inner| inner.update_events(&fds))?;
}
let _guard = self
.condvar
.wait_while(guard, |registrations| registrations.waiting > 0)
.unwrap();

Ok(())
result
})
}

fn lock<F, R>(&self, f: F) -> io::Result<R>
fn apply<F, R>(&self, f: F) -> io::Result<R>
where
F: FnOnce(&mut Registrations<N>) -> io::Result<R>,
{
let mut inner = self.registrations.lock().unwrap();
self.lock(|mut guard| {
let result = f(&mut guard);

guard.waiting = 0;

f(&mut inner)
self.condvar.notify_all();

result
})
}

fn lock<F, R>(&self, f: F) -> io::Result<R>
where
F: FnOnce(MutexGuard<Registrations<N>>) -> io::Result<R>,
{
f(self.registrations.lock().unwrap())
}
}

Expand Down

0 comments on commit 73a020f

Please sign in to comment.