From 9014e6ac5f71bc52d4f29e222388574dc3c3f06e Mon Sep 17 00:00:00 2001 From: Justin Jossick Date: Tue, 12 Nov 2024 12:52:57 -0800 Subject: [PATCH] Fix JsonWriter and RecordBatchWriter to respect stats skipping This is an update to JsonWriter and RecordBatchWriter to allow them to write commit log stats information in accordance with delta.dataSkippingNumIndexedCols and delta.dataSkippingStatsColumns if present on the table. If these fields are unset, then the default behavior of collecting stats for the first 32 columns is preserved Signed-off-by: Justin Jossick --- crates/core/src/writer/json.rs | 142 +++++++++++++++++++++++-- crates/core/src/writer/record_batch.rs | 138 ++++++++++++++++++++++-- crates/core/src/writer/test_utils.rs | 2 +- 3 files changed, 268 insertions(+), 14 deletions(-) diff --git a/crates/core/src/writer/json.rs b/crates/core/src/writer/json.rs index a04dceb3bd..27257cf1bd 100644 --- a/crates/core/src/writer/json.rs +++ b/crates/core/src/writer/json.rs @@ -41,6 +41,8 @@ pub struct JsonWriter { writer_properties: WriterProperties, partition_columns: Vec, arrow_writers: HashMap, + num_indexed_cols: i32, + stats_columns: Option>, } /// Writes messages to an underlying arrow buffer. @@ -187,22 +189,37 @@ impl JsonWriter { partition_columns: Option>, storage_options: Option>, ) -> Result { - let storage = DeltaTableBuilder::from_uri(table_uri) + let delta_table = DeltaTableBuilder::from_uri(table_uri) .with_storage_options(storage_options.unwrap_or_default()) - .build_storage()?; - + .build()?; // Initialize writer properties for the underlying arrow writer let writer_properties = WriterProperties::builder() // NOTE: Consider extracting config for writer properties and setting more than just compression .set_compression(Compression::SNAPPY) .build(); + // if metadata fails to load, use an empty hashmap and default values for num_indexed_cols and stats_columns + let configuration: HashMap> = delta_table.metadata().map_or_else( + |_| HashMap::new(), + |metadata| metadata.configuration.clone(), + ); + Ok(Self { - storage: storage.object_store(), + storage: delta_table.object_store(), arrow_schema_ref: schema, writer_properties, partition_columns: partition_columns.unwrap_or_default(), arrow_writers: HashMap::new(), + num_indexed_cols: configuration + .get("delta.dataSkippingNumIndexedCols") + .and_then(|v| v.clone().map(|v| v.parse::().unwrap())) + .unwrap_or(DEFAULT_NUM_INDEX_COLS), + stats_columns: configuration + .get("delta.dataSkippingStatsColumns") + .and_then(|v| { + v.as_ref() + .map(|v| v.split(',').map(|s| s.to_string()).collect()) + }), }) } @@ -219,6 +236,8 @@ impl JsonWriter { // NOTE: Consider extracting config for writer properties and setting more than just compression .set_compression(Compression::SNAPPY) .build(); + let configuration: HashMap> = + table.metadata()?.configuration.clone(); Ok(Self { storage: table.object_store(), @@ -226,6 +245,16 @@ impl JsonWriter { writer_properties, partition_columns, arrow_writers: HashMap::new(), + num_indexed_cols: configuration + .get("delta.dataSkippingNumIndexedCols") + .and_then(|v| v.clone().map(|v| v.parse::().unwrap())) + .unwrap_or(DEFAULT_NUM_INDEX_COLS), + stats_columns: configuration + .get("delta.dataSkippingStatsColumns") + .and_then(|v| { + v.as_ref() + .map(|v| v.split(',').map(|s| s.to_string()).collect()) + }), }) } @@ -372,8 +401,8 @@ impl DeltaWriter> for JsonWriter { path.to_string(), file_size, &metadata, - DEFAULT_NUM_INDEX_COLS, - &None, + self.num_indexed_cols, + &self.stats_columns, )?); } Ok(actions) @@ -708,4 +737,105 @@ mod tests { .collect(); assert_eq!(entries.len(), 1); } + + #[tokio::test] + async fn test_json_write_data_skipping_stats_columns() { + use crate::operations::create::CreateBuilder; + + let table_dir = tempfile::tempdir().unwrap(); + let schema = get_delta_schema(); + let path = table_dir.path().to_str().unwrap().to_string(); + let config: HashMap> = vec![( + "delta.dataSkippingStatsColumns".to_string(), + Some("id,value".to_string()), + )] + .into_iter() + .collect(); + let mut table = CreateBuilder::new() + .with_location(&path) + .with_table_name("test-table") + .with_comment("A table for running tests") + .with_columns(schema.fields().cloned()) + .with_configuration(config) + .await + .unwrap(); + assert_eq!(table.version(), 0); + let mut writer = JsonWriter::for_table(&table).unwrap(); + let data = serde_json::json!( + { + "id" : "A", + "value": 42, + "modified": "2021-02-01" + } + ); + + writer.write(vec![data]).await.unwrap(); + writer.flush_and_commit(&mut table).await.unwrap(); + assert_eq!(table.version(), 1); + let add_actions = table.state.unwrap().file_actions().unwrap(); + assert_eq!(add_actions.len(), 1); + let expected_stats = "{\"numRecords\":1,\"minValues\":{\"id\":\"A\",\"value\":42},\"maxValues\":{\"id\":\"A\",\"value\":42},\"nullCount\":{\"id\":0,\"value\":0}}"; + assert_eq!( + expected_stats.parse::().unwrap(), + add_actions + .into_iter() + .nth(0) + .unwrap() + .stats + .unwrap() + .parse::() + .unwrap() + ); + } + + #[tokio::test] + async fn test_json_write_data_skipping_num_indexed_cols() { + use crate::operations::create::CreateBuilder; + + let table_dir = tempfile::tempdir().unwrap(); + let schema = get_delta_schema(); + let path = table_dir.path().to_str().unwrap().to_string(); + let config: HashMap> = vec![( + "delta.dataSkippingNumIndexedCols".to_string(), + Some("1".to_string()), + )] + .into_iter() + .collect(); + let mut table = CreateBuilder::new() + .with_location(&path) + .with_table_name("test-table") + .with_comment("A table for running tests") + .with_columns(schema.fields().cloned()) + .with_configuration(config) + .await + .unwrap(); + assert_eq!(table.version(), 0); + let mut writer = JsonWriter::for_table(&table).unwrap(); + let data = serde_json::json!( + { + "id" : "A", + "value": 42, + "modified": "2021-02-01" + } + ); + + writer.write(vec![data]).await.unwrap(); + writer.flush_and_commit(&mut table).await.unwrap(); + assert_eq!(table.version(), 1); + let add_actions = table.state.unwrap().file_actions().unwrap(); + assert_eq!(add_actions.len(), 1); + let expected_stats = "{\"numRecords\":1,\"minValues\":{\"id\":\"A\"},\"maxValues\":{\"id\":\"A\"},\"nullCount\":{\"id\":0}}"; + assert_eq!( + expected_stats.parse::().unwrap(), + add_actions + .into_iter() + .nth(0) + .unwrap() + .stats + .unwrap() + .parse::() + .unwrap() + ); + } + } diff --git a/crates/core/src/writer/record_batch.rs b/crates/core/src/writer/record_batch.rs index 10ba52ae62..2197d64f5f 100644 --- a/crates/core/src/writer/record_batch.rs +++ b/crates/core/src/writer/record_batch.rs @@ -44,6 +44,8 @@ pub struct RecordBatchWriter { should_evolve: bool, partition_columns: Vec, arrow_writers: HashMap, + num_indexed_cols: i32, + stats_columns: Option>, } impl std::fmt::Debug for RecordBatchWriter { @@ -60,25 +62,39 @@ impl RecordBatchWriter { partition_columns: Option>, storage_options: Option>, ) -> Result { - let storage = DeltaTableBuilder::from_uri(table_uri) + let delta_table = DeltaTableBuilder::from_uri(table_uri) .with_storage_options(storage_options.unwrap_or_default()) - .build_storage()? - .object_store(); - + .build()?; // Initialize writer properties for the underlying arrow writer let writer_properties = WriterProperties::builder() // NOTE: Consider extracting config for writer properties and setting more than just compression .set_compression(Compression::SNAPPY) .build(); + // if metadata fails to load, use an empty hashmap and default values for num_indexed_cols and stats_columns + let configuration: HashMap> = delta_table.metadata().map_or_else( + |_| HashMap::new(), + |metadata| metadata.configuration.clone(), + ); + Ok(Self { - storage, + storage: delta_table.object_store(), arrow_schema_ref: schema.clone(), original_schema_ref: schema, writer_properties, partition_columns: partition_columns.unwrap_or_default(), should_evolve: false, arrow_writers: HashMap::new(), + num_indexed_cols: configuration + .get("delta.dataSkippingNumIndexedCols") + .and_then(|v| v.clone().map(|v| v.parse::().unwrap())) + .unwrap_or(DEFAULT_NUM_INDEX_COLS), + stats_columns: configuration + .get("delta.dataSkippingStatsColumns") + .and_then(|v| { + v.as_ref() + .map(|v| v.split(',').map(|s| s.to_string()).collect()) + }), }) } @@ -96,6 +112,8 @@ impl RecordBatchWriter { // NOTE: Consider extracting config for writer properties and setting more than just compression .set_compression(Compression::SNAPPY) .build(); + let configuration: HashMap> = + table.metadata()?.configuration.clone(); Ok(Self { storage: table.object_store(), @@ -105,6 +123,16 @@ impl RecordBatchWriter { partition_columns, should_evolve: false, arrow_writers: HashMap::new(), + num_indexed_cols: configuration + .get("delta.dataSkippingNumIndexedCols") + .and_then(|v| v.clone().map(|v| v.parse::().unwrap())) + .unwrap_or(DEFAULT_NUM_INDEX_COLS), + stats_columns: configuration + .get("delta.dataSkippingStatsColumns") + .and_then(|v| { + v.as_ref() + .map(|v| v.split(',').map(|s| s.to_string()).collect()) + }), }) } @@ -233,8 +261,8 @@ impl DeltaWriter for RecordBatchWriter { path.to_string(), file_size, &metadata, - DEFAULT_NUM_INDEX_COLS, - &None, + self.num_indexed_cols, + &self.stats_columns, )?); } Ok(actions) @@ -985,4 +1013,100 @@ mod tests { ); } } + + #[tokio::test] + async fn test_write_data_skipping_stats_columns() { + let batch = get_record_batch(None, false); + let partition_cols: &[String] = &vec![]; + let table_schema: StructType = get_delta_schema(); + let table_dir = tempfile::tempdir().unwrap(); + let table_path = table_dir.path(); + let config: HashMap> = vec![( + "delta.dataSkippingStatsColumns".to_string(), + Some("id,value".to_string()), + )] + .into_iter() + .collect(); + + let mut table = CreateBuilder::new() + .with_location(table_path.to_str().unwrap()) + .with_table_name("test-table") + .with_comment("A table for running tests") + .with_columns(table_schema.fields().cloned()) + .with_configuration(config) + .with_partition_columns(partition_cols) + .await + .unwrap(); + + let mut writer = RecordBatchWriter::for_table(&table).unwrap(); + let partitions = writer.divide_by_partition_values(&batch).unwrap(); + + assert_eq!(partitions.len(), 1); + assert_eq!(partitions[0].record_batch, batch); + writer.write(batch).await.unwrap(); + writer.flush_and_commit(&mut table).await.unwrap(); + assert_eq!(table.version(), 1); + let add_actions = table.state.unwrap().file_actions().unwrap(); + assert_eq!(add_actions.len(), 1); + let expected_stats ="{\"numRecords\":11,\"minValues\":{\"value\":1,\"id\":\"A\"},\"maxValues\":{\"id\":\"B\",\"value\":11},\"nullCount\":{\"id\":0,\"value\":0}}"; + assert_eq!( + expected_stats.parse::().unwrap(), + add_actions + .into_iter() + .nth(0) + .unwrap() + .stats + .unwrap() + .parse::() + .unwrap() + ); + } + + #[tokio::test] + async fn test_write_data_skipping_num_indexed_colsn() { + let batch = get_record_batch(None, false); + let partition_cols: &[String] = &vec![]; + let table_schema: StructType = get_delta_schema(); + let table_dir = tempfile::tempdir().unwrap(); + let table_path = table_dir.path(); + let config: HashMap> = vec![( + "delta.dataSkippingNumIndexedCols".to_string(), + Some("1".to_string()), + )] + .into_iter() + .collect(); + + let mut table = CreateBuilder::new() + .with_location(table_path.to_str().unwrap()) + .with_table_name("test-table") + .with_comment("A table for running tests") + .with_columns(table_schema.fields().cloned()) + .with_configuration(config) + .with_partition_columns(partition_cols) + .await + .unwrap(); + + let mut writer = RecordBatchWriter::for_table(&table).unwrap(); + let partitions = writer.divide_by_partition_values(&batch).unwrap(); + + assert_eq!(partitions.len(), 1); + assert_eq!(partitions[0].record_batch, batch); + writer.write(batch).await.unwrap(); + writer.flush_and_commit(&mut table).await.unwrap(); + assert_eq!(table.version(), 1); + let add_actions = table.state.unwrap().file_actions().unwrap(); + assert_eq!(add_actions.len(), 1); + let expected_stats = "{\"numRecords\":11,\"minValues\":{\"id\":\"A\"},\"maxValues\":{\"id\":\"B\"},\"nullCount\":{\"id\":0}}"; + assert_eq!( + expected_stats.parse::().unwrap(), + add_actions + .into_iter() + .nth(0) + .unwrap() + .stats + .unwrap() + .parse::() + .unwrap() + ); + } } diff --git a/crates/core/src/writer/test_utils.rs b/crates/core/src/writer/test_utils.rs index be0dfebb66..93f9b5a225 100644 --- a/crates/core/src/writer/test_utils.rs +++ b/crates/core/src/writer/test_utils.rs @@ -291,7 +291,7 @@ pub fn create_bare_table() -> DeltaTable { } pub async fn create_initialized_table(partition_cols: &[String]) -> DeltaTable { - let table_schema = get_delta_schema(); + let table_schema: StructType = get_delta_schema(); let table_dir = tempfile::tempdir().unwrap(); let table_path = table_dir.path();