From 4025e790d4242779a190e9012f8903a0680f9919 Mon Sep 17 00:00:00 2001 From: Wim Looman Date: Wed, 1 May 2019 14:37:57 +0200 Subject: [PATCH] Add Stream and AsyncRead pending test adaptors --- futures-test/Cargo.toml | 3 +- futures-test/src/future/mod.rs | 2 +- futures-test/src/io/interleave_pending.rs | 80 +++++++++++++++++++ futures-test/src/io/mod.rs | 78 ++++++++++++++++++ futures-test/src/lib.rs | 6 ++ futures-test/src/stream/interleave_pending.rs | 49 ++++++++++++ futures-test/src/stream/mod.rs | 43 ++++++++++ 7 files changed, 259 insertions(+), 2 deletions(-) create mode 100644 futures-test/src/io/interleave_pending.rs create mode 100644 futures-test/src/io/mod.rs create mode 100644 futures-test/src/stream/interleave_pending.rs create mode 100644 futures-test/src/stream/mod.rs diff --git a/futures-test/Cargo.toml b/futures-test/Cargo.toml index 3c53be9ddb..5b5e4d87b5 100644 --- a/futures-test/Cargo.toml +++ b/futures-test/Cargo.toml @@ -16,6 +16,7 @@ name = "futures_test" [dependencies] futures-core-preview = { version = "=0.3.0-alpha.15", path = "../futures-core", default-features = false } +futures-io-preview = { version = "=0.3.0-alpha.15", path = "../futures-io", default-features = false } futures-util-preview = { version = "=0.3.0-alpha.15", path = "../futures-util", default-features = false } futures-executor-preview = { version = "=0.3.0-alpha.15", path = "../futures-executor", default-features = false } pin-utils = { version = "0.1.0-alpha.4", default-features = false } @@ -25,4 +26,4 @@ futures-preview = { version = "=0.3.0-alpha.15", path = "../futures", default-fe [features] default = ["std"] -std = ["futures-core-preview/std", "futures-util-preview/std", "futures-executor-preview/std"] +std = ["futures-core-preview/std", "futures-io-preview/std", "futures-util-preview/std", "futures-executor-preview/std"] diff --git a/futures-test/src/future/mod.rs b/futures-test/src/future/mod.rs index a3e56c7906..73b2326e5e 100644 --- a/futures-test/src/future/mod.rs +++ b/futures-test/src/future/mod.rs @@ -52,7 +52,7 @@ pub trait FutureTestExt: Future { where Self: Sized, { - pending_once::PendingOnce::new(self) + PendingOnce::new(self) } /// Runs this future on a dedicated executor running in a background thread. diff --git a/futures-test/src/io/interleave_pending.rs b/futures-test/src/io/interleave_pending.rs new file mode 100644 index 0000000000..0819f08f52 --- /dev/null +++ b/futures-test/src/io/interleave_pending.rs @@ -0,0 +1,80 @@ +use futures_io::{self as io, AsyncBufRead, AsyncRead}; +use pin_utils::{unsafe_pinned, unsafe_unpinned}; +use std::{ + marker::Unpin, + pin::Pin, + task::{Context, Poll}, +}; + +/// Reader for the [`interleave_pending`](super::AsyncReadTestExt::interleave_pending) method. +#[derive(Debug)] +pub struct InterleavePending { + reader: R, + pended: bool, +} + +impl Unpin for InterleavePending {} + +impl InterleavePending { + unsafe_pinned!(reader: R); + unsafe_unpinned!(pended: bool); + + pub(crate) fn new(reader: R) -> InterleavePending { + InterleavePending { + reader, + pended: false, + } + } + + fn project<'a>(self: Pin<&'a mut Self>) -> (Pin<&'a mut R>, &'a mut bool) { + unsafe { + let this = self.get_unchecked_mut(); + (Pin::new_unchecked(&mut this.reader), &mut this.pended) + } + } +} + +impl AsyncRead for InterleavePending { + fn poll_read( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &mut [u8], + ) -> Poll> { + let (reader, pended) = self.project(); + if *pended { + let next = reader.poll_read(cx, buf); + if next.is_ready() { + *pended = false; + } + next + } else { + cx.waker().wake_by_ref(); + *pended = true; + Poll::Pending + } + } +} + +impl AsyncBufRead for InterleavePending { + fn poll_fill_buf<'a>( + self: Pin<&'a mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { + let (reader, pended) = self.project(); + if *pended { + let next = reader.poll_fill_buf(cx); + if next.is_ready() { + *pended = false; + } + next + } else { + cx.waker().wake_by_ref(); + *pended = true; + Poll::Pending + } + } + + fn consume(self: Pin<&mut Self>, amount: usize) { + self.reader().consume(amount) + } +} diff --git a/futures-test/src/io/mod.rs b/futures-test/src/io/mod.rs new file mode 100644 index 0000000000..9a975d5972 --- /dev/null +++ b/futures-test/src/io/mod.rs @@ -0,0 +1,78 @@ +//! Additional combinators for testing async IO. + +use futures_io::AsyncRead; + +mod interleave_pending; +pub use self::interleave_pending::InterleavePending; + +/// Additional combinators for testing async readers. +pub trait AsyncReadTestExt: AsyncRead { + /// Introduces an extra [`Poll::Pending`](futures_core::task::Poll::Pending) + /// in between each read of the reader. + /// + /// # Examples + /// + /// ``` + /// #![feature(async_await)] + /// use futures::task::Poll; + /// use futures::io::AsyncRead; + /// use futures_test::task::noop_context; + /// use futures_test::io::AsyncReadTestExt; + /// use pin_utils::pin_mut; + /// + /// let reader = std::io::Cursor::new(&[1, 2, 3]).interleave_pending(); + /// pin_mut!(reader); + /// + /// let mut cx = noop_context(); + /// + /// let mut buf = [0, 0]; + /// + /// assert_eq!(reader.as_mut().poll_read(&mut cx, &mut buf[..])?, Poll::Pending); + /// assert_eq!(reader.as_mut().poll_read(&mut cx, &mut buf[..])?, Poll::Ready(2)); + /// assert_eq!(buf, [1, 2]); + /// assert_eq!(reader.as_mut().poll_read(&mut cx, &mut buf[..])?, Poll::Pending); + /// assert_eq!(reader.as_mut().poll_read(&mut cx, &mut buf[..])?, Poll::Ready(1)); + /// assert_eq!(buf, [3, 2]); + /// assert_eq!(reader.as_mut().poll_read(&mut cx, &mut buf[..])?, Poll::Pending); + /// assert_eq!(reader.as_mut().poll_read(&mut cx, &mut buf[..])?, Poll::Ready(0)); + /// + /// # Ok::<(), std::io::Error>(()) + /// ``` + /// + /// ## `AsyncBufRead` + /// + /// The returned reader will also implement `AsyncBufRead` if the underlying reader does. + /// + /// ``` + /// #![feature(async_await)] + /// use futures::task::Poll; + /// use futures::io::AsyncBufRead; + /// use futures_test::task::noop_context; + /// use futures_test::io::AsyncReadTestExt; + /// use pin_utils::pin_mut; + /// + /// let reader = std::io::Cursor::new(&[1, 2, 3]).interleave_pending(); + /// pin_mut!(reader); + /// + /// let mut cx = noop_context(); + /// + /// assert_eq!(reader.as_mut().poll_fill_buf(&mut cx)?, Poll::Pending); + /// assert_eq!(reader.as_mut().poll_fill_buf(&mut cx)?, Poll::Ready(&[1, 2, 3][..])); + /// reader.as_mut().consume(2); + /// assert_eq!(reader.as_mut().poll_fill_buf(&mut cx)?, Poll::Pending); + /// assert_eq!(reader.as_mut().poll_fill_buf(&mut cx)?, Poll::Ready(&[3][..])); + /// reader.as_mut().consume(1); + /// assert_eq!(reader.as_mut().poll_fill_buf(&mut cx)?, Poll::Pending); + /// assert_eq!(reader.as_mut().poll_fill_buf(&mut cx)?, Poll::Ready(&[][..])); + /// + /// # Ok::<(), std::io::Error>(()) + /// ``` + fn interleave_pending(self) -> InterleavePending + where + Self: Sized, + { + InterleavePending::new(self) + } +} + +impl AsyncReadTestExt for R where R: AsyncRead {} diff --git a/futures-test/src/lib.rs b/futures-test/src/lib.rs index 12ef551ff1..cb798b4473 100644 --- a/futures-test/src/lib.rs +++ b/futures-test/src/lib.rs @@ -26,3 +26,9 @@ pub mod task; #[cfg(feature = "std")] pub mod future; + +#[cfg(feature = "std")] +pub mod stream; + +#[cfg(feature = "std")] +pub mod io; diff --git a/futures-test/src/stream/interleave_pending.rs b/futures-test/src/stream/interleave_pending.rs new file mode 100644 index 0000000000..f215f087fa --- /dev/null +++ b/futures-test/src/stream/interleave_pending.rs @@ -0,0 +1,49 @@ +use futures_core::stream::Stream; +use pin_utils::{unsafe_pinned, unsafe_unpinned}; +use std::{ + marker::Unpin, + pin::Pin, + task::{Context, Poll}, +}; + +/// Stream for the [`interleave_pending`](super::StreamTestExt::interleave_pending) method. +#[derive(Debug)] +pub struct InterleavePending { + stream: St, + pended: bool, +} + +impl Unpin for InterleavePending {} + +impl InterleavePending { + unsafe_pinned!(stream: St); + unsafe_unpinned!(pended: bool); + + pub(crate) fn new(stream: St) -> InterleavePending { + InterleavePending { + stream, + pended: false, + } + } +} + +impl Stream for InterleavePending { + type Item = St::Item; + + fn poll_next( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { + if *self.as_mut().pended() { + let next = self.as_mut().stream().poll_next(cx); + if next.is_ready() { + *self.pended() = false; + } + next + } else { + cx.waker().wake_by_ref(); + *self.pended() = true; + Poll::Pending + } + } +} diff --git a/futures-test/src/stream/mod.rs b/futures-test/src/stream/mod.rs new file mode 100644 index 0000000000..30717c8fce --- /dev/null +++ b/futures-test/src/stream/mod.rs @@ -0,0 +1,43 @@ +//! Additional combinators for testing streams. + +use futures_core::stream::Stream; + +mod interleave_pending; +pub use self::interleave_pending::InterleavePending; + +/// Additional combinators for testing streams. +pub trait StreamTestExt: Stream { + /// Introduces an extra [`Poll::Pending`](futures_core::task::Poll::Pending) + /// in between each item of the stream. + /// + /// # Examples + /// + /// ``` + /// #![feature(async_await)] + /// use futures::task::Poll; + /// use futures::stream::{self, Stream}; + /// use futures_test::task::noop_context; + /// use futures_test::stream::StreamTestExt; + /// use pin_utils::pin_mut; + /// + /// let stream = stream::iter(vec![1, 2]).interleave_pending(); + /// pin_mut!(stream); + /// + /// let mut cx = noop_context(); + /// + /// assert_eq!(stream.as_mut().poll_next(&mut cx), Poll::Pending); + /// assert_eq!(stream.as_mut().poll_next(&mut cx), Poll::Ready(Some(1))); + /// assert_eq!(stream.as_mut().poll_next(&mut cx), Poll::Pending); + /// assert_eq!(stream.as_mut().poll_next(&mut cx), Poll::Ready(Some(2))); + /// assert_eq!(stream.as_mut().poll_next(&mut cx), Poll::Pending); + /// assert_eq!(stream.as_mut().poll_next(&mut cx), Poll::Ready(None)); + /// ``` + fn interleave_pending(self) -> InterleavePending + where + Self: Sized, + { + InterleavePending::new(self) + } +} + +impl StreamTestExt for St where St: Stream {}