forked from delta-io/delta-rs
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
feat: default logstore implementation (delta-io#1742)
# Description Introduce a `LogStore` abstraction to channel all log store reads and writes through a single place. This is supposed to allow implementations with more sophisticated locking mechanisms that do not rely on atomic rename semantics for the underlying object store. This does not change any functionality - it reorganizes read operations and commits on the delta commit log to be funneled through the respective methods of `LogStore`. ## Rationale The goal is to align the implementation of multi-cluster writes for Delta Lake on S3 with the one provided by the original `delta` library, enabling multi-cluster writes with some writers using Spark / Delta library and other writers using `delta-rs` For an overview of how it's done in delta, please see: 1. Delta [blog post](https://delta.io/blog/2022-05-18-multi-cluster-writes-to-delta-lake-storage-in-s3/) (high-level concept) 2. Associated Databricks [design doc](https://docs.google.com/document/d/1Gs4ZsTH19lMxth4BSdwlWjUNR-XhKHicDvBjd2RqNd8/edit#heading=h.mjjuxw9mcz9h) (detailed read) 3. [S3DynamoDbLogStore.java](https://github.com/delta-io/delta/blob/master/storage-s3-dynamodb/src/main/java/io/delta/storage/S3DynamoDBLogStore.java)(content warning: Java code behind this link) This approach requires readers of a delta table to "recover" unfinished commits from writers - as a result, reading and writing is combined in a single interface, which in this PR is modeled after [LogStore.java](https://github.com/delta-io/delta/blob/master/storage/src/main/java/io/delta/storage/LogStore.java). Currently in `delta-rs`, read path for commits is implemented directly in `DeltaTable`, and there's no mechanism to implement storage-specific behavior like interacting with DynamoDb. --------- Co-authored-by: Robert Pack <[email protected]>
- Loading branch information
1 parent
90b7741
commit 809f645
Showing
45 changed files
with
826 additions
and
747 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,89 @@ | ||
//! Default implementation of [`LogStore`] for storage backends with atomic put-if-absent operation | ||
use std::sync::Arc; | ||
|
||
use bytes::Bytes; | ||
#[cfg(feature = "datafusion")] | ||
use datafusion::execution::object_store::ObjectStoreUrl; | ||
use object_store::{path::Path, ObjectStore}; | ||
use url::Url; | ||
|
||
use super::{LogStore, LogStoreConfig}; | ||
use crate::{ | ||
operations::transaction::TransactionError, | ||
storage::{ | ||
config::{self, StorageOptions}, | ||
ObjectStoreRef, | ||
}, | ||
DeltaResult, | ||
}; | ||
|
||
/// Default [`LogStore`] implementation | ||
#[derive(Debug, Clone)] | ||
pub struct DefaultLogStore { | ||
pub(crate) storage: Arc<dyn ObjectStore>, | ||
config: LogStoreConfig, | ||
} | ||
|
||
impl DefaultLogStore { | ||
/// Create a new instance of [`DefaultLogStore`] | ||
/// | ||
/// # Arguments | ||
/// | ||
/// * `storage` - A shared reference to an [`object_store::ObjectStore`] with "/" pointing at delta table root (i.e. where `_delta_log` is located). | ||
/// * `location` - A url corresponding to the storage location of `storage`. | ||
pub fn new(storage: ObjectStoreRef, config: LogStoreConfig) -> Self { | ||
Self { storage, config } | ||
} | ||
|
||
/// Create log store | ||
pub fn try_new(location: Url, options: impl Into<StorageOptions> + Clone) -> DeltaResult<Self> { | ||
let mut options = options.into(); | ||
let storage = config::configure_store(&location, &mut options)?; | ||
Ok(Self { | ||
storage: Arc::new(storage), | ||
config: LogStoreConfig { location, options }, | ||
}) | ||
} | ||
} | ||
|
||
#[async_trait::async_trait] | ||
impl LogStore for DefaultLogStore { | ||
async fn read_commit_entry(&self, version: i64) -> DeltaResult<Bytes> { | ||
super::read_commit_entry(self.storage.as_ref(), version).await | ||
} | ||
|
||
/// Tries to commit a prepared commit file. Returns [`TransactionError`] | ||
/// if the given `version` already exists. The caller should handle the retry logic itself. | ||
/// This is low-level transaction API. If user does not want to maintain the commit loop then | ||
/// the `DeltaTransaction.commit` is desired to be used as it handles `try_commit_transaction` | ||
/// with retry logic. | ||
async fn write_commit_entry( | ||
&self, | ||
version: i64, | ||
tmp_commit: &Path, | ||
) -> Result<(), TransactionError> { | ||
super::write_commit_entry(self.storage.as_ref(), version, tmp_commit).await | ||
} | ||
|
||
async fn get_latest_version(&self, current_version: i64) -> DeltaResult<i64> { | ||
super::get_latest_version(self, current_version).await | ||
} | ||
|
||
fn object_store(&self) -> Arc<dyn ObjectStore> { | ||
self.storage.clone() | ||
} | ||
|
||
fn to_uri(&self, location: &Path) -> String { | ||
super::to_uri(&self.config.location, location) | ||
} | ||
|
||
#[cfg(feature = "datafusion")] | ||
fn object_store_url(&self) -> ObjectStoreUrl { | ||
super::object_store_url(&self.config.location) | ||
} | ||
|
||
fn config(&self) -> &LogStoreConfig { | ||
&self.config | ||
} | ||
} |
Oops, something went wrong.