Skip to content

Commit

Permalink
fix: add dedicated function to execute futures
Browse files Browse the repository at this point in the history
Signed-off-by: Omkar P <[email protected]>
  • Loading branch information
omkar-foss committed Feb 17, 2025
1 parent a137f87 commit 9ac034f
Showing 1 changed file with 47 additions and 6 deletions.
53 changes: 47 additions & 6 deletions crates/catalog-unity/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use deltalake_core::logstore::{default_logstore, logstores, LogStore, LogStoreFa
use reqwest::header::{HeaderValue, InvalidHeaderValue, AUTHORIZATION};
use reqwest::Url;
use std::collections::HashMap;
use std::future::Future;
use std::str::FromStr;
use std::sync::Arc;

Expand All @@ -25,7 +26,8 @@ use crate::models::{

use deltalake_core::data_catalog::DataCatalogResult;
use deltalake_core::{
DataCatalog, DataCatalogError, DeltaResult, DeltaTableBuilder, DeltaTableError, Path,
DataCatalog, DataCatalogError, DeltaResult, DeltaTableBuilder, DeltaTableError,
ObjectStoreError, Path,
};

use crate::client::retry::*;
Expand All @@ -35,6 +37,8 @@ use deltalake_core::storage::{
};
pub mod client;
pub mod credential;

const STORE_NAME: &str = "UnityCatalogObjectStore";
#[cfg(feature = "datafusion")]
pub mod datafusion;
pub mod models;
Expand Down Expand Up @@ -105,6 +109,10 @@ pub enum UnityCatalogError {
#[error("Datafusion error: {0}")]
DatafusionError(#[from] datafusion_common::DataFusionError),

/// Cannot initialize DynamoDbConfiguration due to some sort of threading issue
#[error("Unable to initialize Unity Catalog, potentially a threading issue")]
InitializationError,

/// A generic error from a source
#[error("An error occurred in catalog: {source}")]
Generic {
Expand Down Expand Up @@ -487,6 +495,41 @@ impl UnityCatalogBuilder {
self
}

fn execute_uc_future<F, T>(future: F) -> DeltaResult<T>
where
T: Send,
F: Future<Output = T> + Send,
{
match tokio::runtime::Handle::try_current() {
Ok(handle) => match handle.runtime_flavor() {
tokio::runtime::RuntimeFlavor::MultiThread => {
Ok(tokio::task::block_in_place(move || handle.block_on(future)))
}
_ => {
let mut cfg: Option<T> = None;
std::thread::scope(|scope| {
scope.spawn(|| {
cfg = Some(handle.block_on(future));
});
});
cfg.ok_or(DeltaTableError::ObjectStore {
source: ObjectStoreError::Generic {
store: STORE_NAME,
source: Box::new(UnityCatalogError::InitializationError),
},
})
}
},
Err(_) => {
let runtime = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.expect("a tokio runtime is required by the Unity Catalog Builder");
Ok(runtime.block_on(future))
}
}
}

/// Returns the storage location and temporary token to be used with the
/// Unity Catalog table.
pub async fn get_uc_location_and_token(
Expand Down Expand Up @@ -800,11 +843,9 @@ impl ObjectStoreFactory for UnityCatalogFactory {
table_uri: &Url,
options: &StorageOptions,
) -> DeltaResult<(ObjectStoreRef, Path)> {
use futures::executor::block_on;

let (table_path, temp_creds) = block_on(UnityCatalogBuilder::get_uc_location_and_token(
table_uri.as_str(),
))?;
let (table_path, temp_creds) = UnityCatalogBuilder::execute_uc_future(
UnityCatalogBuilder::get_uc_location_and_token(table_uri.as_str()),
)?.map_err(UnityCatalogError::from)?;

let mut storage_options = options.0.clone();
storage_options.extend(temp_creds);
Expand Down

0 comments on commit 9ac034f

Please sign in to comment.