From 8826bb790e1407c544116c5d6492ce63f866ba27 Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies Date: Mon, 23 Oct 2023 16:01:32 +0100 Subject: [PATCH 1/3] Improve object_store docs --- object_store/src/lib.rs | 244 ++++++++++++++++++++++++++++++++++------ 1 file changed, 208 insertions(+), 36 deletions(-) diff --git a/object_store/src/lib.rs b/object_store/src/lib.rs index 66964304e853..50fe973bc867 100644 --- a/object_store/src/lib.rs +++ b/object_store/src/lib.rs @@ -38,13 +38,18 @@ //! //! # Highlights //! -//! 1. A focused, easy to use, idiomatic, well documented, high -//! performance, `async` API. +//! 1. A high-performance async API focused on providing a consistent interface +//! mirroring that of object stores such as [S3] //! //! 2. Production quality, leading this crate to be used in large -//! scale production systems, such as [crates.io] and [InfluxDB IOx]. +//! scale production systems, such as [crates.io] and [InfluxDB IOx] //! -//! 3. Stable and predictable governance via the [Apache Arrow] project. +//! 3. Support for advanced functionality, including [ACID] reads +//! and writes, vectored IO, bulk deletion, and more... +//! +//! 4. Stable and predictable governance via the [Apache Arrow] project +//! +//! 5. Very low dependency footprint //! //! Originally developed for [InfluxDB IOx] and subsequently donated //! to [Apache Arrow]. @@ -52,6 +57,8 @@ //! [Apache Arrow]: https://arrow.apache.org/ //! [InfluxDB IOx]: https://github.com/influxdata/influxdb_iox/ //! [crates.io]: https://github.com/rust-lang/crates.io +//! [ACID]: https://en.wikipedia.org/wiki/ACID +//! [S3]: https://aws.amazon.com/s3/ //! //! # Available [`ObjectStore`] Implementations //! @@ -79,6 +86,22 @@ doc = "* [`http`]: [HTTP/WebDAV Storage](https://datatracker.ietf.org/doc/html/rfc2518). See [`HttpBuilder`](http::HttpBuilder)" )] //! +//! # Why not a Filesystem Interface? +//! +//! Whilst this crate does provide a [`BufReader`], the [`ObjectStore`] interface mirrors the APIs +//! of object stores and not filesystems. +//! +//! This provides some compelling advantages: +//! +//! * Except where explicitly stated otherwise, operations are atomic, and readers +//! cannot observe partial and/or failed writes +//! * Methods map directly to object store APIs, providing both efficiency and predictability +//! * Abstracts away filesystem and operating system specific quirks, ensuring portability +//! * Allows for functionality not native to filesystems, such as operation preconditions +//! and atomic multipart uploads +//! +//! [`BufReader`]: buffered::BufReader +//! //! # Adapters //! //! [`ObjectStore`] instances can be composed with various adapters @@ -87,8 +110,7 @@ //! * Rate Throttling: [`ThrottleConfig`](throttle::ThrottleConfig) //! * Concurrent Request Limit: [`LimitStore`](limit::LimitStore) //! -//! -//! # List objects: +//! # List objects //! //! Use the [`ObjectStore::list`] method to iterate over objects in //! remote storage or files in the local filesystem: @@ -111,7 +133,7 @@ //! // Recursively list all files below the 'data' path. //! // 1. On AWS S3 this would be the 'data/' prefix //! // 2. On a local filesystem, this would be the 'data' directory -//! let prefix: Path = "data".try_into().unwrap(); +//! let prefix = Path::from("data"); //! //! // Get an `async` stream of Metadata objects: //! let mut list_stream = object_store.list(Some(&prefix)); @@ -141,25 +163,34 @@ //! # use futures::TryStreamExt; //! # use object_store::local::LocalFileSystem; //! # use std::sync::Arc; -//! # use object_store::{path::Path, ObjectStore}; +//! # use bytes::Bytes; +//! # use object_store::{path::Path, ObjectStore, GetResult}; //! # fn get_object_store() -> Arc { //! # Arc::new(LocalFileSystem::new()) //! # } //! # //! # async fn example() { //! # -//! // create an ObjectStore +//! // Create an ObjectStore //! let object_store: Arc = get_object_store(); //! //! // Retrieve a specific file -//! let path: Path = "data/file01.parquet".try_into().unwrap(); +//! let path = Path::from("data/file01.parquet"); +//! +//! // Fetch just the file metadata +//! let meta = object_store.head(&path).await.unwrap(); +//! println!("{meta:?}"); +//! +//! // Fetch the object including metadata +//! let result: GetResult = object_store.get(&path).await.unwrap(); +//! assert_eq!(result.meta, meta); //! -//! // fetch the bytes from object store -//! let stream = object_store -//! .get(&path) -//! .await -//! .unwrap() -//! .into_stream(); +//! // Buffer the entire object in memory +//! let object: Bytes = result.bytes().await.unwrap(); +//! assert_eq!(object.len(), meta.size); +//! +//! // Alternatively stream the bytes from object storage +//! let stream = object_store.get(&path).await.unwrap().into_stream(); //! //! // Count the '0's using `try_fold` from `TryStreamExt` trait //! let num_zeros = stream @@ -171,13 +202,9 @@ //! # } //! ``` //! -//! Which will print out something like the following: +//! # Put Object //! -//! ```text -//! Num zeros in data/file01.parquet is 657 -//! ``` -//! # Put object -//! Use the [`ObjectStore::put`] method to save data in remote storage or local filesystem. +//! Use the [`ObjectStore::put`] method to atomically write data. //! //! ``` //! # use object_store::local::LocalFileSystem; @@ -190,15 +217,48 @@ //! # } //! # async fn put() { //! # -//! let object_store: Arc = get_object_store(); -//! let path: Path = "data/file1".try_into().unwrap(); -//! let bytes = Bytes::from_static(b"hello"); -//! object_store.put(&path, bytes).await.unwrap(); +//! let object_store: Arc = get_object_store(); +//! let path = Path::from("data/file1"); +//! let bytes = Bytes::from_static(b"hello"); +//! object_store.put(&path, bytes).await.unwrap(); +//! # } +//! ``` +//! +//! # Multipart Upload +//! +//! Use the [`ObjectStore::put_multipart`] method to atomically write a large amount of data, +//! with implementations automatically handling parallel, chunked upload where appropriate. +//! +//! ``` +//! # use object_store::local::LocalFileSystem; +//! # use object_store::ObjectStore; +//! # use std::sync::Arc; +//! # use bytes::Bytes; +//! # use tokio::io::AsyncWriteExt; +//! # use object_store::path::Path; +//! # fn get_object_store() -> Arc { +//! # Arc::new(LocalFileSystem::new()) +//! # } +//! # async fn multi_upload() { +//! # +//! let object_store: Arc = get_object_store(); +//! let path = Path::from("data/large_file"); +//! let (_id, mut writer) = object_store.put_multipart(&path).await.unwrap(); +//! +//! let bytes = Bytes::from_static(b"hello"); +//! writer.write_all(&bytes).await.unwrap(); +//! writer.flush().await.unwrap(); +//! writer.shutdown().await.unwrap(); //! # } //! ``` //! -//! # Multipart put object -//! Use the [`ObjectStore::put_multipart`] method to save large amount of data in chunks. +//! # Vectored Read +//! +//! A common pattern, especially when reading structured datasets, is to need to fetch +//! multiple ranges of a particular object. +//! +//! [`ObjectStore::get_ranges`] provides an efficient way to perform such vectored IO, and will +//! automatically coalesce adjacent ranges into an appropriate number of parallel requests. //! //! ``` //! # use object_store::local::LocalFileSystem; @@ -212,16 +272,128 @@ //! # } //! # async fn multi_upload() { //! # -//! let object_store: Arc = get_object_store(); -//! let path: Path = "data/large_file".try_into().unwrap(); -//! let (_id, mut writer) = object_store.put_multipart(&path).await.unwrap(); -//! -//! let bytes = Bytes::from_static(b"hello"); -//! writer.write_all(&bytes).await.unwrap(); -//! writer.flush().await.unwrap(); -//! writer.shutdown().await.unwrap(); +//! let object_store: Arc = get_object_store(); +//! let path = Path::from("data/large_file"); +//! let ranges = object_store.get_ranges(&path, &[90..100, 400..600, 0..10]).await.unwrap(); +//! assert_eq!(ranges.len(), 3); +//! assert_eq!(ranges[0].len(), 10); +//! # } +//! ``` +//! +//! # Conditional Fetch +//! +//! More complex object retrieval can be supported by [`ObjectStore::get_opts`]. +//! +//! For example, efficiently refreshing a cache without re-fetching the entire object +//! data if the object hasn't been modified. +//! +//! ``` +//! # use std::collections::btree_map::Entry; +//! # use std::collections::HashMap; +//! # use object_store::{GetOptions, GetResult, ObjectStore, Result, Error}; +//! # use std::sync::Arc; +//! # use std::time::{Duration, Instant}; +//! # use bytes::Bytes; +//! # use tokio::io::AsyncWriteExt; +//! # use object_store::path::Path; +//! struct CacheEntry { +//! data: Bytes, +//! e_tag: String, +//! refreshed_at: Instant, +//! } +//! +//! struct Cache { +//! entries: HashMap, +//! store: Arc, +//! } +//! +//! impl Cache { +//! pub async fn get(&mut self, path: &Path) -> Result { +//! Ok(match self.entries.get_mut(path) { +//! Some(e) => match e.refreshed_at.elapsed() < Duration::from_secs(10) { +//! true => e.data.clone(), // Return cached data +//! false => { +//! let opts = GetOptions { +//! if_none_match: Some(e.e_tag.clone()), +//! ..GetOptions::default() +//! }; +//! match self.store.get_opts(&path, opts).await { +//! Ok(d) => e.data = d.bytes().await?, +//! Err(Error::NotModified { .. }) => {} // Data has not changed +//! Err(e) => return Err(e), +//! }; +//! e.refreshed_at = Instant::now(); +//! e.data.clone() +//! } +//! }, +//! None => { +//! let get = self.store.get(&path).await?; +//! let e_tag = get.meta.e_tag.clone(); +//! let data = get.bytes().await?; +//! if let Some(e_tag) = e_tag { +//! let entry = CacheEntry { +//! e_tag, +//! data: data.clone(), +//! refreshed_at: Instant::now(), +//! }; +//! self.entries.insert(path.clone(), entry); +//! } +//! data +//! } +//! }) +//! } +//! } +//! ``` +//! +//! # Conditional Put +//! +//! The default behaviour when writing data is to upsert any existing object at the given path. +//! More complex behaviours can be achieved using [`PutMode`], and can be used to build +//! [Optimistic Concurrency Control] based transactions. This facilitates building metadata catalogs, +//! such as [Apache Iceberg] or [Delta Lake], directly on top of object storage, without relying on +//! a separate DBMS. +//! +//! ``` +//! # use object_store::{Error, ObjectStore, PutMode, UpdateVersion}; +//! # use std::sync::Arc; +//! # use bytes::Bytes; +//! # use tokio::io::AsyncWriteExt; +//! # use object_store::memory::InMemory; +//! # use object_store::path::Path; +//! # fn get_object_store() -> Arc { +//! # Arc::new(InMemory::new()) +//! # } +//! # fn do_update(b: Bytes) -> Bytes {b} +//! # async fn conditional_put() { +//! let store = get_object_store(); +//! let path = Path::from("test"); +//! loop { +//! // Perform get request +//! let r = store.get(&path).await.unwrap(); +//! +//! // Save version information fetched +//! let version = UpdateVersion { +//! e_tag: r.meta.e_tag.clone(), +//! version: r.meta.version.clone(), +//! }; +//! +//! // Compute new version of object contents +//! let new = do_update(r.bytes().await.unwrap()); +//! +//! // Attempt to commit transaction +//! match store.put_opts(&path, new, PutMode::Update(version).into()).await { +//! Ok(_) => break, // Successfully committed +//! Err(Error::Precondition { .. }) => continue, // Transaction conflict, try again +//! Err(e) => panic!("{e}") +//! } +//! } //! # } //! ``` +//! +//! [Optimistic Concurrency Control]: https://en.wikipedia.org/wiki/Optimistic_concurrency_control +//! [Apache Iceberg]: https://iceberg.apache.org/ +//! [Delta Lake]: https://delta.io/ +//! #[cfg(all( target_arch = "wasm32", From 16e01564dee133a3566c11a2ab3ae53b4edb23d9 Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies Date: Fri, 27 Oct 2023 14:54:14 +0100 Subject: [PATCH 2/3] Document configuration system --- object_store/src/lib.rs | 36 ++++++++++++++++++++++++++++++++++++ 1 file changed, 36 insertions(+) diff --git a/object_store/src/lib.rs b/object_store/src/lib.rs index 50fe973bc867..de9b8a9eb2ff 100644 --- a/object_store/src/lib.rs +++ b/object_store/src/lib.rs @@ -110,6 +110,42 @@ //! * Rate Throttling: [`ThrottleConfig`](throttle::ThrottleConfig) //! * Concurrent Request Limit: [`LimitStore`](limit::LimitStore) //! +//! # Configuration System +//! +//! This crate provides a configuration system inspired by the APIs exposed by [fsspec], +//! [PyArrow FileSystem], and [Hadoop FileSystem], allowing creating a [`DynObjectStore`] +//! from a URL and an optional list of key value pairs. This provides a flexible interface +//! to support a wide variety of user-defined store configurations, with minimal additional +//! application complexity. +//! +//! ```no_run +//! # use url::Url; +//! # use object_store::{parse_url, parse_url_opts}; +//! # use object_store::aws::{AmazonS3, AmazonS3Builder}; +//! # +//! # +//! // Can manually create a specific store variant using the appropriate builder +//! let store: AmazonS3 = AmazonS3Builder::from_env() +//! .with_bucket_name("my-bucket").build().unwrap(); +//! +//! // Alternatively can create an ObjectStore from an S3 URL +//! let url = Url::parse("s3://bucket/path").unwrap(); +//! let (store, path) = parse_url(&url).unwrap(); +//! assert_eq!(path.as_ref(), "path"); +//! +//! // Potentially with additional options +//! let (store, path) = parse_url_opts(&url, vec![("aws_access_key_id", "...")]).unwrap(); +//! +//! // Or with URLs that encode the bucket name in the URL path +//! let url = Url::parse("https://ACCOUNT_ID.r2.cloudflarestorage.com/bucket/path").unwrap(); +//! let (store, path) = parse_url(&url).unwrap(); +//! assert_eq!(path.as_ref(), "path"); +//! ``` +//! +//! [PyArrow FileSystem]: https://arrow.apache.org/docs/python/generated/pyarrow.fs.FileSystem.html#pyarrow.fs.FileSystem.from_uri +//! [fsspec]: https://filesystem-spec.readthedocs.io/en/latest/api.html#fsspec.filesystem +//! [Hadoop FileSystem]: https://hadoop.apache.org/docs/r3.0.0/api/org/apache/hadoop/fs/FileSystem.html#get-java.net.URI-org.apache.hadoop.conf.Configuration- +//! //! # List objects //! //! Use the [`ObjectStore::list`] method to iterate over objects in From 0bcc2a2f921cd4a40fb95e867226ad0db174d344 Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies Date: Mon, 30 Oct 2023 11:28:40 +0000 Subject: [PATCH 3/3] Review feedback --- object_store/src/lib.rs | 31 +++++++++++++++++++------------ 1 file changed, 19 insertions(+), 12 deletions(-) diff --git a/object_store/src/lib.rs b/object_store/src/lib.rs index de9b8a9eb2ff..992582d2647d 100644 --- a/object_store/src/lib.rs +++ b/object_store/src/lib.rs @@ -44,12 +44,12 @@ //! 2. Production quality, leading this crate to be used in large //! scale production systems, such as [crates.io] and [InfluxDB IOx] //! -//! 3. Support for advanced functionality, including [ACID] reads +//! 3. Support for advanced functionality, including atomic, conditional reads //! and writes, vectored IO, bulk deletion, and more... //! //! 4. Stable and predictable governance via the [Apache Arrow] project //! -//! 5. Very low dependency footprint +//! 5. Small dependency footprint, depending on only a small number of common crates //! //! Originally developed for [InfluxDB IOx] and subsequently donated //! to [Apache Arrow]. @@ -89,7 +89,8 @@ //! # Why not a Filesystem Interface? //! //! Whilst this crate does provide a [`BufReader`], the [`ObjectStore`] interface mirrors the APIs -//! of object stores and not filesystems. +//! of object stores and not filesystems, opting to provide stateless APIs instead of the cursor +//! based interfaces such as [`Read`] or [`Seek`] favoured by filesystems. //! //! This provides some compelling advantages: //! @@ -291,7 +292,7 @@ //! # Vectored Read //! //! A common pattern, especially when reading structured datasets, is to need to fetch -//! multiple ranges of a particular object. +//! multiple, potentially non-contiguous, ranges of a particular object. //! //! [`ObjectStore::get_ranges`] provides an efficient way to perform such vectored IO, and will //! automatically coalesce adjacent ranges into an appropriate number of parallel requests. @@ -333,11 +334,15 @@ //! # use tokio::io::AsyncWriteExt; //! # use object_store::path::Path; //! struct CacheEntry { +//! /// Data returned by last request //! data: Bytes, +//! /// ETag identifying the object returned by the server //! e_tag: String, +//! /// Instant of last refresh //! refreshed_at: Instant, //! } //! +//! /// Example cache that checks entries after 10 seconds for a new version //! struct Cache { //! entries: HashMap, //! store: Arc, @@ -348,7 +353,7 @@ //! Ok(match self.entries.get_mut(path) { //! Some(e) => match e.refreshed_at.elapsed() < Duration::from_secs(10) { //! true => e.data.clone(), // Return cached data -//! false => { +//! false => { // Check if remote version has changed //! let opts = GetOptions { //! if_none_match: Some(e.e_tag.clone()), //! ..GetOptions::default() @@ -362,7 +367,7 @@ //! e.data.clone() //! } //! }, -//! None => { +//! None => { // Not cached, fetch data //! let get = self.store.get(&path).await?; //! let e_tag = get.meta.e_tag.clone(); //! let data = get.bytes().await?; @@ -383,11 +388,11 @@ //! //! # Conditional Put //! -//! The default behaviour when writing data is to upsert any existing object at the given path. -//! More complex behaviours can be achieved using [`PutMode`], and can be used to build -//! [Optimistic Concurrency Control] based transactions. This facilitates building metadata catalogs, -//! such as [Apache Iceberg] or [Delta Lake], directly on top of object storage, without relying on -//! a separate DBMS. +//! The default behaviour when writing data is to upsert any existing object at the given path, +//! overwriting any previous value. More complex behaviours can be achieved using [`PutMode`], and +//! can be used to build [Optimistic Concurrency Control] based transactions. This facilitates +//! building metadata catalogs, such as [Apache Iceberg] or [Delta Lake], directly on top of object +//! storage, without relying on a separate DBMS. //! //! ``` //! # use object_store::{Error, ObjectStore, PutMode, UpdateVersion}; @@ -403,6 +408,8 @@ //! # async fn conditional_put() { //! let store = get_object_store(); //! let path = Path::from("test"); +//! +//! // Perform a conditional update on path //! loop { //! // Perform get request //! let r = store.get(&path).await.unwrap(); @@ -419,7 +426,7 @@ //! // Attempt to commit transaction //! match store.put_opts(&path, new, PutMode::Update(version).into()).await { //! Ok(_) => break, // Successfully committed -//! Err(Error::Precondition { .. }) => continue, // Transaction conflict, try again +//! Err(Error::Precondition { .. }) => continue, // Object has changed, try again //! Err(e) => panic!("{e}") //! } //! }