diff --git a/futures-util/src/io/buf_reader.rs b/futures-util/src/io/buf_reader.rs new file mode 100644 index 0000000000..4c7f598949 --- /dev/null +++ b/futures-util/src/io/buf_reader.rs @@ -0,0 +1,257 @@ +use futures_core::task::{Context, Poll}; +use futures_io::{AsyncBufRead, AsyncRead, AsyncSeek, Initializer, IoSliceMut, SeekFrom}; +use pin_utils::{unsafe_pinned, unsafe_unpinned}; +use std::io::{self, Read}; +use std::pin::Pin; +use std::{cmp, fmt}; +use super::DEFAULT_BUF_SIZE; + +/// The `BufReader` struct adds buffering to any reader. +/// +/// It can be excessively inefficient to work directly with a [`AsyncRead`] +/// instance. A `BufReader` performs large, infrequent reads on the underlying +/// [`AsyncRead`] and maintains an in-memory buffer of the results. +/// +/// `BufReader` can improve the speed of programs that make *small* and +/// *repeated* read calls to the same file or network socket. It does not +/// help when reading very large amounts at once, or reading just one or a few +/// times. It also provides no advantage when reading from a source that is +/// already in memory, like a `Vec`. +/// +/// When the `BufReader` is dropped, the contents of its buffer will be +/// discarded. Creating multiple instances of a `BufReader` on the same +/// stream can cause data loss. +/// +/// [`AsyncRead`]: futures_io::AsyncRead +/// +// TODO: Examples +pub struct BufReader { + inner: R, + buf: Box<[u8]>, + pos: usize, + cap: usize, +} + +impl BufReader { + unsafe_pinned!(inner: R); + unsafe_unpinned!(pos: usize); + unsafe_unpinned!(cap: usize); + + /// Creates a new `BufReader` with a default buffer capacity. The default is currently 8 KB, + /// but may change in the future. + pub fn new(inner: R) -> Self { + Self::with_capacity(DEFAULT_BUF_SIZE, inner) + } + + /// Creates a new `BufReader` with the specified buffer capacity. + pub fn with_capacity(capacity: usize, inner: R) -> Self { + unsafe { + let mut buffer = Vec::with_capacity(capacity); + buffer.set_len(capacity); + inner.initializer().initialize(&mut buffer); + Self { + inner, + buf: buffer.into_boxed_slice(), + pos: 0, + cap: 0, + } + } + } + + /// Gets a reference to the underlying reader. + /// + /// It is inadvisable to directly read from the underlying reader. + pub fn get_ref(&self) -> &R { + &self.inner + } + + /// Gets a mutable reference to the underlying reader. + /// + /// It is inadvisable to directly read from the underlying reader. + pub fn get_mut(&mut self) -> &mut R { + &mut self.inner + } + + /// Gets a pinned mutable reference to the underlying reader. + /// + /// It is inadvisable to directly read from the underlying reader. + pub fn get_pin_mut<'a>(self: Pin<&'a mut Self>) -> Pin<&'a mut R> { + self.inner() + } + + /// Consumes this `BufWriter`, returning the underlying reader. + /// + /// Note that any leftover data in the internal buffer is lost. + pub fn into_inner(self) -> R { + self.inner + } + + /// Returns a reference to the internally buffered data. + /// + /// Unlike `fill_buf`, this will not attempt to fill the buffer if it is empty. + pub fn buffer(&self) -> &[u8] { + &self.buf[self.pos..self.cap] + } + + /// Invalidates all data in the internal buffer. + #[inline] + fn discard_buffer(mut self: Pin<&mut Self>) { + *self.as_mut().pos() = 0; + *self.cap() = 0; + } +} + +impl BufReader { + // https://github.com/rust-lang/rust/issues/31100 + /// Seeks relative to the current position. If the new position lies within the buffer, + /// the buffer will not be flushed, allowing for more efficient seeks. + /// This method does not return the location of the underlying reader, so the caller + /// must track this information themselves if it is required. + pub fn poll_seek_relative( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + offset: i64, + ) -> Poll> { + let pos = self.pos as u64; + if offset < 0 { + if let Some(new_pos) = pos.checked_sub((-offset) as u64) { + *self.as_mut().pos() = new_pos as usize; + return Poll::Ready(Ok(())); + } + } else if let Some(new_pos) = pos.checked_add(offset as u64) { + if new_pos <= self.cap as u64 { + *self.as_mut().pos() = new_pos as usize; + return Poll::Ready(Ok(())); + } + } + self.poll_seek(cx, SeekFrom::Current(offset)).map(|res| res.map(|_| ())) + } +} + +impl AsyncRead for BufReader { + fn poll_read( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &mut [u8], + ) -> Poll> { + // If we don't have any buffered data and we're doing a massive read + // (larger than our internal buffer), bypass our internal buffer + // entirely. + if self.pos == self.cap && buf.len() >= self.buf.len() { + let res = ready!(self.as_mut().inner().poll_read(cx, buf)); + self.discard_buffer(); + return Poll::Ready(res); + } + let mut rem = ready!(self.as_mut().poll_fill_buf(cx))?; + let nread = rem.read(buf)?; + self.consume(nread); + Poll::Ready(Ok(nread)) + } + + fn poll_read_vectored( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + bufs: &mut [IoSliceMut<'_>], + ) -> Poll> { + let total_len = bufs.iter().map(|b| b.len()).sum::(); + if self.pos == self.cap && total_len >= self.buf.len() { + let res = ready!(self.as_mut().inner().poll_read_vectored(cx, bufs)); + self.discard_buffer(); + return Poll::Ready(res); + } + let mut rem = ready!(self.as_mut().poll_fill_buf(cx))?; + let nread = rem.read_vectored(bufs)?; + self.consume(nread); + Poll::Ready(Ok(nread)) + } + + // we can't skip unconditionally because of the large buffer case in read. + unsafe fn initializer(&self) -> Initializer { + self.inner.initializer() + } +} + +impl AsyncBufRead for BufReader { + fn poll_fill_buf<'a>( + self: Pin<&'a mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { + let Self { inner, buf, cap, pos } = unsafe { Pin::get_unchecked_mut(self) }; + let mut inner = unsafe { Pin::new_unchecked(inner) }; + + // If we've reached the end of our internal buffer then we need to fetch + // some more data from the underlying reader. + // Branch using `>=` instead of the more correct `==` + // to tell the compiler that the pos..cap slice is always valid. + if *pos >= *cap { + debug_assert!(*pos == *cap); + *cap = ready!(inner.as_mut().poll_read(cx, buf))?; + *pos = 0; + } + Poll::Ready(Ok(&buf[*pos..*cap])) + } + + fn consume(mut self: Pin<&mut Self>, amt: usize) { + *self.as_mut().pos() = cmp::min(self.pos + amt, self.cap); + } +} + +impl fmt::Debug for BufReader { + fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { + fmt.debug_struct("BufReader") + .field("reader", &self.inner) + .field("buffer", &format_args!("{}/{}", self.cap - self.pos, self.buf.len())) + .finish() + } +} + +impl AsyncSeek for BufReader { + /// Seek to an offset, in bytes, in the underlying reader. + /// + /// The position used for seeking with `SeekFrom::Current(_)` is the + /// position the underlying reader would be at if the `BufReader` had no + /// internal buffer. + /// + /// Seeking always discards the internal buffer, even if the seek position + /// would otherwise fall within it. This guarantees that calling + /// `.into_inner()` immediately after a seek yields the underlying reader + /// at the same position. + /// + /// To seek without discarding the internal buffer, use + /// [`BufReader::poll_seek_relative`](BufReader::poll_seek_relative). + /// + /// See [`AsyncSeek`](futures_io::AsyncSeek) for more details. + /// + /// Note: In the edge case where you're seeking with `SeekFrom::Current(n)` + /// where `n` minus the internal buffer length overflows an `i64`, two + /// seeks will be performed instead of one. If the second seek returns + /// `Err`, the underlying reader will be left at the same position it would + /// have if you called `seek` with `SeekFrom::Current(0)`. + fn poll_seek( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + pos: SeekFrom, + ) -> Poll> { + let result: u64; + if let SeekFrom::Current(n) = pos { + let remainder = (self.cap - self.pos) as i64; + // it should be safe to assume that remainder fits within an i64 as the alternative + // means we managed to allocate 8 exbibytes and that's absurd. + // But it's not out of the realm of possibility for some weird underlying reader to + // support seeking by i64::min_value() so we need to handle underflow when subtracting + // remainder. + if let Some(offset) = n.checked_sub(remainder) { + result = ready!(self.as_mut().inner().poll_seek(cx, SeekFrom::Current(offset)))?; + } else { + // seek backwards by our remainder, and then by the offset + ready!(self.as_mut().inner().poll_seek(cx, SeekFrom::Current(-remainder)))?; + result = ready!(self.as_mut().inner().poll_seek(cx, SeekFrom::Current(n)))?; + } + } else { + // Seeking with Start/End doesn't care about our buffer length. + result = ready!(self.as_mut().inner().poll_seek(cx, pos))?; + } + self.discard_buffer(); + Poll::Ready(Ok(result)) + } +} diff --git a/futures-util/src/io/mod.rs b/futures-util/src/io/mod.rs index c058378dab..c3a90a67a6 100644 --- a/futures-util/src/io/mod.rs +++ b/futures-util/src/io/mod.rs @@ -11,9 +11,19 @@ pub use futures_io::{ #[cfg(feature = "io-compat")] use crate::compat::Compat; +// used by `BufReader` and `BufWriter` +// https://github.com/rust-lang/rust/blob/master/src/libstd/sys_common/io.rs#L1 +const DEFAULT_BUF_SIZE: usize = 8 * 1024; + mod allow_std; pub use self::allow_std::AllowStdIo; +mod buf_reader; +pub use self::buf_reader::BufReader; + +// mod buf_writer; +// pub use self::buf_writer::BufWriter; + mod copy_into; pub use self::copy_into::CopyInto; diff --git a/futures/src/lib.rs b/futures/src/lib.rs index cdc7c974ba..bb057c135c 100644 --- a/futures/src/lib.rs +++ b/futures/src/lib.rs @@ -280,7 +280,7 @@ pub mod io { }; pub use futures_util::io::{ - AsyncReadExt, AsyncWriteExt, AsyncSeekExt, AsyncBufReadExt, AllowStdIo, + AsyncReadExt, AsyncWriteExt, AsyncSeekExt, AsyncBufReadExt, AllowStdIo, BufReader, Close, CopyInto, Flush, Read, ReadExact, ReadHalf, ReadToEnd, ReadUntil, Seek, Window, WriteAll, WriteHalf, }; diff --git a/futures/tests/io_buf_reader.rs b/futures/tests/io_buf_reader.rs new file mode 100644 index 0000000000..ef2f8f27a1 --- /dev/null +++ b/futures/tests/io_buf_reader.rs @@ -0,0 +1,321 @@ +use futures::executor::block_on; +use futures::future::Future; +use futures::io::{ + AsyncSeekExt, AsyncBufRead, AsyncBufReadExt, AsyncRead, AsyncReadExt, + AllowStdIo, BufReader, SeekFrom, +}; +use futures::task::{Context, Poll}; +use futures_test::task::noop_context; +use std::cmp; +use std::io::{self, Cursor}; +use std::pin::Pin; + +/// A dummy reader intended at testing short-reads propagation. +pub struct ShortReader { + lengths: Vec, +} + +impl io::Read for ShortReader { + fn read(&mut self, _: &mut [u8]) -> io::Result { + if self.lengths.is_empty() { + Ok(0) + } else { + Ok(self.lengths.remove(0)) + } + } +} + +macro_rules! run_fill_buf { + ($reader:expr) => {{ + let mut cx = noop_context(); + loop { + if let Poll::Ready(x) = Pin::new(&mut $reader).poll_fill_buf(&mut cx) { + break x; + } + } + }}; +} + +macro_rules! run_seek_relative { + ($reader:expr, $offset:expr) => {{ + let mut cx = noop_context(); + loop { + if let Poll::Ready(x) = Pin::new(&mut $reader).poll_seek_relative(&mut cx, $offset) { + break x; + } + } + }}; +} + +#[test] +fn test_buffered_reader() { + let inner: &[u8] = &[5, 6, 7, 0, 1, 2, 3, 4]; + let mut reader = BufReader::with_capacity(2, inner); + + let mut buf = [0, 0, 0]; + let nread = block_on(reader.read(&mut buf)); + assert_eq!(nread.unwrap(), 3); + assert_eq!(buf, [5, 6, 7]); + assert_eq!(reader.buffer(), []); + + let mut buf = [0, 0]; + let nread = block_on(reader.read(&mut buf)); + assert_eq!(nread.unwrap(), 2); + assert_eq!(buf, [0, 1]); + assert_eq!(reader.buffer(), []); + + let mut buf = [0]; + let nread = block_on(reader.read(&mut buf)); + assert_eq!(nread.unwrap(), 1); + assert_eq!(buf, [2]); + assert_eq!(reader.buffer(), [3]); + + let mut buf = [0, 0, 0]; + let nread = block_on(reader.read(&mut buf)); + assert_eq!(nread.unwrap(), 1); + assert_eq!(buf, [3, 0, 0]); + assert_eq!(reader.buffer(), []); + + let nread = block_on(reader.read(&mut buf)); + assert_eq!(nread.unwrap(), 1); + assert_eq!(buf, [4, 0, 0]); + assert_eq!(reader.buffer(), []); + + assert_eq!(block_on(reader.read(&mut buf)).unwrap(), 0); +} + +#[test] +fn test_buffered_reader_seek() { + let inner: &[u8] = &[5, 6, 7, 0, 1, 2, 3, 4]; + let mut reader = BufReader::with_capacity(2, Cursor::new(inner)); + + assert_eq!(block_on(reader.seek(SeekFrom::Start(3))).ok(), Some(3)); + assert_eq!(run_fill_buf!(reader).ok(), Some(&[0, 1][..])); + assert_eq!(block_on(reader.seek(SeekFrom::Current(0))).ok(), Some(3)); + assert_eq!(run_fill_buf!(reader).ok(), Some(&[0, 1][..])); + assert_eq!(block_on(reader.seek(SeekFrom::Current(1))).ok(), Some(4)); + assert_eq!(run_fill_buf!(reader).ok(), Some(&[1, 2][..])); + Pin::new(&mut reader).consume(1); + assert_eq!(block_on(reader.seek(SeekFrom::Current(-2))).ok(), Some(3)); +} + +#[test] +fn test_buffered_reader_seek_relative() { + let inner: &[u8] = &[5, 6, 7, 0, 1, 2, 3, 4]; + let mut reader = BufReader::with_capacity(2, Cursor::new(inner)); + + assert!(run_seek_relative!(reader, 3).is_ok()); + assert_eq!(run_fill_buf!(reader).ok(), Some(&[0, 1][..])); + assert!(run_seek_relative!(reader, 0).is_ok()); + assert_eq!(run_fill_buf!(reader).ok(), Some(&[0, 1][..])); + assert!(run_seek_relative!(reader, 1).is_ok()); + assert_eq!(run_fill_buf!(reader).ok(), Some(&[1][..])); + assert!(run_seek_relative!(reader, -1).is_ok()); + assert_eq!(run_fill_buf!(reader).ok(), Some(&[0, 1][..])); + assert!(run_seek_relative!(reader, 2).is_ok()); + assert_eq!(run_fill_buf!(reader).ok(), Some(&[2, 3][..])); +} + +#[test] +fn test_buffered_reader_invalidated_after_read() { + let inner: &[u8] = &[5, 6, 7, 0, 1, 2, 3, 4]; + let mut reader = BufReader::with_capacity(3, Cursor::new(inner)); + + assert_eq!(run_fill_buf!(reader).ok(), Some(&[5, 6, 7][..])); + Pin::new(&mut reader).consume(3); + + let mut buffer = [0, 0, 0, 0, 0]; + assert_eq!(block_on(reader.read(&mut buffer)).ok(), Some(5)); + assert_eq!(buffer, [0, 1, 2, 3, 4]); + + assert!(run_seek_relative!(reader, -2).is_ok()); + let mut buffer = [0, 0]; + assert_eq!(block_on(reader.read(&mut buffer)).ok(), Some(2)); + assert_eq!(buffer, [3, 4]); +} + +#[test] +fn test_buffered_reader_invalidated_after_seek() { + let inner: &[u8] = &[5, 6, 7, 0, 1, 2, 3, 4]; + let mut reader = BufReader::with_capacity(3, Cursor::new(inner)); + + assert_eq!(run_fill_buf!(reader).ok(), Some(&[5, 6, 7][..])); + Pin::new(&mut reader).consume(3); + + assert!(block_on(reader.seek(SeekFrom::Current(5))).is_ok()); + + assert!(run_seek_relative!(reader, -2).is_ok()); + let mut buffer = [0, 0]; + assert_eq!(block_on(reader.read(&mut buffer)).ok(), Some(2)); + assert_eq!(buffer, [3, 4]); +} + +#[test] +fn test_buffered_reader_seek_underflow() { + // gimmick reader that yields its position modulo 256 for each byte + struct PositionReader { + pos: u64 + } + impl io::Read for PositionReader { + fn read(&mut self, buf: &mut [u8]) -> io::Result { + let len = buf.len(); + for x in buf { + *x = self.pos as u8; + self.pos = self.pos.wrapping_add(1); + } + Ok(len) + } + } + impl io::Seek for PositionReader { + fn seek(&mut self, pos: SeekFrom) -> io::Result { + match pos { + SeekFrom::Start(n) => { + self.pos = n; + } + SeekFrom::Current(n) => { + self.pos = self.pos.wrapping_add(n as u64); + } + SeekFrom::End(n) => { + self.pos = u64::max_value().wrapping_add(n as u64); + } + } + Ok(self.pos) + } + } + + let mut reader = BufReader::with_capacity(5, AllowStdIo::new(PositionReader { pos: 0 })); + assert_eq!(run_fill_buf!(reader).ok(), Some(&[0, 1, 2, 3, 4][..])); + assert_eq!(block_on(reader.seek(SeekFrom::End(-5))).ok(), Some(u64::max_value()-5)); + assert_eq!(run_fill_buf!(reader).ok().map(|s| s.len()), Some(5)); + // the following seek will require two underlying seeks + let expected = 9223372036854775802; + assert_eq!(block_on(reader.seek(SeekFrom::Current(i64::min_value()))).ok(), Some(expected)); + assert_eq!(run_fill_buf!(reader).ok().map(|s| s.len()), Some(5)); + // seeking to 0 should empty the buffer. + assert_eq!(block_on(reader.seek(SeekFrom::Current(0))).ok(), Some(expected)); + assert_eq!(reader.get_ref().get_ref().pos, expected); +} + +#[test] +fn test_short_reads() { + let inner = ShortReader { lengths: vec![0, 1, 2, 0, 1, 0] }; + let mut reader = BufReader::new(AllowStdIo::new(inner)); + let mut buf = [0, 0]; + assert_eq!(block_on(reader.read(&mut buf)).unwrap(), 0); + assert_eq!(block_on(reader.read(&mut buf)).unwrap(), 1); + assert_eq!(block_on(reader.read(&mut buf)).unwrap(), 2); + assert_eq!(block_on(reader.read(&mut buf)).unwrap(), 0); + assert_eq!(block_on(reader.read(&mut buf)).unwrap(), 1); + assert_eq!(block_on(reader.read(&mut buf)).unwrap(), 0); + assert_eq!(block_on(reader.read(&mut buf)).unwrap(), 0); +} + +struct MaybePending<'a> { + inner: &'a [u8], + ready_read: bool, + ready_fill_buf: bool, +} + +impl<'a> MaybePending<'a> { + fn new(inner: &'a [u8]) -> Self { + Self { inner, ready_read: false, ready_fill_buf: false } + } +} + +impl AsyncRead for MaybePending<'_> { + fn poll_read(mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut [u8]) + -> Poll> + { + if self.ready_read { + self.ready_read = false; + Pin::new(&mut self.inner).poll_read(cx, buf) + } else { + self.ready_read = true; + Poll::Pending + } + } +} + +impl AsyncBufRead for MaybePending<'_> { + fn poll_fill_buf<'a>(mut self: Pin<&'a mut Self>, _: &mut Context<'_>) + -> Poll> + { + if self.ready_fill_buf { + self.ready_fill_buf = false; + if self.inner.is_empty() { return Poll::Ready(Ok(&[])) } + let len = cmp::min(2, self.inner.len()); + Poll::Ready(Ok(&self.inner[0..len])) + } else { + self.ready_fill_buf = true; + Poll::Pending + } + } + + fn consume(mut self: Pin<&mut Self>, amt: usize) { + self.inner = &self.inner[amt..]; + } +} + +fn run(mut f: F) -> F::Output { + let mut cx = noop_context(); + loop { + if let Poll::Ready(x) = Pin::new(&mut f).poll(&mut cx) { + return x; + } + } +} + +#[test] +fn maybe_pending() { + let inner: &[u8] = &[5, 6, 7, 0, 1, 2, 3, 4]; + let mut reader = BufReader::with_capacity(2, MaybePending::new(inner)); + + let mut buf = [0, 0, 0]; + let nread = run(reader.read(&mut buf)); + assert_eq!(nread.unwrap(), 3); + assert_eq!(buf, [5, 6, 7]); + assert_eq!(reader.buffer(), []); + + let mut buf = [0, 0]; + let nread = run(reader.read(&mut buf)); + assert_eq!(nread.unwrap(), 2); + assert_eq!(buf, [0, 1]); + assert_eq!(reader.buffer(), []); + + let mut buf = [0]; + let nread = run(reader.read(&mut buf)); + assert_eq!(nread.unwrap(), 1); + assert_eq!(buf, [2]); + assert_eq!(reader.buffer(), [3]); + + let mut buf = [0, 0, 0]; + let nread = run(reader.read(&mut buf)); + assert_eq!(nread.unwrap(), 1); + assert_eq!(buf, [3, 0, 0]); + assert_eq!(reader.buffer(), []); + + let nread = run(reader.read(&mut buf)); + assert_eq!(nread.unwrap(), 1); + assert_eq!(buf, [4, 0, 0]); + assert_eq!(reader.buffer(), []); + + assert_eq!(run(reader.read(&mut buf)).unwrap(), 0); +} + +#[test] +fn maybe_pending_buf_read() { + let inner = MaybePending::new(&[0, 1, 2, 3, 1, 0]); + let mut reader = BufReader::with_capacity(2, inner); + let mut v = Vec::new(); + run(reader.read_until(3, &mut v)).unwrap(); + assert_eq!(v, [0, 1, 2, 3]); + v.clear(); + run(reader.read_until(1, &mut v)).unwrap(); + assert_eq!(v, [1]); + v.clear(); + run(reader.read_until(8, &mut v)).unwrap(); + assert_eq!(v, [0]); + v.clear(); + run(reader.read_until(9, &mut v)).unwrap(); + assert_eq!(v, []); +}