Skip to content

Commit

Permalink
Continued refactoring to do logstore and storage registries
Browse files Browse the repository at this point in the history
  • Loading branch information
rtyler committed Dec 29, 2023
1 parent 43c822c commit de528e5
Show file tree
Hide file tree
Showing 22 changed files with 316 additions and 267 deletions.
2 changes: 2 additions & 0 deletions crates/deltalake-aws/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ url = { workspace = true }
chrono = { workspace = true }
serial_test = "2"
deltalake-test = { path = "../deltalake-test" }
rand = "0.8"
serde_json = { workspace = true }

[features]
default = ["rustls"]
Expand Down
35 changes: 34 additions & 1 deletion crates/deltalake-aws/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,13 @@ use regex::Regex;
use std::{
collections::HashMap,
str::FromStr,
sync::Arc,
time::{Duration, SystemTime},
};

use deltalake_core::Path;
use deltalake_core::{DeltaResult, Path};
use deltalake_core::logstore::{LogStoreFactory, LogStore, logstores};
use deltalake_core::storage::{factories, ObjectStoreRef, StorageOptions, url_prefix_handler};
use rusoto_core::{HttpClient, Region, RusotoError};
use rusoto_credential::AutoRefreshingProvider;
use rusoto_dynamodb::{
Expand All @@ -21,8 +24,38 @@ use rusoto_dynamodb::{
UpdateItemError, UpdateItemInput,
};
use rusoto_sts::WebIdentityProvider;
use url::Url;

use errors::{DynamoDbConfigError, LockClientError};
use storage::{S3StorageOptions, S3ObjectStoreFactory};

#[derive(Clone, Debug, Default)]
struct S3LogStoreFactory {}

impl LogStoreFactory for S3LogStoreFactory {
fn with_options(&self, store: ObjectStoreRef, location: &Url, options: &StorageOptions) -> DeltaResult<Arc<dyn LogStore>> {
let store = url_prefix_handler(store,
Path::parse(location.path())?)?;
Ok(Arc::new(logstore::S3DynamoDbLogStore::try_new(
location.clone(),
options.clone(),
&S3StorageOptions::default(),
store)?))
}
}

/// Register an [ObjectStoreFactory] for common S3 [Url] schemes
pub fn register_handlers(_additional_prefixes: Option<Url>) {
for scheme in ["s3", "s3a"].iter() {
let url = Url::parse(&format!("{}://", scheme)).unwrap();
factories().insert(url.clone(),
Arc::new(S3ObjectStoreFactory::default()),
);
logstores().insert(url.clone(),
Arc::new(S3LogStoreFactory::default()),
);
}
}

/// Representation of a log entry stored in DynamoDb
/// dynamo db item consists of:
Expand Down
11 changes: 10 additions & 1 deletion crates/deltalake-aws/src/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@ use deltalake_core::storage::object_store::{
aws::AmazonS3ConfigKey, parse_url_opts, GetOptions, GetResult, ListResult, MultipartId,
ObjectMeta, ObjectStore, Result as ObjectStoreResult,
};
use deltalake_core::storage::{str_is_truthy, ObjectStoreFactory, ObjectStoreRef, StorageOptions};
use deltalake_core::storage::{
str_is_truthy, ObjectStoreFactory, ObjectStoreRef, StorageOptions,
};
use deltalake_core::{DeltaResult, ObjectStoreError, Path};
use futures::stream::BoxStream;
use rusoto_core::Region;
Expand All @@ -20,6 +22,7 @@ use url::Url;

const STORE_NAME: &str = "DeltaS3ObjectStore";


#[derive(Clone, Default, Debug)]
pub struct S3ObjectStoreFactory {}

Expand Down Expand Up @@ -56,6 +59,12 @@ impl ObjectStoreFactory for S3ObjectStoreFactory {
}),
)?;

let options = S3StorageOptions::from_map(&options.0);
let store = S3StorageBackend::try_new(
store.into(),
Some("dynamodb") == options.locking_provider.as_deref() || options.allow_unsafe_rename,
)?;

Ok((Arc::new(store), prefix))
}
}
Expand Down
12 changes: 12 additions & 0 deletions crates/deltalake-aws/tests/common.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
use chrono::Utc;
use deltalake_aws::register_handlers;
use deltalake_aws::storage::*;
use deltalake_test::utils::*;
use rand::Rng;
use std::process::{Command, ExitStatus, Stdio};

#[derive(Clone, Debug)]
Expand All @@ -10,6 +12,7 @@ pub struct S3Integration {

impl Default for S3Integration {
fn default() -> Self {
register_handlers(None);
Self {
bucket_name: format!("test-delta-table-{}", Utc::now().timestamp()),
}
Expand All @@ -19,6 +22,11 @@ impl Default for S3Integration {
impl StorageIntegration for S3Integration {
/// Create a new bucket
fn create_bucket(&self) -> std::io::Result<ExitStatus> {
set_env_if_not_set(
"DYNAMO_LOCK_PARTITION_KEY_VALUE",
format!("s3://{}", self.bucket_name()),
);
Self::create_lock_table()?;
let mut child = Command::new("aws")
.args(["s3", "mb", &self.root_uri()])
.spawn()
Expand All @@ -36,6 +44,10 @@ impl StorageIntegration for S3Integration {

/// prepare_env
fn prepare_env(&self) {
std::env::set_var(
"DELTA_DYNAMO_TABLE_NAME",
format!("delta_log_it_{}", rand::thread_rng().gen::<u16>()),
);
match std::env::var(s3_constants::AWS_ENDPOINT_URL).ok() {
Some(endpoint_url) if endpoint_url.to_lowercase() == "none" => {
std::env::remove_var(s3_constants::AWS_ENDPOINT_URL)
Expand Down
5 changes: 0 additions & 5 deletions crates/deltalake-aws/tests/integration_read.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
#![cfg(feature = "integration_test")]

use deltalake_aws::storage::*;
use deltalake_core::storage::factories;
use deltalake_core::{DeltaTableBuilder, Path};
use deltalake_test::utils::*;
use serial_test::serial;
Expand All @@ -18,10 +17,6 @@ static TEST_PREFIXES_ENCODED: &[&str] = &["my%20table", "%E4%BD%A0%E5%A5%BD/%F0%
#[tokio::test]
#[serial]
async fn test_read_tables_aws() -> TestResult {
factories().insert(
Url::parse("s3://").unwrap(),
std::sync::Arc::new(S3ObjectStoreFactory::default()),
);
let context = IntegrationContext::new(Box::new(S3Integration::default()))?;

read_tables(&context).await?;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,32 +1,29 @@
//! Integration test to verify correct behavior of S3 DynamoDb locking.
//! It inspects the state of the locking table after each operation.
#![cfg(all(
feature = "integration_test",
any(feature = "s3", feature = "s3-native-tls")
))]
#![cfg(feature = "integration_test")]

use std::collections::HashMap;
use std::time::{Duration, SystemTime, UNIX_EPOCH};

use deltalake_aws::logstore::{RepairLogEntryResult, S3DynamoDbLogStore};
use deltalake_aws::storage::S3StorageOptions;
use deltalake_aws::{CommitEntry, DynamoDbLockClient};
use deltalake_core::kernel::{Action, Add, DataType, PrimitiveType, StructField, StructType};
use deltalake_core::logstore::s3::{RepairLogEntryResult, S3DynamoDbLogStore};
use deltalake_core::logstore::LogStore;
use deltalake_core::operations::transaction::{commit, prepare_commit};
use deltalake_core::protocol::{DeltaOperation, SaveMode};
use deltalake_core::storage::commit_uri_from_version;
use deltalake_core::storage::config::StorageOptions;
use deltalake_core::storage::s3::S3StorageOptions;
use deltalake_core::storage::StorageOptions;
use deltalake_core::table::builder::ensure_table_uri;
use deltalake_core::test_utils::{IntegrationContext, StorageIntegration, TestTables};
use deltalake_core::{DeltaOps, DeltaTable, DeltaTableBuilder};
use deltalake_test::utils::*;
use lazy_static::lazy_static;
use object_store::path::Path;
use serde_json::Value;
use serial_test::serial;

#[allow(dead_code)]
mod fs_common;
mod common;
use common::*;

pub type TestResult<T> = Result<T, Box<dyn std::error::Error + 'static>>;

Expand All @@ -49,7 +46,7 @@ fn make_client() -> TestResult<DynamoDbLockClient> {
#[test]
#[serial]
fn client_config_picks_up_lock_table_name() -> TestResult<()> {
let _context = IntegrationContext::new(StorageIntegration::Amazon)?;
let _context = IntegrationContext::new(Box::new(S3Integration::default()))?;
assert!(make_client()?
.get_lock_table_name()
.starts_with("delta_log_it_"));
Expand All @@ -59,7 +56,7 @@ fn client_config_picks_up_lock_table_name() -> TestResult<()> {
#[tokio::test]
#[serial]
async fn get_missing_item() -> TestResult<()> {
let _context = IntegrationContext::new(StorageIntegration::Amazon)?;
let _context = IntegrationContext::new(Box::new(S3Integration::default()))?;
let client = make_client()?;
let version = i64::MAX;
let result = client
Expand All @@ -75,7 +72,7 @@ async fn get_missing_item() -> TestResult<()> {
#[tokio::test]
#[serial]
async fn test_append() -> TestResult<()> {
let context = IntegrationContext::new(StorageIntegration::Amazon)?;
let context = IntegrationContext::new(Box::new(S3Integration::default()))?;
let table = prepare_table(&context, "delta01").await?;
validate_lock_table_state(&table, 0).await?;
append_to_table("datav01.parquet", &table, None).await?;
Expand All @@ -86,7 +83,7 @@ async fn test_append() -> TestResult<()> {
#[tokio::test]
#[serial]
async fn test_repair_commit_entry() -> TestResult<()> {
let context = IntegrationContext::new(StorageIntegration::Amazon)?;
let context = IntegrationContext::new(Box::new(S3Integration::default()))?;
let client = make_client()?;
let table = prepare_table(&context, "repair_needed").await?;
let options: StorageOptions = OPTIONS.clone().into();
Expand Down Expand Up @@ -135,7 +132,7 @@ async fn test_repair_commit_entry() -> TestResult<()> {
#[tokio::test]
#[serial]
async fn test_repair_on_update() -> TestResult<()> {
let context = IntegrationContext::new(StorageIntegration::Amazon)?;
let context = IntegrationContext::new(Box::new(S3Integration::default()))?;
let mut table = prepare_table(&context, "repair_on_update").await?;
let _entry = create_incomplete_commit_entry(&table, 1, "unfinished_commit").await?;
table.update().await?;
Expand All @@ -152,7 +149,7 @@ const COMMITS: i64 = 5;
#[serial]
async fn test_concurrent_writers() -> TestResult<()> {
// Goal: a test with multiple writers, very similar to `integration_concurrent_writes`
let context = IntegrationContext::new(StorageIntegration::Amazon)?;
let context = IntegrationContext::new(Box::new(S3Integration::default()))?;
let table = prepare_table(&context, "concurrent_writes").await?;
let table_uri = table.table_uri();

Expand Down
8 changes: 5 additions & 3 deletions crates/deltalake-core/src/kernel/actions/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -951,8 +951,10 @@ mod tests {
let inline = dv_inline();
assert_eq!(None, inline.absolute_path(&parent).unwrap());

let path =
std::fs::canonicalize(PathBuf::from("./tests/data/table-with-dv-small/")).unwrap();
let path = std::fs::canonicalize(PathBuf::from(
"../deltalake-test/tests/data/table-with-dv-small/",
))
.unwrap();
let parent = url::Url::from_directory_path(path).unwrap();
let dv_url = parent
.join("deletion_vector_61d16c75-6994-46b7-a15b-8b538852e50e.bin")
Expand All @@ -971,7 +973,7 @@ mod tests {
// fn test_deletion_vector_read() {
// let store = Arc::new(LocalFileSystem::new());
// let path =
// std::fs::canonicalize(PathBuf::from("./tests/data/table-with-dv-small/")).unwrap();
// std::fs::canonicalize(PathBuf::from("../deltalake-test/tests/data/table-with-dv-small/")).unwrap();
// let parent = url::Url::from_directory_path(path).unwrap();
// let root = object_store::path::Path::from(parent.path());
// let fs_client = Arc::new(ObjectStoreFileSystemClient::new(
Expand Down
Loading

0 comments on commit de528e5

Please sign in to comment.