Skip to content

Commit

Permalink
DeltaObjectStore fold root_uri and to_uri into LogStore
Browse files Browse the repository at this point in the history
  • Loading branch information
dispanser committed Nov 6, 2023
1 parent 37706cc commit 4493f56
Show file tree
Hide file tree
Showing 12 changed files with 119 additions and 104 deletions.
44 changes: 21 additions & 23 deletions crates/deltalake-core/src/delta_datafusion/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ use url::Url;

use crate::errors::{DeltaResult, DeltaTableError};
use crate::kernel::{Add, DataType as DeltaDataType, Invariant, PrimitiveType};
use crate::logstore::LogStoreRef;
use crate::protocol::{self};
use crate::storage::ObjectStoreRef;
use crate::table::builder::ensure_table_uri;
Expand Down Expand Up @@ -467,7 +468,7 @@ pub struct DeltaScanConfig {
#[derive(Debug)]
pub(crate) struct DeltaScanBuilder<'a> {
snapshot: &'a DeltaTableState,
object_store: ObjectStoreRef,
log_store: LogStoreRef,
filter: Option<Expr>,
state: &'a SessionState,
projection: Option<&'a Vec<usize>>,
Expand All @@ -480,12 +481,12 @@ pub(crate) struct DeltaScanBuilder<'a> {
impl<'a> DeltaScanBuilder<'a> {
pub fn new(
snapshot: &'a DeltaTableState,
object_store: ObjectStoreRef,
log_store: LogStoreRef,
state: &'a SessionState,
) -> Self {
DeltaScanBuilder {
snapshot,
object_store,
log_store,
filter: None,
state,
files: None,
Expand Down Expand Up @@ -532,7 +533,7 @@ impl<'a> DeltaScanBuilder<'a> {
Some(schema) => schema,
None => {
self.snapshot
.physical_arrow_schema(self.object_store.clone())
.physical_arrow_schema(self.log_store.object_store())
.await?
}
};
Expand Down Expand Up @@ -632,7 +633,7 @@ impl<'a> DeltaScanBuilder<'a> {
.create_physical_plan(
self.state,
FileScanConfig {
object_store_url: self.object_store.object_store_url(),
object_store_url: self.log_store.object_store().object_store_url(),
file_schema,
file_groups: file_groups.into_values().collect(),
statistics: self.snapshot.datafusion_table_statistics(),
Expand All @@ -647,9 +648,7 @@ impl<'a> DeltaScanBuilder<'a> {
.await?;

Ok(DeltaScan {
table_uri: ensure_table_uri(self.object_store.root_uri())?
.as_str()
.into(),
table_uri: ensure_table_uri(self.log_store.root_uri())?.as_str().into(),
parquet_scan: scan,
config,
logical_schema,
Expand Down Expand Up @@ -689,7 +688,7 @@ impl TableProvider for DeltaTable {
register_store(self.object_store(), session.runtime_env().clone());
let filter_expr = conjunction(filters.iter().cloned());

let scan = DeltaScanBuilder::new(&self.state, self.object_store(), session)
let scan = DeltaScanBuilder::new(&self.state, self.log_store(), session)
.with_projection(projection)
.with_limit(limit)
.with_filter(filter_expr)
Expand All @@ -714,7 +713,7 @@ impl TableProvider for DeltaTable {
/// A Delta table provider that enables additonal metadata columns to be included during the scan
pub struct DeltaTableProvider {
snapshot: DeltaTableState,
store: ObjectStoreRef,
log_store: LogStoreRef,
config: DeltaScanConfig,
schema: Arc<ArrowSchema>,
}
Expand All @@ -723,13 +722,13 @@ impl DeltaTableProvider {
/// Build a DeltaTableProvider
pub fn try_new(
snapshot: DeltaTableState,
store: ObjectStoreRef,
log_store: LogStoreRef,
config: DeltaScanConfig,
) -> DeltaResult<Self> {
Ok(DeltaTableProvider {
schema: logical_schema(&snapshot, &config)?,
snapshot,
store,
log_store,
config,
})
}
Expand Down Expand Up @@ -764,10 +763,10 @@ impl TableProvider for DeltaTableProvider {
filters: &[Expr],
limit: Option<usize>,
) -> DataFusionResult<Arc<dyn ExecutionPlan>> {
register_store(self.store.clone(), session.runtime_env().clone());
register_store(self.log_store.object_store(), session.runtime_env().clone());
let filter_expr = conjunction(filters.iter().cloned());

let scan = DeltaScanBuilder::new(&self.snapshot, self.store.clone(), session)
let scan = DeltaScanBuilder::new(&self.snapshot, self.log_store.clone(), session)
.with_projection(projection)
.with_limit(limit)
.with_filter(filter_expr)
Expand Down Expand Up @@ -1462,7 +1461,7 @@ fn join_batches_with_add_actions(
/// Determine which files contain a record that statisfies the predicate
pub(crate) async fn find_files_scan<'a>(
snapshot: &DeltaTableState,
store: ObjectStoreRef,
log_store: LogStoreRef,
state: &SessionState,
expression: Expr,
) -> DeltaResult<Vec<Add>> {
Expand All @@ -1489,7 +1488,7 @@ pub(crate) async fn find_files_scan<'a>(
// Add path column
used_columns.push(logical_schema.index_of(scan_config.file_column_name.as_ref().unwrap())?);

let scan = DeltaScanBuilder::new(snapshot, store.clone(), state)
let scan = DeltaScanBuilder::new(snapshot, log_store.clone(), state)
.with_filter(Some(expression.clone()))
.with_projection(Some(&used_columns))
.with_scan_config(scan_config)
Expand Down Expand Up @@ -1580,7 +1579,7 @@ pub(crate) async fn scan_memory_table(
/// Finds files in a snapshot that match the provided predicate.
pub async fn find_files<'a>(
snapshot: &DeltaTableState,
object_store: ObjectStoreRef,
log_store: LogStoreRef,
state: &SessionState,
predicate: Option<Expr>,
) -> DeltaResult<FindFiles> {
Expand Down Expand Up @@ -1608,8 +1607,7 @@ pub async fn find_files<'a>(
})
} else {
let candidates =
find_files_scan(snapshot, object_store.clone(), state, predicate.to_owned())
.await?;
find_files_scan(snapshot, log_store, state, predicate.to_owned()).await?;

Ok(FindFiles {
candidates,
Expand Down Expand Up @@ -1924,8 +1922,8 @@ mod tests {
.build(&table.state)
.unwrap();

let storage = table.object_store();
let provider = DeltaTableProvider::try_new(table.state, storage, config).unwrap();
let log_store = table.log_store();
let provider = DeltaTableProvider::try_new(table.state, log_store, config).unwrap();
let ctx = SessionContext::new();
ctx.register_table("test", Arc::new(provider)).unwrap();

Expand Down Expand Up @@ -1984,8 +1982,8 @@ mod tests {

let config = DeltaScanConfigBuilder::new().build(&table.state).unwrap();

let storage = table.object_store();
let provider = DeltaTableProvider::try_new(table.state, storage, config).unwrap();
let log_store = table.log_store();
let provider = DeltaTableProvider::try_new(table.state, log_store, config).unwrap();
let ctx = SessionContext::new();
ctx.register_table("test", Arc::new(provider)).unwrap();

Expand Down
20 changes: 18 additions & 2 deletions crates/deltalake-core/src/logstore/default_logstore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,26 @@ use crate::{
#[derive(Debug, Clone)]
pub struct DefaultLogStore {
pub(crate) storage: ObjectStoreRef,
location: Url,
}

impl DefaultLogStore {
/// Create a new instance of [`DefaultLogStore`]
///
/// # Arguments
///
/// * `storage` - A shared reference to an [`object_store::ObjectStore`] with "/" pointing at delta table root (i.e. where `_delta_log` is located).
/// * `location` - A url corresponding to the storage location of `storage`.
pub fn new(storage: ObjectStoreRef, location: Url) -> Self {
DefaultLogStore { storage, location }
}

/// Create log store
pub fn try_new(location: Url, options: impl Into<StorageOptions> + Clone) -> DeltaResult<Self> {
let object_store = DeltaObjectStore::try_new(location, options.clone())?;
let object_store = DeltaObjectStore::try_new(location.clone(), options.clone())?;
Ok(Self {
storage: Arc::new(object_store),
location,
})
}
}
Expand All @@ -49,10 +61,14 @@ impl LogStore for DefaultLogStore {
}

async fn get_latest_version(&self, current_version: i64) -> DeltaResult<i64> {
super::get_latest_version(&self.storage, current_version).await
super::get_latest_version(self, current_version).await
}

fn object_store(&self) -> ObjectStoreRef {
self.storage.clone()
}

fn to_uri(&self, location: &Path) -> String {
super::to_uri(&self.location, location)
}
}
58 changes: 53 additions & 5 deletions crates/deltalake-core/src/logstore/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use futures::StreamExt;
use lazy_static::lazy_static;
use regex::Regex;
use std::{cmp::max, sync::Arc};
use url::Url;

use crate::{
errors::DeltaResult,
Expand All @@ -20,6 +21,10 @@ pub mod default_logstore;
/// Sharable reference to [`LogStore`]
pub type LogStoreRef = Arc<dyn LogStore>;

lazy_static! {
static ref DELTA_LOG_PATH: Path = Path::from("_delta_log");
}

/// Trait for critical operations required to read and write commit entries in Delta logs.
///
/// The correctness is predicated on the atomicity and durability guarantees of
Expand Down Expand Up @@ -51,6 +56,19 @@ pub trait LogStore: Sync + Send {

/// Get underlying object store.
fn object_store(&self) -> ObjectStoreRef;

/// [Path] to Delta log
fn to_uri(&self, location: &Path) -> String;

/// Get fully qualified uri for table root
fn root_uri(&self) -> String {
self.to_uri(&Path::from(""))
}

/// [Path] to Delta log
fn log_path(&self) -> &Path {
&DELTA_LOG_PATH
}
}

// TODO: maybe a bit of a hack, required to `#[derive(Debug)]` for the operation builders
Expand All @@ -64,15 +82,45 @@ lazy_static! {
static ref DELTA_LOG_REGEX: Regex = Regex::new(r"(\d{20})\.(json|checkpoint).*$").unwrap();
}

fn to_uri(root: &Url, location: &Path) -> String {
match root.scheme() {
"file" => {
#[cfg(windows)]
let uri = format!(
"{}/{}",
root.as_ref().trim_end_matches('/'),
location.as_ref()
)
.replace("file:///", "");
#[cfg(unix)]
let uri = format!(
"{}/{}",
root.as_ref().trim_end_matches('/'),
location.as_ref()
)
.replace("file://", "");
uri
}
_ => {
if location.as_ref().is_empty() || location.as_ref() == "/" {
root.as_ref().to_string()
} else {
format!("{}/{}", root.as_ref(), location.as_ref())
}
}
}
}

/// Extract version from a file name in the delta log
pub fn extract_version_from_filename(name: &str) -> Option<i64> {
DELTA_LOG_REGEX
.captures(name)
.map(|captures| captures.get(1).unwrap().as_str().parse().unwrap())
}

async fn get_latest_version(storage: &ObjectStoreRef, current_version: i64) -> DeltaResult<i64> {
let version_start = match get_last_checkpoint(storage).await {
async fn get_latest_version(log_store: &dyn LogStore, current_version: i64) -> DeltaResult<i64> {
let object_store = log_store.object_store();
let version_start = match get_last_checkpoint(&object_store).await {
Ok(last_check_point) => last_check_point.version,
Err(ProtocolError::CheckpointNotFound) => {
// no checkpoint
Expand All @@ -90,9 +138,9 @@ async fn get_latest_version(storage: &ObjectStoreRef, current_version: i64) -> D
// list files to find max version
let version = async {
let mut max_version: i64 = version_start;
let prefix = Some(storage.log_path());
let prefix = Some(log_store.log_path());
let offset_path = commit_uri_from_version(max_version);
let mut files = storage.list_with_offset(prefix, &offset_path).await?;
let mut files = object_store.list_with_offset(prefix, &offset_path).await?;

while let Some(obj_meta) = files.next().await {
let obj_meta = obj_meta?;
Expand All @@ -106,7 +154,7 @@ async fn get_latest_version(storage: &ObjectStoreRef, current_version: i64) -> D
}

if max_version < 0 {
return Err(DeltaTableError::not_a_table(storage.root_uri()));
return Err(DeltaTableError::not_a_table(log_store.root_uri()));
}

Ok::<i64, DeltaTableError>(max_version)
Expand Down
4 changes: 1 addition & 3 deletions crates/deltalake-core/src/operations/create.rs
Original file line number Diff line number Diff line change
Expand Up @@ -221,9 +221,7 @@ impl CreateBuilder {

let (storage_url, table) = if let Some(log_store) = self.log_store {
(
ensure_table_uri(log_store.object_store().root_uri())?
.as_str()
.to_string(),
ensure_table_uri(log_store.root_uri())?.as_str().to_string(),
DeltaTable::new(log_store, Default::default()),
)
} else {
Expand Down
17 changes: 5 additions & 12 deletions crates/deltalake-core/src/operations/delete.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ use crate::kernel::{Action, Add, Remove};
use crate::operations::transaction::commit;
use crate::operations::write::write_execution_plan;
use crate::protocol::DeltaOperation;
use crate::storage::ObjectStoreRef;
use crate::table::state::DeltaTableState;
use crate::DeltaTable;

Expand Down Expand Up @@ -126,7 +125,7 @@ impl DeleteBuilder {

async fn excute_non_empty_expr(
snapshot: &DeltaTableState,
object_store: ObjectStoreRef,
log_store: LogStoreRef,
state: &SessionState,
expression: &Expr,
metrics: &mut DeleteMetrics,
Expand All @@ -145,7 +144,7 @@ async fn excute_non_empty_expr(
.partition_columns
.clone();

let scan = DeltaScanBuilder::new(snapshot, object_store.clone(), state)
let scan = DeltaScanBuilder::new(snapshot, log_store.clone(), state)
.with_files(rewrite)
.build()
.await?;
Expand All @@ -168,7 +167,7 @@ async fn excute_non_empty_expr(
state.clone(),
filter.clone(),
table_partition_cols.clone(),
object_store.clone(),
log_store.object_store(),
Some(snapshot.table_config().target_file_size() as usize),
None,
writer_properties,
Expand Down Expand Up @@ -198,13 +197,7 @@ async fn execute(
let mut metrics = DeleteMetrics::default();

let scan_start = Instant::now();
let candidates = find_files(
snapshot,
log_store.object_store().clone(),
&state,
predicate.clone(),
)
.await?;
let candidates = find_files(snapshot, log_store.clone(), &state, predicate.clone()).await?;
metrics.scan_time_ms = Instant::now().duration_since(scan_start).as_micros();

let predicate = predicate.unwrap_or(Expr::Literal(ScalarValue::Boolean(Some(true))));
Expand All @@ -215,7 +208,7 @@ async fn execute(
let write_start = Instant::now();
let add = excute_non_empty_expr(
snapshot,
log_store.object_store().clone(),
log_store.clone(),
&state,
&predicate,
&mut metrics,
Expand Down
2 changes: 1 addition & 1 deletion crates/deltalake-core/src/operations/merge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -590,7 +590,7 @@ async fn execute(
// predicates also need to be considered when pruning

let target = Arc::new(
DeltaScanBuilder::new(snapshot, log_store.object_store(), &state)
DeltaScanBuilder::new(snapshot, log_store.clone(), &state)
.with_schema(snapshot.input_schema()?)
.build()
.await?,
Expand Down
Loading

0 comments on commit 4493f56

Please sign in to comment.