Skip to content

Commit

Permalink
chore: simplify code
Browse files Browse the repository at this point in the history
  • Loading branch information
ion-elgreco committed Jan 9, 2025
1 parent c602b1d commit e3327e6
Show file tree
Hide file tree
Showing 20 changed files with 88 additions and 90 deletions.
17 changes: 0 additions & 17 deletions crates/core/src/logstore/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -203,23 +203,6 @@ pub trait LogStore: Send + Sync + AsAny {
Ok(())
}

/// Build a new object store for an URL using the existing storage options
fn build_new_store(&self, url: &Url) -> DeltaResult<ObjectStoreRef> {
// turn location into scheme
let scheme = Url::parse(&format!("{}://", url.scheme()))
.map_err(|_| DeltaTableError::InvalidTableLocation(url.clone().into()))?;

if let Some(entry) = crate::storage::factories().get(&scheme) {
debug!("Creating new storage with storage provider for {scheme} ({url})");

let (store, _prefix) = entry
.value()
.parse_url_opts(url, &self.config().options.clone())?;
return Ok(store);
}
Err(DeltaTableError::InvalidTableLocation(url.to_string()))
}

/// Read data for commit entry with the given version.
async fn read_commit_entry(&self, version: i64) -> DeltaResult<Option<Bytes>>;

Expand Down
8 changes: 4 additions & 4 deletions crates/core/src/operations/add_column.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,11 @@ pub struct AddColumnBuilder {
}

impl super::Operation<()> for AddColumnBuilder {
fn get_log_store(&self) -> &LogStoreRef {
fn log_store(&self) -> &LogStoreRef {
&self.log_store
}
fn get_custom_execute_handler(&self) -> Option<&Arc<dyn CustomExecuteHandler>> {
self.custom_execute_handler.as_ref()
fn get_custom_execute_handler(&self) -> Option<Arc<dyn CustomExecuteHandler>> {
self.custom_execute_handler.clone()
}
}

Expand Down Expand Up @@ -122,7 +122,7 @@ impl std::future::IntoFuture for AddColumnBuilder {
let commit = CommitBuilder::from(this.commit_properties.clone())
.with_actions(actions)
.with_operation_id(operation_id)
.with_post_commit_hook_handler(this.get_custom_execute_handler().cloned())
.with_post_commit_hook_handler(this.get_custom_execute_handler())
.build(Some(&this.snapshot), this.log_store.clone(), operation)
.await?;

Expand Down
8 changes: 4 additions & 4 deletions crates/core/src/operations/add_feature.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,11 @@ pub struct AddTableFeatureBuilder {
}

impl super::Operation<()> for AddTableFeatureBuilder {
fn get_log_store(&self) -> &LogStoreRef {
fn log_store(&self) -> &LogStoreRef {
&self.log_store
}
fn get_custom_execute_handler(&self) -> Option<&Arc<dyn CustomExecuteHandler>> {
self.custom_execute_handler.as_ref()
fn get_custom_execute_handler(&self) -> Option<Arc<dyn CustomExecuteHandler>> {
self.custom_execute_handler.clone()
}
}

Expand Down Expand Up @@ -134,7 +134,7 @@ impl std::future::IntoFuture for AddTableFeatureBuilder {
let commit = CommitBuilder::from(this.commit_properties.clone())
.with_actions(actions)
.with_operation_id(operation_id)
.with_post_commit_hook_handler(this.get_custom_execute_handler().cloned())
.with_post_commit_hook_handler(this.get_custom_execute_handler())
.build(Some(&this.snapshot), this.log_store.clone(), operation)
.await?;

Expand Down
6 changes: 3 additions & 3 deletions crates/core/src/operations/constraints.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,11 @@ pub struct ConstraintBuilder {
}

impl super::Operation<()> for ConstraintBuilder {
fn get_log_store(&self) -> &LogStoreRef {
fn log_store(&self) -> &LogStoreRef {
&self.log_store
}
fn get_custom_execute_handler(&self) -> Option<&Arc<dyn CustomExecuteHandler>> {
self.custom_execute_handler.as_ref()
fn get_custom_execute_handler(&self) -> Option<Arc<dyn CustomExecuteHandler>> {
self.custom_execute_handler.clone()
}
}

Expand Down
16 changes: 8 additions & 8 deletions crates/core/src/operations/convert_to_delta.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,13 +120,13 @@ impl Default for ConvertToDeltaBuilder {
}

impl super::Operation<()> for ConvertToDeltaBuilder {
fn get_log_store(&self) -> &LogStoreRef {
fn log_store(&self) -> &LogStoreRef {
self.log_store
.as_ref()
.expect("Log store should be available at this stage.")
}
fn get_custom_execute_handler(&self) -> Option<&Arc<dyn CustomExecuteHandler>> {
self.custom_execute_handler.as_ref()
fn get_custom_execute_handler(&self) -> Option<Arc<dyn CustomExecuteHandler>> {
self.custom_execute_handler.clone()
}
}

Expand Down Expand Up @@ -269,16 +269,16 @@ impl ConvertToDeltaBuilder {
self.pre_execute(operation_id).await?;

// Return an error if the location is already a Delta table location
if self.get_log_store().is_delta_table_location().await? {
if self.log_store().is_delta_table_location().await? {
return Err(Error::DeltaTableAlready);
}
debug!(
"Converting Parquet table in log store location: {:?}",
self.get_log_store().root_uri()
self.log_store().root_uri()
);

// Get all the parquet files in the location
let object_store = self.get_log_store().object_store(None);
let object_store = self.log_store().object_store(None);
let mut files = Vec::new();
object_store
.list(None)
Expand Down Expand Up @@ -421,7 +421,7 @@ impl ConvertToDeltaBuilder {

// Generate CreateBuilder with corresponding add actions, schemas and operation meta
let mut builder = CreateBuilder::new()
.with_log_store(self.get_log_store().clone())
.with_log_store(self.log_store().clone())
.with_columns(schema_fields.into_iter().cloned())
.with_partition_columns(partition_columns.into_iter())
.with_actions(actions)
Expand Down Expand Up @@ -456,7 +456,7 @@ impl std::future::IntoFuture for ConvertToDeltaBuilder {

if let Some(handler) = handler {
handler
.post_execute(builder.get_log_store(), operation_id)
.post_execute(builder.log_store(), operation_id)
.await?;
}

Expand Down
6 changes: 3 additions & 3 deletions crates/core/src/operations/create.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,13 +68,13 @@ pub struct CreateBuilder {
}

impl super::Operation<()> for CreateBuilder {
fn get_log_store(&self) -> &LogStoreRef {
fn log_store(&self) -> &LogStoreRef {
self.log_store
.as_ref()
.expect("Logstore shouldn't be none at this stage.")
}
fn get_custom_execute_handler(&self) -> Option<&Arc<dyn CustomExecuteHandler>> {
self.custom_execute_handler.as_ref()
fn get_custom_execute_handler(&self) -> Option<Arc<dyn CustomExecuteHandler>> {
self.custom_execute_handler.clone()
}
}

Expand Down
6 changes: 3 additions & 3 deletions crates/core/src/operations/delete.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,11 +100,11 @@ pub struct DeleteMetrics {
}

impl super::Operation<()> for DeleteBuilder {
fn get_log_store(&self) -> &LogStoreRef {
fn log_store(&self) -> &LogStoreRef {
&self.log_store
}
fn get_custom_execute_handler(&self) -> Option<&Arc<dyn CustomExecuteHandler>> {
self.custom_execute_handler.as_ref()
fn get_custom_execute_handler(&self) -> Option<Arc<dyn CustomExecuteHandler>> {
self.custom_execute_handler.clone()
}
}

Expand Down
8 changes: 4 additions & 4 deletions crates/core/src/operations/drop_constraints.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,11 @@ pub struct DropConstraintBuilder {
}

impl super::Operation<()> for DropConstraintBuilder {
fn get_log_store(&self) -> &LogStoreRef {
fn log_store(&self) -> &LogStoreRef {
&self.log_store
}
fn get_custom_execute_handler(&self) -> Option<&Arc<dyn CustomExecuteHandler>> {
self.custom_execute_handler.as_ref()
fn get_custom_execute_handler(&self) -> Option<Arc<dyn CustomExecuteHandler>> {
self.custom_execute_handler.clone()
}
}

Expand Down Expand Up @@ -110,7 +110,7 @@ impl std::future::IntoFuture for DropConstraintBuilder {

let commit = CommitBuilder::from(this.commit_properties.clone())
.with_operation_id(operation_id)
.with_post_commit_hook_handler(this.get_custom_execute_handler().cloned())
.with_post_commit_hook_handler(this.get_custom_execute_handler())
.with_actions(actions)
.build(Some(&this.snapshot), this.log_store.clone(), operation)
.await?;
Expand Down
10 changes: 5 additions & 5 deletions crates/core/src/operations/filesystem_check.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,11 +78,11 @@ fn is_absolute_path(path: &str) -> DeltaResult<bool> {
}

impl super::Operation<()> for FileSystemCheckBuilder {
fn get_log_store(&self) -> &LogStoreRef {
fn log_store(&self) -> &LogStoreRef {
&self.log_store
}
fn get_custom_execute_handler(&self) -> Option<&Arc<dyn CustomExecuteHandler>> {
self.custom_execute_handler.as_ref()
fn get_custom_execute_handler(&self) -> Option<Arc<dyn CustomExecuteHandler>> {
self.custom_execute_handler.clone()
}
}

Expand Down Expand Up @@ -160,7 +160,7 @@ impl FileSystemCheckPlan {
snapshot: &DeltaTableState,
mut commit_properties: CommitProperties,
operation_id: Uuid,
handle: Option<&Arc<dyn CustomExecuteHandler>>,
handle: Option<Arc<dyn CustomExecuteHandler>>,
) -> DeltaResult<FileSystemCheckMetrics> {
let mut actions = Vec::with_capacity(self.files_to_remove.len());
let mut removed_file_paths = Vec::with_capacity(self.files_to_remove.len());
Expand Down Expand Up @@ -197,7 +197,7 @@ impl FileSystemCheckPlan {

CommitBuilder::from(commit_properties)
.with_operation_id(operation_id)
.with_post_commit_hook_handler(handle.cloned())
.with_post_commit_hook_handler(handle)
.with_actions(actions)
.build(
Some(snapshot),
Expand Down
4 changes: 2 additions & 2 deletions crates/core/src/operations/load.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,10 @@ pub struct LoadBuilder {
}

impl super::Operation<()> for LoadBuilder {
fn get_log_store(&self) -> &LogStoreRef {
fn log_store(&self) -> &LogStoreRef {
&self.log_store
}
fn get_custom_execute_handler(&self) -> Option<&Arc<dyn CustomExecuteHandler>> {
fn get_custom_execute_handler(&self) -> Option<Arc<dyn CustomExecuteHandler>> {
unimplemented!("Not required in loadBuilder for now.")
}
}
Expand Down
6 changes: 3 additions & 3 deletions crates/core/src/operations/merge/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -141,11 +141,11 @@ pub struct MergeBuilder {
}

impl super::Operation<()> for MergeBuilder {
fn get_log_store(&self) -> &LogStoreRef {
fn log_store(&self) -> &LogStoreRef {
&self.log_store
}
fn get_custom_execute_handler(&self) -> Option<&Arc<dyn CustomExecuteHandler>> {
self.custom_execute_handler.as_ref()
fn get_custom_execute_handler(&self) -> Option<Arc<dyn CustomExecuteHandler>> {
self.custom_execute_handler.clone()
}
}

Expand Down
12 changes: 4 additions & 8 deletions crates/core/src/operations/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,23 +95,19 @@ pub trait CustomExecuteHandler: Send + Sync {
/// The [Operation] trait defines common behaviors that all operations builders
/// should have consistent
pub(crate) trait Operation<State>: std::future::IntoFuture {
fn get_log_store(&self) -> &LogStoreRef;
fn get_custom_execute_handler(&self) -> Option<&Arc<dyn CustomExecuteHandler>>;
fn log_store(&self) -> &LogStoreRef;
fn get_custom_execute_handler(&self) -> Option<Arc<dyn CustomExecuteHandler>>;
async fn pre_execute(&self, operation_id: Uuid) -> DeltaResult<()> {
if let Some(handler) = self.get_custom_execute_handler() {
handler
.pre_execute(self.get_log_store(), operation_id)
.await
handler.pre_execute(self.log_store(), operation_id).await
} else {
Ok(())
}
}

async fn post_execute(&self, operation_id: Uuid) -> DeltaResult<()> {
if let Some(handler) = self.get_custom_execute_handler() {
handler
.post_execute(self.get_log_store(), operation_id)
.await
handler.post_execute(self.log_store(), operation_id).await
} else {
Ok(())
}
Expand Down
6 changes: 3 additions & 3 deletions crates/core/src/operations/optimize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -216,11 +216,11 @@ pub struct OptimizeBuilder<'a> {
}

impl super::Operation<()> for OptimizeBuilder<'_> {
fn get_log_store(&self) -> &LogStoreRef {
fn log_store(&self) -> &LogStoreRef {
&self.log_store
}
fn get_custom_execute_handler(&self) -> Option<&Arc<dyn CustomExecuteHandler>> {
self.custom_execute_handler.as_ref()
fn get_custom_execute_handler(&self) -> Option<Arc<dyn CustomExecuteHandler>> {
self.custom_execute_handler.clone()
}
}

Expand Down
6 changes: 3 additions & 3 deletions crates/core/src/operations/restore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,11 +94,11 @@ pub struct RestoreBuilder {
}

impl super::Operation<()> for RestoreBuilder {
fn get_log_store(&self) -> &LogStoreRef {
fn log_store(&self) -> &LogStoreRef {
&self.log_store
}
fn get_custom_execute_handler(&self) -> Option<&Arc<dyn CustomExecuteHandler>> {
self.custom_execute_handler.as_ref()
fn get_custom_execute_handler(&self) -> Option<Arc<dyn CustomExecuteHandler>> {
self.custom_execute_handler.clone()
}
}

Expand Down
6 changes: 3 additions & 3 deletions crates/core/src/operations/set_tbl_properties.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,11 @@ pub struct SetTablePropertiesBuilder {
}

impl super::Operation<()> for SetTablePropertiesBuilder {
fn get_log_store(&self) -> &LogStoreRef {
fn log_store(&self) -> &LogStoreRef {
&self.log_store
}
fn get_custom_execute_handler(&self) -> Option<&Arc<dyn CustomExecuteHandler>> {
self.custom_execute_handler.as_ref()
fn get_custom_execute_handler(&self) -> Option<Arc<dyn CustomExecuteHandler>> {
self.custom_execute_handler.clone()
}
}

Expand Down
10 changes: 5 additions & 5 deletions crates/core/src/operations/transaction/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -599,7 +599,7 @@ impl<'a> std::future::IntoFuture for PreparedCommit<'a> {
cleanup_expired_logs: None,
log_store: this.log_store,
table_data: this.table_data,
customer_execute_handler: this.post_commit_hook_handler,
custom_execute_handler: this.post_commit_hook_handler,
});
}

Expand Down Expand Up @@ -668,7 +668,7 @@ impl<'a> std::future::IntoFuture for PreparedCommit<'a> {
.unwrap_or_default(),
log_store: this.log_store,
table_data: this.table_data,
customer_execute_handler: this.post_commit_hook_handler,
custom_execute_handler: this.post_commit_hook_handler,
});
}
Err(TransactionError::VersionAlreadyExists(version)) => {
Expand Down Expand Up @@ -701,7 +701,7 @@ pub struct PostCommit<'a> {
cleanup_expired_logs: Option<bool>,
log_store: LogStoreRef,
table_data: Option<&'a dyn TableReference>,
customer_execute_handler: Option<Arc<dyn CustomExecuteHandler>>,
custom_execute_handler: Option<Arc<dyn CustomExecuteHandler>>,
}

impl PostCommit<'_> {
Expand Down Expand Up @@ -729,7 +729,7 @@ impl PostCommit<'_> {
};

// Run arbitrary before_post_commit_hook code
if let Some(custom_execute_handler) = &self.customer_execute_handler {
if let Some(custom_execute_handler) = &self.custom_execute_handler {
custom_execute_handler
.before_post_commit_hook(
&self.log_store,
Expand Down Expand Up @@ -762,7 +762,7 @@ impl PostCommit<'_> {
}

// Run arbitrary after_post_commit_hook code
if let Some(custom_execute_handler) = &self.customer_execute_handler {
if let Some(custom_execute_handler) = &self.custom_execute_handler {
custom_execute_handler
.after_post_commit_hook(
&self.log_store,
Expand Down
6 changes: 3 additions & 3 deletions crates/core/src/operations/update.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,11 +120,11 @@ pub struct UpdateMetrics {
}

impl super::Operation<()> for UpdateBuilder {
fn get_log_store(&self) -> &LogStoreRef {
fn log_store(&self) -> &LogStoreRef {
&self.log_store
}
fn get_custom_execute_handler(&self) -> Option<&Arc<dyn CustomExecuteHandler>> {
self.custom_execute_handler.as_ref()
fn get_custom_execute_handler(&self) -> Option<Arc<dyn CustomExecuteHandler>> {
self.custom_execute_handler.clone()
}
}

Expand Down
Loading

0 comments on commit e3327e6

Please sign in to comment.