Skip to content

Commit

Permalink
Ensure that there are no recursions in the invocations of default log…
Browse files Browse the repository at this point in the history
…store behavior
  • Loading branch information
rtyler committed Dec 30, 2023
1 parent 6935c14 commit 74afe22
Show file tree
Hide file tree
Showing 9 changed files with 67 additions and 324 deletions.
2 changes: 0 additions & 2 deletions crates/deltalake-aws/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,13 +44,11 @@ impl LogStoreFactory for S3LogStoreFactory {
let s3_options = S3StorageOptions::from_map(&options.0);

if s3_options.locking_provider.as_deref() != Some("dynamodb") {
println!("RETURNING A DEFAAULT");
debug!("S3LogStoreFactory has been asked to create a LogStore without the dynamodb locking provider");
return Ok(deltalake_core::logstore::default_logstore(
store, location, options,
));
}
println!("RETURNING AN S3 DYNAMODB LOGSTORE {s3_options:?}");

Ok(Arc::new(logstore::S3DynamoDbLogStore::try_new(
location.clone(),
Expand Down
19 changes: 9 additions & 10 deletions crates/deltalake-aws/src/logstore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,14 +80,16 @@ impl S3DynamoDbLogStore {
return Ok(RepairLogEntryResult::AlreadyCompleted);
}
for retry in 0..=MAX_REPAIR_RETRIES {
match LogStore::write_commit_entry(self, entry.version, &entry.temp_path).await {
match write_commit_entry(&self.storage, entry.version, &entry.temp_path).await {
Ok(()) => {
debug!("Successfully committed entry for version {}", entry.version);
return self.try_complete_entry(entry, true).await;
}
// `N.json` has already been moved, complete the entry in DynamoDb just in case
Err(TransactionError::ObjectStore {
source: ObjectStoreError::NotFound { .. },
}) => {
warn!("It looks like the {}.json has already been moved, we got 404 from ObjectStorage.", entry.version);
return self.try_complete_entry(entry, false).await;
}
Err(err) if retry == MAX_REPAIR_RETRIES => return Err(err),
Expand All @@ -105,6 +107,7 @@ impl S3DynamoDbLogStore {
entry: &CommitEntry,
copy_performed: bool,
) -> Result<RepairLogEntryResult, TransactionError> {
debug!("try_complete_entry for {:?}, {}", entry, copy_performed);
for retry in 0..=MAX_REPAIR_RETRIES {
match self
.lock_client
Expand All @@ -119,7 +122,7 @@ impl S3DynamoDbLogStore {
}) {
Ok(x) => return Ok(Self::map_retry_result(x, copy_performed)),
Err(err) if retry == MAX_REPAIR_RETRIES => return Err(err),
Err(err) => log::debug!(
Err(err) => error!(
"retry #{retry} on log entry {entry:?} failed to update lock db: '{err}'"
),
}
Expand Down Expand Up @@ -162,7 +165,7 @@ impl LogStore for S3DynamoDbLogStore {
if let Ok(Some(entry)) = entry {
self.repair_entry(&entry).await?;
}
LogStore::read_commit_entry(self, version).await
read_commit_entry(&self.storage, version).await
}

/// Tries to commit a prepared commit file. Returns [DeltaTableError::VersionAlreadyExists]
Expand All @@ -183,9 +186,10 @@ impl LogStore for S3DynamoDbLogStore {
.await
.map_err(|err| match err {
LockClientError::VersionAlreadyExists { version, .. } => {
warn!("LockClientError::VersionAlreadyExists({version})");
TransactionError::VersionAlreadyExists(version)
}
LockClientError::ProvisionedThroughputExceeded => todo!(),
LockClientError::ProvisionedThroughputExceeded => todo!("deltalake-aws does not yet handle DynamoDB providioned throughput errors"),
LockClientError::LockTableNotFound => {
let table_name = self.lock_client.get_lock_table_name();
error!("Lock table '{table_name}' not found");
Expand Down Expand Up @@ -228,19 +232,14 @@ impl LogStore for S3DynamoDbLogStore {
self.repair_entry(&entry).await?;
Ok(entry.version)
} else {
LogStore::get_latest_version(self, current_version).await
get_latest_version(self, current_version).await
}
}

fn object_store(&self) -> ObjectStoreRef {
self.storage.clone()
}

#[cfg(feature = "datafusion")]
fn object_store_url(&self) -> datafusion::execution::object_store::ObjectStoreUrl {
LogStore::object_store_url(&self.config.location)
}

fn config(&self) -> &LogStoreConfig {
&self.config
}
Expand Down
1 change: 0 additions & 1 deletion crates/deltalake-aws/src/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,6 @@ impl ObjectStoreFactory for S3ObjectStoreFactory {
Some((s3_key, value.clone()))
}),
)?;
println!("STORE: {store:?}, options {options:?}");

let options = S3StorageOptions::from_map(&options.0);
let store = S3StorageBackend::try_new(
Expand Down
1 change: 0 additions & 1 deletion crates/deltalake-aws/tests/integration_s3_dynamodb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,6 @@ const COMMITS: i64 = 5;
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
#[serial]
async fn test_concurrent_writers() -> TestResult<()> {
let _ = pretty_env_logger::try_init();
// Goal: a test with multiple writers, very similar to `integration_concurrent_writes`
let context = IntegrationContext::new(Box::new(S3Integration::default()))?;
println!(">>> preparing table");
Expand Down
2 changes: 1 addition & 1 deletion crates/deltalake-core/src/data_catalog/storage/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ impl ListingSchemaProvider {
let uri = ensure_table_uri(root_uri)?;
let storage_options = storage_options.unwrap_or_default().into();
// We already parsed the url, so unwrapping is safe.
let store = get_default_store(&uri)?;
let store = store_for(&uri)?;
Ok(Self {
authority: uri.to_string(),
store,
Expand Down
39 changes: 34 additions & 5 deletions crates/deltalake-core/src/logstore/default_logstore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,12 +58,41 @@ impl LogStore for DefaultLogStore {
self.storage.clone()
}

#[cfg(feature = "datafusion")]
fn object_store_url(&self) -> datafusion::execution::object_store::ObjectStoreUrl {
super::object_store_url(&self.config.location)
}

fn config(&self) -> &LogStoreConfig {
&self.config
}
}

#[cfg(feature = "datafusion")]
#[cfg(test)]
mod datafusion_tests {
use super::*;

use std::collections::HashMap;
use url::Url;

#[tokio::test]
async fn test_unique_object_store_url() {
let location = Url::parse("memory://table").unwrap();
let store = crate::logstore::logstore_for(location, HashMap::default());
assert!(store.is_ok());
let store = store.unwrap();

for (location_1, location_2) in [
// Same scheme, no host, different path
("file:///path/to/table_1", "file:///path/to/table_2"),
// Different scheme/host, same path
("s3://my_bucket/path/to/table_1", "file:///path/to/table_1"),
// Same scheme, different host, same path
("s3://bucket_1/table_1", "s3://bucket_2/table_1"),
] {
let url_1 = Url::parse(location_1).unwrap();
let url_2 = Url::parse(location_2).unwrap();

assert_ne!(
store.object_store_url(&url_1).as_str(),
store.object_store_url(&url_2).as_str(),
);
}
}
}
69 changes: 23 additions & 46 deletions crates/deltalake-core/src/logstore/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,6 @@ pub fn logstore_with(
location: Url,
options: impl Into<StorageOptions> + Clone,
) -> DeltaResult<LogStoreRef> {
println!("WITH! {store:?}");
let scheme = Url::parse(&format!("{}://", location.scheme()))
.map_err(|_| DeltaTableError::InvalidTableLocation(location.clone().into()))?;

Expand Down Expand Up @@ -225,7 +224,18 @@ pub trait LogStore: Sync + Send {
/// registering/fetching. In our case the scheme is hard-coded to "delta-rs", so to get a unique
/// host we convert the location from this `LogStore` to a valid name, combining the
/// original scheme, host and path with invalid characters replaced.
fn object_store_url(&self) -> ObjectStoreUrl;
fn object_store_url(&self) -> ObjectStoreUrl {
use object_store::path::DELIMITER;
let location = &self.config().location;

ObjectStoreUrl::parse(format!(
"delta-rs://{}-{}{}",
location.scheme(),
location.host_str().unwrap_or("-"),
location.path().replace(DELIMITER, "-").replace(':', "-")
))
.expect("Invalid object store url.")
}

/// Get configuration representing configured log store.
fn config(&self) -> &LogStoreConfig;
Expand Down Expand Up @@ -342,29 +352,18 @@ lazy_static! {
static ref DELTA_LOG_REGEX: Regex = Regex::new(r"(\d{20})\.(json|checkpoint).*$").unwrap();
}

#[cfg(feature = "datafusion")]
fn object_store_url(location: &Url) -> ObjectStoreUrl {
// we are certain, that the URL can be parsed, since
// we make sure when we are parsing the table uri

use object_store::path::DELIMITER;
ObjectStoreUrl::parse(format!(
"delta-rs://{}-{}{}",
location.scheme(),
location.host_str().unwrap_or("-"),
location.path().replace(DELIMITER, "-").replace(':', "-")
))
.expect("Invalid object store url.")
}

/// Extract version from a file name in the delta log
pub fn extract_version_from_filename(name: &str) -> Option<i64> {
DELTA_LOG_REGEX
.captures(name)
.map(|captures| captures.get(1).unwrap().as_str().parse().unwrap())
}

async fn get_latest_version(log_store: &dyn LogStore, current_version: i64) -> DeltaResult<i64> {
/// Default implementation for retrieving the latest version
pub async fn get_latest_version(
log_store: &dyn LogStore,
current_version: i64,
) -> DeltaResult<i64> {
let version_start = match get_last_checkpoint(log_store).await {
Ok(last_check_point) => last_check_point.version,
Err(ProtocolError::CheckpointNotFound) => {
Expand Down Expand Up @@ -410,7 +409,10 @@ async fn get_latest_version(log_store: &dyn LogStore, current_version: i64) -> D
}

/// Read delta log for a specific version
async fn read_commit_entry(storage: &dyn ObjectStore, version: i64) -> DeltaResult<Option<Bytes>> {
pub async fn read_commit_entry(
storage: &dyn ObjectStore,
version: i64,
) -> DeltaResult<Option<Bytes>> {
let commit_uri = commit_uri_from_version(version);
match storage.get(&commit_uri).await {
Ok(res) => Ok(Some(res.bytes().await?)),
Expand All @@ -419,7 +421,8 @@ async fn read_commit_entry(storage: &dyn ObjectStore, version: i64) -> DeltaResu
}
}

async fn write_commit_entry(
/// Default implementation for writing a commit entry
pub async fn write_commit_entry(
storage: &dyn ObjectStore,
version: i64,
tmp_commit: &Path,
Expand Down Expand Up @@ -458,29 +461,3 @@ mod tests {
assert!(store.is_ok());
}
}

#[cfg(feature = "datafusion")]
#[cfg(test)]
mod datafusion_tests {
use url::Url;

#[tokio::test]
async fn test_unique_object_store_url() {
for (location_1, location_2) in [
// Same scheme, no host, different path
("file:///path/to/table_1", "file:///path/to/table_2"),
// Different scheme/host, same path
("s3://my_bucket/path/to/table_1", "file:///path/to/table_1"),
// Same scheme, different host, same path
("s3://bucket_1/table_1", "s3://bucket_2/table_1"),
] {
let url_1 = Url::parse(location_1).unwrap();
let url_2 = Url::parse(location_2).unwrap();

assert_ne!(
super::object_store_url(&url_1).as_str(),
super::object_store_url(&url_2).as_str(),
);
}
}
}
6 changes: 0 additions & 6 deletions crates/deltalake-core/src/storage/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,6 @@ pub mod utils;

use crate::{DeltaResult, DeltaTableError};

#[cfg(feature = "datafusion")]
pub use datafusion::execution::object_store::DefaultObjectStoreRegistry;
#[cfg(feature = "datafusion")]
pub(crate) use datafusion::execution::object_store::ObjectStoreRegistry;

pub use object_store;
use object_store::local::LocalFileSystem;
use object_store::memory::InMemory;
Expand Down Expand Up @@ -147,7 +142,6 @@ pub fn str_is_truthy(val: &str) -> bool {
/// [ObjectStore]
pub fn url_prefix_handler<T: ObjectStore>(store: T, prefix: Path) -> DeltaResult<ObjectStoreRef> {
if prefix != Path::from("/") {
println!("returning prefix store for {prefix:?}");
Ok(Arc::new(PrefixStore::new(store, prefix)))
} else {
Ok(Arc::new(store))
Expand Down
Loading

0 comments on commit 74afe22

Please sign in to comment.