Skip to content

Commit

Permalink
Add Stream and AsyncRead pending test adaptors
Browse files Browse the repository at this point in the history
  • Loading branch information
Nemo157 authored and cramertj committed May 3, 2019
1 parent aaa1efc commit 4025e79
Show file tree
Hide file tree
Showing 7 changed files with 259 additions and 2 deletions.
3 changes: 2 additions & 1 deletion futures-test/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ name = "futures_test"

[dependencies]
futures-core-preview = { version = "=0.3.0-alpha.15", path = "../futures-core", default-features = false }
futures-io-preview = { version = "=0.3.0-alpha.15", path = "../futures-io", default-features = false }
futures-util-preview = { version = "=0.3.0-alpha.15", path = "../futures-util", default-features = false }
futures-executor-preview = { version = "=0.3.0-alpha.15", path = "../futures-executor", default-features = false }
pin-utils = { version = "0.1.0-alpha.4", default-features = false }
Expand All @@ -25,4 +26,4 @@ futures-preview = { version = "=0.3.0-alpha.15", path = "../futures", default-fe

[features]
default = ["std"]
std = ["futures-core-preview/std", "futures-util-preview/std", "futures-executor-preview/std"]
std = ["futures-core-preview/std", "futures-io-preview/std", "futures-util-preview/std", "futures-executor-preview/std"]
2 changes: 1 addition & 1 deletion futures-test/src/future/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ pub trait FutureTestExt: Future {
where
Self: Sized,
{
pending_once::PendingOnce::new(self)
PendingOnce::new(self)
}

/// Runs this future on a dedicated executor running in a background thread.
Expand Down
80 changes: 80 additions & 0 deletions futures-test/src/io/interleave_pending.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
use futures_io::{self as io, AsyncBufRead, AsyncRead};
use pin_utils::{unsafe_pinned, unsafe_unpinned};
use std::{
marker::Unpin,
pin::Pin,
task::{Context, Poll},
};

/// Reader for the [`interleave_pending`](super::AsyncReadTestExt::interleave_pending) method.
#[derive(Debug)]
pub struct InterleavePending<R: AsyncRead> {
reader: R,
pended: bool,
}

impl<R: AsyncRead + Unpin> Unpin for InterleavePending<R> {}

impl<R: AsyncRead> InterleavePending<R> {
unsafe_pinned!(reader: R);
unsafe_unpinned!(pended: bool);

pub(crate) fn new(reader: R) -> InterleavePending<R> {
InterleavePending {
reader,
pended: false,
}
}

fn project<'a>(self: Pin<&'a mut Self>) -> (Pin<&'a mut R>, &'a mut bool) {
unsafe {
let this = self.get_unchecked_mut();
(Pin::new_unchecked(&mut this.reader), &mut this.pended)
}
}
}

impl<R: AsyncRead> AsyncRead for InterleavePending<R> {
fn poll_read(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut [u8],
) -> Poll<io::Result<usize>> {
let (reader, pended) = self.project();
if *pended {
let next = reader.poll_read(cx, buf);
if next.is_ready() {
*pended = false;
}
next
} else {
cx.waker().wake_by_ref();
*pended = true;
Poll::Pending
}
}
}

impl<R: AsyncBufRead> AsyncBufRead for InterleavePending<R> {
fn poll_fill_buf<'a>(
self: Pin<&'a mut Self>,
cx: &mut Context<'_>,
) -> Poll<io::Result<&'a [u8]>> {
let (reader, pended) = self.project();
if *pended {
let next = reader.poll_fill_buf(cx);
if next.is_ready() {
*pended = false;
}
next
} else {
cx.waker().wake_by_ref();
*pended = true;
Poll::Pending
}
}

fn consume(self: Pin<&mut Self>, amount: usize) {
self.reader().consume(amount)
}
}
78 changes: 78 additions & 0 deletions futures-test/src/io/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
//! Additional combinators for testing async IO.
use futures_io::AsyncRead;

mod interleave_pending;
pub use self::interleave_pending::InterleavePending;

/// Additional combinators for testing async readers.
pub trait AsyncReadTestExt: AsyncRead {
/// Introduces an extra [`Poll::Pending`](futures_core::task::Poll::Pending)
/// in between each read of the reader.
///
/// # Examples
///
/// ```
/// #![feature(async_await)]
/// use futures::task::Poll;
/// use futures::io::AsyncRead;
/// use futures_test::task::noop_context;
/// use futures_test::io::AsyncReadTestExt;
/// use pin_utils::pin_mut;
///
/// let reader = std::io::Cursor::new(&[1, 2, 3]).interleave_pending();
/// pin_mut!(reader);
///
/// let mut cx = noop_context();
///
/// let mut buf = [0, 0];
///
/// assert_eq!(reader.as_mut().poll_read(&mut cx, &mut buf[..])?, Poll::Pending);
/// assert_eq!(reader.as_mut().poll_read(&mut cx, &mut buf[..])?, Poll::Ready(2));
/// assert_eq!(buf, [1, 2]);
/// assert_eq!(reader.as_mut().poll_read(&mut cx, &mut buf[..])?, Poll::Pending);
/// assert_eq!(reader.as_mut().poll_read(&mut cx, &mut buf[..])?, Poll::Ready(1));
/// assert_eq!(buf, [3, 2]);
/// assert_eq!(reader.as_mut().poll_read(&mut cx, &mut buf[..])?, Poll::Pending);
/// assert_eq!(reader.as_mut().poll_read(&mut cx, &mut buf[..])?, Poll::Ready(0));
///
/// # Ok::<(), std::io::Error>(())
/// ```
///
/// ## `AsyncBufRead`
///
/// The returned reader will also implement `AsyncBufRead` if the underlying reader does.
///
/// ```
/// #![feature(async_await)]
/// use futures::task::Poll;
/// use futures::io::AsyncBufRead;
/// use futures_test::task::noop_context;
/// use futures_test::io::AsyncReadTestExt;
/// use pin_utils::pin_mut;
///
/// let reader = std::io::Cursor::new(&[1, 2, 3]).interleave_pending();
/// pin_mut!(reader);
///
/// let mut cx = noop_context();
///
/// assert_eq!(reader.as_mut().poll_fill_buf(&mut cx)?, Poll::Pending);
/// assert_eq!(reader.as_mut().poll_fill_buf(&mut cx)?, Poll::Ready(&[1, 2, 3][..]));
/// reader.as_mut().consume(2);
/// assert_eq!(reader.as_mut().poll_fill_buf(&mut cx)?, Poll::Pending);
/// assert_eq!(reader.as_mut().poll_fill_buf(&mut cx)?, Poll::Ready(&[3][..]));
/// reader.as_mut().consume(1);
/// assert_eq!(reader.as_mut().poll_fill_buf(&mut cx)?, Poll::Pending);
/// assert_eq!(reader.as_mut().poll_fill_buf(&mut cx)?, Poll::Ready(&[][..]));
///
/// # Ok::<(), std::io::Error>(())
/// ```
fn interleave_pending(self) -> InterleavePending<Self>
where
Self: Sized,
{
InterleavePending::new(self)
}
}

impl<R> AsyncReadTestExt for R where R: AsyncRead {}
6 changes: 6 additions & 0 deletions futures-test/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,3 +26,9 @@ pub mod task;

#[cfg(feature = "std")]
pub mod future;

#[cfg(feature = "std")]
pub mod stream;

#[cfg(feature = "std")]
pub mod io;
49 changes: 49 additions & 0 deletions futures-test/src/stream/interleave_pending.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
use futures_core::stream::Stream;
use pin_utils::{unsafe_pinned, unsafe_unpinned};
use std::{
marker::Unpin,
pin::Pin,
task::{Context, Poll},
};

/// Stream for the [`interleave_pending`](super::StreamTestExt::interleave_pending) method.
#[derive(Debug)]
pub struct InterleavePending<St: Stream> {
stream: St,
pended: bool,
}

impl<St: Stream + Unpin> Unpin for InterleavePending<St> {}

impl<St: Stream> InterleavePending<St> {
unsafe_pinned!(stream: St);
unsafe_unpinned!(pended: bool);

pub(crate) fn new(stream: St) -> InterleavePending<St> {
InterleavePending {
stream,
pended: false,
}
}
}

impl<St: Stream> Stream for InterleavePending<St> {
type Item = St::Item;

fn poll_next(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Self::Item>> {
if *self.as_mut().pended() {
let next = self.as_mut().stream().poll_next(cx);
if next.is_ready() {
*self.pended() = false;
}
next
} else {
cx.waker().wake_by_ref();
*self.pended() = true;
Poll::Pending
}
}
}
43 changes: 43 additions & 0 deletions futures-test/src/stream/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
//! Additional combinators for testing streams.
use futures_core::stream::Stream;

mod interleave_pending;
pub use self::interleave_pending::InterleavePending;

/// Additional combinators for testing streams.
pub trait StreamTestExt: Stream {
/// Introduces an extra [`Poll::Pending`](futures_core::task::Poll::Pending)
/// in between each item of the stream.
///
/// # Examples
///
/// ```
/// #![feature(async_await)]
/// use futures::task::Poll;
/// use futures::stream::{self, Stream};
/// use futures_test::task::noop_context;
/// use futures_test::stream::StreamTestExt;
/// use pin_utils::pin_mut;
///
/// let stream = stream::iter(vec![1, 2]).interleave_pending();
/// pin_mut!(stream);
///
/// let mut cx = noop_context();
///
/// assert_eq!(stream.as_mut().poll_next(&mut cx), Poll::Pending);
/// assert_eq!(stream.as_mut().poll_next(&mut cx), Poll::Ready(Some(1)));
/// assert_eq!(stream.as_mut().poll_next(&mut cx), Poll::Pending);
/// assert_eq!(stream.as_mut().poll_next(&mut cx), Poll::Ready(Some(2)));
/// assert_eq!(stream.as_mut().poll_next(&mut cx), Poll::Pending);
/// assert_eq!(stream.as_mut().poll_next(&mut cx), Poll::Ready(None));
/// ```
fn interleave_pending(self) -> InterleavePending<Self>
where
Self: Sized,
{
InterleavePending::new(self)
}
}

impl<St> StreamTestExt for St where St: Stream {}

0 comments on commit 4025e79

Please sign in to comment.