Skip to content

Commit

Permalink
Remove DeltaObjectStore, in favor of LogStore + ObjectStore
Browse files Browse the repository at this point in the history
  • Loading branch information
dispanser committed Nov 7, 2023
1 parent 711c031 commit 5583247
Show file tree
Hide file tree
Showing 14 changed files with 145 additions and 301 deletions.
34 changes: 21 additions & 13 deletions crates/deltalake-core/src/logstore/default_logstore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,21 +5,24 @@ use std::sync::Arc;
use bytes::Bytes;
#[cfg(feature = "datafusion")]
use datafusion::execution::object_store::ObjectStoreUrl;
use object_store::path::Path;
use object_store::{path::Path, ObjectStore};
use url::Url;

use super::LogStore;
use super::{LogStore, LogStoreConfig};
use crate::{
operations::transaction::TransactionError,
storage::{config::StorageOptions, DeltaObjectStore, ObjectStoreRef},
storage::{
config::{self, StorageOptions},
ObjectStoreRef,
},
DeltaResult,
};

/// Default [`LogStore`] implementation
#[derive(Debug, Clone)]
pub struct DefaultLogStore {
pub(crate) storage: ObjectStoreRef,
location: Url,
pub(crate) storage: Arc<dyn ObjectStore>,
config: LogStoreConfig,
}

impl DefaultLogStore {
Expand All @@ -29,16 +32,17 @@ impl DefaultLogStore {
///
/// * `storage` - A shared reference to an [`object_store::ObjectStore`] with "/" pointing at delta table root (i.e. where `_delta_log` is located).
/// * `location` - A url corresponding to the storage location of `storage`.
pub fn new(storage: ObjectStoreRef, location: Url) -> Self {
DefaultLogStore { storage, location }
pub fn new(storage: ObjectStoreRef, config: LogStoreConfig) -> Self {
DefaultLogStore { storage, config }
}

/// Create log store
pub fn try_new(location: Url, options: impl Into<StorageOptions> + Clone) -> DeltaResult<Self> {
let object_store = DeltaObjectStore::try_new(location.clone(), options.clone())?;
let mut options = options.into();
let storage = config::configure_store(&location, &mut options)?;
Ok(Self {
storage: Arc::new(object_store),
location,
storage: Arc::new(storage),
config: LogStoreConfig { location, options },
})
}
}
Expand Down Expand Up @@ -66,16 +70,20 @@ impl LogStore for DefaultLogStore {
super::get_latest_version(self, current_version).await
}

fn object_store(&self) -> ObjectStoreRef {
fn object_store(&self) -> Arc<dyn ObjectStore> {
self.storage.clone()
}

fn to_uri(&self, location: &Path) -> String {
super::to_uri(&self.location, location)
super::to_uri(&self.config.location, location)
}

#[cfg(feature = "datafusion")]
fn object_store_url(&self) -> ObjectStoreUrl {
super::object_store_url(&self.location)
super::object_store_url(&self.config.location)
}

fn config(&self) -> &LogStoreConfig {
&self.config
}
}
75 changes: 70 additions & 5 deletions crates/deltalake-core/src/logstore/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,19 @@
use futures::StreamExt;
use lazy_static::lazy_static;
use regex::Regex;
use std::{cmp::max, sync::Arc};
use serde::{
de::{Error, SeqAccess, Visitor},
ser::SerializeSeq,
Deserialize, Serialize,
};
use std::{cmp::max, collections::HashMap, sync::Arc};
use url::Url;

use crate::{
errors::DeltaResult,
operations::transaction::TransactionError,
protocol::{get_last_checkpoint, ProtocolError},
storage::{commit_uri_from_version, DeltaObjectStore, ObjectStoreRef},
storage::{commit_uri_from_version, config::StorageOptions},
DeltaTableError,
};
use bytes::Bytes;
Expand All @@ -30,6 +35,15 @@ lazy_static! {
static ref DELTA_LOG_PATH: Path = Path::from("_delta_log");
}

/// Configuration parameters for a log store
#[derive(Debug, Clone)]
pub struct LogStoreConfig {
/// url corresponding to the storage location.
pub location: Url,
/// Options used for configuring backend storage
pub options: StorageOptions,
}

/// Trait for critical operations required to read and write commit entries in Delta logs.
///
/// The correctness is predicated on the atomicity and durability guarantees of
Expand Down Expand Up @@ -60,7 +74,7 @@ pub trait LogStore: Sync + Send {
async fn get_latest_version(&self, start_version: i64) -> DeltaResult<i64>;

/// Get underlying object store.
fn object_store(&self) -> ObjectStoreRef;
fn object_store(&self) -> Arc<dyn ObjectStore>;

/// [Path] to Delta log
fn to_uri(&self, location: &Path) -> String;
Expand Down Expand Up @@ -95,7 +109,7 @@ pub trait LogStore: Sync + Send {
/// Generate a unique enough url to identify the store in datafusion.
/// The DF object store registry only cares about the scheme and the host of the url for
/// registering/fetching. In our case the scheme is hard-coded to "delta-rs", so to get a unique
/// host we convert the location from this `DeltaObjectStore` to a valid name, combining the
/// host we convert the location from this `LogStore` to a valid name, combining the
/// original scheme, host and path with invalid characters replaced.
fn object_store_url(&self) -> ObjectStoreUrl;

Expand All @@ -111,6 +125,9 @@ pub trait LogStore: Sync + Send {
}
Ok(())
}

/// Get configuration representing configured log store.
fn config(&self) -> &LogStoreConfig;
}

// TODO: maybe a bit of a hack, required to `#[derive(Debug)]` for the operation builders
Expand All @@ -120,6 +137,54 @@ impl std::fmt::Debug for dyn LogStore + '_ {
}
}

impl Serialize for LogStoreConfig {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
let mut seq = serializer.serialize_seq(None)?;
seq.serialize_element(&self.location.to_string())?;
seq.serialize_element(&self.options.0)?;
seq.end()
}
}

impl<'de> Deserialize<'de> for LogStoreConfig {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: serde::Deserializer<'de>,
{
struct LogStoreConfigVisitor {}

impl<'de> Visitor<'de> for LogStoreConfigVisitor {
type Value = LogStoreConfig;

fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result {
formatter.write_str("struct LogStoreConfig")
}

fn visit_seq<A>(self, mut seq: A) -> Result<Self::Value, A::Error>
where
A: SeqAccess<'de>,
{
let location_str: String = seq
.next_element()?
.ok_or_else(|| A::Error::invalid_length(0, &self))?;
let options: HashMap<String, String> = seq
.next_element()?
.ok_or_else(|| A::Error::invalid_length(0, &self))?;
let location = Url::parse(&location_str).unwrap();
Ok(LogStoreConfig {
location,
options: options.into(),
})
}
}

deserializer.deserialize_seq(LogStoreConfigVisitor {})
}
}

lazy_static! {
static ref DELTA_LOG_REGEX: Regex = Regex::new(r"(\d{20})\.(json|checkpoint).*$").unwrap();
}
Expand Down Expand Up @@ -228,7 +293,7 @@ async fn read_commit_entry(storage: &dyn ObjectStore, version: i64) -> DeltaResu
}

async fn write_commit_entry(
storage: &DeltaObjectStore,
storage: &dyn ObjectStore,
version: i64,
tmp_commit: &Path,
) -> Result<(), TransactionError> {
Expand Down
12 changes: 8 additions & 4 deletions crates/deltalake-core/src/operations/transaction/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -243,8 +243,7 @@ mod tests {
use self::test_utils::{create_remove_action, init_table_actions};
use super::*;
use crate::{
logstore::default_logstore::DefaultLogStore,
storage::{commit_uri_from_version, DeltaObjectStore},
logstore::default_logstore::DefaultLogStore, storage::commit_uri_from_version,
DeltaConfigKey,
};
use object_store::memory::InMemory;
Expand Down Expand Up @@ -294,8 +293,13 @@ mod tests {
async fn test_try_commit_transaction() {
let store = Arc::new(InMemory::new());
let url = Url::parse("mem://what/is/this").unwrap();
let delta_store = DeltaObjectStore::new(store.clone(), url.clone());
let log_store = DefaultLogStore::new(Arc::new(delta_store), url);
let log_store = DefaultLogStore::new(
store.clone(),
crate::logstore::LogStoreConfig {
location: url,
options: HashMap::new().into(),
},
);
let tmp_path = Path::from("_delta_log/tmp");
let version_path = Path::from("_delta_log/00000000000000000000.json");
store.put(&tmp_path, bytes::Bytes::new()).await.unwrap();
Expand Down
6 changes: 4 additions & 2 deletions crates/deltalake-core/src/storage/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,8 @@ impl From<HashMap<String, String>> for StorageOptions {
}
}

pub(crate) fn configure_log_store(
/// Configure a [`LogStoreRef`] for the given url and configuration
pub fn configure_log_store(
url: Url,
options: impl Into<StorageOptions> + Clone,
) -> DeltaResult<LogStoreRef> {
Expand All @@ -236,7 +237,8 @@ pub(crate) fn configure_log_store(
Ok(Arc::new(DefaultLogStore::try_new(url, options)?))
}

pub(crate) fn configure_store(
/// Configure an instance of an [`ObjectStore`] for the given url and configuration
pub fn configure_store(
url: &Url,
options: &mut StorageOptions,
) -> DeltaResult<Arc<DynObjectStore>> {
Expand Down
Loading

0 comments on commit 5583247

Please sign in to comment.