Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: move azure integration to dedicated crate #2023

Merged
merged 3 commits into from
Jan 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 8 additions & 6 deletions crates/deltalake-aws/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,20 +61,22 @@ impl LogStoreFactory for S3LogStoreFactory {

/// Register an [ObjectStoreFactory] for common S3 [Url] schemes
pub fn register_handlers(_additional_prefixes: Option<Url>) {
let object_stores = Arc::new(S3ObjectStoreFactory::default());
let log_stores = Arc::new(S3LogStoreFactory::default());
for scheme in ["s3", "s3a"].iter() {
let url = Url::parse(&format!("{}://", scheme)).unwrap();
factories().insert(url.clone(), Arc::new(S3ObjectStoreFactory::default()));
logstores().insert(url.clone(), Arc::new(S3LogStoreFactory::default()));
factories().insert(url.clone(), object_stores.clone());
logstores().insert(url.clone(), log_stores.clone());
}
}

/// Representation of a log entry stored in DynamoDb
/// dynamo db item consists of:
/// - tablePath: String - tracked in the log store implementation
/// - fileName: String - commit version.json (part of primary key), stored as i64 in this struct
/// - tempPath: String - name of temporary file containing commit info
/// - table_path: String - tracked in the log store implementation
/// - file_name: String - commit version.json (part of primary key), stored as i64 in this struct
/// - temp_path: String - name of temporary file containing commit info
/// - complete: bool - operation completed, i.e. atomic rename from `tempPath` to `fileName` succeeded
/// - expireTime: `Option<SystemTime>` - epoch seconds at which this external commit entry is safe to be deleted
/// - expire_time: `Option<SystemTime>` - epoch seconds at which this external commit entry is safe to be deleted
#[derive(Debug, PartialEq)]
pub struct CommitEntry {
/// Commit version, stored as file name (e.g., 00000N.json) in dynamodb (relative to `_delta_log/`
Expand Down
167 changes: 2 additions & 165 deletions crates/deltalake-aws/tests/integration_read.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,13 @@
#![cfg(feature = "integration_test")]

use deltalake_core::{DeltaTableBuilder, Path};
use deltalake_test::read::{read_table_paths, test_read_tables};
use deltalake_test::utils::*;
use serial_test::serial;

mod common;
use common::*;

static TEST_PREFIXES: &[&str] = &["my table", "你好/😊"];

/// TEST_PREFIXES as they should appear in object stores.
static TEST_PREFIXES_ENCODED: &[&str] = &["my%20table", "%E4%BD%A0%E5%A5%BD/%F0%9F%98%8A"];

Expand All @@ -17,173 +16,11 @@ static TEST_PREFIXES_ENCODED: &[&str] = &["my%20table", "%E4%BD%A0%E5%A5%BD/%F0%
async fn test_read_tables_aws() -> TestResult {
let context = IntegrationContext::new(Box::new(S3Integration::default()))?;

read_tables(&context).await?;
test_read_tables(&context).await?;

for (prefix, prefix_encoded) in TEST_PREFIXES.iter().zip(TEST_PREFIXES_ENCODED.iter()) {
read_table_paths(&context, prefix, prefix_encoded).await?;
}

Ok(())
}

async fn read_tables(context: &IntegrationContext) -> TestResult {
context.load_table(TestTables::Simple).await?;
context.load_table(TestTables::Golden).await?;
context
.load_table(TestTables::Delta0_8_0SpecialPartitioned)
.await?;

read_simple_table(&context).await?;
read_simple_table_with_version(&context).await?;
read_golden(&context).await?;

Ok(())
}

async fn read_table_paths(
context: &IntegrationContext,
table_root: &str,
upload_path: &str,
) -> TestResult {
context
.load_table_with_name(TestTables::Delta0_8_0SpecialPartitioned, upload_path)
.await?;

println!("table_root: {}", table_root);
verify_store(&context, table_root).await?;

read_encoded_table(&context, table_root).await?;

Ok(())
}

async fn verify_store(integration: &IntegrationContext, root_path: &str) -> TestResult {
let table_uri = format!("{}/{}", integration.root_uri(), root_path);
println!("working with table_uri: {}", table_uri);
let storage = DeltaTableBuilder::from_uri(table_uri.clone())
.with_allow_http(true)
.build_storage()?
.object_store();

let files = storage.list_with_delimiter(None).await?;
println!("files: {files:?}");
assert_eq!(
vec![
Path::parse("_delta_log").unwrap(),
Path::parse("x=A%2FA").unwrap(),
Path::parse("x=B%20B").unwrap(),
],
files.common_prefixes
);

Ok(())
}

async fn read_encoded_table(integration: &IntegrationContext, root_path: &str) -> TestResult {
let table_uri = format!("{}/{}", integration.root_uri(), root_path);

let table = DeltaTableBuilder::from_uri(table_uri)
.with_allow_http(true)
.load()
.await?;

assert_eq!(table.version(), 0);
assert_eq!(table.get_files_iter().count(), 2);

Ok(())
}

async fn read_simple_table(integration: &IntegrationContext) -> TestResult {
let table_uri = integration.uri_for_table(TestTables::Simple);
let table = DeltaTableBuilder::from_uri(table_uri)
.with_allow_http(true)
.load()
.await?;

assert_eq!(table.version(), 4);
assert_eq!(table.protocol().min_writer_version, 2);
assert_eq!(table.protocol().min_reader_version, 1);
assert_eq!(
table.get_files_iter().collect::<Vec<_>>(),
vec![
Path::from("part-00000-c1777d7d-89d9-4790-b38a-6ee7e24456b1-c000.snappy.parquet"),
Path::from("part-00001-7891c33d-cedc-47c3-88a6-abcfb049d3b4-c000.snappy.parquet"),
Path::from("part-00004-315835fe-fb44-4562-98f6-5e6cfa3ae45d-c000.snappy.parquet"),
Path::from("part-00007-3a0e4727-de0d-41b6-81ef-5223cf40f025-c000.snappy.parquet"),
Path::from("part-00000-2befed33-c358-4768-a43c-3eda0d2a499d-c000.snappy.parquet"),
]
);
let tombstones = table.get_state().all_tombstones();
assert_eq!(tombstones.len(), 31);
assert!(tombstones.contains(&deltalake_core::kernel::Remove {
path: "part-00006-63ce9deb-bc0f-482d-b9a1-7e717b67f294-c000.snappy.parquet".to_string(),
deletion_timestamp: Some(1587968596250),
data_change: true,
extended_file_metadata: None,
deletion_vector: None,
base_row_id: None,
default_row_commit_version: None,
size: None,
partition_values: None,
tags: None,
}));

Ok(())
}

async fn read_simple_table_with_version(integration: &IntegrationContext) -> TestResult {
let table_uri = integration.uri_for_table(TestTables::Simple);

let table = DeltaTableBuilder::from_uri(table_uri)
.with_allow_http(true)
.with_version(3)
.load()
.await?;

assert_eq!(table.version(), 3);
assert_eq!(table.protocol().min_writer_version, 2);
assert_eq!(table.protocol().min_reader_version, 1);
assert_eq!(
table.get_files_iter().collect::<Vec<_>>(),
vec![
Path::from("part-00000-c1777d7d-89d9-4790-b38a-6ee7e24456b1-c000.snappy.parquet"),
Path::from("part-00001-7891c33d-cedc-47c3-88a6-abcfb049d3b4-c000.snappy.parquet"),
Path::from("part-00004-315835fe-fb44-4562-98f6-5e6cfa3ae45d-c000.snappy.parquet"),
Path::from("part-00007-3a0e4727-de0d-41b6-81ef-5223cf40f025-c000.snappy.parquet"),
Path::from("part-00000-f17fcbf5-e0dc-40ba-adae-ce66d1fcaef6-c000.snappy.parquet"),
Path::from("part-00001-bb70d2ba-c196-4df2-9c85-f34969ad3aa9-c000.snappy.parquet"),
]
);
let tombstones = table.get_state().all_tombstones();
assert_eq!(tombstones.len(), 29);
assert!(tombstones.contains(&deltalake_core::kernel::Remove {
path: "part-00006-63ce9deb-bc0f-482d-b9a1-7e717b67f294-c000.snappy.parquet".to_string(),
deletion_timestamp: Some(1587968596250),
data_change: true,
tags: None,
partition_values: None,
base_row_id: None,
default_row_commit_version: None,
size: None,
deletion_vector: None,
extended_file_metadata: None,
}));

Ok(())
}

async fn read_golden(integration: &IntegrationContext) -> TestResult {
let table_uri = integration.uri_for_table(TestTables::Golden);

let table = DeltaTableBuilder::from_uri(table_uri)
.with_allow_http(true)
.load()
.await
.unwrap();

assert_eq!(table.version(), 0);
assert_eq!(table.protocol().min_writer_version, 2);
assert_eq!(table.protocol().min_reader_version, 1);

Ok(())
}
30 changes: 30 additions & 0 deletions crates/deltalake-azure/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
[package]
name = "deltalake-azure"
version = "0.1.0"
edition = "2021"

[dependencies]
deltalake-core = { path = "../deltalake-core" }
lazy_static = "1"

# workspace depenndecies
async-trait = { workspace = true }
bytes = { workspace = true }
futures = { workspace = true }
tracing = { workspace = true }
object_store = { workspace = true, features = ["azure"]}
thiserror = { workspace = true }
tokio = { workspace = true }
regex = { workspace = true }
url = { workspace = true }

[dev-dependencies]
chrono = { workspace = true }
serial_test = "2"
deltalake-test = { path = "../deltalake-test" }
pretty_env_logger = "*"
rand = "0.8"
serde_json = { workspace = true }

[features]
integration_test = []
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,9 @@ use std::collections::{hash_map::Entry, HashMap};
use std::str::FromStr;

use object_store::azure::AzureConfigKey;
use object_store::Error;
use object_store::Error as ObjectStoreError;

use crate::{DeltaResult, DeltaTableError};
use crate::error::Result;

lazy_static::lazy_static! {
static ref CREDENTIAL_KEYS: Vec<AzureConfigKey> =
Expand Down Expand Up @@ -42,40 +42,8 @@ enum AzureCredential {
WorkloadIdentity,
}

impl FromStr for AzureCredential {
type Err = DeltaTableError;

fn from_str(s: &str) -> Result<Self, Self::Err> {
match s {
"access_key" => Ok(AzureCredential::AccessKey),
"bearer_token" => Ok(AzureCredential::BearerToken),
"client_secret" => Ok(AzureCredential::ClientSecret),
"managed_identity" => Ok(AzureCredential::ManagedIdentity),
"workload_identity" => Ok(AzureCredential::WorkloadIdentity),
"sas_key" => Ok(AzureCredential::SasKey),
_ => Err(DeltaTableError::Generic(format!(
"Cannot parse AzureCredential variant from {}",
s
))),
}
}
}

impl AsRef<str> for AzureCredential {
fn as_ref(&self) -> &str {
match self {
Self::AccessKey => "access_key",
Self::BearerToken => "bearer_token",
Self::ClientSecret => "client_secret",
Self::ManagedIdentity => "managed_identity",
Self::SasKey => "sas_key",
Self::WorkloadIdentity => "workload_identity",
}
}
}

impl AzureCredential {
/// Reys required for config
/// required configuration keys for variant
fn keys(&self) -> Vec<AzureConfigKey> {
match self {
Self::AccessKey => Vec::from_iter([AzureConfigKey::AccessKey]),
Expand Down Expand Up @@ -110,7 +78,7 @@ impl AzureConfigHelper {
/// Create a new [`ConfigHelper`]
pub fn try_new(
config: impl IntoIterator<Item = (impl AsRef<str>, impl Into<String>)>,
) -> DeltaResult<Self> {
) -> Result<Self> {
let mut env_config = HashMap::new();
for (os_key, os_value) in std::env::vars_os() {
if let (Some(key), Some(value)) = (os_key.to_str(), os_value.to_str()) {
Expand All @@ -126,7 +94,7 @@ impl AzureConfigHelper {
config: config
.into_iter()
.map(|(key, value)| Ok((AzureConfigKey::from_str(key.as_ref())?, value.into())))
.collect::<Result<_, Error>>()?,
.collect::<Result<_, ObjectStoreError>>()?,
env_config,
priority: Vec::from_iter([
AzureCredential::AccessKey,
Expand Down Expand Up @@ -156,7 +124,7 @@ impl AzureConfigHelper {
}

/// Generate a cofiguration augmented with options from the environment
pub fn build(mut self) -> DeltaResult<HashMap<AzureConfigKey, String>> {
pub fn build(mut self) -> Result<HashMap<AzureConfigKey, String>> {
let mut has_credential = false;

if self.config.contains_key(&AzureConfigKey::UseAzureCli) {
Expand Down
21 changes: 21 additions & 0 deletions crates/deltalake-azure/src/error.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
use deltalake_core::errors::DeltaTableError;

pub(crate) type Result<T, E = Error> = std::result::Result<T, E>;

#[derive(thiserror::Error, Debug)]
pub(crate) enum Error {
#[error("failed to parse config: {0}")]
Parse(String),

#[error(transparent)]
ObjectStore(#[from] object_store::Error),
}

impl From<Error> for DeltaTableError {
fn from(e: Error) -> Self {
match e {
Error::Parse(msg) => DeltaTableError::Generic(msg),
Error::ObjectStore(e) => DeltaTableError::ObjectStore { source: e },
}
}
}
Loading
Loading