Skip to content

Commit

Permalink
Improve object_store docs
Browse files Browse the repository at this point in the history
  • Loading branch information
tustvold committed Oct 27, 2023
1 parent e3cce56 commit e274581
Showing 1 changed file with 197 additions and 35 deletions.
232 changes: 197 additions & 35 deletions object_store/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,20 +38,26 @@
//!
//! # Highlights
//!
//! 1. A focused, easy to use, idiomatic, well documented, high
//! performance, `async` API.
//! 1. A high-performance async API focused on providing a consistent interface
//! mirroring that of the underlying object stores
//!
//! 2. Production quality, leading this crate to be used in large
//! scale production systems, such as [crates.io] and [InfluxDB IOx].
//! scale production systems, such as [crates.io] and [InfluxDB IOx]
//!
//! 3. Stable and predictable governance via the [Apache Arrow] project.
//! 3. Support for advanced functionality, including [ACID] reads
//! and writes, vectored IO, bulk deletion, and more
//!
//! 4. Stable and predictable governance via the [Apache Arrow] project
//!
//! 5. Very low dependency footprint
//!
//! Originally developed for [InfluxDB IOx] and subsequently donated
//! to [Apache Arrow].
//!
//! [Apache Arrow]: https://arrow.apache.org/
//! [InfluxDB IOx]: https://github.com/influxdata/influxdb_iox/
//! [crates.io]: https://github.com/rust-lang/crates.io
//! [ACID]: https://en.wikipedia.org/wiki/ACID
//!
//! # Available [`ObjectStore`] Implementations
//!
Expand Down Expand Up @@ -79,6 +85,22 @@
doc = "* [`http`]: [HTTP/WebDAV Storage](https://datatracker.ietf.org/doc/html/rfc2518). See [`HttpBuilder`](http::HttpBuilder)"
)]
//!
//! # Why not a Filesystem Interface?
//!
//! Whilst this crate does provide a [`BufReader`], the [`ObjectStore`] interface mirrors the APIs
//! of object stores and not filesystems.
//!
//! This provides some compelling advantages:
//!
//! * Except where explicitly stated otherwise, operations are atomic, and readers
//! cannot observe partial and/or failed writes
//! * Methods map directly to object store APIs, providing both efficiency and predictability
//! * Abstracts away filesystem and operating system specific quirks, ensuring portability
//! * Allows for functionality not native to filesystems, such as operation preconditions
//! and atomic multipart uploads
//!
//! [`BufReader`]: buffered::BufReader
//!
//! # Adapters
//!
//! [`ObjectStore`] instances can be composed with various adapters
Expand All @@ -87,8 +109,7 @@
//! * Rate Throttling: [`ThrottleConfig`](throttle::ThrottleConfig)
//! * Concurrent Request Limit: [`LimitStore`](limit::LimitStore)
//!
//!
//! # List objects:
//! # List objects
//!
//! Use the [`ObjectStore::list`] method to iterate over objects in
//! remote storage or files in the local filesystem:
Expand All @@ -111,7 +132,7 @@
//! // Recursively list all files below the 'data' path.
//! // 1. On AWS S3 this would be the 'data/' prefix
//! // 2. On a local filesystem, this would be the 'data' directory
//! let prefix: Path = "data".try_into().unwrap();
//! let prefix = Path::from("data");
//!
//! // Get an `async` stream of Metadata objects:
//! let mut list_stream = object_store.list(Some(&prefix));
Expand Down Expand Up @@ -148,18 +169,18 @@
//! #
//! # async fn example() {
//! #
//! // create an ObjectStore
//! // Create an ObjectStore
//! let object_store: Arc<dyn ObjectStore> = get_object_store();
//!
//! // Retrieve a specific file
//! let path: Path = "data/file01.parquet".try_into().unwrap();
//! let path = Path::from("data/file01.parquet");
//!
//! // fetch the bytes from object store
//! let stream = object_store
//! .get(&path)
//! .await
//! .unwrap()
//! .into_stream();
//! // Fetch the file metadata
//! let meta = object_store.head(&path).await.unwrap();
//! println!("{meta:?}");
//!
//! // Fetch the bytes from object store as a stream
//! let stream = object_store.get(&path).await.unwrap().into_stream();
//!
//! // Count the '0's using `try_fold` from `TryStreamExt` trait
//! let num_zeros = stream
Expand All @@ -171,13 +192,9 @@
//! # }
//! ```
//!
//! Which will print out something like the following:
//! # Put Object
//!
//! ```text
//! Num zeros in data/file01.parquet is 657
//! ```
//! # Put object
//! Use the [`ObjectStore::put`] method to save data in remote storage or local filesystem.
//! Use the [`ObjectStore::put`] method to atomically write data.
//!
//! ```
//! # use object_store::local::LocalFileSystem;
Expand All @@ -190,15 +207,17 @@
//! # }
//! # async fn put() {
//! #
//! let object_store: Arc<dyn ObjectStore> = get_object_store();
//! let path: Path = "data/file1".try_into().unwrap();
//! let bytes = Bytes::from_static(b"hello");
//! object_store.put(&path, bytes).await.unwrap();
//! let object_store: Arc<dyn ObjectStore> = get_object_store();
//! let path = Path::from("data/file1");
//! let bytes = Bytes::from_static(b"hello");
//! object_store.put(&path, bytes).await.unwrap();
//! # }
//! ```
//!
//! # Multipart put object
//! Use the [`ObjectStore::put_multipart`] method to save large amount of data in chunks.
//! # Multipart Upload
//!
//! Use the [`ObjectStore::put_multipart`] method to atomically write a large amount of data,
//! with implementations automatically handling parallel, chunked upload where appropriate.
//!
//! ```
//! # use object_store::local::LocalFileSystem;
Expand All @@ -212,16 +231,159 @@
//! # }
//! # async fn multi_upload() {
//! #
//! let object_store: Arc<dyn ObjectStore> = get_object_store();
//! let path: Path = "data/large_file".try_into().unwrap();
//! let (_id, mut writer) = object_store.put_multipart(&path).await.unwrap();
//!
//! let bytes = Bytes::from_static(b"hello");
//! writer.write_all(&bytes).await.unwrap();
//! writer.flush().await.unwrap();
//! writer.shutdown().await.unwrap();
//! let object_store: Arc<dyn ObjectStore> = get_object_store();
//! let path = Path::from("data/large_file");
//! let (_id, mut writer) = object_store.put_multipart(&path).await.unwrap();
//!
//! let bytes = Bytes::from_static(b"hello");
//! writer.write_all(&bytes).await.unwrap();
//! writer.flush().await.unwrap();
//! writer.shutdown().await.unwrap();
//! # }
//! ```
//!
//! # Vectored Read
//!
//! A common pattern, especially when reading structured datasets, is to need to fetch
//! multiple ranges of a particular object.
//!
//! [`ObjectStore::get_ranges`] provides an efficient way to perform such vectored IO, and will
//! automatically coalesce adjacent ranges into an appropriate number of parallel requests.
//!
//! ```
//! # use object_store::local::LocalFileSystem;
//! # use object_store::ObjectStore;
//! # use std::sync::Arc;
//! # use bytes::Bytes;
//! # use tokio::io::AsyncWriteExt;
//! # use object_store::path::Path;
//! # fn get_object_store() -> Arc<dyn ObjectStore> {
//! # Arc::new(LocalFileSystem::new())
//! # }
//! # async fn multi_upload() {
//! #
//! let object_store: Arc<dyn ObjectStore> = get_object_store();
//! let path = Path::from("data/large_file");
//! let ranges = object_store.get_ranges(&path, &[90..100, 400..600, 0..10]).await.unwrap();
//! assert_eq!(ranges.len(), 3);
//! assert_eq!(ranges[0].len(), 10);
//! # }
//! ```
//!
//! # Conditional Fetch
//!
//! More complex object retrieval can be supported by [`ObjectStore::get_opts`].
//!
//! For example, efficiently refreshing a cache without re-fetching the entire object
//! data if the object hasn't been modified.
//!
//! ```
//! # use std::collections::btree_map::Entry;
//! # use std::collections::HashMap;
//! # use object_store::{GetOptions, GetResult, ObjectStore, Result, Error};
//! # use std::sync::Arc;
//! # use std::time::{Duration, Instant};
//! # use bytes::Bytes;
//! # use tokio::io::AsyncWriteExt;
//! # use object_store::path::Path;
//! struct CacheEntry {
//! data: Bytes,
//! e_tag: String,
//! refreshed_at: Instant,
//! }
//!
//! struct Cache {
//! entries: HashMap<Path, CacheEntry>,
//! store: Arc<dyn ObjectStore>,
//! }
//!
//! impl Cache {
//! pub async fn get(&mut self, path: &Path) -> Result<Bytes> {
//! Ok(match self.entries.get_mut(path) {
//! Some(e) => match e.refreshed_at.elapsed() < Duration::from_secs(10) {
//! true => e.data.clone(), // Return cached data
//! false => {
//! let opts = GetOptions {
//! if_none_match: Some(e.e_tag.clone()),
//! ..GetOptions::default()
//! };
//! match self.store.get_opts(&path, opts).await {
//! Ok(d) => e.data = d.bytes().await?,
//! Err(Error::NotModified { .. }) => {} // Data has not changed
//! Err(e) => return Err(e),
//! };
//! e.refreshed_at = Instant::now();
//! e.data.clone()
//! }
//! },
//! None => {
//! let get = self.store.get(&path).await?;
//! let e_tag = get.meta.e_tag.clone();
//! let data = get.bytes().await?;
//! if let Some(e_tag) = e_tag {
//! let entry = CacheEntry {
//! e_tag,
//! data: data.clone(),
//! refreshed_at: Instant::now(),
//! };
//! self.entries.insert(path.clone(), entry);
//! }
//! data
//! }
//! })
//! }
//! }
//! ```
//!
//! # Conditional Put
//!
//! The default behaviour when writing data is to upsert any existing object at the given path.
//! More complex behaviours can be achieved using [`PutMode`], and can be used to build
//! [Optimistic Concurrency Control] based transactions. This facilitates building metadata catalogs,
//! such as [Apache Iceberg] or [Delta Lake], directly on top of object storage, without relying on
//! a separate DBMS.
//!
//! ```
//! # use object_store::{Error, ObjectStore, PutMode, UpdateVersion};
//! # use std::sync::Arc;
//! # use bytes::Bytes;
//! # use tokio::io::AsyncWriteExt;
//! # use object_store::memory::InMemory;
//! # use object_store::path::Path;
//! # fn get_object_store() -> Arc<dyn ObjectStore> {
//! # Arc::new(InMemory::new())
//! # }
//! # fn do_update(b: Bytes) -> Bytes {b}
//! # async fn conditional_put() {
//! let store = get_object_store();
//! let path = Path::from("test");
//! loop {
//! // Perform get request
//! let r = store.get(&path).await.unwrap();
//!
//! // Save version information fetched
//! let version = UpdateVersion {
//! e_tag: r.meta.e_tag.clone(),
//! version: r.meta.version.clone(),
//! };
//!
//! // Compute new version of object contents
//! let new = do_update(r.bytes().await.unwrap());
//!
//! // Attempt to commit transaction
//! match store.put_opts(&path, new, PutMode::Update(version).into()).await {
//! Ok(_) => break, // Successfully committed
//! Err(Error::Precondition { .. }) => continue, // Transaction conflict, try again
//! Err(e) => panic!("{e}")
//! }
//! }
//! # }
//! ```
//!
//! [Optimistic Concurrency Control]: https://en.wikipedia.org/wiki/Optimistic_concurrency_control
//! [Apache Iceberg]: https://iceberg.apache.org/
//! [Delta Lake]: https://delta.io/
//!
#[cfg(all(
target_arch = "wasm32",
Expand Down

0 comments on commit e274581

Please sign in to comment.