Skip to content

Commit

Permalink
Ensure that there are no recursions in the invocations of default log…
Browse files Browse the repository at this point in the history
…store behavior
  • Loading branch information
rtyler committed Dec 30, 2023
1 parent 6935c14 commit 77e5e2b
Show file tree
Hide file tree
Showing 4 changed files with 20 additions and 32 deletions.
9 changes: 2 additions & 7 deletions crates/deltalake-aws/src/logstore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ impl LogStore for S3DynamoDbLogStore {
if let Ok(Some(entry)) = entry {
self.repair_entry(&entry).await?;
}
LogStore::read_commit_entry(self, version).await
read_commit_entry(&self.storage, version).await
}

/// Tries to commit a prepared commit file. Returns [DeltaTableError::VersionAlreadyExists]
Expand Down Expand Up @@ -228,19 +228,14 @@ impl LogStore for S3DynamoDbLogStore {
self.repair_entry(&entry).await?;
Ok(entry.version)
} else {
LogStore::get_latest_version(self, current_version).await
get_latest_version(self, current_version).await
}
}

fn object_store(&self) -> ObjectStoreRef {
self.storage.clone()
}

#[cfg(feature = "datafusion")]
fn object_store_url(&self) -> datafusion::execution::object_store::ObjectStoreUrl {
LogStore::object_store_url(&self.config.location)
}

fn config(&self) -> &LogStoreConfig {
&self.config
}
Expand Down
2 changes: 1 addition & 1 deletion crates/deltalake-core/src/data_catalog/storage/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ impl ListingSchemaProvider {
let uri = ensure_table_uri(root_uri)?;
let storage_options = storage_options.unwrap_or_default().into();
// We already parsed the url, so unwrapping is safe.
let store = get_default_store(&uri)?;
let store = store_for(&uri)?;
Ok(Self {
authority: uri.to_string(),
store,
Expand Down
5 changes: 0 additions & 5 deletions crates/deltalake-core/src/logstore/default_logstore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,11 +58,6 @@ impl LogStore for DefaultLogStore {
self.storage.clone()
}

#[cfg(feature = "datafusion")]
fn object_store_url(&self) -> datafusion::execution::object_store::ObjectStoreUrl {
super::object_store_url(&self.config.location)
}

fn config(&self) -> &LogStoreConfig {
&self.config
}
Expand Down
36 changes: 17 additions & 19 deletions crates/deltalake-core/src/logstore/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,18 @@ pub trait LogStore: Sync + Send {
/// registering/fetching. In our case the scheme is hard-coded to "delta-rs", so to get a unique
/// host we convert the location from this `LogStore` to a valid name, combining the
/// original scheme, host and path with invalid characters replaced.
fn object_store_url(&self) -> ObjectStoreUrl;
fn object_store_url(&self) -> ObjectStoreUrl {
use object_store::path::DELIMITER;
let location = &self.config().location;

ObjectStoreUrl::parse(format!(
"delta-rs://{}-{}{}",
location.scheme(),
location.host_str().unwrap_or("-"),
location.path().replace(DELIMITER, "-").replace(':', "-")
))
.expect("Invalid object store url.")
}

/// Get configuration representing configured log store.
fn config(&self) -> &LogStoreConfig;
Expand Down Expand Up @@ -342,29 +353,15 @@ lazy_static! {
static ref DELTA_LOG_REGEX: Regex = Regex::new(r"(\d{20})\.(json|checkpoint).*$").unwrap();
}

#[cfg(feature = "datafusion")]
fn object_store_url(location: &Url) -> ObjectStoreUrl {
// we are certain, that the URL can be parsed, since
// we make sure when we are parsing the table uri

use object_store::path::DELIMITER;
ObjectStoreUrl::parse(format!(
"delta-rs://{}-{}{}",
location.scheme(),
location.host_str().unwrap_or("-"),
location.path().replace(DELIMITER, "-").replace(':', "-")
))
.expect("Invalid object store url.")
}

/// Extract version from a file name in the delta log
pub fn extract_version_from_filename(name: &str) -> Option<i64> {
DELTA_LOG_REGEX
.captures(name)
.map(|captures| captures.get(1).unwrap().as_str().parse().unwrap())
}

async fn get_latest_version(log_store: &dyn LogStore, current_version: i64) -> DeltaResult<i64> {
/// Default implementation for retrieving the latest version
pub async fn get_latest_version(log_store: &dyn LogStore, current_version: i64) -> DeltaResult<i64> {
let version_start = match get_last_checkpoint(log_store).await {
Ok(last_check_point) => last_check_point.version,
Err(ProtocolError::CheckpointNotFound) => {
Expand Down Expand Up @@ -410,7 +407,7 @@ async fn get_latest_version(log_store: &dyn LogStore, current_version: i64) -> D
}

/// Read delta log for a specific version
async fn read_commit_entry(storage: &dyn ObjectStore, version: i64) -> DeltaResult<Option<Bytes>> {
pub async fn read_commit_entry(storage: &dyn ObjectStore, version: i64) -> DeltaResult<Option<Bytes>> {
let commit_uri = commit_uri_from_version(version);
match storage.get(&commit_uri).await {
Ok(res) => Ok(Some(res.bytes().await?)),
Expand All @@ -419,7 +416,8 @@ async fn read_commit_entry(storage: &dyn ObjectStore, version: i64) -> DeltaResu
}
}

async fn write_commit_entry(
/// Default implementation for writing a commit entry
pub async fn write_commit_entry(
storage: &dyn ObjectStore,
version: i64,
tmp_commit: &Path,
Expand Down

0 comments on commit 77e5e2b

Please sign in to comment.