From e3327e67db0ddc44d4bd62fb25366e7a4d9dfb8b Mon Sep 17 00:00:00 2001 From: Ion Koutsouris <15728914+ion-elgreco@users.noreply.github.com> Date: Thu, 9 Jan 2025 16:59:56 +0100 Subject: [PATCH] chore: simplify code --- crates/core/src/logstore/mod.rs | 17 ----------------- crates/core/src/operations/add_column.rs | 8 ++++---- crates/core/src/operations/add_feature.rs | 8 ++++---- crates/core/src/operations/constraints.rs | 6 +++--- .../core/src/operations/convert_to_delta.rs | 16 ++++++++-------- crates/core/src/operations/create.rs | 6 +++--- crates/core/src/operations/delete.rs | 6 +++--- .../core/src/operations/drop_constraints.rs | 8 ++++---- .../core/src/operations/filesystem_check.rs | 10 +++++----- crates/core/src/operations/load.rs | 4 ++-- crates/core/src/operations/merge/mod.rs | 6 +++--- crates/core/src/operations/mod.rs | 12 ++++-------- crates/core/src/operations/optimize.rs | 6 +++--- crates/core/src/operations/restore.rs | 6 +++--- .../core/src/operations/set_tbl_properties.rs | 6 +++--- crates/core/src/operations/transaction/mod.rs | 10 +++++----- crates/core/src/operations/update.rs | 6 +++--- crates/core/src/operations/vacuum.rs | 12 ++++++------ crates/core/src/operations/write.rs | 6 +++--- crates/lakefs/src/logstore.rs | 19 +++++++++++++++++++ 20 files changed, 88 insertions(+), 90 deletions(-) diff --git a/crates/core/src/logstore/mod.rs b/crates/core/src/logstore/mod.rs index 130405a835..74b9c56bc4 100644 --- a/crates/core/src/logstore/mod.rs +++ b/crates/core/src/logstore/mod.rs @@ -203,23 +203,6 @@ pub trait LogStore: Send + Sync + AsAny { Ok(()) } - /// Build a new object store for an URL using the existing storage options - fn build_new_store(&self, url: &Url) -> DeltaResult { - // turn location into scheme - let scheme = Url::parse(&format!("{}://", url.scheme())) - .map_err(|_| DeltaTableError::InvalidTableLocation(url.clone().into()))?; - - if let Some(entry) = crate::storage::factories().get(&scheme) { - debug!("Creating new storage with storage provider for {scheme} ({url})"); - - let (store, _prefix) = entry - .value() - .parse_url_opts(url, &self.config().options.clone())?; - return Ok(store); - } - Err(DeltaTableError::InvalidTableLocation(url.to_string())) - } - /// Read data for commit entry with the given version. async fn read_commit_entry(&self, version: i64) -> DeltaResult>; diff --git a/crates/core/src/operations/add_column.rs b/crates/core/src/operations/add_column.rs index 049c10d36b..2b6d9de7df 100644 --- a/crates/core/src/operations/add_column.rs +++ b/crates/core/src/operations/add_column.rs @@ -29,11 +29,11 @@ pub struct AddColumnBuilder { } impl super::Operation<()> for AddColumnBuilder { - fn get_log_store(&self) -> &LogStoreRef { + fn log_store(&self) -> &LogStoreRef { &self.log_store } - fn get_custom_execute_handler(&self) -> Option<&Arc> { - self.custom_execute_handler.as_ref() + fn get_custom_execute_handler(&self) -> Option> { + self.custom_execute_handler.clone() } } @@ -122,7 +122,7 @@ impl std::future::IntoFuture for AddColumnBuilder { let commit = CommitBuilder::from(this.commit_properties.clone()) .with_actions(actions) .with_operation_id(operation_id) - .with_post_commit_hook_handler(this.get_custom_execute_handler().cloned()) + .with_post_commit_hook_handler(this.get_custom_execute_handler()) .build(Some(&this.snapshot), this.log_store.clone(), operation) .await?; diff --git a/crates/core/src/operations/add_feature.rs b/crates/core/src/operations/add_feature.rs index e5d1cb386e..40ab7fcf42 100644 --- a/crates/core/src/operations/add_feature.rs +++ b/crates/core/src/operations/add_feature.rs @@ -30,11 +30,11 @@ pub struct AddTableFeatureBuilder { } impl super::Operation<()> for AddTableFeatureBuilder { - fn get_log_store(&self) -> &LogStoreRef { + fn log_store(&self) -> &LogStoreRef { &self.log_store } - fn get_custom_execute_handler(&self) -> Option<&Arc> { - self.custom_execute_handler.as_ref() + fn get_custom_execute_handler(&self) -> Option> { + self.custom_execute_handler.clone() } } @@ -134,7 +134,7 @@ impl std::future::IntoFuture for AddTableFeatureBuilder { let commit = CommitBuilder::from(this.commit_properties.clone()) .with_actions(actions) .with_operation_id(operation_id) - .with_post_commit_hook_handler(this.get_custom_execute_handler().cloned()) + .with_post_commit_hook_handler(this.get_custom_execute_handler()) .build(Some(&this.snapshot), this.log_store.clone(), operation) .await?; diff --git a/crates/core/src/operations/constraints.rs b/crates/core/src/operations/constraints.rs index 71503f83b9..479055ab40 100644 --- a/crates/core/src/operations/constraints.rs +++ b/crates/core/src/operations/constraints.rs @@ -44,11 +44,11 @@ pub struct ConstraintBuilder { } impl super::Operation<()> for ConstraintBuilder { - fn get_log_store(&self) -> &LogStoreRef { + fn log_store(&self) -> &LogStoreRef { &self.log_store } - fn get_custom_execute_handler(&self) -> Option<&Arc> { - self.custom_execute_handler.as_ref() + fn get_custom_execute_handler(&self) -> Option> { + self.custom_execute_handler.clone() } } diff --git a/crates/core/src/operations/convert_to_delta.rs b/crates/core/src/operations/convert_to_delta.rs index 0487e7dbe6..915ced6402 100644 --- a/crates/core/src/operations/convert_to_delta.rs +++ b/crates/core/src/operations/convert_to_delta.rs @@ -120,13 +120,13 @@ impl Default for ConvertToDeltaBuilder { } impl super::Operation<()> for ConvertToDeltaBuilder { - fn get_log_store(&self) -> &LogStoreRef { + fn log_store(&self) -> &LogStoreRef { self.log_store .as_ref() .expect("Log store should be available at this stage.") } - fn get_custom_execute_handler(&self) -> Option<&Arc> { - self.custom_execute_handler.as_ref() + fn get_custom_execute_handler(&self) -> Option> { + self.custom_execute_handler.clone() } } @@ -269,16 +269,16 @@ impl ConvertToDeltaBuilder { self.pre_execute(operation_id).await?; // Return an error if the location is already a Delta table location - if self.get_log_store().is_delta_table_location().await? { + if self.log_store().is_delta_table_location().await? { return Err(Error::DeltaTableAlready); } debug!( "Converting Parquet table in log store location: {:?}", - self.get_log_store().root_uri() + self.log_store().root_uri() ); // Get all the parquet files in the location - let object_store = self.get_log_store().object_store(None); + let object_store = self.log_store().object_store(None); let mut files = Vec::new(); object_store .list(None) @@ -421,7 +421,7 @@ impl ConvertToDeltaBuilder { // Generate CreateBuilder with corresponding add actions, schemas and operation meta let mut builder = CreateBuilder::new() - .with_log_store(self.get_log_store().clone()) + .with_log_store(self.log_store().clone()) .with_columns(schema_fields.into_iter().cloned()) .with_partition_columns(partition_columns.into_iter()) .with_actions(actions) @@ -456,7 +456,7 @@ impl std::future::IntoFuture for ConvertToDeltaBuilder { if let Some(handler) = handler { handler - .post_execute(builder.get_log_store(), operation_id) + .post_execute(builder.log_store(), operation_id) .await?; } diff --git a/crates/core/src/operations/create.rs b/crates/core/src/operations/create.rs index 3f93335e56..571ab7dc7c 100644 --- a/crates/core/src/operations/create.rs +++ b/crates/core/src/operations/create.rs @@ -68,13 +68,13 @@ pub struct CreateBuilder { } impl super::Operation<()> for CreateBuilder { - fn get_log_store(&self) -> &LogStoreRef { + fn log_store(&self) -> &LogStoreRef { self.log_store .as_ref() .expect("Logstore shouldn't be none at this stage.") } - fn get_custom_execute_handler(&self) -> Option<&Arc> { - self.custom_execute_handler.as_ref() + fn get_custom_execute_handler(&self) -> Option> { + self.custom_execute_handler.clone() } } diff --git a/crates/core/src/operations/delete.rs b/crates/core/src/operations/delete.rs index d7b38309c0..c00d20a064 100644 --- a/crates/core/src/operations/delete.rs +++ b/crates/core/src/operations/delete.rs @@ -100,11 +100,11 @@ pub struct DeleteMetrics { } impl super::Operation<()> for DeleteBuilder { - fn get_log_store(&self) -> &LogStoreRef { + fn log_store(&self) -> &LogStoreRef { &self.log_store } - fn get_custom_execute_handler(&self) -> Option<&Arc> { - self.custom_execute_handler.as_ref() + fn get_custom_execute_handler(&self) -> Option> { + self.custom_execute_handler.clone() } } diff --git a/crates/core/src/operations/drop_constraints.rs b/crates/core/src/operations/drop_constraints.rs index 5a2e9915d6..56792dad86 100644 --- a/crates/core/src/operations/drop_constraints.rs +++ b/crates/core/src/operations/drop_constraints.rs @@ -29,11 +29,11 @@ pub struct DropConstraintBuilder { } impl super::Operation<()> for DropConstraintBuilder { - fn get_log_store(&self) -> &LogStoreRef { + fn log_store(&self) -> &LogStoreRef { &self.log_store } - fn get_custom_execute_handler(&self) -> Option<&Arc> { - self.custom_execute_handler.as_ref() + fn get_custom_execute_handler(&self) -> Option> { + self.custom_execute_handler.clone() } } @@ -110,7 +110,7 @@ impl std::future::IntoFuture for DropConstraintBuilder { let commit = CommitBuilder::from(this.commit_properties.clone()) .with_operation_id(operation_id) - .with_post_commit_hook_handler(this.get_custom_execute_handler().cloned()) + .with_post_commit_hook_handler(this.get_custom_execute_handler()) .with_actions(actions) .build(Some(&this.snapshot), this.log_store.clone(), operation) .await?; diff --git a/crates/core/src/operations/filesystem_check.rs b/crates/core/src/operations/filesystem_check.rs index 91a25ff767..1e5f9d736e 100644 --- a/crates/core/src/operations/filesystem_check.rs +++ b/crates/core/src/operations/filesystem_check.rs @@ -78,11 +78,11 @@ fn is_absolute_path(path: &str) -> DeltaResult { } impl super::Operation<()> for FileSystemCheckBuilder { - fn get_log_store(&self) -> &LogStoreRef { + fn log_store(&self) -> &LogStoreRef { &self.log_store } - fn get_custom_execute_handler(&self) -> Option<&Arc> { - self.custom_execute_handler.as_ref() + fn get_custom_execute_handler(&self) -> Option> { + self.custom_execute_handler.clone() } } @@ -160,7 +160,7 @@ impl FileSystemCheckPlan { snapshot: &DeltaTableState, mut commit_properties: CommitProperties, operation_id: Uuid, - handle: Option<&Arc>, + handle: Option>, ) -> DeltaResult { let mut actions = Vec::with_capacity(self.files_to_remove.len()); let mut removed_file_paths = Vec::with_capacity(self.files_to_remove.len()); @@ -197,7 +197,7 @@ impl FileSystemCheckPlan { CommitBuilder::from(commit_properties) .with_operation_id(operation_id) - .with_post_commit_hook_handler(handle.cloned()) + .with_post_commit_hook_handler(handle) .with_actions(actions) .build( Some(snapshot), diff --git a/crates/core/src/operations/load.rs b/crates/core/src/operations/load.rs index 30f60a9b28..cf7f0c93ca 100644 --- a/crates/core/src/operations/load.rs +++ b/crates/core/src/operations/load.rs @@ -25,10 +25,10 @@ pub struct LoadBuilder { } impl super::Operation<()> for LoadBuilder { - fn get_log_store(&self) -> &LogStoreRef { + fn log_store(&self) -> &LogStoreRef { &self.log_store } - fn get_custom_execute_handler(&self) -> Option<&Arc> { + fn get_custom_execute_handler(&self) -> Option> { unimplemented!("Not required in loadBuilder for now.") } } diff --git a/crates/core/src/operations/merge/mod.rs b/crates/core/src/operations/merge/mod.rs index cc104cd8ff..8a4640b9a3 100644 --- a/crates/core/src/operations/merge/mod.rs +++ b/crates/core/src/operations/merge/mod.rs @@ -141,11 +141,11 @@ pub struct MergeBuilder { } impl super::Operation<()> for MergeBuilder { - fn get_log_store(&self) -> &LogStoreRef { + fn log_store(&self) -> &LogStoreRef { &self.log_store } - fn get_custom_execute_handler(&self) -> Option<&Arc> { - self.custom_execute_handler.as_ref() + fn get_custom_execute_handler(&self) -> Option> { + self.custom_execute_handler.clone() } } diff --git a/crates/core/src/operations/mod.rs b/crates/core/src/operations/mod.rs index 34f7536106..948bb9447d 100644 --- a/crates/core/src/operations/mod.rs +++ b/crates/core/src/operations/mod.rs @@ -95,13 +95,11 @@ pub trait CustomExecuteHandler: Send + Sync { /// The [Operation] trait defines common behaviors that all operations builders /// should have consistent pub(crate) trait Operation: std::future::IntoFuture { - fn get_log_store(&self) -> &LogStoreRef; - fn get_custom_execute_handler(&self) -> Option<&Arc>; + fn log_store(&self) -> &LogStoreRef; + fn get_custom_execute_handler(&self) -> Option>; async fn pre_execute(&self, operation_id: Uuid) -> DeltaResult<()> { if let Some(handler) = self.get_custom_execute_handler() { - handler - .pre_execute(self.get_log_store(), operation_id) - .await + handler.pre_execute(self.log_store(), operation_id).await } else { Ok(()) } @@ -109,9 +107,7 @@ pub(crate) trait Operation: std::future::IntoFuture { async fn post_execute(&self, operation_id: Uuid) -> DeltaResult<()> { if let Some(handler) = self.get_custom_execute_handler() { - handler - .post_execute(self.get_log_store(), operation_id) - .await + handler.post_execute(self.log_store(), operation_id).await } else { Ok(()) } diff --git a/crates/core/src/operations/optimize.rs b/crates/core/src/operations/optimize.rs index bab442f894..5947aea906 100644 --- a/crates/core/src/operations/optimize.rs +++ b/crates/core/src/operations/optimize.rs @@ -216,11 +216,11 @@ pub struct OptimizeBuilder<'a> { } impl super::Operation<()> for OptimizeBuilder<'_> { - fn get_log_store(&self) -> &LogStoreRef { + fn log_store(&self) -> &LogStoreRef { &self.log_store } - fn get_custom_execute_handler(&self) -> Option<&Arc> { - self.custom_execute_handler.as_ref() + fn get_custom_execute_handler(&self) -> Option> { + self.custom_execute_handler.clone() } } diff --git a/crates/core/src/operations/restore.rs b/crates/core/src/operations/restore.rs index 734209baf8..af90851985 100644 --- a/crates/core/src/operations/restore.rs +++ b/crates/core/src/operations/restore.rs @@ -94,11 +94,11 @@ pub struct RestoreBuilder { } impl super::Operation<()> for RestoreBuilder { - fn get_log_store(&self) -> &LogStoreRef { + fn log_store(&self) -> &LogStoreRef { &self.log_store } - fn get_custom_execute_handler(&self) -> Option<&Arc> { - self.custom_execute_handler.as_ref() + fn get_custom_execute_handler(&self) -> Option> { + self.custom_execute_handler.clone() } } diff --git a/crates/core/src/operations/set_tbl_properties.rs b/crates/core/src/operations/set_tbl_properties.rs index 4ff01e371f..3e17f8c07c 100644 --- a/crates/core/src/operations/set_tbl_properties.rs +++ b/crates/core/src/operations/set_tbl_properties.rs @@ -30,11 +30,11 @@ pub struct SetTablePropertiesBuilder { } impl super::Operation<()> for SetTablePropertiesBuilder { - fn get_log_store(&self) -> &LogStoreRef { + fn log_store(&self) -> &LogStoreRef { &self.log_store } - fn get_custom_execute_handler(&self) -> Option<&Arc> { - self.custom_execute_handler.as_ref() + fn get_custom_execute_handler(&self) -> Option> { + self.custom_execute_handler.clone() } } diff --git a/crates/core/src/operations/transaction/mod.rs b/crates/core/src/operations/transaction/mod.rs index 04e0357959..d98b707e43 100644 --- a/crates/core/src/operations/transaction/mod.rs +++ b/crates/core/src/operations/transaction/mod.rs @@ -599,7 +599,7 @@ impl<'a> std::future::IntoFuture for PreparedCommit<'a> { cleanup_expired_logs: None, log_store: this.log_store, table_data: this.table_data, - customer_execute_handler: this.post_commit_hook_handler, + custom_execute_handler: this.post_commit_hook_handler, }); } @@ -668,7 +668,7 @@ impl<'a> std::future::IntoFuture for PreparedCommit<'a> { .unwrap_or_default(), log_store: this.log_store, table_data: this.table_data, - customer_execute_handler: this.post_commit_hook_handler, + custom_execute_handler: this.post_commit_hook_handler, }); } Err(TransactionError::VersionAlreadyExists(version)) => { @@ -701,7 +701,7 @@ pub struct PostCommit<'a> { cleanup_expired_logs: Option, log_store: LogStoreRef, table_data: Option<&'a dyn TableReference>, - customer_execute_handler: Option>, + custom_execute_handler: Option>, } impl PostCommit<'_> { @@ -729,7 +729,7 @@ impl PostCommit<'_> { }; // Run arbitrary before_post_commit_hook code - if let Some(custom_execute_handler) = &self.customer_execute_handler { + if let Some(custom_execute_handler) = &self.custom_execute_handler { custom_execute_handler .before_post_commit_hook( &self.log_store, @@ -762,7 +762,7 @@ impl PostCommit<'_> { } // Run arbitrary after_post_commit_hook code - if let Some(custom_execute_handler) = &self.customer_execute_handler { + if let Some(custom_execute_handler) = &self.custom_execute_handler { custom_execute_handler .after_post_commit_hook( &self.log_store, diff --git a/crates/core/src/operations/update.rs b/crates/core/src/operations/update.rs index ba4dbdefae..61623c8215 100644 --- a/crates/core/src/operations/update.rs +++ b/crates/core/src/operations/update.rs @@ -120,11 +120,11 @@ pub struct UpdateMetrics { } impl super::Operation<()> for UpdateBuilder { - fn get_log_store(&self) -> &LogStoreRef { + fn log_store(&self) -> &LogStoreRef { &self.log_store } - fn get_custom_execute_handler(&self) -> Option<&Arc> { - self.custom_execute_handler.as_ref() + fn get_custom_execute_handler(&self) -> Option> { + self.custom_execute_handler.clone() } } diff --git a/crates/core/src/operations/vacuum.rs b/crates/core/src/operations/vacuum.rs index e5471b6512..08caf10873 100644 --- a/crates/core/src/operations/vacuum.rs +++ b/crates/core/src/operations/vacuum.rs @@ -97,11 +97,11 @@ pub struct VacuumBuilder { } impl super::Operation<()> for VacuumBuilder { - fn get_log_store(&self) -> &LogStoreRef { + fn log_store(&self) -> &LogStoreRef { &self.log_store } - fn get_custom_execute_handler(&self) -> Option<&Arc> { - self.custom_execute_handler.as_ref() + fn get_custom_execute_handler(&self) -> Option> { + self.custom_execute_handler.clone() } } @@ -314,7 +314,7 @@ impl VacuumPlan { snapshot: &DeltaTableState, mut commit_properties: CommitProperties, operation_id: uuid::Uuid, - handle: Option<&Arc>, + handle: Option>, ) -> Result { if self.files_to_delete.is_empty() { return Ok(VacuumMetrics { @@ -348,7 +348,7 @@ impl VacuumPlan { CommitBuilder::from(start_props) .with_operation_id(operation_id) - .with_post_commit_hook_handler(handle.cloned()) + .with_post_commit_hook_handler(handle.clone()) .build(Some(snapshot), store.clone(), start_operation) .await?; // Finish VACUUM START COMMIT @@ -381,7 +381,7 @@ impl VacuumPlan { ); CommitBuilder::from(commit_properties) .with_operation_id(operation_id) - .with_post_commit_hook_handler(handle.cloned()) + .with_post_commit_hook_handler(handle) .build(Some(snapshot), store.clone(), end_operation) .await?; // Finish VACUUM END COMMIT diff --git a/crates/core/src/operations/write.rs b/crates/core/src/operations/write.rs index 9bcc976cfb..4d73b0d6e5 100644 --- a/crates/core/src/operations/write.rs +++ b/crates/core/src/operations/write.rs @@ -183,11 +183,11 @@ pub struct WriteMetrics { } impl super::Operation<()> for WriteBuilder { - fn get_log_store(&self) -> &LogStoreRef { + fn log_store(&self) -> &LogStoreRef { &self.log_store } - fn get_custom_execute_handler(&self) -> Option<&Arc> { - self.custom_execute_handler.as_ref() + fn get_custom_execute_handler(&self) -> Option> { + self.custom_execute_handler.clone() } } diff --git a/crates/lakefs/src/logstore.rs b/crates/lakefs/src/logstore.rs index 313c69a450..5de17441bf 100644 --- a/crates/lakefs/src/logstore.rs +++ b/crates/lakefs/src/logstore.rs @@ -18,6 +18,7 @@ use deltalake_core::{ DeltaResult, }; use object_store::{Attributes, Error as ObjectStoreError, ObjectStore, PutOptions, TagSet}; +use tracing::debug; use url::Url; use uuid::Uuid; @@ -79,6 +80,24 @@ impl LakeFSLogStore { } } + /// Build a new object store for an URL using the existing storage options. After + /// branch creation a new object store needs to be created for the branch uri + fn build_new_store(&self, url: &Url) -> DeltaResult { + // turn location into scheme + let scheme = Url::parse(&format!("{}://", url.scheme())) + .map_err(|_| DeltaTableError::InvalidTableLocation(url.clone().into()))?; + + if let Some(entry) = deltalake_core::storage::factories().get(&scheme) { + debug!("Creating new storage with storage provider for {scheme} ({url})"); + + let (store, _prefix) = entry + .value() + .parse_url_opts(url, &self.config().options.clone())?; + return Ok(store); + } + Err(DeltaTableError::InvalidTableLocation(url.to_string())) + } + fn register_object_store(&self, url: &Url, store: ObjectStoreRef) { self.storage.register_store(url, store); }