Skip to content

Commit

Permalink
test to exercise & document behavior when we exhaust slots (#61)
Browse files Browse the repository at this point in the history
In #57 there
were doubts about the behavior when we're waiting for slots, and when we
return slots.

This PR adds test cases that demonstrate the current behavior.

The behavior demoed in
`test_slot_exhaustion_behavior_when_op_completes_but_future_does_not_get_polled`
might be surprising, but, is not a huge problem in Pageserver right now
because generally we don't have that pattern in the codebase.
Created an issue nonetheless, this can be improved:
#60

closes #57
  • Loading branch information
problame authored Oct 29, 2024
1 parent 2256845 commit 781989b
Show file tree
Hide file tree
Showing 8 changed files with 248 additions and 6 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions tokio-epoll-uring/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,3 +22,4 @@ tempfile = "3.6.0"
tracing-subscriber = "*"
os_pipe = "1.1.4"
assert-panic = "1.0.1"
bytes = "*"
4 changes: 2 additions & 2 deletions tokio-epoll-uring/src/system/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ pub(super) mod lifecycle;
pub(crate) mod slots;
pub(super) mod submission;
#[cfg(test)]
mod test_util;
pub(crate) mod test_util;
#[cfg(test)]
mod tests;
pub(crate) mod tests;

pub(crate) const RING_SIZE: u32 = 128;
3 changes: 1 addition & 2 deletions tokio-epoll-uring/src/system/slots.rs
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,6 @@ impl SlotsInner {
},
}
}

match &mut self.state {
SlotsInnerState::Open { myself, waiters } => {
clear_slot(&mut self.storage[idx]);
Expand All @@ -235,7 +234,6 @@ impl SlotsInner {
}
}
}
trace!("no waiters, returning idx to unused_indices");
self.unused_indices.push(idx);
}
SlotsInnerState::Draining => {
Expand Down Expand Up @@ -313,6 +311,7 @@ impl SlotsInner {
trace!("waking up future");
waker.wake();
}
// The slot will be returned by `wait_for_completion`.
}
Slot::PendingButFutureDropped {
_resources_owned_by_kernel,
Expand Down
5 changes: 5 additions & 0 deletions tokio-epoll-uring/src/system/test_util.rs
Original file line number Diff line number Diff line change
@@ -1 +1,6 @@
use std::time::Duration;

pub(super) mod shared_system_handle;
pub(crate) mod timerfd;

pub(crate) const FOREVER: Duration = Duration::from_secs(365 * 24 * 60 * 60);
67 changes: 67 additions & 0 deletions tokio-epoll-uring/src/system/test_util/timerfd.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
use bytes::Buf;
use nix::sys::time::TimeSpec;
use nix::sys::timerfd;
use nix::sys::timerfd::ClockId;
use uring_common::io_fd::IoFd;

use nix::sys::timer::TimerSetTimeFlags;

use nix::sys::timerfd::Expiration;

use nix::sys::timerfd::TimerFlags;

use std::time::Duration;

use crate::SystemHandle;

/// Abstraction for creating a timerfd.
///
/// See [`oneshot`] for creating it, and [`read`] for using `tokio-epoll-uring` to read from it.
pub struct TimerFd {
pub(crate) timerfd: timerfd::TimerFd,
}

pub fn oneshot(duration: Duration) -> TimerFd {
// setup a timerfd that will block readers forever
let timerfd = timerfd::TimerFd::new(ClockId::CLOCK_MONOTONIC, TimerFlags::empty())
.expect("timerfd creation");
timerfd
.set(
Expiration::OneShot(TimeSpec::from_duration(duration)),
TimerSetTimeFlags::empty(),
)
.unwrap();
TimerFd { timerfd }
}

impl TimerFd {
pub fn set(&self, duration: Duration) {
self.timerfd
.set(
Expiration::OneShot(TimeSpec::from_duration(duration)),
TimerSetTimeFlags::empty(),
)
.unwrap()
}
}

impl IoFd for TimerFd {
// Safety: we own the timerfd, so, as per the trait definition, we're allowed to return the fd.
unsafe fn as_fd(&self) -> i32 {
use std::os::fd::AsRawFd;
self.timerfd.as_raw_fd()
}
}

pub async fn read<T>(fd: impl IoFd + Send, system: T)
where
T: AsRef<SystemHandle>,
{
let value = vec![0u8; 8];
let ((_, value), res) = system.as_ref().read(fd, 0, value).await;
let n: usize = res.unwrap();
assert_eq!(n, 8);
let mut value = bytes::Bytes::from(value);
assert_ne!(value.get_u64_ne(), 0);
assert!(value.is_empty());
}
156 changes: 155 additions & 1 deletion tokio-epoll-uring/src/system/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,16 @@ use std::{
time::Duration,
};

use futures::{stream::FuturesUnordered, StreamExt};
use tokio::task::{unconstrained, JoinSet};
use tokio_util::sync::CancellationToken;

use crate::{
metrics::GlobalMetricsStorage, system::test_util::shared_system_handle::SharedSystemHandle,
metrics::GlobalMetricsStorage,
system::{
test_util::{shared_system_handle::SharedSystemHandle, timerfd, FOREVER},
RING_SIZE,
},
System,
};

Expand Down Expand Up @@ -305,3 +311,151 @@ async fn test_write() {

drop(fd);
}

/// Scenario: More tasks than slots; each tasks `.await`s one operation at a time.
///
/// NB: In this test, we use the pattern of `select! { ..., sleep(2 seconds) }` to drive op futures
/// to the point where they are enqueued and occupy a slot. This will become flaky if that takes
/// more than 2 seconds. A more deterministic way to do it would be to use
/// #[tokio::test(start_paused=true)].
#[tokio::test]
async fn test_slot_exhaustion_behavior_when_op_future_gets_dropped() {
let system = System::launch().await.unwrap();
let system = Arc::new(system);

// rack up 3*RING_SIZE tasks that wait forever
let mut submitted_or_enqueued = Vec::new();
let cancel = tokio_util::sync::CancellationToken::new();
let mut tasks = JoinSet::new();
for _ in 0..3 * RING_SIZE {
let system = system.clone();
let fd = Arc::new(timerfd::oneshot(FOREVER));
let cancel = cancel.child_token();
let (tx, rx) = tokio::sync::oneshot::channel();
submitted_or_enqueued.push(rx);
tasks.spawn(async move {
let fut = timerfd::read(Arc::clone(&fd), &system);
let mut fut = std::pin::pin!(fut);
tokio::select! {
biased; // to ensure we poll system.read() before notifying the test task
_ = &mut fut => {
unreachable!("timerfd only fires in far future")
}
_ = tokio::time::sleep(Duration::from_secs(2)) => { }
}
tx.send(()).expect("test bug");
tokio::select! {
_ = &mut fut => {
unreachable!()
}
_ = cancel.cancelled() => { drop(fut); fd }
}
});
}

for rx in submitted_or_enqueued {
rx.await.expect("test bug");
}

// all the futures have been submitted, drop them
cancel.cancel();
let mut timerfds = Vec::new();
while let Some(res) = tasks.join_next().await {
let timerfd = res.unwrap();
timerfds.push(timerfd);
}

// the slots are still blocked on the timerfd
// TODO: assert that directly
// assert it by starting a new read and check that that read will be Pending forever
let fire_in = Duration::from_secs(1);
let fd = timerfd::oneshot(fire_in);
tokio::time::sleep(2 * fire_in).await;
let fut = timerfd::read(fd, system.clone());
let mut fut = std::pin::pin!(fut);
tokio::select! {
biased; // ensure future gets queued first
_ = &mut fut => {
panic!("future shouldn't be ready because all slots are still used")
}
_ = tokio::time::sleep(Duration::from_secs(2)) => { }
}

// unblock the tasks by firing their timerfds sooner, otherwise shutdown hangs forever
for timerfd in timerfds {
timerfd.set(Duration::from_millis(1));
}

// our read should complete because unblocking of the tasks
// frees up slots
let _: () = fut.await;

Arc::into_inner(system).unwrap().initiate_shutdown().await;
}

/// Scenario: a single tasks creates many futures that get submitted
/// and hence occupy a slot, but the future never gets polled to completion,
/// even though the io_uring-level operation has long completed.
///
/// The current behavior is that the operation waits for a slot to
/// become available, i.e., it never completes.
///
/// NB: In this test, we use the pattern of `select! { ..., sleep(2 seconds) }` to drive op futures
/// to the point where they are enqueued and occupy a slot. This will become flaky if that takes
/// more than 2 seconds. A more deterministic way to do it would be to use
/// #[tokio::test(start_paused=true)].
#[tokio::test]
async fn test_slot_exhaustion_behavior_when_op_completes_but_future_does_not_get_polled() {
let system = Arc::new(System::launch().await.unwrap());

// Use up all slots.
let mut reads = FuturesUnordered::new();
let mut timerfds = Vec::new();
for _ in 0..RING_SIZE {
let oneshot = timerfd::oneshot(FOREVER);
let oneshot = Arc::new(oneshot);
let mut fut = Box::pin(tokio::task::unconstrained(timerfd::read(
oneshot.clone(),
system.clone(),
)));
let res = futures::poll!(&mut fut);
assert!(res.is_pending());
reads.push(fut);
timerfds.push(oneshot);
}

// An additional op will now wait forever for a free slot.
let mut nop = Box::pin(unconstrained(system.nop()));
tokio::select! {
biased; // ensure future gets queued first
res = &mut nop => {
panic!("nop shouldn't be able to get a slot because all slots are already used: {res:?}")
}
_ = tokio::time::sleep(Duration::from_secs(2)) => { }
}

// make the io_uring operations complete
for timerfd in timerfds {
timerfd.set(Duration::from_millis(1));
}

// despite the completed io_uring operations, our nop future is still waiting for a slot
tokio::select! {
biased; // ensure future gets queued first
res = &mut nop => {
panic!("nop shouldn't be able to get a slot because all slots are still used: {res:?}")
}
_ = tokio::time::sleep(Duration::from_secs(2)) => { }
}

//
// Cleanup
//
while let Some(()) = reads.next().await {}

// nop can now get a slot because the read futs have been polled to completion
let ((), res) = nop.await;
res.unwrap();

Arc::into_inner(system).unwrap().initiate_shutdown().await;
}
17 changes: 16 additions & 1 deletion uring-common/src/io_fd.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
use std::os::fd::{AsRawFd, OwnedFd, RawFd};
use std::{
os::fd::{AsRawFd, OwnedFd, RawFd},
sync::Arc,
};

/// An `io-uring` compatible file descriptor, or wrapper thereof.
///
Expand Down Expand Up @@ -66,3 +69,15 @@ impl IoFdMut for std::fs::File {
self.as_raw_fd()
}
}

impl<T> IoFd for Arc<T>
where
T: IoFd,
{
unsafe fn as_fd(&self) -> RawFd
where
T: IoFd,
{
self.as_ref().as_fd()
}
}

0 comments on commit 781989b

Please sign in to comment.