Skip to content

Commit

Permalink
Improve object_store docs (apache#4978)
Browse files Browse the repository at this point in the history
* Improve object_store docs

* Document configuration system

* Review feedback
  • Loading branch information
tustvold authored Oct 30, 2023
1 parent d9aaa43 commit cc23cac
Showing 1 changed file with 250 additions and 35 deletions.
285 changes: 250 additions & 35 deletions object_store/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,20 +38,27 @@
//!
//! # Highlights
//!
//! 1. A focused, easy to use, idiomatic, well documented, high
//! performance, `async` API.
//! 1. A high-performance async API focused on providing a consistent interface
//! mirroring that of object stores such as [S3]
//!
//! 2. Production quality, leading this crate to be used in large
//! scale production systems, such as [crates.io] and [InfluxDB IOx].
//! scale production systems, such as [crates.io] and [InfluxDB IOx]
//!
//! 3. Stable and predictable governance via the [Apache Arrow] project.
//! 3. Support for advanced functionality, including atomic, conditional reads
//! and writes, vectored IO, bulk deletion, and more...
//!
//! 4. Stable and predictable governance via the [Apache Arrow] project
//!
//! 5. Small dependency footprint, depending on only a small number of common crates
//!
//! Originally developed for [InfluxDB IOx] and subsequently donated
//! to [Apache Arrow].
//!
//! [Apache Arrow]: https://arrow.apache.org/
//! [InfluxDB IOx]: https://github.com/influxdata/influxdb_iox/
//! [crates.io]: https://github.com/rust-lang/crates.io
//! [ACID]: https://en.wikipedia.org/wiki/ACID
//! [S3]: https://aws.amazon.com/s3/
//!
//! # Available [`ObjectStore`] Implementations
//!
Expand Down Expand Up @@ -79,6 +86,23 @@
doc = "* [`http`]: [HTTP/WebDAV Storage](https://datatracker.ietf.org/doc/html/rfc2518). See [`HttpBuilder`](http::HttpBuilder)"
)]
//!
//! # Why not a Filesystem Interface?
//!
//! Whilst this crate does provide a [`BufReader`], the [`ObjectStore`] interface mirrors the APIs
//! of object stores and not filesystems, opting to provide stateless APIs instead of the cursor
//! based interfaces such as [`Read`] or [`Seek`] favoured by filesystems.
//!
//! This provides some compelling advantages:
//!
//! * Except where explicitly stated otherwise, operations are atomic, and readers
//! cannot observe partial and/or failed writes
//! * Methods map directly to object store APIs, providing both efficiency and predictability
//! * Abstracts away filesystem and operating system specific quirks, ensuring portability
//! * Allows for functionality not native to filesystems, such as operation preconditions
//! and atomic multipart uploads
//!
//! [`BufReader`]: buffered::BufReader
//!
//! # Adapters
//!
//! [`ObjectStore`] instances can be composed with various adapters
Expand All @@ -87,8 +111,43 @@
//! * Rate Throttling: [`ThrottleConfig`](throttle::ThrottleConfig)
//! * Concurrent Request Limit: [`LimitStore`](limit::LimitStore)
//!
//! # Configuration System
//!
//! This crate provides a configuration system inspired by the APIs exposed by [fsspec],
//! [PyArrow FileSystem], and [Hadoop FileSystem], allowing creating a [`DynObjectStore`]
//! from a URL and an optional list of key value pairs. This provides a flexible interface
//! to support a wide variety of user-defined store configurations, with minimal additional
//! application complexity.
//!
//! ```no_run
//! # use url::Url;
//! # use object_store::{parse_url, parse_url_opts};
//! # use object_store::aws::{AmazonS3, AmazonS3Builder};
//! #
//! #
//! // Can manually create a specific store variant using the appropriate builder
//! let store: AmazonS3 = AmazonS3Builder::from_env()
//! .with_bucket_name("my-bucket").build().unwrap();
//!
//! # List objects:
//! // Alternatively can create an ObjectStore from an S3 URL
//! let url = Url::parse("s3://bucket/path").unwrap();
//! let (store, path) = parse_url(&url).unwrap();
//! assert_eq!(path.as_ref(), "path");
//!
//! // Potentially with additional options
//! let (store, path) = parse_url_opts(&url, vec![("aws_access_key_id", "...")]).unwrap();
//!
//! // Or with URLs that encode the bucket name in the URL path
//! let url = Url::parse("https://ACCOUNT_ID.r2.cloudflarestorage.com/bucket/path").unwrap();
//! let (store, path) = parse_url(&url).unwrap();
//! assert_eq!(path.as_ref(), "path");
//! ```
//!
//! [PyArrow FileSystem]: https://arrow.apache.org/docs/python/generated/pyarrow.fs.FileSystem.html#pyarrow.fs.FileSystem.from_uri
//! [fsspec]: https://filesystem-spec.readthedocs.io/en/latest/api.html#fsspec.filesystem
//! [Hadoop FileSystem]: https://hadoop.apache.org/docs/r3.0.0/api/org/apache/hadoop/fs/FileSystem.html#get-java.net.URI-org.apache.hadoop.conf.Configuration-
//!
//! # List objects
//!
//! Use the [`ObjectStore::list`] method to iterate over objects in
//! remote storage or files in the local filesystem:
Expand All @@ -111,7 +170,7 @@
//! // Recursively list all files below the 'data' path.
//! // 1. On AWS S3 this would be the 'data/' prefix
//! // 2. On a local filesystem, this would be the 'data' directory
//! let prefix: Path = "data".try_into().unwrap();
//! let prefix = Path::from("data");
//!
//! // Get an `async` stream of Metadata objects:
//! let mut list_stream = object_store.list(Some(&prefix));
Expand Down Expand Up @@ -141,25 +200,34 @@
//! # use futures::TryStreamExt;
//! # use object_store::local::LocalFileSystem;
//! # use std::sync::Arc;
//! # use object_store::{path::Path, ObjectStore};
//! # use bytes::Bytes;
//! # use object_store::{path::Path, ObjectStore, GetResult};
//! # fn get_object_store() -> Arc<dyn ObjectStore> {
//! # Arc::new(LocalFileSystem::new())
//! # }
//! #
//! # async fn example() {
//! #
//! // create an ObjectStore
//! // Create an ObjectStore
//! let object_store: Arc<dyn ObjectStore> = get_object_store();
//!
//! // Retrieve a specific file
//! let path: Path = "data/file01.parquet".try_into().unwrap();
//! let path = Path::from("data/file01.parquet");
//!
//! // Fetch just the file metadata
//! let meta = object_store.head(&path).await.unwrap();
//! println!("{meta:?}");
//!
//! // Fetch the object including metadata
//! let result: GetResult = object_store.get(&path).await.unwrap();
//! assert_eq!(result.meta, meta);
//!
//! // Buffer the entire object in memory
//! let object: Bytes = result.bytes().await.unwrap();
//! assert_eq!(object.len(), meta.size);
//!
//! // fetch the bytes from object store
//! let stream = object_store
//! .get(&path)
//! .await
//! .unwrap()
//! .into_stream();
//! // Alternatively stream the bytes from object storage
//! let stream = object_store.get(&path).await.unwrap().into_stream();
//!
//! // Count the '0's using `try_fold` from `TryStreamExt` trait
//! let num_zeros = stream
Expand All @@ -171,13 +239,9 @@
//! # }
//! ```
//!
//! Which will print out something like the following:
//! # Put Object
//!
//! ```text
//! Num zeros in data/file01.parquet is 657
//! ```
//! # Put object
//! Use the [`ObjectStore::put`] method to save data in remote storage or local filesystem.
//! Use the [`ObjectStore::put`] method to atomically write data.
//!
//! ```
//! # use object_store::local::LocalFileSystem;
Expand All @@ -190,15 +254,17 @@
//! # }
//! # async fn put() {
//! #
//! let object_store: Arc<dyn ObjectStore> = get_object_store();
//! let path: Path = "data/file1".try_into().unwrap();
//! let bytes = Bytes::from_static(b"hello");
//! object_store.put(&path, bytes).await.unwrap();
//! let object_store: Arc<dyn ObjectStore> = get_object_store();
//! let path = Path::from("data/file1");
//! let bytes = Bytes::from_static(b"hello");
//! object_store.put(&path, bytes).await.unwrap();
//! # }
//! ```
//!
//! # Multipart put object
//! Use the [`ObjectStore::put_multipart`] method to save large amount of data in chunks.
//! # Multipart Upload
//!
//! Use the [`ObjectStore::put_multipart`] method to atomically write a large amount of data,
//! with implementations automatically handling parallel, chunked upload where appropriate.
//!
//! ```
//! # use object_store::local::LocalFileSystem;
Expand All @@ -212,16 +278,165 @@
//! # }
//! # async fn multi_upload() {
//! #
//! let object_store: Arc<dyn ObjectStore> = get_object_store();
//! let path: Path = "data/large_file".try_into().unwrap();
//! let (_id, mut writer) = object_store.put_multipart(&path).await.unwrap();
//!
//! let bytes = Bytes::from_static(b"hello");
//! writer.write_all(&bytes).await.unwrap();
//! writer.flush().await.unwrap();
//! writer.shutdown().await.unwrap();
//! let object_store: Arc<dyn ObjectStore> = get_object_store();
//! let path = Path::from("data/large_file");
//! let (_id, mut writer) = object_store.put_multipart(&path).await.unwrap();
//!
//! let bytes = Bytes::from_static(b"hello");
//! writer.write_all(&bytes).await.unwrap();
//! writer.flush().await.unwrap();
//! writer.shutdown().await.unwrap();
//! # }
//! ```
//!
//! # Vectored Read
//!
//! A common pattern, especially when reading structured datasets, is to need to fetch
//! multiple, potentially non-contiguous, ranges of a particular object.
//!
//! [`ObjectStore::get_ranges`] provides an efficient way to perform such vectored IO, and will
//! automatically coalesce adjacent ranges into an appropriate number of parallel requests.
//!
//! ```
//! # use object_store::local::LocalFileSystem;
//! # use object_store::ObjectStore;
//! # use std::sync::Arc;
//! # use bytes::Bytes;
//! # use tokio::io::AsyncWriteExt;
//! # use object_store::path::Path;
//! # fn get_object_store() -> Arc<dyn ObjectStore> {
//! # Arc::new(LocalFileSystem::new())
//! # }
//! # async fn multi_upload() {
//! #
//! let object_store: Arc<dyn ObjectStore> = get_object_store();
//! let path = Path::from("data/large_file");
//! let ranges = object_store.get_ranges(&path, &[90..100, 400..600, 0..10]).await.unwrap();
//! assert_eq!(ranges.len(), 3);
//! assert_eq!(ranges[0].len(), 10);
//! # }
//! ```
//!
//! # Conditional Fetch
//!
//! More complex object retrieval can be supported by [`ObjectStore::get_opts`].
//!
//! For example, efficiently refreshing a cache without re-fetching the entire object
//! data if the object hasn't been modified.
//!
//! ```
//! # use std::collections::btree_map::Entry;
//! # use std::collections::HashMap;
//! # use object_store::{GetOptions, GetResult, ObjectStore, Result, Error};
//! # use std::sync::Arc;
//! # use std::time::{Duration, Instant};
//! # use bytes::Bytes;
//! # use tokio::io::AsyncWriteExt;
//! # use object_store::path::Path;
//! struct CacheEntry {
//! /// Data returned by last request
//! data: Bytes,
//! /// ETag identifying the object returned by the server
//! e_tag: String,
//! /// Instant of last refresh
//! refreshed_at: Instant,
//! }
//!
//! /// Example cache that checks entries after 10 seconds for a new version
//! struct Cache {
//! entries: HashMap<Path, CacheEntry>,
//! store: Arc<dyn ObjectStore>,
//! }
//!
//! impl Cache {
//! pub async fn get(&mut self, path: &Path) -> Result<Bytes> {
//! Ok(match self.entries.get_mut(path) {
//! Some(e) => match e.refreshed_at.elapsed() < Duration::from_secs(10) {
//! true => e.data.clone(), // Return cached data
//! false => { // Check if remote version has changed
//! let opts = GetOptions {
//! if_none_match: Some(e.e_tag.clone()),
//! ..GetOptions::default()
//! };
//! match self.store.get_opts(&path, opts).await {
//! Ok(d) => e.data = d.bytes().await?,
//! Err(Error::NotModified { .. }) => {} // Data has not changed
//! Err(e) => return Err(e),
//! };
//! e.refreshed_at = Instant::now();
//! e.data.clone()
//! }
//! },
//! None => { // Not cached, fetch data
//! let get = self.store.get(&path).await?;
//! let e_tag = get.meta.e_tag.clone();
//! let data = get.bytes().await?;
//! if let Some(e_tag) = e_tag {
//! let entry = CacheEntry {
//! e_tag,
//! data: data.clone(),
//! refreshed_at: Instant::now(),
//! };
//! self.entries.insert(path.clone(), entry);
//! }
//! data
//! }
//! })
//! }
//! }
//! ```
//!
//! # Conditional Put
//!
//! The default behaviour when writing data is to upsert any existing object at the given path,
//! overwriting any previous value. More complex behaviours can be achieved using [`PutMode`], and
//! can be used to build [Optimistic Concurrency Control] based transactions. This facilitates
//! building metadata catalogs, such as [Apache Iceberg] or [Delta Lake], directly on top of object
//! storage, without relying on a separate DBMS.
//!
//! ```
//! # use object_store::{Error, ObjectStore, PutMode, UpdateVersion};
//! # use std::sync::Arc;
//! # use bytes::Bytes;
//! # use tokio::io::AsyncWriteExt;
//! # use object_store::memory::InMemory;
//! # use object_store::path::Path;
//! # fn get_object_store() -> Arc<dyn ObjectStore> {
//! # Arc::new(InMemory::new())
//! # }
//! # fn do_update(b: Bytes) -> Bytes {b}
//! # async fn conditional_put() {
//! let store = get_object_store();
//! let path = Path::from("test");
//!
//! // Perform a conditional update on path
//! loop {
//! // Perform get request
//! let r = store.get(&path).await.unwrap();
//!
//! // Save version information fetched
//! let version = UpdateVersion {
//! e_tag: r.meta.e_tag.clone(),
//! version: r.meta.version.clone(),
//! };
//!
//! // Compute new version of object contents
//! let new = do_update(r.bytes().await.unwrap());
//!
//! // Attempt to commit transaction
//! match store.put_opts(&path, new, PutMode::Update(version).into()).await {
//! Ok(_) => break, // Successfully committed
//! Err(Error::Precondition { .. }) => continue, // Object has changed, try again
//! Err(e) => panic!("{e}")
//! }
//! }
//! # }
//! ```
//!
//! [Optimistic Concurrency Control]: https://en.wikipedia.org/wiki/Optimistic_concurrency_control
//! [Apache Iceberg]: https://iceberg.apache.org/
//! [Delta Lake]: https://delta.io/
//!
#[cfg(all(
target_arch = "wasm32",
Expand Down

0 comments on commit cc23cac

Please sign in to comment.