Skip to content

Commit

Permalink
refactor: move code to azure crate
Browse files Browse the repository at this point in the history
  • Loading branch information
roeap committed Jan 3, 2024
1 parent ea52d83 commit b0dd33c
Show file tree
Hide file tree
Showing 11 changed files with 345 additions and 287 deletions.
14 changes: 8 additions & 6 deletions crates/deltalake-aws/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,20 +61,22 @@ impl LogStoreFactory for S3LogStoreFactory {

/// Register an [ObjectStoreFactory] for common S3 [Url] schemes
pub fn register_handlers(_additional_prefixes: Option<Url>) {
let object_stores = Arc::new(S3ObjectStoreFactory::default());
let log_stores = Arc::new(S3LogStoreFactory::default());
for scheme in ["s3", "s3a"].iter() {
let url = Url::parse(&format!("{}://", scheme)).unwrap();
factories().insert(url.clone(), Arc::new(S3ObjectStoreFactory::default()));
logstores().insert(url.clone(), Arc::new(S3LogStoreFactory::default()));
factories().insert(url.clone(), object_stores.clone());
logstores().insert(url.clone(), log_stores.clone());
}
}

/// Representation of a log entry stored in DynamoDb
/// dynamo db item consists of:
/// - tablePath: String - tracked in the log store implementation
/// - fileName: String - commit version.json (part of primary key), stored as i64 in this struct
/// - tempPath: String - name of temporary file containing commit info
/// - table_path: String - tracked in the log store implementation
/// - file_name: String - commit version.json (part of primary key), stored as i64 in this struct
/// - temp_path: String - name of temporary file containing commit info
/// - complete: bool - operation completed, i.e. atomic rename from `tempPath` to `fileName` succeeded
/// - expireTime: `Option<SystemTime>` - epoch seconds at which this external commit entry is safe to be deleted
/// - expire_time: `Option<SystemTime>` - epoch seconds at which this external commit entry is safe to be deleted
#[derive(Debug, PartialEq)]
pub struct CommitEntry {
/// Commit version, stored as file name (e.g., 00000N.json) in dynamodb (relative to `_delta_log/`
Expand Down
30 changes: 30 additions & 0 deletions crates/deltalake-azure/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
[package]
name = "deltalake-azure"
version = "0.1.0"
edition = "2021"

[dependencies]
deltalake-core = { path = "../deltalake-core" }
lazy_static = "1"

# workspace depenndecies
async-trait = { workspace = true }
bytes = { workspace = true }
futures = { workspace = true }
tracing = { workspace = true }
object_store = { workspace = true, features = ["azure"]}
thiserror = { workspace = true }
tokio = { workspace = true }
regex = { workspace = true }
url = { workspace = true }

[dev-dependencies]
chrono = { workspace = true }
serial_test = "2"
deltalake-test = { path = "../deltalake-test" }
pretty_env_logger = "*"
rand = "0.8"
serde_json = { workspace = true }

[features]
integration_test = []
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,9 @@ use std::collections::{hash_map::Entry, HashMap};
use std::str::FromStr;

use object_store::azure::AzureConfigKey;
use object_store::Error;
use object_store::Error as ObjectStoreError;

use crate::{DeltaResult, DeltaTableError};
use crate::error::Result;

lazy_static::lazy_static! {
static ref CREDENTIAL_KEYS: Vec<AzureConfigKey> =
Expand Down Expand Up @@ -42,40 +42,8 @@ enum AzureCredential {
WorkloadIdentity,
}

impl FromStr for AzureCredential {
type Err = DeltaTableError;

fn from_str(s: &str) -> Result<Self, Self::Err> {
match s {
"access_key" => Ok(AzureCredential::AccessKey),
"bearer_token" => Ok(AzureCredential::BearerToken),
"client_secret" => Ok(AzureCredential::ClientSecret),
"managed_identity" => Ok(AzureCredential::ManagedIdentity),
"workload_identity" => Ok(AzureCredential::WorkloadIdentity),
"sas_key" => Ok(AzureCredential::SasKey),
_ => Err(DeltaTableError::Generic(format!(
"Cannot parse AzureCredential variant from {}",
s
))),
}
}
}

impl AsRef<str> for AzureCredential {
fn as_ref(&self) -> &str {
match self {
Self::AccessKey => "access_key",
Self::BearerToken => "bearer_token",
Self::ClientSecret => "client_secret",
Self::ManagedIdentity => "managed_identity",
Self::SasKey => "sas_key",
Self::WorkloadIdentity => "workload_identity",
}
}
}

impl AzureCredential {
/// Reys required for config
/// required configuration keys for variant
fn keys(&self) -> Vec<AzureConfigKey> {
match self {
Self::AccessKey => Vec::from_iter([AzureConfigKey::AccessKey]),
Expand Down Expand Up @@ -110,7 +78,7 @@ impl AzureConfigHelper {
/// Create a new [`ConfigHelper`]
pub fn try_new(
config: impl IntoIterator<Item = (impl AsRef<str>, impl Into<String>)>,
) -> DeltaResult<Self> {
) -> Result<Self> {
let mut env_config = HashMap::new();
for (os_key, os_value) in std::env::vars_os() {
if let (Some(key), Some(value)) = (os_key.to_str(), os_value.to_str()) {
Expand All @@ -126,7 +94,7 @@ impl AzureConfigHelper {
config: config
.into_iter()
.map(|(key, value)| Ok((AzureConfigKey::from_str(key.as_ref())?, value.into())))
.collect::<Result<_, Error>>()?,
.collect::<Result<_, ObjectStoreError>>()?,
env_config,
priority: Vec::from_iter([
AzureCredential::AccessKey,
Expand Down Expand Up @@ -156,7 +124,7 @@ impl AzureConfigHelper {
}

/// Generate a cofiguration augmented with options from the environment
pub fn build(mut self) -> DeltaResult<HashMap<AzureConfigKey, String>> {
pub fn build(mut self) -> Result<HashMap<AzureConfigKey, String>> {
let mut has_credential = false;

if self.config.contains_key(&AzureConfigKey::UseAzureCli) {
Expand Down
21 changes: 21 additions & 0 deletions crates/deltalake-azure/src/error.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
use deltalake_core::errors::DeltaTableError;

pub(crate) type Result<T, E = Error> = std::result::Result<T, E>;

#[derive(thiserror::Error, Debug)]
pub(crate) enum Error {
#[error("failed to parse config: {0}")]
Parse(String),

#[error(transparent)]
ObjectStore(#[from] object_store::Error),
}

impl From<Error> for DeltaTableError {
fn from(e: Error) -> Self {
match e {
Error::Parse(msg) => DeltaTableError::Generic(msg),
Error::ObjectStore(e) => DeltaTableError::ObjectStore { source: e },
}
}
}
69 changes: 69 additions & 0 deletions crates/deltalake-azure/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
use std::collections::HashMap;
use std::str::FromStr;
use std::sync::Arc;

use deltalake_core::logstore::{default_logstore, logstores, LogStore, LogStoreFactory};
use deltalake_core::storage::{
factories, url_prefix_handler, ObjectStoreFactory, ObjectStoreRef, StorageOptions,
};
use deltalake_core::{DeltaResult, Path};
use object_store::azure::AzureConfigKey;
use object_store::parse_url_opts;
use url::Url;

mod config;
pub mod error;

trait AzureOptions {
fn as_azure_options(&self) -> HashMap<AzureConfigKey, String>;
}

impl AzureOptions for StorageOptions {
fn as_azure_options(&self) -> HashMap<AzureConfigKey, String> {
self.0
.iter()
.filter_map(|(key, value)| {
Some((
AzureConfigKey::from_str(&key.to_ascii_lowercase()).ok()?,
value.clone(),
))
})
.collect()
}
}

#[derive(Clone, Default, Debug)]
pub struct AzureFactory {}

impl ObjectStoreFactory for AzureFactory {
fn parse_url_opts(
&self,
url: &Url,
options: &StorageOptions,
) -> DeltaResult<(ObjectStoreRef, Path)> {
let config = config::AzureConfigHelper::try_new(options.as_azure_options())?.build()?;
let (store, prefix) = parse_url_opts(url, config)?;
Ok((url_prefix_handler(store, prefix.clone())?, prefix))
}
}

impl LogStoreFactory for AzureFactory {
fn with_options(
&self,
store: ObjectStoreRef,
location: &Url,
options: &StorageOptions,
) -> DeltaResult<Arc<dyn LogStore>> {
Ok(default_logstore(store, location, options))
}
}

/// Register an [ObjectStoreFactory] for common Azure [Url] schemes
pub fn register_handlers(_additional_prefixes: Option<Url>) {
let factory = Arc::new(AzureFactory {});
for scheme in ["az", "adl", "azure", "abfs", "abfss"].iter() {
let url = Url::parse(&format!("{}://", scheme)).unwrap();
factories().insert(url.clone(), factory.clone());
logstores().insert(url.clone(), factory.clone());
}
}
161 changes: 161 additions & 0 deletions crates/deltalake-azure/tests/context.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
use chrono::Utc;
use deltalake_azure::register_handlers;
use deltalake_test::utils::*;
use std::process::ExitStatus;

/// Kinds of storage integration
#[derive(Clone, Debug)]
pub enum MsftIntegration {
Azure(String),
Onelake,
OnelakeAbfs,
}

impl Default for MsftIntegration {
fn default() -> Self {
register_handlers(None);
Self::Azure(format!("test-delta-table-{}", Utc::now().timestamp()))
}
}

impl StorageIntegration for MsftIntegration {
fn prepare_env(&self) {
match self {
Self::Azure(_) => az_cli::prepare_env(),
Self::Onelake => onelake_cli::prepare_env(),
Self::OnelakeAbfs => onelake_cli::prepare_env(),
}
}

fn create_bucket(&self) -> std::io::Result<ExitStatus> {
match self {
Self::Azure(_) => az_cli::create_container(self.bucket_name()),
Self::Onelake => Ok(ExitStatus::default()),
Self::OnelakeAbfs => Ok(ExitStatus::default()),
}
}

fn bucket_name(&self) -> String {
match self {
Self::Azure(name) => name.clone(),
Self::Onelake => {
let account_name =
std::env::var("AZURE_STORAGE_ACCOUNT_NAME").unwrap_or(String::from("onelake"));
let container_name = std::env::var("AZURE_STORAGE_CONTAINER_NAME")
.unwrap_or(String::from("delta-rs"));
format!(
"{0}.dfs.fabric.microsoft.com/{1}",
account_name, container_name
)
}
Self::OnelakeAbfs => {
let account_name =
std::env::var("AZURE_STORAGE_ACCOUNT_NAME").unwrap_or(String::from("onelake"));
let container_name = std::env::var("AZURE_STORAGE_CONTAINER_NAME")
.unwrap_or(String::from("delta-rs"));
format!(
"{0}@{1}.dfs.fabric.microsoft.com",
container_name, account_name
)
}
}
}

fn root_uri(&self) -> String {
format!("az://{}", self.bucket_name())
}

fn copy_directory(&self, source: &str, destination: &str) -> std::io::Result<ExitStatus> {
let destination = format!("{}/{}", self.bucket_name(), destination);
az_cli::copy_directory(source, destination)
}
}

impl Drop for MsftIntegration {
fn drop(&mut self) {
az_cli::delete_container(self.bucket_name()).expect("Failed to drop bucket");
}
}

//cli for onelake
mod onelake_cli {
use super::set_env_if_not_set;
/// prepare_env
pub fn prepare_env() {
let token = "jwt-token";
set_env_if_not_set("AZURE_STORAGE_USE_EMULATOR", "0");
set_env_if_not_set("AZURE_STORAGE_ACCOUNT_NAME", "daily-onelake");
set_env_if_not_set(
"AZURE_STORAGE_CONTAINER_NAME",
"86bc63cf-5086-42e0-b16d-6bc580d1dc87",
);
set_env_if_not_set("AZURE_STORAGE_TOKEN", token);
}
}

/// small wrapper around az cli
mod az_cli {
use super::set_env_if_not_set;
use std::process::{Command, ExitStatus};

/// Create a new bucket
pub fn create_container(container_name: impl AsRef<str>) -> std::io::Result<ExitStatus> {
let mut child = Command::new("az")
.args([
"storage",
"container",
"create",
"-n",
container_name.as_ref(),
])
.spawn()
.expect("az command is installed");
child.wait()
}

/// delete bucket
pub fn delete_container(container_name: impl AsRef<str>) -> std::io::Result<ExitStatus> {
let mut child = Command::new("az")
.args([
"storage",
"container",
"delete",
"-n",
container_name.as_ref(),
])
.spawn()
.expect("az command is installed");
child.wait()
}

/// copy directory
pub fn copy_directory(
source: impl AsRef<str>,
destination: impl AsRef<str>,
) -> std::io::Result<ExitStatus> {
let mut child = Command::new("az")
.args([
"storage",
"blob",
"upload-batch",
"-s",
source.as_ref(),
"-d",
destination.as_ref(),
])
.spawn()
.expect("az command is installed");
child.wait()
}

/// prepare_env
pub fn prepare_env() {
set_env_if_not_set("AZURE_STORAGE_USE_EMULATOR", "1");
set_env_if_not_set("AZURE_STORAGE_ACCOUNT_NAME", "devstoreaccount1");
set_env_if_not_set("AZURE_STORAGE_ACCOUNT_KEY", "Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==");
set_env_if_not_set(
"AZURE_STORAGE_CONNECTION_STRING",
"DefaultEndpointsProtocol=http;AccountName=devstoreaccount1;AccountKey=Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==;BlobEndpoint=http://localhost:10000/devstoreaccount1;"
);
}
}
Loading

0 comments on commit b0dd33c

Please sign in to comment.