Skip to content

Commit

Permalink
feat: Lazy iterator
Browse files Browse the repository at this point in the history
`wait` now returns an iterator that allows lazily evaluating event closures. This allows for returning an error when the 1st closure returns an error, rather than always needing to evaluate every closure.

Signed-off-by: Jonathan Woollett-Light <[email protected]>
  • Loading branch information
Jonathan Woollett-Light committed Jun 14, 2023
1 parent 9d9636c commit 283b1ad
Show file tree
Hide file tree
Showing 2 changed files with 128 additions and 52 deletions.
48 changes: 34 additions & 14 deletions benches/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,10 +47,14 @@ fn run_basic_subscriber(c: &mut Criterion) {
event_fd
}).collect::<Vec<_>>();

let expected = vec![();usize::try_from(no_of_subscribers).unwrap()];
let n = usize::try_from(no_of_subscribers).unwrap();
c.bench_function("process_basic", |b| {
b.iter(|| {
assert_eq!(event_manager.wait(Some(0)), Ok(expected.as_slice()));
let mut iter = event_manager.wait(Some(0)).unwrap();
for _ in 0..n {
assert_eq!(iter.next(), Some(&mut ()));
}
assert_eq!(iter.next(), None);
})
});

Expand Down Expand Up @@ -96,10 +100,14 @@ fn run_arc_mutex_subscriber(c: &mut Criterion) {
(event_fd,counter)
}).collect::<Vec<_>>();

let expected = vec![();usize::try_from(no_of_subscribers).unwrap()];
let n = usize::try_from(no_of_subscribers).unwrap();
c.bench_function("process_with_arc_mutex", |b| {
b.iter(|| {
assert_eq!(event_manager.wait(Some(0)), Ok(expected.as_slice()));
let mut iter = event_manager.wait(Some(0)).unwrap();
for _ in 0..n {
assert_eq!(iter.next(), Some(&mut ()));
}
assert_eq!(iter.next(), None);
})
});

Expand Down Expand Up @@ -146,10 +154,14 @@ fn run_subscriber_with_inner_mut(c: &mut Criterion) {
(event_fd,counter)
}).collect::<Vec<_>>();

let expected = vec![();usize::try_from(no_of_subscribers).unwrap()];
let n = usize::try_from(no_of_subscribers).unwrap();
c.bench_function("process_with_inner_mut", |b| {
b.iter(|| {
assert_eq!(event_manager.wait(Some(0)), Ok(expected.as_slice()));
let mut iter = event_manager.wait(Some(0)).unwrap();
for _ in 0..n {
assert_eq!(iter.next(), Some(&mut ()));
}
assert_eq!(iter.next(), None);
})
});

Expand Down Expand Up @@ -225,8 +237,8 @@ fn run_multiple_subscriber_types(c: &mut Criterion) {
.add(
inner_subscribers[i].as_fd(),
EventSet::IN | EventSet::ERROR | EventSet::HANG_UP,
Box::new(
move |_: &mut EventManager<()>, event_set: EventSet| match event_set {
Box::new(move |_: &mut EventManager<()>, event_set: EventSet| {
match event_set {
EventSet::IN => {
data_clone[i].fetch_add(1, Ordering::SeqCst);
}
Expand All @@ -237,8 +249,8 @@ fn run_multiple_subscriber_types(c: &mut Criterion) {
panic!("Cannot continue execution. Associated fd was closed.");
}
_ => {}
},
),
}
}),
)
.unwrap();
}
Expand All @@ -247,10 +259,14 @@ fn run_multiple_subscriber_types(c: &mut Criterion) {
})
.collect::<Vec<_>>();

let expected = vec![();usize::try_from(total).unwrap()];
let n = usize::try_from(total).unwrap();
c.bench_function("process_dynamic_dispatch", |b| {
b.iter(|| {
assert_eq!(event_manager.wait(Some(0)), Ok(expected.as_slice()));
let mut iter = event_manager.wait(Some(0)).unwrap();
for _ in 0..n {
assert_eq!(iter.next(), Some(&mut ()));
}
assert_eq!(iter.next(), None);
})
});

Expand Down Expand Up @@ -294,10 +310,14 @@ fn run_with_few_active_events(c: &mut Criterion) {
event_fd
}).collect::<Vec<_>>();

let expected = vec![();usize::try_from(active).unwrap()];
let n = usize::try_from(active).unwrap();
c.bench_function("process_dispatch_few_events", |b| {
b.iter(|| {
assert_eq!(event_manager.wait(Some(0)), Ok(expected.as_slice()));
let mut iter = event_manager.wait(Some(0)).unwrap();
for _ in 0..n {
assert_eq!(iter.next(), Some(&mut ()));
}
assert_eq!(iter.next(), None);
})
});

Expand Down
132 changes: 94 additions & 38 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,21 +58,15 @@ impl<T> BufferedEventManager<T> {
///
/// When the value given in timeout does not fit within an `i32` e.g.
/// `timeout.map(|u| i32::try_from(u).unwrap())`.
pub fn wait(&mut self, timeout: Option<u32>) -> Result<&[T], i32> {
pub fn wait(&mut self, timeout: Option<u32>) -> Result<Iter<'_, T>, i32> {
// SAFETY: `EventManager::wait` initializes N element from the start of the slice and only
// accesses these, thus it will never access uninitialized memory, making this safe.
unsafe {
self.buffer.set_len(self.buffer.capacity());
self.output_buffer.set_len(self.output_buffer.capacity());
}
let n = self
.event_manager
.wait(timeout, &mut self.buffer, &mut self.output_buffer)?;
unsafe {
Ok(self
.output_buffer
.get_unchecked(..usize::try_from(n).unwrap_unchecked()))
}
self.event_manager
.wait(timeout, &mut self.buffer, &mut self.output_buffer)
}

/// Creates new event manager.
Expand Down Expand Up @@ -123,6 +117,62 @@ impl<T> std::fmt::Debug for EventManager<T> {
}
}

#[derive(Debug)]
pub struct Iter<'a, T> {
event_manager: &'a mut EventManager<T>,
buffer: &'a [libc::epoll_event],
output_buffer: &'a mut [T],
index: usize,
}
impl<'a, T> Iter<'a, T> {
/// Returns a mutable slice of all the items previously returned by [`Iter::next`].
pub fn as_mut_slice(&'a mut self) -> &'a mut [T] {
&mut self.output_buffer[..self.index]
}
/// Returns a slice of all the items previously returned by [`Iter::next`].
pub fn as_slice(&'a self) -> &'a [T] {
&self.output_buffer[..self.index]
}
}
impl<'a, T> Iterator for Iter<'a, T> {
type Item = &'a mut T;
fn next(&mut self) -> Option<Self::Item> {
debug_assert_eq!(self.buffer.len(), self.output_buffer.len());

if self.index >= self.buffer.len() {
return None;
}
unsafe {
let event = self.buffer.get_unchecked(self.index);
// For all events which can fire there exists an entry within `self.events` thus
// it is safe to unwrap here.
let f: *const dyn Fn(&mut EventManager<T>, EventSet) -> T = self
.event_manager
.events
.get(&i32::try_from(event.u64).unwrap_unchecked())
.unwrap_unchecked();
self.output_buffer[self.index] = (*f)(
self.event_manager,
EventSet::from_bits_unchecked(event.events),
);

// SAFETY: This is always safe. This is required as the current standard library trait
// doesn't support lending iteraor semantics.
let temp = Some(std::mem::transmute(&mut self.output_buffer[self.index]));

self.index += 1;

temp
}
}

/// O(1)
fn size_hint(&self) -> (usize, Option<usize>) {
let n = self.buffer.len() - self.index;
(n, Some(n))
}
}

impl<T> EventManager<T> {
/// Add an entry to the interest list of the epoll file descriptor.
///
Expand Down Expand Up @@ -186,12 +236,12 @@ impl<T> EventManager<T> {
///
/// When the value given in timeout does not fit within an `i32` e.g.
/// `timeout.map(|u| i32::try_from(u).unwrap())`.
pub fn wait(
&mut self,
pub fn wait<'a>(
&'a mut self,
timeout: Option<u32>,
buffer: &mut [libc::epoll_event],
output_buffer: &mut [T],
) -> Result<i32, i32> {
buffer: &'a mut [libc::epoll_event],
output_buffer: &'a mut [T],
) -> Result<Iter<'a, T>, i32> {
// SAFETY: Always safe.
match unsafe {
libc::epoll_wait(
Expand All @@ -204,18 +254,13 @@ impl<T> EventManager<T> {
-1 => Err(errno()),
// SAFETY: `x` elements are initialized by `libc::epoll_wait`.
n @ 0.. => unsafe {
#[allow(clippy::needless_range_loop)]
for i in 0..usize::try_from(n).unwrap_unchecked() {
let event = buffer[i];
// For all events which can fire there exists an entry within `self.events` thus
// it is safe to unwrap here.
let f: *const dyn Fn(&mut EventManager<T>, EventSet) -> T = self
.events
.get(&i32::try_from(event.u64).unwrap_unchecked())
.unwrap_unchecked();
output_buffer[i] = (*f)(self, EventSet::from_bits_unchecked(event.events));
}
Ok(n)
let n = usize::try_from(n).unwrap_unchecked();
Ok(Iter {
event_manager: self,
buffer: &mut buffer[..n],
output_buffer: &mut output_buffer[..n],
index: 0,
})
},
_ => unreachable!(),
}
Expand Down Expand Up @@ -286,17 +331,19 @@ mod tests {

// The file descriptor has been pre-armed, this will immediately call the respective
// closure.
let vec = vec![()];
assert_eq!(manager.wait(Some(10)), Ok(vec.as_slice()));
let mut iter = manager.wait(Some(10)).unwrap();
assert_eq!(iter.next(), Some(&mut ()));
assert_eq!(iter.next(), None);

// As the closure will flip the atomic boolean we assert it has flipped correctly.
assert!(COUNT.load(Ordering::SeqCst));

// At this point we have called the closure, since the closure removes the event fd from the
// interest list of the inner epoll, calling this again should timeout as there are no event
// fd in the inner epolls interest list which could trigger.
let vec = vec![];
assert_eq!(manager.wait(Some(10)), Ok(vec.as_slice()));
let mut iter = manager.wait(Some(10)).unwrap();
assert_eq!(iter.next(), None);

// As the `EventManager::wait` should timeout the value of the atomic boolean should not be
// flipped.
assert!(COUNT.load(Ordering::SeqCst));
Expand Down Expand Up @@ -329,15 +376,18 @@ mod tests {
assert!(!COUNT.load(Ordering::SeqCst));

// As the closure will flip the atomic boolean we assert it has flipped correctly.
let vec = vec![()];
assert_eq!(manager.wait(Some(10)), Ok(vec.as_slice()));
let mut iter = manager.wait(Some(10)).unwrap();
assert_eq!(iter.next(), Some(&mut ()));
assert_eq!(iter.next(), None);

// As the closure will flip the atomic boolean we assert it has flipped correctly.
assert!(COUNT.load(Ordering::SeqCst));

// The file descriptor has been pre-armed, this will immediately call the respective
// closure.
let vec = vec![()];
assert_eq!(manager.wait(Some(10)), Ok(vec.as_slice()));
let mut iter = manager.wait(Some(10)).unwrap();
assert_eq!(iter.next(), Some(&mut ()));
assert_eq!(iter.next(), None);
// As the closure will flip the atomic boolean we assert it has flipped correctly.
assert!(!COUNT.load(Ordering::SeqCst));
}
Expand Down Expand Up @@ -394,8 +444,12 @@ mod tests {
}

// Check counter are the correct values
let arr = [0; FIRING];
assert_eq!(manager.wait(None), Ok(arr.as_slice()));
let mut iter = manager.wait(None).unwrap();
for _ in 0..FIRING {
assert_eq!(iter.next(), Some(&mut 0));
}
assert_eq!(iter.next(), None);

for i in set {
assert_eq!(subscribers[i].1.load(Ordering::SeqCst), 1);
}
Expand Down Expand Up @@ -429,7 +483,9 @@ mod tests {
.add(event_fd, EventSet::IN, Box::new(|_, _| Err(())))
.unwrap();

let arr = [Ok(()), Err(())];
assert_eq!(manager.wait(None), Ok(arr.as_slice()));
let mut iter = manager.wait(None).unwrap();
assert_eq!(iter.next(), Some(&mut Ok(())));
assert_eq!(iter.next(), Some(&mut Err(())));
assert_eq!(iter.next(), None);
}
}

0 comments on commit 283b1ad

Please sign in to comment.