diff --git a/object_store/src/buffered.rs b/object_store/src/buffered.rs new file mode 100644 index 00000000000..bdc3f4c772b --- /dev/null +++ b/object_store/src/buffered.rs @@ -0,0 +1,293 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Utilities for performing tokio-style buffered IO + +use crate::path::Path; +use crate::{ObjectMeta, ObjectStore}; +use bytes::Bytes; +use futures::future::{BoxFuture, FutureExt}; +use futures::ready; +use std::cmp::Ordering; +use std::io::{Error, ErrorKind, SeekFrom}; +use std::pin::Pin; +use std::sync::Arc; +use std::task::{Context, Poll}; +use tokio::io::{AsyncBufRead, AsyncRead, AsyncSeek, ReadBuf}; + +/// The default buffer size used by [`BufReader`] +pub const DEFAULT_BUFFER_SIZE: usize = 1024 * 1024; + +/// An async-buffered reader compatible with the tokio IO traits +/// +/// Internally this maintains a buffer of the requested size, and uses [`ObjectStore::get_range`] +/// to populate its internal buffer once depleted. This buffer is cleared on seek. +/// +/// Whilst simple, this interface will typically be outperformed by the native [`ObjectStore`] +/// methods that better map to the network APIs. This is because most object stores have +/// very [high first-byte latencies], on the order of 100-200ms, and so avoiding unnecessary +/// round-trips is critical to throughput. +/// +/// Systems looking to sequentially scan a file should instead consider using [`ObjectStore::get`], +/// or [`ObjectStore::get_opts`], or [`ObjectStore::get_range`] to read a particular range. +/// +/// Systems looking to read multiple ranges of a file should instead consider using +/// [`ObjectStore::get_ranges`], which will optimise the vectored IO. +/// +/// [high first-byte latencies]: https://docs.aws.amazon.com/AmazonS3/latest/userguide/optimizing-performance.html +pub struct BufReader { + /// The object store to fetch data from + store: Arc, + /// The size of the object + size: u64, + /// The path to the object + path: Path, + /// The current position in the object + cursor: u64, + /// The number of bytes to read in a single request + capacity: usize, + /// The buffered data if any + buffer: Buffer, +} + +impl std::fmt::Debug for BufReader { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("BufReader") + .field("path", &self.path) + .field("size", &self.size) + .field("capacity", &self.capacity) + .finish() + } +} + +enum Buffer { + Empty, + Pending(BoxFuture<'static, std::io::Result>), + Ready(Bytes), +} + +impl BufReader { + /// Create a new [`BufReader`] from the provided [`ObjectMeta`] and [`ObjectStore`] + pub fn new(store: Arc, meta: &ObjectMeta) -> Self { + Self::with_capacity(store, meta, DEFAULT_BUFFER_SIZE) + } + + /// Create a new [`BufReader`] from the provided [`ObjectMeta`], [`ObjectStore`], and `capacity` + pub fn with_capacity( + store: Arc, + meta: &ObjectMeta, + capacity: usize, + ) -> Self { + Self { + path: meta.location.clone(), + size: meta.size as _, + store, + capacity, + cursor: 0, + buffer: Buffer::Empty, + } + } + + fn poll_fill_buf_impl( + &mut self, + cx: &mut Context<'_>, + amnt: usize, + ) -> Poll> { + let buf = &mut self.buffer; + loop { + match buf { + Buffer::Empty => { + let store = Arc::clone(&self.store); + let path = self.path.clone(); + let start = self.cursor.min(self.size) as _; + let end = self.cursor.saturating_add(amnt as u64).min(self.size) as _; + + if start == end { + return Poll::Ready(Ok(&[])); + } + + *buf = Buffer::Pending(Box::pin(async move { + Ok(store.get_range(&path, start..end).await?) + })) + } + Buffer::Pending(fut) => match ready!(fut.poll_unpin(cx)) { + Ok(b) => *buf = Buffer::Ready(b), + Err(e) => return Poll::Ready(Err(e)), + }, + Buffer::Ready(r) => return Poll::Ready(Ok(r)), + } + } + } +} + +impl AsyncSeek for BufReader { + fn start_seek(mut self: Pin<&mut Self>, position: SeekFrom) -> std::io::Result<()> { + self.cursor = match position { + SeekFrom::Start(offset) => offset, + SeekFrom::End(offset) => { + checked_add_signed(self.size,offset).ok_or_else(|| Error::new(ErrorKind::InvalidInput, format!("Seeking {offset} from end of {} byte file would result in overflow", self.size)))? + } + SeekFrom::Current(offset) => { + checked_add_signed(self.cursor, offset).ok_or_else(|| Error::new(ErrorKind::InvalidInput, format!("Seeking {offset} from current offset of {} would result in overflow", self.cursor)))? + } + }; + self.buffer = Buffer::Empty; + Ok(()) + } + + fn poll_complete( + self: Pin<&mut Self>, + _cx: &mut Context<'_>, + ) -> Poll> { + Poll::Ready(Ok(self.cursor)) + } +} + +impl AsyncRead for BufReader { + fn poll_read( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + out: &mut ReadBuf<'_>, + ) -> Poll> { + // Read the maximum of the internal buffer and `out` + let to_read = out.remaining().max(self.capacity); + let r = match ready!(self.poll_fill_buf_impl(cx, to_read)) { + Ok(buf) => { + let to_consume = out.remaining().min(buf.len()); + out.put_slice(&buf[..to_consume]); + self.consume(to_consume); + Ok(()) + } + Err(e) => Err(e), + }; + Poll::Ready(r) + } +} + +impl AsyncBufRead for BufReader { + fn poll_fill_buf( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { + let capacity = self.capacity; + self.get_mut().poll_fill_buf_impl(cx, capacity) + } + + fn consume(mut self: Pin<&mut Self>, amt: usize) { + match &mut self.buffer { + Buffer::Empty => assert_eq!(amt, 0, "cannot consume from empty buffer"), + Buffer::Ready(b) => match b.len().cmp(&amt) { + Ordering::Less => panic!("{amt} exceeds buffer sized of {}", b.len()), + Ordering::Greater => *b = b.slice(amt..), + Ordering::Equal => self.buffer = Buffer::Empty, + }, + Buffer::Pending(_) => panic!("cannot consume from pending buffer"), + } + self.cursor += amt as u64; + } +} + +/// Port of standardised function as requires Rust 1.66 +/// +/// +#[inline] +fn checked_add_signed(a: u64, rhs: i64) -> Option { + let (res, overflowed) = a.overflowing_add(rhs as _); + let overflow = overflowed ^ (rhs < 0); + (!overflow).then_some(res) +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::memory::InMemory; + use crate::path::Path; + use tokio::io::{AsyncBufReadExt, AsyncReadExt, AsyncSeekExt}; + + #[tokio::test] + async fn test_buf_reader() { + let store = Arc::new(InMemory::new()) as Arc; + + let existent = Path::from("exists.txt"); + const BYTES: usize = 4096; + + let data: Bytes = b"12345678".iter().cycle().copied().take(BYTES).collect(); + store.put(&existent, data.clone()).await.unwrap(); + + let meta = store.head(&existent).await.unwrap(); + + let mut reader = BufReader::new(Arc::clone(&store), &meta); + let mut out = Vec::with_capacity(BYTES); + let read = reader.read_to_end(&mut out).await.unwrap(); + + assert_eq!(read, BYTES); + assert_eq!(&out, &data); + + let err = reader.seek(SeekFrom::Current(i64::MIN)).await.unwrap_err(); + assert_eq!(err.to_string(), "Seeking -9223372036854775808 from current offset of 4096 would result in overflow"); + + reader.rewind().await.unwrap(); + + let err = reader.seek(SeekFrom::Current(-1)).await.unwrap_err(); + assert_eq!( + err.to_string(), + "Seeking -1 from current offset of 0 would result in overflow" + ); + + // Seeking beyond the bounds of the file is permitted but should return no data + reader.seek(SeekFrom::Start(u64::MAX)).await.unwrap(); + let buf = reader.fill_buf().await.unwrap(); + assert!(buf.is_empty()); + + let err = reader.seek(SeekFrom::Current(1)).await.unwrap_err(); + assert_eq!(err.to_string(), "Seeking 1 from current offset of 18446744073709551615 would result in overflow"); + + for capacity in [200, 1024, 4096, DEFAULT_BUFFER_SIZE] { + let store = Arc::clone(&store); + let mut reader = BufReader::with_capacity(store, &meta, capacity); + + let mut bytes_read = 0; + loop { + let buf = reader.fill_buf().await.unwrap(); + if buf.is_empty() { + assert_eq!(bytes_read, BYTES); + break; + } + assert!(buf.starts_with(b"12345678")); + bytes_read += 8; + reader.consume(8); + } + + let mut buf = Vec::with_capacity(76); + reader.seek(SeekFrom::Current(-76)).await.unwrap(); + reader.read_to_end(&mut buf).await.unwrap(); + assert_eq!(&buf, &data[BYTES - 76..]); + + reader.rewind().await.unwrap(); + let buffer = reader.fill_buf().await.unwrap(); + assert_eq!(buffer, &data[..capacity.min(BYTES)]); + + reader.seek(SeekFrom::Start(325)).await.unwrap(); + let buffer = reader.fill_buf().await.unwrap(); + assert_eq!(buffer, &data[325..(325 + capacity).min(BYTES)]); + + reader.seek(SeekFrom::End(0)).await.unwrap(); + let buffer = reader.fill_buf().await.unwrap(); + assert!(buffer.is_empty()); + } + } +} diff --git a/object_store/src/lib.rs b/object_store/src/lib.rs index cef10f1dd41..3fd363fd4f0 100644 --- a/object_store/src/lib.rs +++ b/object_store/src/lib.rs @@ -253,6 +253,7 @@ compile_error!("Features 'gcp', 'aws', 'azure', 'http' are not supported on wasm pub mod aws; #[cfg(feature = "azure")] pub mod azure; +pub mod buffered; #[cfg(not(target_arch = "wasm32"))] pub mod chunked; pub mod delimited;