diff --git a/object_store/src/lib.rs b/object_store/src/lib.rs index 66964304e853..f4e07c72d063 100644 --- a/object_store/src/lib.rs +++ b/object_store/src/lib.rs @@ -38,13 +38,18 @@ //! //! # Highlights //! -//! 1. A focused, easy to use, idiomatic, well documented, high -//! performance, `async` API. +//! 1. A high-performance async API focused on providing a consistent interface +//! mirroring that of the underlying object stores //! //! 2. Production quality, leading this crate to be used in large -//! scale production systems, such as [crates.io] and [InfluxDB IOx]. +//! scale production systems, such as [crates.io] and [InfluxDB IOx] //! -//! 3. Stable and predictable governance via the [Apache Arrow] project. +//! 3. Support for advanced functionality, including [ACID] reads +//! and writes, vectored IO, bulk deletion, and more +//! +//! 4. Stable and predictable governance via the [Apache Arrow] project +//! +//! 5. Very low dependency footprint //! //! Originally developed for [InfluxDB IOx] and subsequently donated //! to [Apache Arrow]. @@ -52,6 +57,7 @@ //! [Apache Arrow]: https://arrow.apache.org/ //! [InfluxDB IOx]: https://github.com/influxdata/influxdb_iox/ //! [crates.io]: https://github.com/rust-lang/crates.io +//! [ACID]: https://en.wikipedia.org/wiki/ACID //! //! # Available [`ObjectStore`] Implementations //! @@ -79,6 +85,22 @@ doc = "* [`http`]: [HTTP/WebDAV Storage](https://datatracker.ietf.org/doc/html/rfc2518). See [`HttpBuilder`](http::HttpBuilder)" )] //! +//! # Why not a Filesystem Interface? +//! +//! Whilst this crate does provide a [`BufReader`], the [`ObjectStore`] interface mirrors the APIs +//! of object stores and not filesystems. +//! +//! This provides some compelling advantages: +//! +//! * Except where explicitly stated otherwise, operations are atomic, and readers +//! cannot observe partial and/or failed writes +//! * Methods map directly to object store APIs, providing both efficiency and predictability +//! * Abstracts away filesystem and operating system specific quirks, ensuring portability +//! * Allows for functionality not native to filesystems, such as operation preconditions +//! and atomic multipart uploads +//! +//! [`BufReader`]: buffered::BufReader +//! //! # Adapters //! //! [`ObjectStore`] instances can be composed with various adapters @@ -87,8 +109,7 @@ //! * Rate Throttling: [`ThrottleConfig`](throttle::ThrottleConfig) //! * Concurrent Request Limit: [`LimitStore`](limit::LimitStore) //! -//! -//! # List objects: +//! # List objects //! //! Use the [`ObjectStore::list`] method to iterate over objects in //! remote storage or files in the local filesystem: @@ -111,7 +132,7 @@ //! // Recursively list all files below the 'data' path. //! // 1. On AWS S3 this would be the 'data/' prefix //! // 2. On a local filesystem, this would be the 'data' directory -//! let prefix: Path = "data".try_into().unwrap(); +//! let prefix = Path::from("data"); //! //! // Get an `async` stream of Metadata objects: //! let mut list_stream = object_store.list(Some(&prefix)); @@ -148,18 +169,18 @@ //! # //! # async fn example() { //! # -//! // create an ObjectStore +//! // Create an ObjectStore //! let object_store: Arc = get_object_store(); //! //! // Retrieve a specific file -//! let path: Path = "data/file01.parquet".try_into().unwrap(); +//! let path = Path::from("data/file01.parquet"); //! -//! // fetch the bytes from object store -//! let stream = object_store -//! .get(&path) -//! .await -//! .unwrap() -//! .into_stream(); +//! // Fetch the file metadata +//! let meta = object_store.head(&path).await.unwrap(); +//! println!("{meta:?}"); +//! +//! // Fetch the bytes from object store as a stream +//! let stream = object_store.get(&path).await.unwrap().into_stream(); //! //! // Count the '0's using `try_fold` from `TryStreamExt` trait //! let num_zeros = stream @@ -171,13 +192,9 @@ //! # } //! ``` //! -//! Which will print out something like the following: +//! # Put Object //! -//! ```text -//! Num zeros in data/file01.parquet is 657 -//! ``` -//! # Put object -//! Use the [`ObjectStore::put`] method to save data in remote storage or local filesystem. +//! Use the [`ObjectStore::put`] method to atomically write data. //! //! ``` //! # use object_store::local::LocalFileSystem; @@ -190,15 +207,17 @@ //! # } //! # async fn put() { //! # -//! let object_store: Arc = get_object_store(); -//! let path: Path = "data/file1".try_into().unwrap(); -//! let bytes = Bytes::from_static(b"hello"); -//! object_store.put(&path, bytes).await.unwrap(); +//! let object_store: Arc = get_object_store(); +//! let path = Path::from("data/file1"); +//! let bytes = Bytes::from_static(b"hello"); +//! object_store.put(&path, bytes).await.unwrap(); //! # } //! ``` //! -//! # Multipart put object -//! Use the [`ObjectStore::put_multipart`] method to save large amount of data in chunks. +//! # Multipart Upload +//! +//! Use the [`ObjectStore::put_multipart`] method to atomically write a large amount of data, +//! with implementations automatically handling parallel, chunked upload where appropriate. //! //! ``` //! # use object_store::local::LocalFileSystem; @@ -212,16 +231,159 @@ //! # } //! # async fn multi_upload() { //! # -//! let object_store: Arc = get_object_store(); -//! let path: Path = "data/large_file".try_into().unwrap(); -//! let (_id, mut writer) = object_store.put_multipart(&path).await.unwrap(); -//! -//! let bytes = Bytes::from_static(b"hello"); -//! writer.write_all(&bytes).await.unwrap(); -//! writer.flush().await.unwrap(); -//! writer.shutdown().await.unwrap(); +//! let object_store: Arc = get_object_store(); +//! let path = Path::from("data/large_file"); +//! let (_id, mut writer) = object_store.put_multipart(&path).await.unwrap(); +//! +//! let bytes = Bytes::from_static(b"hello"); +//! writer.write_all(&bytes).await.unwrap(); +//! writer.flush().await.unwrap(); +//! writer.shutdown().await.unwrap(); //! # } //! ``` +//! +//! # Vectored Read +//! +//! A common pattern, especially when reading structured datasets, is to need to fetch +//! multiple ranges of a particular object. +//! +//! [`ObjectStore::get_ranges`] provides an efficient way to perform such vectored IO, and will +//! automatically coalesce adjacent ranges into an appropriate number of parallel requests. +//! +//! ``` +//! # use object_store::local::LocalFileSystem; +//! # use object_store::ObjectStore; +//! # use std::sync::Arc; +//! # use bytes::Bytes; +//! # use tokio::io::AsyncWriteExt; +//! # use object_store::path::Path; +//! # fn get_object_store() -> Arc { +//! # Arc::new(LocalFileSystem::new()) +//! # } +//! # async fn multi_upload() { +//! # +//! let object_store: Arc = get_object_store(); +//! let path = Path::from("data/large_file"); +//! let ranges = object_store.get_ranges(&path, &[90..100, 400..600, 0..10]).await.unwrap(); +//! assert_eq!(ranges.len(), 3); +//! assert_eq!(ranges[0].len(), 10); +//! # } +//! ``` +//! +//! # Conditional Fetch +//! +//! More complex object retrieval can be supported by [`ObjectStore::get_opts`]. +//! +//! For example, efficiently refreshing a cache without re-fetching the entire object +//! data if the object hasn't been modified. +//! +//! ``` +//! # use std::collections::btree_map::Entry; +//! # use std::collections::HashMap; +//! # use object_store::{GetOptions, GetResult, ObjectStore, Result, Error}; +//! # use std::sync::Arc; +//! # use std::time::{Duration, Instant}; +//! # use bytes::Bytes; +//! # use tokio::io::AsyncWriteExt; +//! # use object_store::path::Path; +//! struct CacheEntry { +//! data: Bytes, +//! e_tag: String, +//! refreshed_at: Instant, +//! } +//! +//! struct Cache { +//! entries: HashMap, +//! store: Arc, +//! } +//! +//! impl Cache { +//! pub async fn get(&mut self, path: &Path) -> Result { +//! Ok(match self.entries.get_mut(path) { +//! Some(e) => match e.refreshed_at.elapsed() < Duration::from_secs(10) { +//! true => e.data.clone(), // Return cached data +//! false => { +//! let opts = GetOptions { +//! if_none_match: Some(e.e_tag.clone()), +//! ..GetOptions::default() +//! }; +//! match self.store.get_opts(&path, opts).await { +//! Ok(d) => e.data = d.bytes().await?, +//! Err(Error::NotModified { .. }) => {} // Data has not changed +//! Err(e) => return Err(e), +//! }; +//! e.refreshed_at = Instant::now(); +//! e.data.clone() +//! } +//! }, +//! None => { +//! let get = self.store.get(&path).await?; +//! let e_tag = get.meta.e_tag.clone(); +//! let data = get.bytes().await?; +//! if let Some(e_tag) = e_tag { +//! let entry = CacheEntry { +//! e_tag, +//! data: data.clone(), +//! refreshed_at: Instant::now(), +//! }; +//! self.entries.insert(path.clone(), entry); +//! } +//! data +//! } +//! }) +//! } +//! } +//! ``` +//! +//! # Conditional Put +//! +//! The default behaviour when writing data is to upsert any existing object at the given path. +//! More complex behaviours can be achieved using [`PutMode`], and can be used to build +//! [Optimistic Concurrency Control] based transactions. This facilitates building metadata catalogs, +//! such as [Apache Iceberg] or [Delta Lake], directly on top of object storage, without relying on +//! a separate DBMS. +//! +//! ``` +//! # use object_store::{Error, ObjectStore, PutMode, UpdateVersion}; +//! # use std::sync::Arc; +//! # use bytes::Bytes; +//! # use tokio::io::AsyncWriteExt; +//! # use object_store::memory::InMemory; +//! # use object_store::path::Path; +//! # fn get_object_store() -> Arc { +//! # Arc::new(InMemory::new()) +//! # } +//! # fn do_update(b: Bytes) -> Bytes {b} +//! # async fn conditional_put() { +//! let store = get_object_store(); +//! let path = Path::from("test"); +//! loop { +//! // Perform get request +//! let r = store.get(&path).await.unwrap(); +//! +//! // Save version information fetched +//! let version = UpdateVersion { +//! e_tag: r.meta.e_tag.clone(), +//! version: r.meta.version.clone(), +//! }; +//! +//! // Compute new version of object contents +//! let new = do_update(r.bytes().await.unwrap()); +//! +//! // Attempt to commit transaction +//! match store.put_opts(&path, new, PutMode::Update(version).into()).await { +//! Ok(_) => break, // Successfully committed +//! Err(Error::Precondition { .. }) => continue, // Transaction conflict, try again +//! Err(e) => panic!("{e}") +//! } +//! } +//! # } +//! ``` +//! +//! [Optimistic Concurrency Control]: https://en.wikipedia.org/wiki/Optimistic_concurrency_control +//! [Apache Iceberg]: https://iceberg.apache.org/ +//! [Delta Lake]: https://delta.io/ +//! #[cfg(all( target_arch = "wasm32",