Skip to content

Commit

Permalink
chore: cargo fmt + clippy
Browse files Browse the repository at this point in the history
  • Loading branch information
ion-elgreco committed Jan 8, 2025
1 parent d5458fe commit 29ba0a0
Show file tree
Hide file tree
Showing 12 changed files with 30 additions and 49 deletions.
17 changes: 4 additions & 13 deletions crates/core/src/logstore/default_logstore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,12 @@ use std::sync::{Arc, OnceLock};

use bytes::Bytes;
use object_store::{Attributes, Error as ObjectStoreError, ObjectStore, PutOptions, TagSet};
use url::Url;
use uuid::Uuid;

use super::{CommitOrBytes, LogStore, LogStoreConfig};
use crate::{
operations::transaction::TransactionError,
storage::{
commit_uri_from_version, DefaultObjectStoreRegistry, ObjectStoreRef, ObjectStoreRegistry,
},
storage::{commit_uri_from_version, ObjectStoreRef},
DeltaResult,
};

Expand All @@ -28,7 +25,7 @@ fn put_options() -> &'static PutOptions {
/// Default [`LogStore`] implementation
#[derive(Debug, Clone)]
pub struct DefaultLogStore {
pub(crate) storage: DefaultObjectStoreRegistry,
pub(crate) storage: ObjectStoreRef,
config: LogStoreConfig,
}

Expand All @@ -40,12 +37,7 @@ impl DefaultLogStore {
/// * `storage` - A shared reference to an [`object_store::ObjectStore`] with "/" pointing at delta table root (i.e. where `_delta_log` is located).
/// * `location` - A url corresponding to the storage location of `storage`.
pub fn new(storage: ObjectStoreRef, config: LogStoreConfig) -> Self {
let registry = DefaultObjectStoreRegistry::new();
registry.register_store(&config.location, storage);
Self {
storage: registry,
config,
}
Self { storage, config }
}
}

Expand All @@ -70,7 +62,6 @@ impl LogStore for DefaultLogStore {
commit_or_bytes: CommitOrBytes,
_: Uuid,
) -> Result<(), TransactionError> {
// ADD LAKEFS COMMIT + MERGE HERE, should only
match commit_or_bytes {
CommitOrBytes::LogBytes(log_bytes) => self
.object_store(None)
Expand Down Expand Up @@ -114,7 +105,7 @@ impl LogStore for DefaultLogStore {
}

fn object_store(&self, _: Option<Uuid>) -> Arc<dyn ObjectStore> {
self.storage.get_store(&self.config.location).unwrap()
self.storage.clone()
}

fn reading_object_store(&self) -> Arc<dyn ObjectStore> {
Expand Down
2 changes: 1 addition & 1 deletion crates/core/src/logstore/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ pub trait LogStore: Send + Sync + AsAny {

let (store, _prefix) = entry
.value()
.parse_url_opts(&url, &self.config().options.clone().into())?;
.parse_url_opts(url, &self.config().options.clone())?;
return Ok(store);
}
Err(DeltaTableError::InvalidTableLocation(url.to_string()))
Expand Down
3 changes: 1 addition & 2 deletions crates/core/src/operations/convert_to_delta.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,8 +121,7 @@ impl Default for ConvertToDeltaBuilder {

impl super::Operation<()> for ConvertToDeltaBuilder {
fn get_log_store(&self) -> &LogStoreRef {
&self
.log_store
self.log_store
.as_ref()
.expect("Log store should be available at this stage.")
}
Expand Down
3 changes: 1 addition & 2 deletions crates/core/src/operations/create.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,7 @@ pub struct CreateBuilder {

impl super::Operation<()> for CreateBuilder {
fn get_log_store(&self) -> &LogStoreRef {
&self
.log_store
self.log_store
.as_ref()
.expect("Logstore shouldn't be none at this stage.")
}
Expand Down
1 change: 1 addition & 0 deletions crates/core/src/operations/delete.rs
Original file line number Diff line number Diff line change
Expand Up @@ -304,6 +304,7 @@ async fn excute_non_empty_expr(
Ok(actions)
}

#[allow(clippy::too_many_arguments)]
async fn execute(
predicate: Option<Expr>,
log_store: LogStoreRef,
Expand Down
1 change: 1 addition & 0 deletions crates/core/src/operations/optimize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -690,6 +690,7 @@ impl MergePlan {
}

/// Perform the operations outlined in the plan.
#[allow(clippy::too_many_arguments)]
pub async fn execute(
mut self,
log_store: LogStoreRef,
Expand Down
1 change: 1 addition & 0 deletions crates/core/src/operations/restore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,7 @@ impl RestoreBuilder {
}
}

#[allow(clippy::too_many_arguments)]
async fn execute(
log_store: LogStoreRef,
snapshot: DeltaTableState,
Expand Down
4 changes: 2 additions & 2 deletions crates/core/src/operations/transaction/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -531,9 +531,9 @@ impl<'a> PreCommit<'a> {
}
let log_entry = this.data.get_bytes()?;

// With the DefaultLogStore, we just pass the bytes around, since we use conditionalPuts
// With the DefaultLogStore & LakeFSLogstore, we just pass the bytes around, since we use conditionalPuts
// Other stores will use tmp_commits
let commit_or_bytes = if vec!["LakeFSLogStore", "DefaultLogStore"]
let commit_or_bytes = if ["LakeFSLogStore", "DefaultLogStore"]
.contains(&this.log_store.name().as_str())
{
CommitOrBytes::LogBytes(log_entry)
Expand Down
4 changes: 2 additions & 2 deletions crates/lakefs/src/client.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use dashmap::DashMap;
use deltalake_core::operations::transaction::TransactionError;
use deltalake_core::{DeltaResult, DeltaTableError};
use deltalake_core::DeltaResult;
use reqwest::Client;
use reqwest::StatusCode;
use serde::Deserialize;
Expand Down Expand Up @@ -119,7 +119,7 @@ impl LakeFSClient {
debug!("Deleting LakeFS Branch.");
// Handle the response
match response.status() {
StatusCode::NO_CONTENT => return Ok(()),
StatusCode::NO_CONTENT => Ok(()),
StatusCode::UNAUTHORIZED => Err(LakeFSOperationError::UnauthorizedAction.into()),
_ => {
let error: LakeFSErrorResponse =
Expand Down
9 changes: 4 additions & 5 deletions crates/lakefs/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,8 @@ impl LogStoreFactory for LakeFSLogStoreFactory {
pub fn register_handlers(_additional_prefixes: Option<Url>) {
let object_stores = Arc::new(LakeFSObjectStoreFactory::default());
let log_stores = Arc::new(LakeFSLogStoreFactory::default());
for scheme in ["lakefs"].iter() {
let url = Url::parse(&format!("{}://", scheme)).unwrap();
factories().insert(url.clone(), object_stores.clone());
logstores().insert(url.clone(), log_stores.clone());
}
let scheme = "lakefs";
let url = Url::parse(&format!("{}://", scheme)).unwrap();
factories().insert(url.clone(), object_stores.clone());
logstores().insert(url.clone(), log_stores.clone());
}
6 changes: 2 additions & 4 deletions crates/lakefs/src/logstore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,7 @@ impl LogStore for LakeFSLogStore {
store
.delete(&commit_uri_from_version(version))
.await
.map_err(|err| TransactionError::from(err))?;
.map_err(TransactionError::from)?;
return Err(TransactionError::VersionAlreadyExists(version));
}
Err(err) => Err(err),
Expand Down Expand Up @@ -303,9 +303,7 @@ impl LogStore for LakeFSLogStore {
fn object_store(&self, operation_id: Option<Uuid>) -> Arc<dyn ObjectStore> {
match operation_id {
Some(id) => {
let (_, store) = self.get_transaction_objectstore(id).expect(
&format!("The object_store registry inside LakeFSLogstore didn't have a store for operation_id {} Something went wrong.", id)
);
let (_, store) = self.get_transaction_objectstore(id).unwrap_or_else(|_| panic!("The object_store registry inside LakeFSLogstore didn't have a store for operation_id {} Something went wrong.", id));
store
}
_ => self.reading_object_store(),
Expand Down
28 changes: 10 additions & 18 deletions python/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -310,7 +310,7 @@ impl RawDeltaTable {
/// This will acquire the internal lock since it is a mutating operation!
pub fn load_version(&self, py: Python, version: i64) -> PyResult<()> {
py.allow_threads(|| {
Ok(rt().block_on(async {
rt().block_on(async {
let mut table = self
._table
.lock()
Expand All @@ -320,14 +320,14 @@ impl RawDeltaTable {
.await
.map_err(PythonError::from)
.map_err(PyErr::from)
})?)
})
})
}

/// Retrieve the latest version from the internally loaded table state
pub fn get_latest_version(&self, py: Python) -> PyResult<i64> {
py.allow_threads(|| {
Ok(rt().block_on(async {
rt().block_on(async {
match self._table.lock() {
Ok(table) => table
.get_latest_version()
Expand All @@ -336,13 +336,13 @@ impl RawDeltaTable {
.map_err(PyErr::from),
Err(e) => Err(PyRuntimeError::new_err(e.to_string())),
}
})?)
})
})
}

pub fn get_earliest_version(&self, py: Python) -> PyResult<i64> {
py.allow_threads(|| {
Ok(rt().block_on(async {
rt().block_on(async {
match self._table.lock() {
Ok(table) => table
.get_earliest_version()
Expand All @@ -351,7 +351,7 @@ impl RawDeltaTable {
.map_err(PyErr::from),
Err(e) => Err(PyRuntimeError::new_err(e.to_string())),
}
})?)
})
})
}

Expand Down Expand Up @@ -380,7 +380,7 @@ impl RawDeltaTable {
DateTime::<Utc>::from(DateTime::<FixedOffset>::parse_from_rfc3339(ds).map_err(
|err| PyValueError::new_err(format!("Failed to parse datetime string: {err}")),
)?);
Ok(rt().block_on(async {
rt().block_on(async {
let mut table = self
._table
.lock()
Expand All @@ -390,7 +390,7 @@ impl RawDeltaTable {
.await
.map_err(PythonError::from)
.map_err(PyErr::from)
})?)
})
})
}

Expand Down Expand Up @@ -2116,11 +2116,7 @@ fn create_deltalake(
let mode = mode.parse().map_err(PythonError::from)?;
let schema: StructType = (&schema.0).try_into().map_err(PythonError::from)?;

let use_lakefs_handler = if table.log_store().name() == "LakeFSLogStore" {
true
} else {
false
};
let use_lakefs_handler = table.log_store().name() == "LakeFSLogStore";

let mut builder = DeltaOps(table)
.create()
Expand Down Expand Up @@ -2182,11 +2178,7 @@ fn write_new_deltalake(

let schema: StructType = (&schema.0).try_into().map_err(PythonError::from)?;

let use_lakefs_handler = if table.log_store().name() == "LakeFSLogStore" {
true
} else {
false
};
let use_lakefs_handler = table.log_store().name() == "LakeFSLogStore";

let mut builder = DeltaOps(table)
.create()
Expand Down

0 comments on commit 29ba0a0

Please sign in to comment.