Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve object_store docs #4978

Merged
merged 3 commits into from
Oct 30, 2023
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
244 changes: 208 additions & 36 deletions object_store/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,20 +38,27 @@
//!
//! # Highlights
//!
//! 1. A focused, easy to use, idiomatic, well documented, high
//! performance, `async` API.
//! 1. A high-performance async API focused on providing a consistent interface
//! mirroring that of object stores such as [S3]
//!
//! 2. Production quality, leading this crate to be used in large
//! scale production systems, such as [crates.io] and [InfluxDB IOx].
//! scale production systems, such as [crates.io] and [InfluxDB IOx]
//!
//! 3. Stable and predictable governance via the [Apache Arrow] project.
//! 3. Support for advanced functionality, including [ACID] reads
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suggest you don't use ACID here as it is a database term and not as commonly applied to object stores in my experience.

For example, I think the claim that the D in ACID for Durable counts as "advanced" functionality for object stores / their APIs is confusing. It is also unclear to me what this crates adds for Isolation in the classic database sense of the word.

Perhaps a better term would be "atomic reads/writes" which also highlights the feature I think you are hinting at

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was trying to allude that you could build ACID transactions on top, but fair enough will change

//! and writes, vectored IO, bulk deletion, and more...
//!
//! 4. Stable and predictable governance via the [Apache Arrow] project
//!
//! 5. Very low dependency footprint
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
//! 5. Very low dependency footprint
//! 5. Small dependency footprint (depends on only a small number of common crates)

//!
//! Originally developed for [InfluxDB IOx] and subsequently donated
//! to [Apache Arrow].
//!
//! [Apache Arrow]: https://arrow.apache.org/
//! [InfluxDB IOx]: https://github.com/influxdata/influxdb_iox/
//! [crates.io]: https://github.com/rust-lang/crates.io
//! [ACID]: https://en.wikipedia.org/wiki/ACID
//! [S3]: https://aws.amazon.com/s3/
//!
//! # Available [`ObjectStore`] Implementations
//!
Expand Down Expand Up @@ -79,6 +86,22 @@
doc = "* [`http`]: [HTTP/WebDAV Storage](https://datatracker.ietf.org/doc/html/rfc2518). See [`HttpBuilder`](http::HttpBuilder)"
)]
//!
//! # Why not a Filesystem Interface?
//!
//! Whilst this crate does provide a [`BufReader`], the [`ObjectStore`] interface mirrors the APIs
//! of object stores and not filesystems.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it might help to give an example of what a filesystem API means. Perhaps something like

Suggested change
//! of object stores and not filesystems.
//! of object stores and not filesystems. For example, it purposely does not offer a `Seek` like API.

//!
//! This provides some compelling advantages:
//!
//! * Except where explicitly stated otherwise, operations are atomic, and readers
//! cannot observe partial and/or failed writes
//! * Methods map directly to object store APIs, providing both efficiency and predictability
//! * Abstracts away filesystem and operating system specific quirks, ensuring portability
//! * Allows for functionality not native to filesystems, such as operation preconditions
//! and atomic multipart uploads
//!
//! [`BufReader`]: buffered::BufReader
//!
//! # Adapters
//!
//! [`ObjectStore`] instances can be composed with various adapters
Expand All @@ -87,8 +110,7 @@
//! * Rate Throttling: [`ThrottleConfig`](throttle::ThrottleConfig)
//! * Concurrent Request Limit: [`LimitStore`](limit::LimitStore)
//!
//!
//! # List objects:
//! # List objects
//!
//! Use the [`ObjectStore::list`] method to iterate over objects in
//! remote storage or files in the local filesystem:
Expand All @@ -111,7 +133,7 @@
//! // Recursively list all files below the 'data' path.
//! // 1. On AWS S3 this would be the 'data/' prefix
//! // 2. On a local filesystem, this would be the 'data' directory
//! let prefix: Path = "data".try_into().unwrap();
//! let prefix = Path::from("data");
//!
//! // Get an `async` stream of Metadata objects:
//! let mut list_stream = object_store.list(Some(&prefix));
Expand Down Expand Up @@ -141,25 +163,34 @@
//! # use futures::TryStreamExt;
//! # use object_store::local::LocalFileSystem;
//! # use std::sync::Arc;
//! # use object_store::{path::Path, ObjectStore};
//! # use bytes::Bytes;
//! # use object_store::{path::Path, ObjectStore, GetResult};
//! # fn get_object_store() -> Arc<dyn ObjectStore> {
//! # Arc::new(LocalFileSystem::new())
//! # }
//! #
//! # async fn example() {
//! #
//! // create an ObjectStore
//! // Create an ObjectStore
//! let object_store: Arc<dyn ObjectStore> = get_object_store();
//!
//! // Retrieve a specific file
//! let path: Path = "data/file01.parquet".try_into().unwrap();
//! let path = Path::from("data/file01.parquet");
tustvold marked this conversation as resolved.
Show resolved Hide resolved
//!
//! // Fetch just the file metadata
//! let meta = object_store.head(&path).await.unwrap();
//! println!("{meta:?}");
//!
//! // Fetch the object including metadata
//! let result: GetResult = object_store.get(&path).await.unwrap();
//! assert_eq!(result.meta, meta);
//!
//! // fetch the bytes from object store
//! let stream = object_store
//! .get(&path)
//! .await
//! .unwrap()
//! .into_stream();
//! // Buffer the entire object in memory
//! let object: Bytes = result.bytes().await.unwrap();
//! assert_eq!(object.len(), meta.size);
//!
//! // Alternatively stream the bytes from object storage
//! let stream = object_store.get(&path).await.unwrap().into_stream();
//!
//! // Count the '0's using `try_fold` from `TryStreamExt` trait
//! let num_zeros = stream
Expand All @@ -171,13 +202,9 @@
//! # }
//! ```
//!
//! Which will print out something like the following:
//! # Put Object
//!
//! ```text
//! Num zeros in data/file01.parquet is 657
//! ```
//! # Put object
//! Use the [`ObjectStore::put`] method to save data in remote storage or local filesystem.
//! Use the [`ObjectStore::put`] method to atomically write data.
//!
//! ```
//! # use object_store::local::LocalFileSystem;
Expand All @@ -190,15 +217,48 @@
//! # }
//! # async fn put() {
//! #
//! let object_store: Arc<dyn ObjectStore> = get_object_store();
//! let path: Path = "data/file1".try_into().unwrap();
//! let bytes = Bytes::from_static(b"hello");
//! object_store.put(&path, bytes).await.unwrap();
//! let object_store: Arc<dyn ObjectStore> = get_object_store();
//! let path = Path::from("data/file1");
//! let bytes = Bytes::from_static(b"hello");
//! object_store.put(&path, bytes).await.unwrap();
//! # }
//! ```
//!
//! # Multipart Upload
//!
//! Use the [`ObjectStore::put_multipart`] method to atomically write a large amount of data,
//! with implementations automatically handling parallel, chunked upload where appropriate.
//!
//! ```
//! # use object_store::local::LocalFileSystem;
//! # use object_store::ObjectStore;
//! # use std::sync::Arc;
//! # use bytes::Bytes;
//! # use tokio::io::AsyncWriteExt;
//! # use object_store::path::Path;
//! # fn get_object_store() -> Arc<dyn ObjectStore> {
//! # Arc::new(LocalFileSystem::new())
//! # }
//! # async fn multi_upload() {
//! #
//! let object_store: Arc<dyn ObjectStore> = get_object_store();
//! let path = Path::from("data/large_file");
//! let (_id, mut writer) = object_store.put_multipart(&path).await.unwrap();
//!
//! let bytes = Bytes::from_static(b"hello");
//! writer.write_all(&bytes).await.unwrap();
//! writer.flush().await.unwrap();
//! writer.shutdown().await.unwrap();
//! # }
//! ```
//!
//! # Multipart put object
//! Use the [`ObjectStore::put_multipart`] method to save large amount of data in chunks.
//! # Vectored Read
//!
//! A common pattern, especially when reading structured datasets, is to need to fetch
//! multiple ranges of a particular object.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
//! A common pattern, especially when reading structured datasets, is to need to fetch
//! multiple ranges of a particular object.
//! A common pattern, especially when reading structured datasets, is to fetch
//! multiple, potentially discontiguous, ranges of a particular object.

//!
//! [`ObjectStore::get_ranges`] provides an efficient way to perform such vectored IO, and will
//! automatically coalesce adjacent ranges into an appropriate number of parallel requests.
//!
//! ```
//! # use object_store::local::LocalFileSystem;
Expand All @@ -212,16 +272,128 @@
//! # }
//! # async fn multi_upload() {
//! #
//! let object_store: Arc<dyn ObjectStore> = get_object_store();
//! let path: Path = "data/large_file".try_into().unwrap();
//! let (_id, mut writer) = object_store.put_multipart(&path).await.unwrap();
//!
//! let bytes = Bytes::from_static(b"hello");
//! writer.write_all(&bytes).await.unwrap();
//! writer.flush().await.unwrap();
//! writer.shutdown().await.unwrap();
//! let object_store: Arc<dyn ObjectStore> = get_object_store();
//! let path = Path::from("data/large_file");
//! let ranges = object_store.get_ranges(&path, &[90..100, 400..600, 0..10]).await.unwrap();
//! assert_eq!(ranges.len(), 3);
//! assert_eq!(ranges[0].len(), 10);
//! # }
//! ```
//!
//! # Conditional Fetch
//!
//! More complex object retrieval can be supported by [`ObjectStore::get_opts`].
//!
//! For example, efficiently refreshing a cache without re-fetching the entire object
//! data if the object hasn't been modified.
//!
//! ```
//! # use std::collections::btree_map::Entry;
//! # use std::collections::HashMap;
//! # use object_store::{GetOptions, GetResult, ObjectStore, Result, Error};
//! # use std::sync::Arc;
//! # use std::time::{Duration, Instant};
//! # use bytes::Bytes;
//! # use tokio::io::AsyncWriteExt;
//! # use object_store::path::Path;
//! struct CacheEntry {
//! data: Bytes,
//! e_tag: String,
//! refreshed_at: Instant,
//! }
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
//! struct CacheEntry {
//! data: Bytes,
//! e_tag: String,
//! refreshed_at: Instant,
//! }
//! struct CacheEntry {
//! /// Data returned by last request
//! data: Bytes,
//! /// Entity tag, provided by the server, identifying the object
//! e_tag: String,
//! /// Time of last refresh
//! refreshed_at: Instant,
//! }

//!
//! struct Cache {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
//! struct Cache {
//! /// Example cache that returns a cached version of data from a remote
//! /// ObjectStore, checking every 10 seconds for a new version
//! struct Cache {

//! entries: HashMap<Path, CacheEntry>,
//! store: Arc<dyn ObjectStore>,
//! }
//!
//! impl Cache {
//! pub async fn get(&mut self, path: &Path) -> Result<Bytes> {
//! Ok(match self.entries.get_mut(path) {
//! Some(e) => match e.refreshed_at.elapsed() < Duration::from_secs(10) {
//! true => e.data.clone(), // Return cached data
//! false => {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
//! false => {
//! false => { // check if remote version has changed

//! let opts = GetOptions {
//! if_none_match: Some(e.e_tag.clone()),
//! ..GetOptions::default()
//! };
//! match self.store.get_opts(&path, opts).await {
//! Ok(d) => e.data = d.bytes().await?,
//! Err(Error::NotModified { .. }) => {} // Data has not changed
//! Err(e) => return Err(e),
//! };
//! e.refreshed_at = Instant::now();
//! e.data.clone()
//! }
//! },
//! None => {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
//! None => {
//! None => { // not cached, fetch value

//! let get = self.store.get(&path).await?;
//! let e_tag = get.meta.e_tag.clone();
//! let data = get.bytes().await?;
//! if let Some(e_tag) = e_tag {
//! let entry = CacheEntry {
//! e_tag,
//! data: data.clone(),
//! refreshed_at: Instant::now(),
//! };
//! self.entries.insert(path.clone(), entry);
//! }
//! data
//! }
//! })
//! }
//! }
//! ```
//!
//! # Conditional Put
//!
//! The default behaviour when writing data is to upsert any existing object at the given path.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
//! The default behaviour when writing data is to upsert any existing object at the given path.
//! The default behaviour when writing data is to upsert any existing object at the given path, overwriting any previous values.

//! More complex behaviours can be achieved using [`PutMode`], and can be used to build
//! [Optimistic Concurrency Control] based transactions. This facilitates building metadata catalogs,
//! such as [Apache Iceberg] or [Delta Lake], directly on top of object storage, without relying on
//! a separate DBMS.
//!
//! ```
//! # use object_store::{Error, ObjectStore, PutMode, UpdateVersion};
//! # use std::sync::Arc;
//! # use bytes::Bytes;
//! # use tokio::io::AsyncWriteExt;
//! # use object_store::memory::InMemory;
//! # use object_store::path::Path;
//! # fn get_object_store() -> Arc<dyn ObjectStore> {
//! # Arc::new(InMemory::new())
//! # }
//! # fn do_update(b: Bytes) -> Bytes {b}
//! # async fn conditional_put() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It might be helpful to have this name exposed

Suggested change
//! # async fn conditional_put() {
//! async fn conditional_put() {

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I opted to add a comment

//! let store = get_object_store();
//! let path = Path::from("test");
//! loop {
//! // Perform get request
//! let r = store.get(&path).await.unwrap();
//!
//! // Save version information fetched
//! let version = UpdateVersion {
//! e_tag: r.meta.e_tag.clone(),
//! version: r.meta.version.clone(),
//! };
//!
//! // Compute new version of object contents
//! let new = do_update(r.bytes().await.unwrap());
//!
//! // Attempt to commit transaction
//! match store.put_opts(&path, new, PutMode::Update(version).into()).await {
//! Ok(_) => break, // Successfully committed
//! Err(Error::Precondition { .. }) => continue, // Transaction conflict, try again
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
//! Err(Error::Precondition { .. }) => continue, // Transaction conflict, try again
//! Err(Error::Precondition { .. }) => continue, // object has been changed, try again

//! Err(e) => panic!("{e}")
//! }
//! }
//! # }
//! ```
//!
//! [Optimistic Concurrency Control]: https://en.wikipedia.org/wiki/Optimistic_concurrency_control
//! [Apache Iceberg]: https://iceberg.apache.org/
//! [Delta Lake]: https://delta.io/
//!

#[cfg(all(
target_arch = "wasm32",
Expand Down
Loading