Skip to content

Commit

Permalink
DeltaObjectStore: move additional functionality into LogStore
Browse files Browse the repository at this point in the history
  • Loading branch information
dispanser committed Nov 7, 2023
1 parent 4493f56 commit 711c031
Show file tree
Hide file tree
Showing 13 changed files with 136 additions and 140 deletions.
11 changes: 5 additions & 6 deletions crates/deltalake-core/src/delta_datafusion/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,6 @@ use crate::errors::{DeltaResult, DeltaTableError};
use crate::kernel::{Add, DataType as DeltaDataType, Invariant, PrimitiveType};
use crate::logstore::LogStoreRef;
use crate::protocol::{self};
use crate::storage::ObjectStoreRef;
use crate::table::builder::ensure_table_uri;
use crate::table::state::DeltaTableState;
use crate::{open_table, open_table_with_storage_options, DeltaTable};
Expand Down Expand Up @@ -358,10 +357,10 @@ impl PruningStatistics for DeltaTable {

// each delta table must register a specific object store, since paths are internally
// handled relative to the table root.
pub(crate) fn register_store(store: ObjectStoreRef, env: Arc<RuntimeEnv>) {
pub(crate) fn register_store(store: LogStoreRef, env: Arc<RuntimeEnv>) {
let object_store_url = store.object_store_url();
let url: &Url = object_store_url.as_ref();
env.register_object_store(url, store);
env.register_object_store(url, store.object_store());
}

pub(crate) fn logical_schema(
Expand Down Expand Up @@ -633,7 +632,7 @@ impl<'a> DeltaScanBuilder<'a> {
.create_physical_plan(
self.state,
FileScanConfig {
object_store_url: self.log_store.object_store().object_store_url(),
object_store_url: self.log_store.object_store_url(),
file_schema,
file_groups: file_groups.into_values().collect(),
statistics: self.snapshot.datafusion_table_statistics(),
Expand Down Expand Up @@ -685,7 +684,7 @@ impl TableProvider for DeltaTable {
filters: &[Expr],
limit: Option<usize>,
) -> DataFusionResult<Arc<dyn ExecutionPlan>> {
register_store(self.object_store(), session.runtime_env().clone());
register_store(self.log_store(), session.runtime_env().clone());
let filter_expr = conjunction(filters.iter().cloned());

let scan = DeltaScanBuilder::new(&self.state, self.log_store(), session)
Expand Down Expand Up @@ -763,7 +762,7 @@ impl TableProvider for DeltaTableProvider {
filters: &[Expr],
limit: Option<usize>,
) -> DataFusionResult<Arc<dyn ExecutionPlan>> {
register_store(self.log_store.object_store(), session.runtime_env().clone());
register_store(self.log_store.clone(), session.runtime_env().clone());
let filter_expr = conjunction(filters.iter().cloned());

let scan = DeltaScanBuilder::new(&self.snapshot, self.log_store.clone(), session)
Expand Down
9 changes: 8 additions & 1 deletion crates/deltalake-core/src/logstore/default_logstore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
use std::sync::Arc;

use bytes::Bytes;
#[cfg(feature = "datafusion")]
use datafusion::execution::object_store::ObjectStoreUrl;
use object_store::path::Path;
use url::Url;

Expand Down Expand Up @@ -47,7 +49,7 @@ impl LogStore for DefaultLogStore {
super::read_commit_entry(self.storage.as_ref(), version).await
}

/// Tries to commit a prepared commit file. Returns [DeltaTableError::VersionAlreadyExists]
/// Tries to commit a prepared commit file. Returns [`TransactionError`]
/// if the given `version` already exists. The caller should handle the retry logic itself.
/// This is low-level transaction API. If user does not want to maintain the commit loop then
/// the `DeltaTransaction.commit` is desired to be used as it handles `try_commit_transaction`
Expand All @@ -71,4 +73,9 @@ impl LogStore for DefaultLogStore {
fn to_uri(&self, location: &Path) -> String {
super::to_uri(&self.location, location)
}

#[cfg(feature = "datafusion")]
fn object_store_url(&self) -> ObjectStoreUrl {
super::object_store_url(&self.location)
}
}
92 changes: 88 additions & 4 deletions crates/deltalake-core/src/logstore/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,12 @@ use crate::{
};
use bytes::Bytes;
use log::debug;
use object_store::{path::Path, Error as ObjectStoreError, ObjectStore};
use object_store::{
path::Path, Error as ObjectStoreError, ObjectStore, Result as ObjectStoreResult,
};

#[cfg(feature = "datafusion")]
use datafusion::datasource::object_store::ObjectStoreUrl;

pub mod default_logstore;

Expand Down Expand Up @@ -44,7 +49,7 @@ pub trait LogStore: Sync + Send {
/// Write list of actions as delta commit entry for given version.
///
/// This operation can be retried with a higher version in case the write
/// fails with `TransactionError::VersionAlreadyExists`.
/// fails with [`TransactionError::VersionAlreadyExists`].
async fn write_commit_entry(
&self,
version: i64,
Expand All @@ -69,6 +74,43 @@ pub trait LogStore: Sync + Send {
fn log_path(&self) -> &Path {
&DELTA_LOG_PATH
}

/// Check if the location is a delta table location
async fn is_delta_table_location(&self) -> ObjectStoreResult<bool> {
// TODO We should really be using HEAD here, but this fails in windows tests
let object_store = self.object_store();
let mut stream = object_store.list(Some(self.log_path())).await?;
if let Some(res) = stream.next().await {
match res {
Ok(_) => Ok(true),
Err(ObjectStoreError::NotFound { .. }) => Ok(false),
Err(err) => Err(err),
}
} else {
Ok(false)
}
}

#[cfg(feature = "datafusion")]
/// Generate a unique enough url to identify the store in datafusion.
/// The DF object store registry only cares about the scheme and the host of the url for
/// registering/fetching. In our case the scheme is hard-coded to "delta-rs", so to get a unique
/// host we convert the location from this `DeltaObjectStore` to a valid name, combining the
/// original scheme, host and path with invalid characters replaced.
fn object_store_url(&self) -> ObjectStoreUrl;

/// Deletes object by `paths`.
async fn delete_batch(&self, paths: &[Path]) -> ObjectStoreResult<()> {
let object_store = self.object_store();
for path in paths {
match object_store.delete(path).await {
Ok(_) => continue,
Err(ObjectStoreError::NotFound { .. }) => continue,
Err(e) => return Err(e),
}
}
Ok(())
}
}

// TODO: maybe a bit of a hack, required to `#[derive(Debug)]` for the operation builders
Expand Down Expand Up @@ -111,6 +153,21 @@ fn to_uri(root: &Url, location: &Path) -> String {
}
}

#[cfg(feature = "datafusion")]
fn object_store_url(location: &Url) -> ObjectStoreUrl {
// we are certain, that the URL can be parsed, since
// we make sure when we are parsing the table uri

use object_store::path::DELIMITER;
ObjectStoreUrl::parse(format!(
"delta-rs://{}-{}{}",
location.scheme(),
location.host_str().unwrap_or("-"),
location.path().replace(DELIMITER, "-").replace(':', "-")
))
.expect("Invalid object store url.")
}

/// Extract version from a file name in the delta log
pub fn extract_version_from_filename(name: &str) -> Option<i64> {
DELTA_LOG_REGEX
Expand All @@ -119,8 +176,7 @@ pub fn extract_version_from_filename(name: &str) -> Option<i64> {
}

async fn get_latest_version(log_store: &dyn LogStore, current_version: i64) -> DeltaResult<i64> {
let object_store = log_store.object_store();
let version_start = match get_last_checkpoint(&object_store).await {
let version_start = match get_last_checkpoint(log_store).await {
Ok(last_check_point) => last_check_point.version,
Err(ProtocolError::CheckpointNotFound) => {
// no checkpoint
Expand All @@ -140,6 +196,8 @@ async fn get_latest_version(log_store: &dyn LogStore, current_version: i64) -> D
let mut max_version: i64 = version_start;
let prefix = Some(log_store.log_path());
let offset_path = commit_uri_from_version(max_version);
// let log_store = log_store.clone();
let object_store = log_store.object_store();
let mut files = object_store.list_with_offset(prefix, &offset_path).await?;

while let Some(obj_meta) = files.next().await {
Expand Down Expand Up @@ -189,3 +247,29 @@ async fn write_commit_entry(
})?;
Ok(())
}

#[cfg(feature = "datafusion")]
#[cfg(test)]
mod tests {
use url::Url;

#[tokio::test]
async fn test_unique_object_store_url() {
for (location_1, location_2) in [
// Same scheme, no host, different path
("file:///path/to/table_1", "file:///path/to/table_2"),
// Different scheme/host, same path
("s3://my_bucket/path/to/table_1", "file:///path/to/table_1"),
// Same scheme, different host, same path
("s3://bucket_1/table_1", "s3://bucket_2/table_1"),
] {
let url_1 = Url::parse(location_1).unwrap();
let url_2 = Url::parse(location_2).unwrap();

assert_ne!(
super::object_store_url(&url_1).as_str(),
super::object_store_url(&url_2).as_str(),
);
}
}
}
3 changes: 2 additions & 1 deletion crates/deltalake-core/src/operations/create.rs
Original file line number Diff line number Diff line change
Expand Up @@ -291,7 +291,8 @@ impl std::future::IntoFuture for CreateBuilder {
Box::pin(async move {
let mode = this.mode.clone();
let (mut table, actions, operation) = this.into_table_and_actions()?;
let table_state = if table.object_store().is_delta_table_location().await? {
let log_store = table.log_store();
let table_state = if log_store.is_delta_table_location().await? {
match mode {
SaveMode::ErrorIfExists => return Err(CreateError::TableAlreadyExists.into()),
SaveMode::Append => return Err(CreateError::AppendNotAllowed.into()),
Expand Down
2 changes: 1 addition & 1 deletion crates/deltalake-core/src/operations/delete.rs
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,7 @@ impl std::future::IntoFuture for DeleteBuilder {
let session = SessionContext::new();

// If a user provides their own their DF state then they must register the store themselves
register_store(this.log_store.object_store().clone(), session.runtime_env());
register_store(this.log_store.clone(), session.runtime_env());

session.state()
});
Expand Down
2 changes: 1 addition & 1 deletion crates/deltalake-core/src/operations/merge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1212,7 +1212,7 @@ impl std::future::IntoFuture for MergeBuilder {
let session = SessionContext::new();

// If a user provides their own their DF state then they must register the store themselves
register_store(this.log_store.object_store().clone(), session.runtime_env());
register_store(this.log_store.clone(), session.runtime_env());

session.state()
});
Expand Down
2 changes: 1 addition & 1 deletion crates/deltalake-core/src/operations/update.rs
Original file line number Diff line number Diff line change
Expand Up @@ -430,7 +430,7 @@ impl std::future::IntoFuture for UpdateBuilder {
let session = SessionContext::new();

// If a user provides their own their DF state then they must register the store themselves
register_store(this.log_store.object_store().clone(), session.runtime_env());
register_store(this.log_store.clone(), session.runtime_env());

session.state()
});
Expand Down
3 changes: 1 addition & 2 deletions crates/deltalake-core/src/operations/write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -211,8 +211,7 @@ impl WriteBuilder {
}

async fn check_preconditions(&self) -> DeltaResult<Vec<Action>> {
let object_store = self.log_store.object_store();
match object_store.is_delta_table_location().await? {
match self.log_store.is_delta_table_location().await? {
true => {
let min_writer = self.snapshot.min_writer_version();
if min_writer > MAX_SUPPORTED_WRITER_VERSION {
Expand Down
33 changes: 15 additions & 18 deletions crates/deltalake-core/src/protocol/checkpoints.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use crate::kernel::{
Action, Add as AddAction, DataType, Metadata, PrimitiveType, Protocol, StructField, StructType,
Txn,
};
use crate::storage::DeltaObjectStore;
use crate::logstore::LogStore;
use crate::table::state::DeltaTableState;
use crate::table::{CheckPoint, CheckPointBuilder};
use crate::{open_table_with_version, DeltaTable};
Expand Down Expand Up @@ -70,12 +70,7 @@ pub const CHECKPOINT_RECORD_BATCH_SIZE: usize = 5000;

/// Creates checkpoint at current table version
pub async fn create_checkpoint(table: &DeltaTable) -> Result<(), ProtocolError> {
create_checkpoint_for(
table.version(),
table.get_state(),
table.object_store().as_ref(),
)
.await?;
create_checkpoint_for(table.version(), table.get_state(), table.log_store.as_ref()).await?;
Ok(())
}

Expand All @@ -86,7 +81,7 @@ pub async fn cleanup_metadata(table: &DeltaTable) -> Result<usize, ProtocolError
Utc::now().timestamp_millis() - table.get_state().log_retention_millis();
cleanup_expired_logs_for(
table.version(),
table.object_store().as_ref(),
table.log_store.as_ref(),
log_retention_timestamp,
)
.await
Expand All @@ -103,7 +98,7 @@ pub async fn create_checkpoint_from_table_uri_and_cleanup(
let table = open_table_with_version(table_uri, version)
.await
.map_err(|err| ProtocolError::Generic(err.to_string()))?;
create_checkpoint_for(version, table.get_state(), table.object_store().as_ref()).await?;
create_checkpoint_for(version, table.get_state(), table.log_store.as_ref()).await?;

let enable_expired_log_cleanup =
cleanup.unwrap_or_else(|| table.get_state().enable_expired_log_cleanup());
Expand All @@ -120,27 +115,28 @@ pub async fn create_checkpoint_from_table_uri_and_cleanup(
pub async fn create_checkpoint_for(
version: i64,
state: &DeltaTableState,
storage: &DeltaObjectStore,
log_store: &dyn LogStore,
) -> Result<(), ProtocolError> {
// TODO: checkpoints _can_ be multi-part... haven't actually found a good reference for
// an appropriate split point yet though so only writing a single part currently.
// See https://github.com/delta-io/delta-rs/issues/288
let last_checkpoint_path = storage.log_path().child("_last_checkpoint");
let last_checkpoint_path = log_store.log_path().child("_last_checkpoint");

debug!("Writing parquet bytes to checkpoint buffer.");
let (checkpoint, parquet_bytes) = parquet_bytes_from_state(state)?;

let file_name = format!("{version:020}.checkpoint.parquet");
let checkpoint_path = storage.log_path().child(file_name);
let checkpoint_path = log_store.log_path().child(file_name);

let object_store = log_store.object_store();
debug!("Writing checkpoint to {:?}.", checkpoint_path);
storage.put(&checkpoint_path, parquet_bytes).await?;
object_store.put(&checkpoint_path, parquet_bytes).await?;

let last_checkpoint_content: Value = serde_json::to_value(checkpoint)?;
let last_checkpoint_content = bytes::Bytes::from(serde_json::to_vec(&last_checkpoint_content)?);

debug!("Writing _last_checkpoint to {:?}.", last_checkpoint_path);
storage
object_store
.put(&last_checkpoint_path, last_checkpoint_content)
.await?;

Expand All @@ -151,7 +147,7 @@ pub async fn create_checkpoint_for(
/// and less than the specified version.
pub async fn cleanup_expired_logs_for(
until_version: i64,
storage: &DeltaObjectStore,
log_store: &dyn LogStore,
cutoff_timestamp: i64,
) -> Result<usize, ProtocolError> {
lazy_static! {
Expand All @@ -162,10 +158,11 @@ pub async fn cleanup_expired_logs_for(
// Feed a stream of candidate deletion files directly into the delete_stream
// function to try to improve the speed of cleanup and reduce the need for
// intermediate memory.
let deleted = storage
let object_store = log_store.object_store();
let deleted = object_store
.delete_stream(
storage
.list(Some(storage.log_path()))
object_store
.list(Some(log_store.log_path()))
.await?
// This predicate function will filter out any locations that don't
// match the given timestamp range
Expand Down
Loading

0 comments on commit 711c031

Please sign in to comment.