Skip to content

Commit

Permalink
feat: allow custom IO runtime
Browse files Browse the repository at this point in the history
  • Loading branch information
ion-elgreco committed Aug 19, 2024
1 parent 25905c9 commit 5a949e9
Show file tree
Hide file tree
Showing 11 changed files with 414 additions and 112 deletions.
41 changes: 23 additions & 18 deletions crates/aws/src/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,26 +78,31 @@ impl ObjectStoreFactory for S3ObjectStoreFactory {
Some((s3_key, value.clone()))
}),
)?;
let store = aws_storage_handler(limit_store_handler(inner, &options), &options)?;

let store = limit_store_handler(inner, &options);

// If the copy-if-not-exists env var is set, we don't need to instantiate a locking client or check for allow-unsafe-rename.
if options
.0
.contains_key(AmazonS3ConfigKey::CopyIfNotExists.as_ref())
{
Ok((store, prefix))
} else {
let s3_options = S3StorageOptions::from_map(&storage_options.0)?;

let store = S3StorageBackend::try_new(
store,
Some("dynamodb") == s3_options.locking_provider.as_deref()
|| s3_options.allow_unsafe_rename,
)?;
Ok((store, prefix))
}
}

Ok((Arc::new(store), prefix))
}
fn aws_storage_handler(
store: ObjectStoreRef,
options: &StorageOptions,
) -> DeltaResult<ObjectStoreRef> {
// If the copy-if-not-exists env var is set, we don't need to instantiate a locking client or check for allow-unsafe-rename.
if options
.0
.contains_key(AmazonS3ConfigKey::CopyIfNotExists.as_ref())
{
Ok(store)
} else {
let s3_options = S3StorageOptions::from_map(&options.0)?;

let store = S3StorageBackend::try_new(
store,
Some("dynamodb") == s3_options.locking_provider.as_deref()
|| s3_options.allow_unsafe_rename,
)?;
Ok(Arc::new(store))
}
}

Expand Down
2 changes: 1 addition & 1 deletion crates/core/src/kernel/snapshot/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -657,7 +657,7 @@ impl EagerSnapshot {
}
}

fn stats_schema<'a>(schema: &StructType, config: TableConfig<'a>) -> DeltaResult<StructType> {
fn stats_schema(schema: &StructType, config: TableConfig<'_>) -> DeltaResult<StructType> {
let stats_fields = if let Some(stats_cols) = config.stats_columns() {
stats_cols
.iter()
Expand Down
4 changes: 1 addition & 3 deletions crates/core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,9 +92,7 @@ pub use self::data_catalog::{DataCatalog, DataCatalogError};
pub use self::errors::*;
pub use self::schema::partitions::*;
pub use self::schema::*;
pub use self::table::builder::{
DeltaTableBuilder, DeltaTableConfig, DeltaTableLoadOptions, DeltaVersion,
};
pub use self::table::builder::{DeltaTableBuilder, DeltaTableConfig, DeltaVersion};
pub use self::table::config::DeltaConfigKey;
pub use self::table::DeltaTable;
pub use object_store::{path::Path, Error as ObjectStoreError, ObjectMeta, ObjectStore};
Expand Down
29 changes: 24 additions & 5 deletions crates/core/src/logstore/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,12 @@ use url::Url;
use crate::kernel::Action;
use crate::operations::transaction::TransactionError;
use crate::protocol::{get_last_checkpoint, ProtocolError};
use crate::storage::DeltaIOStorageBackend;
use crate::storage::{
commit_uri_from_version, retry_ext::ObjectStoreRetryExt, ObjectStoreRef, StorageOptions,
commit_uri_from_version, retry_ext::ObjectStoreRetryExt, IORuntime, ObjectStoreRef,
StorageOptions,
};

use crate::{DeltaResult, DeltaTableError};

#[cfg(feature = "datafusion")]
Expand Down Expand Up @@ -98,22 +101,24 @@ lazy_static! {
/// # use std::collections::HashMap;
/// # use url::Url;
/// let location = Url::parse("memory:///").expect("Failed to make location");
/// let logstore = logstore_for(location, HashMap::new()).expect("Failed to get a logstore");
/// let logstore = logstore_for(location, HashMap::new(), None).expect("Failed to get a logstore");
/// ```
pub fn logstore_for(
location: Url,
options: impl Into<StorageOptions> + Clone,
io_runtime: Option<IORuntime>,
) -> DeltaResult<LogStoreRef> {
// turn location into scheme
let scheme = Url::parse(&format!("{}://", location.scheme()))
.map_err(|_| DeltaTableError::InvalidTableLocation(location.clone().into()))?;

if let Some(entry) = crate::storage::factories().get(&scheme) {
debug!("Found a storage provider for {scheme} ({location})");

let (store, _prefix) = entry
.value()
.parse_url_opts(&location, &options.clone().into())?;
return logstore_with(store, location, options);
return logstore_with(store, location, options, io_runtime);
}
Err(DeltaTableError::InvalidTableLocation(location.into()))
}
Expand All @@ -123,10 +128,17 @@ pub fn logstore_with(
store: ObjectStoreRef,
location: Url,
options: impl Into<StorageOptions> + Clone,
io_runtime: Option<IORuntime>,
) -> DeltaResult<LogStoreRef> {
let scheme = Url::parse(&format!("{}://", location.scheme()))
.map_err(|_| DeltaTableError::InvalidTableLocation(location.clone().into()))?;

let store = if let Some(io_runtime) = io_runtime {
Arc::new(DeltaIOStorageBackend::new(store, io_runtime.get_rt())) as ObjectStoreRef
} else {
store
};

if let Some(factory) = logstores().get(&scheme) {
debug!("Found a logstore provider for {scheme}");
return factory.with_options(store, &location, &options.into());
Expand Down Expand Up @@ -471,14 +483,21 @@ mod tests {
#[test]
fn logstore_with_invalid_url() {
let location = Url::parse("nonexistent://table").unwrap();
let store = logstore_for(location, HashMap::default());
let store = logstore_for(location, HashMap::default(), None);
assert!(store.is_err());
}

#[test]
fn logstore_with_memory() {
let location = Url::parse("memory://table").unwrap();
let store = logstore_for(location, HashMap::default());
let store = logstore_for(location, HashMap::default(), None);
assert!(store.is_ok());
}

#[test]
fn logstore_with_memory_and_rt() {
let location = Url::parse("memory://table").unwrap();
let store = logstore_for(location, HashMap::default(), Some(IORuntime::default()));
assert!(store.is_ok());
}
}
Expand Down
3 changes: 2 additions & 1 deletion crates/core/src/operations/convert_to_delta.rs
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,7 @@ impl ConvertToDeltaBuilder {
crate::logstore::logstore_for(
ensure_table_uri(location)?,
self.storage_options.unwrap_or_default(),
None, // TODO: allow runtime to be passed into builder
)?
} else {
return Err(Error::MissingLocation);
Expand Down Expand Up @@ -477,7 +478,7 @@ mod tests {
fn log_store(path: impl Into<String>) -> LogStoreRef {
let path: String = path.into();
let location = ensure_table_uri(path).expect("Failed to get the URI from the path");
crate::logstore::logstore_for(location, StorageOptions::default())
crate::logstore::logstore_for(location, StorageOptions::default(), None)
.expect("Failed to create an object store")
}

Expand Down
1 change: 1 addition & 0 deletions crates/core/src/operations/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -369,6 +369,7 @@ impl PartitionWriter {

// write file to object store
self.object_store.put(&path, buffer.into()).await?;

self.files_written.push(
create_add(
&self.config.partition_values,
Expand Down
Loading

0 comments on commit 5a949e9

Please sign in to comment.