From b6500d8314af369180c594d2ed88f125ca7cddcd Mon Sep 17 00:00:00 2001 From: Junjun Dong Date: Fri, 6 Oct 2023 22:34:30 -0700 Subject: [PATCH] feat: Add convert_to_delta - Add percent decoding - Add convert_to_delta_path - Add more unit tests --- rust/Cargo.toml | 1 - rust/src/lib.rs | 2 + rust/src/table/convert_to_delta.rs | 484 +++++++++++++++++++++-------- rust/src/writer/utils.rs | 2 +- 4 files changed, 360 insertions(+), 129 deletions(-) diff --git a/rust/Cargo.toml b/rust/Cargo.toml index 0a89f92eb6..66b9c7e5b9 100644 --- a/rust/Cargo.toml +++ b/rust/Cargo.toml @@ -126,7 +126,6 @@ tempfile = "3" tokio = { version = "1", features = ["macros", "rt-multi-thread"] } utime = "0.3" hyper = { version = "0.14", features = ["server"] } -test-case = "3.2.1" [features] azure = ["object_store/azure"] diff --git a/rust/src/lib.rs b/rust/src/lib.rs index af692fd5c9..eedec98a8e 100644 --- a/rust/src/lib.rs +++ b/rust/src/lib.rs @@ -126,6 +126,8 @@ pub use protocol::checkpoints; #[cfg(feature = "integration_test")] pub mod test_utils; +const NULL_PARTITION_VALUE_DATA_PATH: &str = "__HIVE_DEFAULT_PARTITION__"; + /// Creates and loads a DeltaTable from the given path with current metadata. /// Infers the storage backend to use from the scheme in the given table path. pub async fn open_table(table_uri: impl AsRef) -> Result { diff --git a/rust/src/table/convert_to_delta.rs b/rust/src/table/convert_to_delta.rs index 76b68560b0..2b96978c08 100644 --- a/rust/src/table/convert_to_delta.rs +++ b/rust/src/table/convert_to_delta.rs @@ -1,35 +1,36 @@ //! Convert a Parquet table to a Delta table in place use crate::{ - errors::DeltaTableError, open_table, operations::create::CreateBuilder, protocol::{Action, Add, SaveMode}, - schema::Schema, - storage::DeltaObjectStore, - DeltaTable, DeltaTablePartition, SchemaDataType, SchemaField, + storage::{DeltaObjectStore, ObjectStoreRef}, + table::builder::ensure_table_uri, + DeltaTable, DeltaTableConfig, DeltaTableError, DeltaTablePartition, ObjectMeta, ObjectStore, + ObjectStoreError, Path, Schema, SchemaDataType, SchemaField, NULL_PARTITION_VALUE_DATA_PATH, }; use arrow::{datatypes::Schema as ArrowSchema, error::ArrowError}; use futures::StreamExt; -use log::*; -use object_store::{ObjectMeta, ObjectStore}; +use log::{debug, info}; use parquet::{ arrow::async_reader::{ParquetObjectReader, ParquetRecordBatchStreamBuilder}, errors::ParquetError, }; +use percent_encoding::percent_decode_str; use std::{ + borrow::Cow, collections::{HashMap, HashSet}, num::TryFromIntError, + str::Utf8Error, sync::Arc, }; -use url::Url; /// Error converting a Parquet table to a Delta table #[derive(Debug, thiserror::Error)] pub enum Error { /// Object store error #[error("Object store error: {0}")] - ObjectStore(#[from] object_store::Error), + ObjectStore(#[from] ObjectStoreError), /// Arrow error #[error("Arrow error: {0}")] Arrow(#[from] ArrowError), @@ -39,19 +40,43 @@ pub enum Error { /// DeltaTable error #[error("DeltaTable error: {0}")] DeltaTable(#[from] DeltaTableError), + /// Error percent-decoding as UTF-8 + #[error("Error percent-decoding as UTF-8: {0}")] + PercentDecode(#[from] Utf8Error), /// Error converting usize to i64 #[error("Error converting usize to i64: {0}")] TryFromUsize(#[from] TryFromIntError), - /// Error canonicalizing the given path - #[error("Error canonicalizing the given path: {0}")] - CanonicalizePath(#[from] std::io::Error), /// Error converting the path into an URL #[error("Error converting the path into an URL")] UrlFromFilePath, + /// Error reaching the end of an iterator + #[error("Error reaching the end of an iterator")] + EndOfIterator, } -/// Convert a Parquet table to a Delta table in place -pub async fn convert_to_delta(path: &str) -> Result { +/// Convert a Parquet table to a Delta table in place. +/// +/// # Arguments +/// +/// * `storage` - A shared reference to an [`DeltaObjectStore`](crate::storage::DeltaObjectStore) with "/" pointing at delta table root (i.e. where `_delta_log` is located). +pub async fn convert_to_delta(storage: ObjectStoreRef) -> Result { + if storage.is_delta_table_location().await? { + info!("A Delta table already exists in the given object store"); + let mut table = DeltaTable::new(storage, DeltaTableConfig::default()); + table.load().await?; + Ok(table) + } else { + let parquet_files = parquet_files(storage.clone()).await?; + delta_table(parquet_files, storage).await + } +} + +/// Convert a Parquet table to a Delta table in place. +/// +/// # Arguments +/// +/// * `path` - A path with "/" pointing at delta table root (i.e. where `_delta_log` is located). +pub async fn convert_to_delta_from_path(path: &str) -> Result { match open_table(path).await { Ok(table) => { info!("A Delta table already exists in the given path"); @@ -59,32 +84,24 @@ pub async fn convert_to_delta(path: &str) -> Result { } Err(_) => { info!("Converting Parquet table into a Delta table..."); - let url = url(path)?; - let store = object_store(url.clone())?; - let parquet_files = parquet_files(store.clone()).await?; - delta_table(parquet_files, store, url).await + let storage = object_store(path)?; + let parquet_files = parquet_files(storage.clone()).await?; + delta_table(parquet_files, storage).await } } } -fn url(path: &str) -> Result { - Ok(if let Ok(url) = Url::parse(path) { - url - } else { - info!( - "Cannot parse an absolute URL from the given path {path:?}. Canonicalizing the path..." - ); - Url::from_file_path(std::fs::canonicalize(path)?).map_err(|_| Error::UrlFromFilePath)? - }) -} - -fn object_store(url: Url) -> Result, Error> { - debug!("Creating an object store for URL: {url:#?}"); - Ok(Arc::new(DeltaObjectStore::try_new(url, HashMap::new())?)) +fn object_store(path: &str) -> Result { + debug!("Creating an object store for path: {path:?}"); + let location = ensure_table_uri(path)?; + Ok(Arc::new(DeltaObjectStore::try_new( + location, + HashMap::new(), + )?)) } -async fn parquet_files(store: Arc) -> Result, Error> { - let objects = store.list(None).await?; +async fn parquet_files(storage: ObjectStoreRef) -> Result, Error> { + let objects = storage.list(None).await?; Ok(objects .filter_map(|path| async move { if let Ok(meta) = path { @@ -102,16 +119,16 @@ async fn parquet_files(store: Arc) -> Result, .await) } -async fn arrow_schema( - file: &ObjectMeta, - store: Arc, -) -> Result { +async fn arrow_schema(file: &ObjectMeta, storage: ObjectStoreRef) -> Result { Ok( - ParquetRecordBatchStreamBuilder::new(ParquetObjectReader::new(store.clone(), file.clone())) - .await? - .schema() - .as_ref() - .clone(), + ParquetRecordBatchStreamBuilder::new(ParquetObjectReader::new( + storage.clone(), + file.clone(), + )) + .await? + .schema() + .as_ref() + .clone(), ) } @@ -120,7 +137,8 @@ fn add_action( partition_values: HashMap>, ) -> Result { Ok(Action::add(Add { - path: file.location.to_string(), + // Percent decoded file path + path: percent_decode(file.location.as_ref())?.to_string(), size: i64::try_from(file.size)?, partition_values, modification_time: file.last_modified.timestamp_millis(), @@ -129,23 +147,29 @@ fn add_action( } fn partitions( - file: &ObjectMeta, + path: &Path, // A HashSet for all unique partition columns in a Parquet table partition_columns: &mut HashSet, - // A Vector for schema fields of all unique partition columns in a Parquet table + // A Vector for schema fields from all unique partition columns in a Parquet table partition_schema_fields: &mut Vec, ) -> Result>, Error> { - // A HashMap from partition column to value for this parquet file + // A HashMap from partition column to value for this parquet file only let mut partitions = HashMap::new(); - for s in file.location.as_ref().split('/') { - if !s.ends_with(".parquet") { - let partition = DeltaTablePartition::try_from(s)?; + let mut iter = path.as_ref().split('/').peekable(); + let mut subpath = iter.next(); + // Get partition from subpaths. Skip the last subpath + while iter.peek().is_some() { + if let Some(subpath) = subpath { + // Percent decode the subpath + let subpath = percent_decode(subpath)?; + let partition = DeltaTablePartition::try_from(subpath.as_ref())?; + debug!("Found partition {partition:#?} in parquet file {path:#?}"); let (key, val) = (partition.key.to_string(), partition.value.to_string()); - debug!( - "Found partition {partition:#?} in parquet file {:#?}", - file.location - ); - partitions.insert(key.clone(), Some(val)); + if val == NULL_PARTITION_VALUE_DATA_PATH { + partitions.insert(key.clone(), None); + } else { + partitions.insert(key.clone(), Some(val)); + } if partition_columns.insert(key.clone()) { partition_schema_fields.push(SchemaField::new( key, @@ -154,23 +178,26 @@ fn partitions( HashMap::new(), )); } + } else { + // This error shouldn't happen. The while condition ensures that subpath is not none + return Err(Error::EndOfIterator); } + subpath = iter.next(); } Ok(partitions) } -async fn delta_table( - files: Vec, - store: Arc, - url: Url, -) -> Result { +async fn delta_table(files: Vec, storage: ObjectStoreRef) -> Result { let (mut schemas, mut actions, mut partition_columns, mut partition_schema_fields) = (Vec::new(), Vec::new(), HashSet::new(), Vec::new()); for file in files { debug!("Processing parquet file: {:#?}", file.location); - schemas.push(arrow_schema(&file, store.clone()).await?); - let partition_values = - partitions(&file, &mut partition_columns, &mut partition_schema_fields)?; + schemas.push(arrow_schema(&file, storage.clone()).await?); + let partition_values = partitions( + &file.location, + &mut partition_columns, + &mut partition_schema_fields, + )?; actions.push(add_action(&file, partition_values)?); } let mut schema_fields = Schema::try_from(&ArrowSchema::try_merge(schemas)?)? @@ -179,8 +206,7 @@ async fn delta_table( schema_fields.append(&mut partition_schema_fields); debug!("Schema fields for the parquet table: {schema_fields:#?}"); Ok(CreateBuilder::new() - .with_location(url) - .with_object_store(store.clone()) + .with_object_store(storage.clone()) .with_columns(schema_fields) .with_partition_columns(partition_columns.into_iter()) .with_actions(actions) @@ -188,13 +214,20 @@ async fn delta_table( .await?) } +fn percent_decode(path: &str) -> Result, Utf8Error> { + percent_decode_str(path).decode_utf8() +} + #[cfg(test)] mod tests { use super::{ - object_store, parquet_files, partitions, url, HashMap, HashSet, SchemaDataType, SchemaField, + /*convert_to_delta,*/ object_store, parquet_files, partitions, HashMap, HashSet, Path, + SchemaDataType, SchemaField, }; - use object_store::path::Path; - use test_case::test_case; + /* + use std::fs; + use tempfile::TempDir; + */ fn schema_field(key: &str) -> SchemaField { SchemaField::new( @@ -205,73 +238,270 @@ mod tests { ) } - #[test_case("tests/data/delta-0.8.0" => - vec![ - Path::from("part-00000-c9b90f86-73e6-46c8-93ba-ff6bfaf892a1-c000.snappy.parquet"), - Path::from("part-00000-04ec9591-0b73-459e-8d18-ba5711d6cbe1-c000.snappy.parquet"), - Path::from("part-00001-911a94a2-43f6-4acb-8620-5e68c2654989-c000.snappy.parquet"), - ] - )] - #[test_case("tests/data/delta-0.8.0-numeric-partition" => - vec![ - Path::from("x=9/y=9.9/part-00007-3c50fba1-4264-446c-9c67-d8e24a1ccf83.c000.snappy.parquet"), - Path::from("x=10/y=10.0/part-00015-24eb4845-2d25-4448-b3bb-5ed7f12635ab.c000.snappy.parquet"), - ] - )] + async fn get_file_paths_and_partitions( + path: &str, + ) -> (Vec, HashSet, Vec) { + let files = parquet_files(object_store(path).expect("Failed to create an object store")) + .await + .expect("Failed to get parquet files"); + let (mut paths, mut partition_columns, mut partition_schema_fields) = + (Vec::new(), HashSet::new(), Vec::new()); + for file in files { + partitions( + &file.location, + &mut partition_columns, + &mut partition_schema_fields, + ) + .expect("Failed to get partitions"); + paths.push(file.location); + } + (paths, partition_columns, partition_schema_fields) + } + #[tokio::test] - async fn test_parquet_files(path: &str) -> Vec { - let files = parquet_files( - object_store(url(path).expect("Failed to get URL from the path")) - .expect("Failed to create an object store"), + async fn test_parquet_files_and_partitions() { + assert_eq!( + get_file_paths_and_partitions("tests/data/delta-0.8.0").await, + ( + vec![ + Path::from( + "part-00000-c9b90f86-73e6-46c8-93ba-ff6bfaf892a1-c000.snappy.parquet" + ), + Path::from( + "part-00000-04ec9591-0b73-459e-8d18-ba5711d6cbe1-c000.snappy.parquet" + ), + Path::from( + "part-00001-911a94a2-43f6-4acb-8620-5e68c2654989-c000.snappy.parquet" + ), + ], + HashSet::new(), + Vec::new() + ) + ); + assert_eq!( + get_file_paths_and_partitions("tests/data/delta-0.8.0-partitioned").await, + ( + vec![ + Path::from("year=2020/month=1/day=1/part-00000-8eafa330-3be9-4a39-ad78-fd13c2027c7e.c000.snappy.parquet"), + Path::from("year=2020/month=2/day=3/part-00000-94d16827-f2fd-42cd-a060-f67ccc63ced9.c000.snappy.parquet"), + Path::from("year=2020/month=2/day=5/part-00000-89cdd4c8-2af7-4add-8ea3-3990b2f027b5.c000.snappy.parquet"), + Path::from("year=2021/month=4/day=5/part-00000-c5856301-3439-4032-a6fc-22b7bc92bebb.c000.snappy.parquet"), + Path::from("year=2021/month=12/day=20/part-00000-9275fdf4-3961-4184-baa0-1c8a2bb98104.c000.snappy.parquet"), + Path::from("year=2021/month=12/day=4/part-00000-6dc763c0-3e8b-4d52-b19e-1f92af3fbb25.c000.snappy.parquet"), + ], + HashSet::from(["month".to_string(), "day".to_string(), "year".to_string()]), + vec![ + schema_field("year"), + schema_field("month"), + schema_field("day") + ], + ) + ); + assert_eq!( + get_file_paths_and_partitions("tests/data/delta-0.8.0-null-partition").await, + ( + vec![ + Path::from("k=__HIVE_DEFAULT_PARTITION__/part-00001-8474ac85-360b-4f58-b3ea-23990c71b932.c000.snappy.parquet"), + Path::from("k=A/part-00000-b1f1dbbb-70bc-4970-893f-9bb772bf246e.c000.snappy.parquet"), + ], + HashSet::from(["k".to_string()]), + vec![schema_field("k")] + ) + ); + + assert_eq!( + get_file_paths_and_partitions("tests/data/delta-0.8.0-numeric-partition").await, + ( + vec![ + Path::from("x=9/y=9.9/part-00007-3c50fba1-4264-446c-9c67-d8e24a1ccf83.c000.snappy.parquet"), + Path::from("x=10/y=10.0/part-00015-24eb4845-2d25-4448-b3bb-5ed7f12635ab.c000.snappy.parquet"), + ], + HashSet::from(["x".to_string(), "y".to_string()]), + vec![schema_field("x"), schema_field("y")], + ) + ); + assert_eq!( + get_file_paths_and_partitions("tests/data/delta-0.8.0-special-partition").await, + ( + vec![ + Path::parse( + "x=A%2FA/part-00007-b350e235-2832-45df-9918-6cab4f7578f7.c000.snappy.parquet" + ) + .expect("Failed to parse a string as a Path"), + Path::parse( + "x=B%20B/part-00015-e9abbc6f-85e9-457b-be8e-e9f5b8a22890.c000.snappy.parquet" + ) + .expect("Failed to parse a string as a Path") + ], + HashSet::from(["x".to_string()]), + vec![schema_field("x")] + ) + ); + assert_eq!( + get_file_paths_and_partitions("tests/data/delta-2.2.0-partitioned-types").await, + ( + vec![ + Path::from("c1=6/c2=a/part-00011-10619b10-b691-4fd0-acc4-2a9608499d7c.c000.snappy.parquet"), + Path::from("c1=5/c2=b/part-00007-4e73fa3b-2c88-424a-8051-f8b54328ffdb.c000.snappy.parquet"), + Path::from("c1=4/c2=c/part-00003-f525f459-34f9-46f5-82d6-d42121d883fd.c000.snappy.parquet"), + ], + HashSet::from(["c1".to_string(), "c2".to_string()]), + vec![schema_field("c1"), schema_field("c2")], + ) + ); + } + + fn partition_key_values(path: &str) -> HashMap> { + partitions( + &Path::parse(path).expect("Failed to parse a string as a Path"), + &mut HashSet::new(), + &mut Vec::new(), ) - .await - .expect("Failed to get parquet files"); - files - .into_iter() - .map(|meta| meta.location) - .collect::>() + .expect("Failed to get partitions") } - #[test_case( - "tests/data/delta-0.8.0-partitioned" => ( - HashSet::from(["month".to_string(), "day".to_string(), "year".to_string()]), - vec![ - schema_field("year"), - schema_field("month"), - schema_field("day"), - ] - ))] - #[test_case( - "tests/data/delta-0.8.0-special-partition" => ( - HashSet::from(["x".to_string()]), - vec![ - schema_field("x"), - ] - ))] - #[test_case( - "tests/data/delta-0.8.0-null-partition" => ( - HashSet::from(["k".to_string()]), - vec![ - schema_field("k"), - ] - ))] - #[tokio::test] - async fn test_partitions(path: &str) -> (HashSet, Vec) { - let files = parquet_files( - object_store(url(path).expect("Failed to get URL from the path")) - .expect("Failed to create an object store"), + #[test] + fn test_partition_key_values() { + assert_eq!( + partition_key_values( + "x=A%2FA/part-00007-b350e235-2832-45df-9918-6cab4f7578f7.c000.snappy.parquet", + ), + HashMap::from([("x".to_string(), Some("A/A".to_string()))]) + ); + assert_eq!( + partition_key_values( + "x=B%20B/part-00015-e9abbc6f-85e9-457b-be8e-e9f5b8a22890.c000.snappy.parquet" + ), + HashMap::from([("x".to_string(), Some("B B".to_string()))]) + ); + assert_eq!( + partition_key_values( + "k=__HIVE_DEFAULT_PARTITION__/part-00001-8474ac85-360b-4f58-b3ea-23990c71b932.c000.snappy.parquet" + ), + HashMap::from([("k".to_string(), None)]) + ); + assert_eq!( + partition_key_values( + "x=9/y=9.9/part-00007-3c50fba1-4264-446c-9c67-d8e24a1ccf83.c000.snappy.parquet", + ), + HashMap::from([ + ("x".to_string(), Some("9".to_string())), + ("y".to_string(), Some("9.9".to_string())) + ]) + ); + assert_eq!( + partition_key_values( + "c1=6/c2=a/part-00011-10619b10-b691-4fd0-acc4-2a9608499d7c.c000.snappy.parquet" + ), + HashMap::from([ + ("c1".to_string(), Some("6".to_string())), + ("c2".to_string(), Some("a".to_string())) + ]) + ); + assert_eq!( + partition_key_values( + "part-00000-c9b90f86-73e6-46c8-93ba-ff6bfaf892a1-c000.snappy.parquet" + ), + HashMap::new() + ); + assert_eq!( + partition_key_values( + "year=2021/month=12/day=4/part-00000-6dc763c0-3e8b-4d52-b19e-1f92af3fbb25.c000.snappy.parquet" + ), + HashMap::from([ + ("year".to_string(), Some("2021".to_string())), + ("month".to_string(), Some("12".to_string())), + ("day".to_string(), Some("4".to_string())), + ]) + ); + assert_eq!( + partition_key_values( + "date=2023-04-13/part-00000-e853fe2e-6f42-450c-8af1-4145b73a96c7-c000.snappy.parquet", + ), + HashMap::from([("date".to_string(), Some("2023-04-13".to_string()))]) ) - .await - .expect("Failed to get parquet files"); - let (mut partition_columns, mut partition_schema_fields) = (HashSet::new(), Vec::new()); + } + + /* + fn copy_parquet_files(src: impl AsRef, dst: impl AsRef) { + let files = fs::read_dir(src).expect("Failed to read source directory"); for file in files { - partitions(&file, &mut partition_columns, &mut partition_schema_fields) - .expect("Failed to get partitions"); + let file = file.expect("Failed to read file"); + let name = file.path().file_name().map(|name| name.to_str()); + // Skip Delta log + if Some("_delta_log") == name { + println!("///////////////// file name = {:?}", file.path().display()); + } + if file.file_type().is_dir() { + println!( + "/////////////////// subdirectory = {:?}", + dst.af_ref().join(name).display() + ); + copy_dir(file.path(), dst.af_ref().join(name)) + .expect("Failed to copy subdirectory"); + } else { + fs::copy(file.path(), dst.af_ref().join(name)).expect("Failed to copy file"); + } } - (partition_columns, partition_schema_fields) } - // TODO - #[allow(dead_code)] - fn test_convert_to_delta() {} + #[tokio::test] + async fn test_convert_to_delta() { + let path = "tests/data/delta-0.8.0-partitioned"; + let store = object_store(path).expect("Failed to create an object store"); + let _table = convert_to_delta(store) + .await + .expect("Failed to convert to Delta table"); + let temp_dir = TempDir::new().expect("Failed to create a temp directory"); + copy_parquet_files(path, temp_dir); + panic!(); + assert_eq!(table.version(), 0); + assert_eq!( + table.get_partition_values().cloned().collect::>(), + vec![ + HashMap::from([ + ("year".to_string(), Some("2020".to_string())), + ("month".to_string(), Some("1".to_string())), + ("day".to_string(), Some("1".to_string())), + ]), + HashMap::from([ + ("year".to_string(), Some("2020".to_string())), + ("month".to_string(), Some("2".to_string())), + ("day".to_string(), Some("3".to_string())), + ]), + HashMap::from([ + ("year".to_string(), Some("2020".to_string())), + ("month".to_string(), Some("2".to_string())), + ("day".to_string(), Some("5".to_string())), + ]), + HashMap::from([ + ("year".to_string(), Some("2021".to_string())), + ("month".to_string(), Some("4".to_string())), + ("day".to_string(), Some("5".to_string())), + ]), + HashMap::from([ + ("year".to_string(), Some("2021".to_string())), + ("month".to_string(), Some("12".to_string())), + ("day".to_string(), Some("20".to_string())), + ]), + HashMap::from([ + ("year".to_string(), Some("2021".to_string())), + ("month".to_string(), Some("12".to_string())), + ("day".to_string(), Some("4".to_string())), + ]), + ] + ); + assert_eq!( + *table + .get_schema() + .expect("Failed to get schema") + .get_fields(), + vec![ + schema_field("year"), + schema_field("month"), + schema_field("day") + ], + ); + } + */ } diff --git a/rust/src/writer/utils.rs b/rust/src/writer/utils.rs index cfc089c164..06e3f27e2d 100644 --- a/rust/src/writer/utils.rs +++ b/rust/src/writer/utils.rs @@ -27,8 +27,8 @@ use uuid::Uuid; use crate::errors::DeltaResult; use crate::writer::DeltaWriterError; +use crate::NULL_PARTITION_VALUE_DATA_PATH; -const NULL_PARTITION_VALUE_DATA_PATH: &str = "__HIVE_DEFAULT_PARTITION__"; const PARTITION_DATE_FORMAT: &str = "%Y-%m-%d"; const PARTITION_DATETIME_FORMAT: &str = "%Y-%m-%d %H:%M:%S";