From c386a608c81ac66f8b47301100b15744c15ca457 Mon Sep 17 00:00:00 2001 From: Junjun Dong Date: Thu, 12 Oct 2023 17:15:16 -0700 Subject: [PATCH] feat: Add convert_to_delta - Add more unit tests - Remove metadata in arrow schemas to avoid merging conflicts - Return immediately with an error in parquet_files if the stream returns an error - Set data_change to true in Add actions --- rust/src/table/convert_to_delta.rs | 609 ++++++++++++++++++++--------- 1 file changed, 420 insertions(+), 189 deletions(-) diff --git a/rust/src/table/convert_to_delta.rs b/rust/src/table/convert_to_delta.rs index 2b96978c08..7bdb2dd90f 100644 --- a/rust/src/table/convert_to_delta.rs +++ b/rust/src/table/convert_to_delta.rs @@ -10,7 +10,7 @@ use crate::{ ObjectStoreError, Path, Schema, SchemaDataType, SchemaField, NULL_PARTITION_VALUE_DATA_PATH, }; use arrow::{datatypes::Schema as ArrowSchema, error::ArrowError}; -use futures::StreamExt; +use futures::{future, TryStreamExt}; use log::{debug, info}; use parquet::{ arrow::async_reader::{ParquetObjectReader, ParquetRecordBatchStreamBuilder}, @@ -52,9 +52,12 @@ pub enum Error { /// Error reaching the end of an iterator #[error("Error reaching the end of an iterator")] EndOfIterator, + /// No parquet file is found in the given path + #[error("No parquet file is found in the given path")] + ParquetFileNotFound, } -/// Convert a Parquet table to a Delta table in place. +/// Convert a Parquet table to a Delta table in place /// /// # Arguments /// @@ -64,14 +67,17 @@ pub async fn convert_to_delta(storage: ObjectStoreRef) -> Result Result Ok(table) } Err(_) => { - info!("Converting Parquet table into a Delta table..."); + info!("Converting Parquet table in path: {path:?}"); let storage = object_store(path)?; let parquet_files = parquet_files(storage.clone()).await?; delta_table(parquet_files, storage).await @@ -102,34 +108,34 @@ fn object_store(path: &str) -> Result { async fn parquet_files(storage: ObjectStoreRef) -> Result, Error> { let objects = storage.list(None).await?; - Ok(objects - .filter_map(|path| async move { - if let Ok(meta) = path { - if Some("parquet") == meta.location.extension() { - debug!("Found parquet file in {:#?}", meta.location); - Some(meta) - } else { - None - } - } else { - None - } - }) - .collect() - .await) + let mut files = Vec::new(); + let fut = objects.try_for_each(|meta| { + if Some("parquet") == meta.location.extension() { + debug!("Found parquet file {:#?}", meta.location); + files.push(meta); + } + future::ready(Ok(())) + }); + // Return immediately with an error if the stream returns an error + fut.await?; + if files.is_empty() { + return Err(Error::ParquetFileNotFound); + } + Ok(files) } async fn arrow_schema(file: &ObjectMeta, storage: ObjectStoreRef) -> Result { - Ok( - ParquetRecordBatchStreamBuilder::new(ParquetObjectReader::new( - storage.clone(), - file.clone(), - )) - .await? - .schema() - .as_ref() - .clone(), - ) + let mut arrow_schema = ParquetRecordBatchStreamBuilder::new(ParquetObjectReader::new( + storage.clone(), + file.clone(), + )) + .await? + .schema() + .as_ref() + .clone(); + // Arrow schemas may have conflicting metatdata. Since Arrow Schema's metadata is not used when generating Delta table schema, we set the metadata field to an empty HashMap. + arrow_schema.metadata = HashMap::new(); + Ok(arrow_schema) } fn add_action( @@ -137,30 +143,29 @@ fn add_action( partition_values: HashMap>, ) -> Result { Ok(Action::add(Add { - // Percent decoded file path path: percent_decode(file.location.as_ref())?.to_string(), size: i64::try_from(file.size)?, partition_values, modification_time: file.last_modified.timestamp_millis(), + data_change: true, ..Default::default() })) } fn partitions( path: &Path, - // A HashSet for all unique partition columns in a Parquet table + // A mutable reference to the HashSet of all unique partition columns in a Parquet table partition_columns: &mut HashSet, - // A Vector for schema fields from all unique partition columns in a Parquet table + // A mutable reference to the vector of schema fields of all unique partition columns in a Parquet table partition_schema_fields: &mut Vec, ) -> Result>, Error> { // A HashMap from partition column to value for this parquet file only let mut partitions = HashMap::new(); let mut iter = path.as_ref().split('/').peekable(); let mut subpath = iter.next(); - // Get partition from subpaths. Skip the last subpath + // Get partitions from subpaths. Skip the last subpath while iter.peek().is_some() { if let Some(subpath) = subpath { - // Percent decode the subpath let subpath = percent_decode(subpath)?; let partition = DeltaTablePartition::try_from(subpath.as_ref())?; debug!("Found partition {partition:#?} in parquet file {path:#?}"); @@ -188,11 +193,11 @@ fn partitions( } async fn delta_table(files: Vec, storage: ObjectStoreRef) -> Result { - let (mut schemas, mut actions, mut partition_columns, mut partition_schema_fields) = + let (mut arrow_schemas, mut actions, mut partition_columns, mut partition_schema_fields) = (Vec::new(), Vec::new(), HashSet::new(), Vec::new()); for file in files { debug!("Processing parquet file: {:#?}", file.location); - schemas.push(arrow_schema(&file, storage.clone()).await?); + arrow_schemas.push(arrow_schema(&file, storage.clone()).await?); let partition_values = partitions( &file.location, &mut partition_columns, @@ -200,7 +205,7 @@ async fn delta_table(files: Vec, storage: ObjectStoreRef) -> Result< )?; actions.push(add_action(&file, partition_values)?); } - let mut schema_fields = Schema::try_from(&ArrowSchema::try_merge(schemas)?)? + let mut schema_fields = Schema::try_from(&ArrowSchema::try_merge(arrow_schemas)?)? .get_fields() .clone(); schema_fields.append(&mut partition_schema_fields); @@ -221,31 +226,76 @@ fn percent_decode(path: &str) -> Result, Utf8Error> { #[cfg(test)] mod tests { use super::{ - /*convert_to_delta,*/ object_store, parquet_files, partitions, HashMap, HashSet, Path, - SchemaDataType, SchemaField, + convert_to_delta, convert_to_delta_from_path, object_store, parquet_files, partitions, Add, + DeltaTable, HashMap, HashSet, Path, SchemaDataType, SchemaField, }; - /* + use crate::PartitionFilter; + use pretty_assertions::assert_eq; use std::fs; - use tempfile::TempDir; - */ + use tempfile::tempdir; - fn schema_field(key: &str) -> SchemaField { + fn schema_field(key: &str, primitive: &str, nullable: bool) -> SchemaField { SchemaField::new( key.to_string(), - SchemaDataType::primitive("string".to_string()), - true, + SchemaDataType::primitive(primitive.to_string()), + nullable, HashMap::new(), ) } - async fn get_file_paths_and_partitions( - path: &str, - ) -> (Vec, HashSet, Vec) { + async fn get_parquet_files(path: &str) -> Vec { + parquet_files(object_store(path).expect("Failed to create an object store")) + .await + .expect("Failed to get parquet files") + .into_iter() + .map(|file| file.location) + .collect() + } + + #[tokio::test] + async fn test_parquet_files() { + assert_eq!( + get_parquet_files("tests/data/delta-0.8.0").await, + vec![ + Path::from("part-00000-c9b90f86-73e6-46c8-93ba-ff6bfaf892a1-c000.snappy.parquet"), + Path::from("part-00000-04ec9591-0b73-459e-8d18-ba5711d6cbe1-c000.snappy.parquet"), + Path::from("part-00001-911a94a2-43f6-4acb-8620-5e68c2654989-c000.snappy.parquet"), + ], + ); + assert_eq!( + get_parquet_files("tests/data/delta-0.8.0-null-partition").await, + vec![ + Path::from("k=__HIVE_DEFAULT_PARTITION__/part-00001-8474ac85-360b-4f58-b3ea-23990c71b932.c000.snappy.parquet"), + Path::from("k=A/part-00000-b1f1dbbb-70bc-4970-893f-9bb772bf246e.c000.snappy.parquet"), + ], + ); + assert_eq!( + get_parquet_files("tests/data/delta-0.8.0-numeric-partition").await, + vec![ + Path::from("x=9/y=9.9/part-00007-3c50fba1-4264-446c-9c67-d8e24a1ccf83.c000.snappy.parquet"), + Path::from("x=10/y=10.0/part-00015-24eb4845-2d25-4448-b3bb-5ed7f12635ab.c000.snappy.parquet"), + ], + ); + assert_eq!( + get_parquet_files("tests/data/delta-0.8.0-special-partition").await, + vec![ + Path::parse( + "x=A%2FA/part-00007-b350e235-2832-45df-9918-6cab4f7578f7.c000.snappy.parquet" + ) + .expect("Failed to parse as Path"), + Path::parse( + "x=B%20B/part-00015-e9abbc6f-85e9-457b-be8e-e9f5b8a22890.c000.snappy.parquet" + ) + .expect("Failed to parse as Path") + ], + ); + } + + async fn get_partitions(path: &str) -> (HashSet, Vec) { let files = parquet_files(object_store(path).expect("Failed to create an object store")) .await .expect("Failed to get parquet files"); - let (mut paths, mut partition_columns, mut partition_schema_fields) = - (Vec::new(), HashSet::new(), Vec::new()); + let (mut partition_columns, mut partition_schema_fields) = (HashSet::new(), Vec::new()); for file in files { partitions( &file.location, @@ -253,107 +303,67 @@ mod tests { &mut partition_schema_fields, ) .expect("Failed to get partitions"); - paths.push(file.location); } - (paths, partition_columns, partition_schema_fields) + (partition_columns, partition_schema_fields) } #[tokio::test] - async fn test_parquet_files_and_partitions() { + async fn test_partitions() { assert_eq!( - get_file_paths_and_partitions("tests/data/delta-0.8.0").await, - ( - vec![ - Path::from( - "part-00000-c9b90f86-73e6-46c8-93ba-ff6bfaf892a1-c000.snappy.parquet" - ), - Path::from( - "part-00000-04ec9591-0b73-459e-8d18-ba5711d6cbe1-c000.snappy.parquet" - ), - Path::from( - "part-00001-911a94a2-43f6-4acb-8620-5e68c2654989-c000.snappy.parquet" - ), - ], - HashSet::new(), - Vec::new() - ) + get_partitions("tests/data/delta-0.8.0").await, + (HashSet::new(), Vec::new()) ); assert_eq!( - get_file_paths_and_partitions("tests/data/delta-0.8.0-partitioned").await, + get_partitions("tests/data/delta-0.8.0-partitioned").await, ( + HashSet::from_iter(["month".to_string(), "day".to_string(), "year".to_string()]), vec![ - Path::from("year=2020/month=1/day=1/part-00000-8eafa330-3be9-4a39-ad78-fd13c2027c7e.c000.snappy.parquet"), - Path::from("year=2020/month=2/day=3/part-00000-94d16827-f2fd-42cd-a060-f67ccc63ced9.c000.snappy.parquet"), - Path::from("year=2020/month=2/day=5/part-00000-89cdd4c8-2af7-4add-8ea3-3990b2f027b5.c000.snappy.parquet"), - Path::from("year=2021/month=4/day=5/part-00000-c5856301-3439-4032-a6fc-22b7bc92bebb.c000.snappy.parquet"), - Path::from("year=2021/month=12/day=20/part-00000-9275fdf4-3961-4184-baa0-1c8a2bb98104.c000.snappy.parquet"), - Path::from("year=2021/month=12/day=4/part-00000-6dc763c0-3e8b-4d52-b19e-1f92af3fbb25.c000.snappy.parquet"), - ], - HashSet::from(["month".to_string(), "day".to_string(), "year".to_string()]), - vec![ - schema_field("year"), - schema_field("month"), - schema_field("day") + schema_field("year", "string", true), + schema_field("month", "string", true), + schema_field("day", "string", true) ], ) ); assert_eq!( - get_file_paths_and_partitions("tests/data/delta-0.8.0-null-partition").await, + get_partitions("tests/data/delta-0.8.0-null-partition").await, ( - vec![ - Path::from("k=__HIVE_DEFAULT_PARTITION__/part-00001-8474ac85-360b-4f58-b3ea-23990c71b932.c000.snappy.parquet"), - Path::from("k=A/part-00000-b1f1dbbb-70bc-4970-893f-9bb772bf246e.c000.snappy.parquet"), - ], - HashSet::from(["k".to_string()]), - vec![schema_field("k")] + HashSet::from_iter(["k".to_string()]), + vec![schema_field("k", "string", true)] ) ); assert_eq!( - get_file_paths_and_partitions("tests/data/delta-0.8.0-numeric-partition").await, - ( - vec![ - Path::from("x=9/y=9.9/part-00007-3c50fba1-4264-446c-9c67-d8e24a1ccf83.c000.snappy.parquet"), - Path::from("x=10/y=10.0/part-00015-24eb4845-2d25-4448-b3bb-5ed7f12635ab.c000.snappy.parquet"), + get_partitions("tests/data/delta-0.8.0-numeric-partition").await, + ( + HashSet::from_iter(["x".to_string(), "y".to_string()]), + vec![ + schema_field("x", "string", true), + schema_field("y", "string", true) ], - HashSet::from(["x".to_string(), "y".to_string()]), - vec![schema_field("x"), schema_field("y")], ) ); assert_eq!( - get_file_paths_and_partitions("tests/data/delta-0.8.0-special-partition").await, + get_partitions("tests/data/delta-0.8.0-special-partition").await, ( - vec![ - Path::parse( - "x=A%2FA/part-00007-b350e235-2832-45df-9918-6cab4f7578f7.c000.snappy.parquet" - ) - .expect("Failed to parse a string as a Path"), - Path::parse( - "x=B%20B/part-00015-e9abbc6f-85e9-457b-be8e-e9f5b8a22890.c000.snappy.parquet" - ) - .expect("Failed to parse a string as a Path") - ], - HashSet::from(["x".to_string()]), - vec![schema_field("x")] + HashSet::from_iter(["x".to_string()]), + vec![schema_field("x", "string", true)] ) ); assert_eq!( - get_file_paths_and_partitions("tests/data/delta-2.2.0-partitioned-types").await, + get_partitions("tests/data/delta-2.2.0-partitioned-types").await, ( + HashSet::from_iter(["c1".to_string(), "c2".to_string()]), vec![ - Path::from("c1=6/c2=a/part-00011-10619b10-b691-4fd0-acc4-2a9608499d7c.c000.snappy.parquet"), - Path::from("c1=5/c2=b/part-00007-4e73fa3b-2c88-424a-8051-f8b54328ffdb.c000.snappy.parquet"), - Path::from("c1=4/c2=c/part-00003-f525f459-34f9-46f5-82d6-d42121d883fd.c000.snappy.parquet"), + schema_field("c1", "string", true), + schema_field("c2", "string", true) ], - HashSet::from(["c1".to_string(), "c2".to_string()]), - vec![schema_field("c1"), schema_field("c2")], ) ); } fn partition_key_values(path: &str) -> HashMap> { partitions( - &Path::parse(path).expect("Failed to parse a string as a Path"), + &Path::parse(path).expect("Failed to parse as Path"), &mut HashSet::new(), &mut Vec::new(), ) @@ -362,18 +372,21 @@ mod tests { #[test] fn test_partition_key_values() { + // Special character partition assert_eq!( partition_key_values( "x=A%2FA/part-00007-b350e235-2832-45df-9918-6cab4f7578f7.c000.snappy.parquet", ), HashMap::from([("x".to_string(), Some("A/A".to_string()))]) ); + // Special character partition assert_eq!( partition_key_values( "x=B%20B/part-00015-e9abbc6f-85e9-457b-be8e-e9f5b8a22890.c000.snappy.parquet" ), HashMap::from([("x".to_string(), Some("B B".to_string()))]) ); + // Null partition assert_eq!( partition_key_values( "k=__HIVE_DEFAULT_PARTITION__/part-00001-8474ac85-360b-4f58-b3ea-23990c71b932.c000.snappy.parquet" @@ -404,16 +417,6 @@ mod tests { ), HashMap::new() ); - assert_eq!( - partition_key_values( - "year=2021/month=12/day=4/part-00000-6dc763c0-3e8b-4d52-b19e-1f92af3fbb25.c000.snappy.parquet" - ), - HashMap::from([ - ("year".to_string(), Some("2021".to_string())), - ("month".to_string(), Some("12".to_string())), - ("day".to_string(), Some("4".to_string())), - ]) - ); assert_eq!( partition_key_values( "date=2023-04-13/part-00000-e853fe2e-6f42-450c-8af1-4145b73a96c7-c000.snappy.parquet", @@ -422,86 +425,314 @@ mod tests { ) } - /* - fn copy_parquet_files(src: impl AsRef, dst: impl AsRef) { + fn temp_dir() -> tempfile::TempDir { + tempdir().expect("Failed to create a temp directory") + } + + #[tokio::test] + async fn test_empty_dir() { + let temp_dir = temp_dir(); + let temp_dir = temp_dir + .path() + .to_str() + .expect("Failed to convert to string slice"); + // Parquet file does not exist. convert_to_delta should error + convert_to_delta(object_store(temp_dir).expect("Failed to create an object store")) + .await + .expect_err("Should error"); + } + + async fn get_delta_table( + path: &str, + version: i64, + expected_schema_fields: Vec, + test_data_path: &str, + ) -> DeltaTable { + let table = convert_to_delta(object_store(path).expect("Failed to create an object store")) + .await + .expect("Failed to convert to Delta table"); + assert_eq!( + table.version(), + version, + "Testing location: {test_data_path:?}" + ); + assert_eq!( + *table + .get_schema() + .expect("Failed to get schema") + .get_fields(), + expected_schema_fields, + "Testing location: {test_data_path:?}" + ); + table + } + + // Test convert_to_delta when a delta table already exists in the location + #[tokio::test] + async fn test_delta_table_location() { + get_delta_table( + "tests/data/delta-0.2.0", + 3, + vec![schema_field("value", "integer", true)], + "tests/data/delta-0.2.0", + ) + .await; + get_delta_table( + "tests/data/delta-0.8-empty", + 1, + vec![schema_field("column", "long", true)], + "tests/data/delta-0.8-empty", + ) + .await; + get_delta_table( + "tests/data/delta-0.8.0", + 1, + vec![schema_field("value", "integer", true)], + "tests/data/delta-0.8-empty", + ) + .await; + } + + fn copy_files(src: impl AsRef, dst: impl AsRef) { + fs::create_dir_all(&dst).expect("Failed to create all directories"); let files = fs::read_dir(src).expect("Failed to read source directory"); for file in files { let file = file.expect("Failed to read file"); - let name = file.path().file_name().map(|name| name.to_str()); + let name = file.file_name(); // Skip Delta log - if Some("_delta_log") == name { - println!("///////////////// file name = {:?}", file.path().display()); - } - if file.file_type().is_dir() { - println!( - "/////////////////// subdirectory = {:?}", - dst.af_ref().join(name).display() - ); - copy_dir(file.path(), dst.af_ref().join(name)) - .expect("Failed to copy subdirectory"); - } else { - fs::copy(file.path(), dst.af_ref().join(name)).expect("Failed to copy file"); + if name.to_str() != Some("_delta_log") { + if file.file_type().expect("Failed to get file type").is_dir() { + copy_files(file.path(), dst.as_ref().join(name)); + } else { + fs::copy(file.path(), dst.as_ref().join(name)).expect("Failed to copy file"); + } } } } + async fn create_delta_table( + path: &str, + expected_schema_fields: Vec, + ) -> DeltaTable { + let temp_dir = temp_dir(); + let temp_dir = temp_dir + .path() + .to_str() + .expect("Failed to convert to string slice"); + // Copy all files to a temp directory to perform testing. Skip Delta log + copy_files(format!("{}/{}", env!("CARGO_MANIFEST_DIR"), path), temp_dir); + get_delta_table(temp_dir, 0, expected_schema_fields, path).await + } + + fn get_add_action_by_partition(table: &DeltaTable, filters: Vec>) -> Add { + table + .get_active_add_actions_by_partitions(&filters) + .expect("Failed to get Add actions") + .next() + .expect("End of iterator") + .clone() + } + #[tokio::test] async fn test_convert_to_delta() { - let path = "tests/data/delta-0.8.0-partitioned"; - let store = object_store(path).expect("Failed to create an object store"); - let _table = convert_to_delta(store) - .await - .expect("Failed to convert to Delta table"); - let temp_dir = TempDir::new().expect("Failed to create a temp directory"); - copy_parquet_files(path, temp_dir); - panic!(); - assert_eq!(table.version(), 0); + // Test Parquet files in path "tests/data/delta-0.8.0-date" + let table = create_delta_table( + "tests/data/delta-0.8.0-date", + vec![ + schema_field("date", "date", true), + schema_field("dayOfYear", "integer", true), + ], + ) + .await; + assert_eq!( + table.get_partition_values().cloned().collect::>(), + vec![HashMap::new()] + ); + // Test Parquet files in path "tests/data/delta-0.8.0-null-partition" + let table = create_delta_table( + "tests/data/delta-0.8.0-null-partition", + vec![ + schema_field("v", "long", true), + schema_field("k", "string", true), + ], + ) + .await; + assert_eq!( + table.get_partition_values().cloned().collect::>(), + vec![ + HashMap::from([("k".to_string(), None)]), + HashMap::from([("k".to_string(), Some("A".to_string()))]), + ] + ); + // Test Parquet files in path "tests/data/delta-0.8.0-special-partition" + let table = create_delta_table( + "tests/data/delta-0.8.0-special-partition", + vec![ + schema_field("y", "long", true), + schema_field("x", "string", true), + ], + ) + .await; assert_eq!( table.get_partition_values().cloned().collect::>(), vec![ - HashMap::from([ - ("year".to_string(), Some("2020".to_string())), - ("month".to_string(), Some("1".to_string())), - ("day".to_string(), Some("1".to_string())), - ]), - HashMap::from([ - ("year".to_string(), Some("2020".to_string())), - ("month".to_string(), Some("2".to_string())), - ("day".to_string(), Some("3".to_string())), - ]), - HashMap::from([ - ("year".to_string(), Some("2020".to_string())), - ("month".to_string(), Some("2".to_string())), - ("day".to_string(), Some("5".to_string())), - ]), - HashMap::from([ - ("year".to_string(), Some("2021".to_string())), - ("month".to_string(), Some("4".to_string())), - ("day".to_string(), Some("5".to_string())), - ]), - HashMap::from([ - ("year".to_string(), Some("2021".to_string())), - ("month".to_string(), Some("12".to_string())), - ("day".to_string(), Some("20".to_string())), - ]), - HashMap::from([ - ("year".to_string(), Some("2021".to_string())), - ("month".to_string(), Some("12".to_string())), - ("day".to_string(), Some("4".to_string())), - ]), + HashMap::from([("x".to_string(), Some("A/A".to_string()))]), + HashMap::from([("x".to_string(), Some("B B".to_string()))]) ] ); + // Test Parquet files in path "tests/data/delta-0.8.0-partitioned" + let table = create_delta_table( + "tests/data/delta-0.8.0-partitioned", + vec![ + schema_field("value", "string", true), + schema_field("year", "string", true), + schema_field("month", "string", true), + schema_field("day", "string", true), + ], + ) + .await; + let action = get_add_action_by_partition( + &table, + vec![ + PartitionFilter::try_from(("year", "=", "2020")) + .expect("Failed to create a PartitionFilter"), + PartitionFilter::try_from(("month", "=", "1")) + .expect("Failed to create a PartitionFilter"), + PartitionFilter::try_from(("day", "=", "1")) + .expect("Failed to create a PartitionFilter"), + ], + ); + assert_eq!(action.path, "year=2020/month=1/day=1/part-00000-8eafa330-3be9-4a39-ad78-fd13c2027c7e.c000.snappy.parquet"); + assert!(action.data_change); + let action = get_add_action_by_partition( + &table, + vec![ + PartitionFilter::try_from(("year", "=", "2020")) + .expect("Failed to create a PartitionFilter"), + PartitionFilter::try_from(("month", "=", "2")) + .expect("Failed to create a PartitionFilter"), + PartitionFilter::try_from(("day", "=", "3")) + .expect("Failed to create a PartitionFilter"), + ], + ); + assert_eq!(action.path, "year=2020/month=2/day=3/part-00000-94d16827-f2fd-42cd-a060-f67ccc63ced9.c000.snappy.parquet"); + assert!(action.data_change); + let action = get_add_action_by_partition( + &table, + vec![ + PartitionFilter::try_from(("year", "=", "2020")) + .expect("Failed to create a PartitionFilter"), + PartitionFilter::try_from(("month", "=", "2")) + .expect("Failed to create a PartitionFilter"), + PartitionFilter::try_from(("day", "=", "5")) + .expect("Failed to create a PartitionFilter"), + ], + ); + assert_eq!(action.path, "year=2020/month=2/day=5/part-00000-89cdd4c8-2af7-4add-8ea3-3990b2f027b5.c000.snappy.parquet"); + assert!(action.data_change); + let action = get_add_action_by_partition( + &table, + vec![ + PartitionFilter::try_from(("year", "=", "2021")) + .expect("Failed to create a PartitionFilter"), + PartitionFilter::try_from(("month", "=", "4")) + .expect("Failed to create a PartitionFilter"), + PartitionFilter::try_from(("day", "=", "5")) + .expect("Failed to create a PartitionFilter"), + ], + ); + assert_eq!(action.path, "year=2021/month=4/day=5/part-00000-c5856301-3439-4032-a6fc-22b7bc92bebb.c000.snappy.parquet"); + assert!(action.data_change); + let action = get_add_action_by_partition( + &table, + vec![ + PartitionFilter::try_from(("year", "=", "2021")) + .expect("Failed to create a PartitionFilter"), + PartitionFilter::try_from(("month", "=", "12")) + .expect("Failed to create a PartitionFilter"), + PartitionFilter::try_from(("day", "=", "20")) + .expect("Failed to create a PartitionFilter"), + ], + ); + assert_eq!(action.path, "year=2021/month=12/day=20/part-00000-9275fdf4-3961-4184-baa0-1c8a2bb98104.c000.snappy.parquet"); + assert!(action.data_change); + let action = get_add_action_by_partition( + &table, + vec![ + PartitionFilter::try_from(("year", "=", "2021")) + .expect("Failed to create a PartitionFilter"), + PartitionFilter::try_from(("month", "=", "12")) + .expect("Failed to create a PartitionFilter"), + PartitionFilter::try_from(("day", "=", "4")) + .expect("Failed to create a PartitionFilter"), + ], + ); + assert_eq!(action.path, "year=2021/month=12/day=4/part-00000-6dc763c0-3e8b-4d52-b19e-1f92af3fbb25.c000.snappy.parquet"); + assert!(action.data_change); + } + + async fn get_delta_table_from_path( + path: &str, + version: i64, + expected_schema_fields: Vec, + test_data_path: &str, + ) -> DeltaTable { + let table = convert_to_delta_from_path(path) + .await + .expect("Failed to convert to Delta table"); + assert_eq!( + table.version(), + version, + "Testing location: {test_data_path:?}" + ); assert_eq!( *table .get_schema() .expect("Failed to get schema") .get_fields(), + expected_schema_fields, + "Testing location: {test_data_path:?}" + ); + table + } + + async fn create_delta_table_from_path( + path: &str, + expected_schema_fields: Vec, + ) -> DeltaTable { + let temp_dir = temp_dir(); + let temp_dir = temp_dir + .path() + .to_str() + .expect("Failed to convert to string slice"); + // Copy all files to a temp directory to perform testing. Skip Delta log + copy_files(format!("{}/{}", env!("CARGO_MANIFEST_DIR"), path), temp_dir); + get_delta_table_from_path(temp_dir, 0, expected_schema_fields, path).await + } + + #[tokio::test] + async fn test_convert_to_delta_from_path() { + // Test delta table location "tests/data/delta-2.2.0-partitioned-types" + get_delta_table_from_path( + "tests/data/delta-2.2.0-partitioned-types", + 0, vec![ - schema_field("year"), - schema_field("month"), - schema_field("day") + schema_field("c1", "integer", true), + schema_field("c2", "string", true), + schema_field("c3", "integer", true), ], - ); + "tests/data/delta-2.2.0-partitioned-types", + ) + .await; + // Test Parquet files in path "tests/data/delta-0.8.0-numeric-partition" + create_delta_table_from_path( + "tests/data/delta-0.8.0-numeric-partition", + vec![ + schema_field("z", "string", true), + schema_field("x", "string", true), + schema_field("y", "string", true), + ], + ) + .await; } - */ }