Skip to content

Commit

Permalink
refactor!: Implement default LogStore trait
Browse files Browse the repository at this point in the history
Introduce a new trait, `LogStore` and a single implementation that keeps
the functionality unchanged.

`LogStore` serves as an entry point for interacting with the delta
commit log in a centralized way, allowing to read commits and
atomically create new commits.

This is in preparation for enabling S3 DynamoDb multi-cluster writes in
compatibility with the reference JVM implementation.
  • Loading branch information
dispanser committed Oct 19, 2023
1 parent 1d4d5b7 commit db65ba9
Show file tree
Hide file tree
Showing 31 changed files with 443 additions and 302 deletions.
16 changes: 8 additions & 8 deletions python/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,7 @@ impl RawDeltaTable {
retention_hours: Option<u64>,
enforce_retention_duration: bool,
) -> PyResult<Vec<String>> {
let mut cmd = VacuumBuilder::new(self._table.object_store(), self._table.state.clone())
let mut cmd = VacuumBuilder::new(self._table.log_store(), self._table.state.clone())
.with_enforce_retention_duration(enforce_retention_duration)
.with_dry_run(dry_run);
if let Some(retention_period) = retention_hours {
Expand All @@ -295,7 +295,7 @@ impl RawDeltaTable {
writer_properties: Option<HashMap<String, usize>>,
safe_cast: bool,
) -> PyResult<String> {
let mut cmd = UpdateBuilder::new(self._table.object_store(), self._table.state.clone())
let mut cmd = UpdateBuilder::new(self._table.log_store(), self._table.state.clone())
.with_safe_cast(safe_cast);

if let Some(writer_props) = writer_properties {
Expand Down Expand Up @@ -348,7 +348,7 @@ impl RawDeltaTable {
max_concurrent_tasks: Option<usize>,
min_commit_interval: Option<u64>,
) -> PyResult<String> {
let mut cmd = OptimizeBuilder::new(self._table.object_store(), self._table.state.clone())
let mut cmd = OptimizeBuilder::new(self._table.log_store(), self._table.state.clone())
.with_max_concurrent_tasks(max_concurrent_tasks.unwrap_or_else(num_cpus::get));
if let Some(size) = target_size {
cmd = cmd.with_target_size(size);
Expand Down Expand Up @@ -378,7 +378,7 @@ impl RawDeltaTable {
max_spill_size: usize,
min_commit_interval: Option<u64>,
) -> PyResult<String> {
let mut cmd = OptimizeBuilder::new(self._table.object_store(), self._table.state.clone())
let mut cmd = OptimizeBuilder::new(self._table.log_store(), self._table.state.clone())
.with_max_concurrent_tasks(max_concurrent_tasks.unwrap_or_else(num_cpus::get))
.with_max_spill_size(max_spill_size)
.with_type(OptimizeType::ZOrder(z_order_columns));
Expand Down Expand Up @@ -445,7 +445,7 @@ impl RawDeltaTable {
let source_df = ctx.read_table(table_provider).unwrap();

let mut cmd = MergeBuilder::new(
self._table.object_store(),
self._table.log_store(),
self._table.state.clone(),
predicate,
source_df,
Expand Down Expand Up @@ -587,7 +587,7 @@ impl RawDeltaTable {
ignore_missing_files: bool,
protocol_downgrade_allowed: bool,
) -> PyResult<String> {
let mut cmd = RestoreBuilder::new(self._table.object_store(), self._table.state.clone());
let mut cmd = RestoreBuilder::new(self._table.log_store(), self._table.state.clone());
if let Some(val) = target {
if let Ok(version) = val.extract::<i64>() {
cmd = cmd.with_version_to_restore(version)
Expand Down Expand Up @@ -799,7 +799,7 @@ impl RawDeltaTable {
partition_by: Some(partition_by),
predicate: None,
};
let store = self._table.object_store();
let store = self._table.log_store();

rt()?
.block_on(commit(
Expand Down Expand Up @@ -843,7 +843,7 @@ impl RawDeltaTable {
/// Run the delete command on the delta table: delete records following a predicate and return the delete metrics.
#[pyo3(signature = (predicate = None))]
pub fn delete(&mut self, predicate: Option<String>) -> PyResult<String> {
let mut cmd = DeleteBuilder::new(self._table.object_store(), self._table.state.clone());
let mut cmd = DeleteBuilder::new(self._table.log_store(), self._table.state.clone());
if let Some(predicate) = predicate {
cmd = cmd.with_predicate(predicate);
}
Expand Down
115 changes: 115 additions & 0 deletions rust/src/logstore/default_logstore.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
//! Default implementation of [`LogStore`] for storage backends with atomic put-if-absent operation
use std::cmp::max;

use bytes::Bytes;
use futures::StreamExt;
use lazy_static::lazy_static;
use log::debug;
use object_store::{path::Path, Error as ObjectStoreError, ObjectStore};
use regex::Regex;

use super::LogStore;
use crate::{
operations::transaction::TransactionError,
protocol::{get_last_checkpoint, ProtocolError},
storage::{commit_uri_from_version, ObjectStoreRef},
DeltaResult, DeltaTableError,
};

/// Default [`LogStore`] implementation
#[derive(Debug, Clone)]
pub struct DefaultLogStore {
pub(crate) storage: ObjectStoreRef,
}

#[async_trait::async_trait]
impl LogStore for DefaultLogStore {
async fn read_commit_entry(&self, version: i64) -> DeltaResult<Bytes> {
let commit_uri = commit_uri_from_version(version);
let data = self.storage.get(&commit_uri).await?.bytes().await?;
Ok(data)

// TODO: return actual actions instead
// let actions = Self::get_actions(next_version, commit_log_bytes).await;
}

/// Tries to commit a prepared commit file. Returns [DeltaTableError::VersionAlreadyExists]
/// if the given `version` already exists. The caller should handle the retry logic itself.
/// This is low-level transaction API. If user does not want to maintain the commit loop then
/// the `DeltaTransaction.commit` is desired to be used as it handles `try_commit_transaction`
/// with retry logic.
async fn write_commit_entry(
&self,
version: i64,
tmp_commit: &Path,
) -> Result<(), TransactionError> {
// move temporary commit file to delta log directory
// rely on storage to fail if the file already exists -
self.storage
.rename_if_not_exists(tmp_commit, &commit_uri_from_version(version))
.await
.map_err(|err| match err {
ObjectStoreError::AlreadyExists { .. } => {
TransactionError::VersionAlreadyExists(version)
}
_ => TransactionError::from(err),
})?;
Ok(())
}

async fn get_latest_version(&self, current_version: i64) -> DeltaResult<i64> {
let version_start = match get_last_checkpoint(&self.storage).await {
Ok(last_check_point) => last_check_point.version,
Err(ProtocolError::CheckpointNotFound) => {
// no checkpoint
-1
}
Err(e) => {
return Err(DeltaTableError::from(e));
}
};

debug!("latest checkpoint version: {version_start}");

let version_start = max(current_version, version_start);

lazy_static! {
static ref DELTA_LOG_REGEX: Regex =
Regex::new(r"_delta_log/(\d{20})\.(json|checkpoint).*$").unwrap();
}

// list files to find max version
let version = async {
let mut max_version: i64 = version_start;
let prefix = Some(self.storage.log_path());
let offset_path = commit_uri_from_version(max_version);
let mut files = self.storage.list_with_offset(prefix, &offset_path).await?;
// let mut files = self.storage.list_with_offset(prefix, &offset_path).await?;

while let Some(obj_meta) = files.next().await {
let obj_meta = obj_meta?;
if let Some(captures) = DELTA_LOG_REGEX.captures(obj_meta.location.as_ref()) {
let log_version = captures.get(1).unwrap().as_str().parse().unwrap();
// listing may not be ordered
max_version = max(max_version, log_version);
// also cache timestamp for version, for faster time-travel
// TODO: temporarily disabled because `version_timestamp` is not available in the [`LogStore`]
// self.version_timestamp
// .insert(log_version, obj_meta.last_modified.timestamp());
}
}

if max_version < 0 {
return Err(DeltaTableError::not_a_table(self.storage.root_uri()));
}

Ok::<i64, DeltaTableError>(max_version)
}
.await?;
Ok(version)
}

fn object_store(&self) -> ObjectStoreRef {
self.storage.clone()
}
}
27 changes: 23 additions & 4 deletions rust/src/logstore/mod.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,17 @@
//! Delta log store.
// use std::io::{Cursor, BufReader, BufRead};
use std::sync::Arc;

use crate::errors::DeltaResult;
use crate::{
errors::DeltaResult, operations::transaction::TransactionError, storage::ObjectStoreRef,
};
use bytes::Bytes;
// use log::debug;
use object_store::path::Path;

pub mod default_logstore;

/// Sharable reference to [`LogStore`]
pub type LogStoreRef = Arc<dyn LogStore>;

/// Trait for critical operations required to read and write commit entries in Delta logs.
///
/// The correctness is predicated on the atomicity and durability guarantees of
Expand All @@ -27,8 +32,22 @@ pub trait LogStore: Sync + Send {
///
/// This operation can be retried with a higher version in case the write
/// fails with `TransactionError::VersionAlreadyExists`.
async fn write_commit_entry(&self, version: i64, actions: Bytes) -> DeltaResult<()>;
async fn write_commit_entry(
&self,
version: i64,
tmp_commit: &Path,
) -> Result<(), TransactionError>;

/// Find latest version currently stored in the delta log.
async fn get_latest_version(&self, start_version: i64) -> DeltaResult<i64>;

/// Get underlying object store.
fn object_store(&self) -> ObjectStoreRef;
}

// TODO: maybe a bit of a hack, required to `#[derive(Debug)]` for the operation builders
impl std::fmt::Debug for dyn LogStore + '_ {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
self.object_store().fmt(f)
}
}
27 changes: 13 additions & 14 deletions rust/src/operations/create.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,16 @@
// https://github.com/delta-io/delta/blob/master/core/src/main/scala/org/apache/spark/sql/delta/commands/CreateDeltaTableCommand.scala

use std::collections::HashMap;
use std::sync::Arc;

use futures::future::BoxFuture;
use serde_json::{Map, Value};

use super::transaction::commit;
use super::{MAX_SUPPORTED_READER_VERSION, MAX_SUPPORTED_WRITER_VERSION};
use crate::errors::{DeltaResult, DeltaTableError};
use crate::logstore::LogStoreRef;
use crate::protocol::{Action, DeltaOperation, MetaData, Protocol, SaveMode};
use crate::schema::{SchemaDataType, SchemaField, SchemaTypeStruct};
use crate::storage::DeltaObjectStore;
use crate::table::builder::ensure_table_uri;
use crate::table::config::DeltaConfigKey;
use crate::table::DeltaTableMetaData;
Expand Down Expand Up @@ -55,7 +54,7 @@ pub struct CreateBuilder {
partition_columns: Option<Vec<String>>,
storage_options: Option<HashMap<String, String>>,
actions: Vec<Action>,
object_store: Option<Arc<DeltaObjectStore>>,
log_store: Option<LogStoreRef>,
configuration: HashMap<String, Option<String>>,
metadata: Option<Map<String, Value>>,
}
Expand All @@ -78,7 +77,7 @@ impl CreateBuilder {
partition_columns: None,
storage_options: None,
actions: Default::default(),
object_store: None,
log_store: None,
configuration: Default::default(),
metadata: Default::default(),
}
Expand Down Expand Up @@ -200,8 +199,8 @@ impl CreateBuilder {
}

/// Provide a [`DeltaObjectStore`] instance, that points at table location
pub fn with_object_store(mut self, object_store: Arc<DeltaObjectStore>) -> Self {
self.object_store = Some(object_store);
pub fn with_log_store(mut self, log_store: LogStoreRef) -> Self {
self.log_store = Some(log_store);
self
}

Expand All @@ -220,12 +219,12 @@ impl CreateBuilder {
return Err(CreateError::MissingSchema.into());
}

let (storage_url, table) = if let Some(object_store) = self.object_store {
let (storage_url, table) = if let Some(log_store) = self.log_store {
(
ensure_table_uri(object_store.root_uri())?
ensure_table_uri(log_store.object_store().root_uri())?
.as_str()
.to_string(),
DeltaTable::new(object_store, Default::default()),
DeltaTable::new(log_store, Default::default()),
)
} else {
let storage_url = ensure_table_uri(self.location.ok_or(CreateError::MissingLocation)?)?;
Expand Down Expand Up @@ -310,7 +309,7 @@ impl std::future::IntoFuture for CreateBuilder {
};

let version = commit(
table.object_store().as_ref(),
table.log_store.as_ref(),
&actions,
operation,
table_state,
Expand Down Expand Up @@ -440,19 +439,19 @@ mod tests {
assert_eq!(table.version(), 0);
let first_id = table.get_metadata().unwrap().id.clone();

let object_store = table.object_store();
let log_store = table.log_store;

// Check an error is raised when a table exists at location
let table = CreateBuilder::new()
.with_object_store(object_store.clone())
.with_log_store(log_store.clone())
.with_columns(schema.get_fields().clone())
.with_save_mode(SaveMode::ErrorIfExists)
.await;
assert!(table.is_err());

// Check current table is returned when ignore option is chosen.
let table = CreateBuilder::new()
.with_object_store(object_store.clone())
.with_log_store(log_store.clone())
.with_columns(schema.get_fields().clone())
.with_save_mode(SaveMode::Ignore)
.await
Expand All @@ -461,7 +460,7 @@ mod tests {

// Check table is overwritten
let table = CreateBuilder::new()
.with_object_store(object_store.clone())
.with_log_store(log_store.clone())
.with_columns(schema.get_fields().clone())
.with_save_mode(SaveMode::Overwrite)
.await
Expand Down
Loading

0 comments on commit db65ba9

Please sign in to comment.