diff --git a/crates/core/src/writer/json.rs b/crates/core/src/writer/json.rs index a04dceb3bd..27257cf1bd 100644 --- a/crates/core/src/writer/json.rs +++ b/crates/core/src/writer/json.rs @@ -41,6 +41,8 @@ pub struct JsonWriter { writer_properties: WriterProperties, partition_columns: Vec, arrow_writers: HashMap, + num_indexed_cols: i32, + stats_columns: Option>, } /// Writes messages to an underlying arrow buffer. @@ -187,22 +189,37 @@ impl JsonWriter { partition_columns: Option>, storage_options: Option>, ) -> Result { - let storage = DeltaTableBuilder::from_uri(table_uri) + let delta_table = DeltaTableBuilder::from_uri(table_uri) .with_storage_options(storage_options.unwrap_or_default()) - .build_storage()?; - + .build()?; // Initialize writer properties for the underlying arrow writer let writer_properties = WriterProperties::builder() // NOTE: Consider extracting config for writer properties and setting more than just compression .set_compression(Compression::SNAPPY) .build(); + // if metadata fails to load, use an empty hashmap and default values for num_indexed_cols and stats_columns + let configuration: HashMap> = delta_table.metadata().map_or_else( + |_| HashMap::new(), + |metadata| metadata.configuration.clone(), + ); + Ok(Self { - storage: storage.object_store(), + storage: delta_table.object_store(), arrow_schema_ref: schema, writer_properties, partition_columns: partition_columns.unwrap_or_default(), arrow_writers: HashMap::new(), + num_indexed_cols: configuration + .get("delta.dataSkippingNumIndexedCols") + .and_then(|v| v.clone().map(|v| v.parse::().unwrap())) + .unwrap_or(DEFAULT_NUM_INDEX_COLS), + stats_columns: configuration + .get("delta.dataSkippingStatsColumns") + .and_then(|v| { + v.as_ref() + .map(|v| v.split(',').map(|s| s.to_string()).collect()) + }), }) } @@ -219,6 +236,8 @@ impl JsonWriter { // NOTE: Consider extracting config for writer properties and setting more than just compression .set_compression(Compression::SNAPPY) .build(); + let configuration: HashMap> = + table.metadata()?.configuration.clone(); Ok(Self { storage: table.object_store(), @@ -226,6 +245,16 @@ impl JsonWriter { writer_properties, partition_columns, arrow_writers: HashMap::new(), + num_indexed_cols: configuration + .get("delta.dataSkippingNumIndexedCols") + .and_then(|v| v.clone().map(|v| v.parse::().unwrap())) + .unwrap_or(DEFAULT_NUM_INDEX_COLS), + stats_columns: configuration + .get("delta.dataSkippingStatsColumns") + .and_then(|v| { + v.as_ref() + .map(|v| v.split(',').map(|s| s.to_string()).collect()) + }), }) } @@ -372,8 +401,8 @@ impl DeltaWriter> for JsonWriter { path.to_string(), file_size, &metadata, - DEFAULT_NUM_INDEX_COLS, - &None, + self.num_indexed_cols, + &self.stats_columns, )?); } Ok(actions) @@ -708,4 +737,105 @@ mod tests { .collect(); assert_eq!(entries.len(), 1); } + + #[tokio::test] + async fn test_json_write_data_skipping_stats_columns() { + use crate::operations::create::CreateBuilder; + + let table_dir = tempfile::tempdir().unwrap(); + let schema = get_delta_schema(); + let path = table_dir.path().to_str().unwrap().to_string(); + let config: HashMap> = vec![( + "delta.dataSkippingStatsColumns".to_string(), + Some("id,value".to_string()), + )] + .into_iter() + .collect(); + let mut table = CreateBuilder::new() + .with_location(&path) + .with_table_name("test-table") + .with_comment("A table for running tests") + .with_columns(schema.fields().cloned()) + .with_configuration(config) + .await + .unwrap(); + assert_eq!(table.version(), 0); + let mut writer = JsonWriter::for_table(&table).unwrap(); + let data = serde_json::json!( + { + "id" : "A", + "value": 42, + "modified": "2021-02-01" + } + ); + + writer.write(vec![data]).await.unwrap(); + writer.flush_and_commit(&mut table).await.unwrap(); + assert_eq!(table.version(), 1); + let add_actions = table.state.unwrap().file_actions().unwrap(); + assert_eq!(add_actions.len(), 1); + let expected_stats = "{\"numRecords\":1,\"minValues\":{\"id\":\"A\",\"value\":42},\"maxValues\":{\"id\":\"A\",\"value\":42},\"nullCount\":{\"id\":0,\"value\":0}}"; + assert_eq!( + expected_stats.parse::().unwrap(), + add_actions + .into_iter() + .nth(0) + .unwrap() + .stats + .unwrap() + .parse::() + .unwrap() + ); + } + + #[tokio::test] + async fn test_json_write_data_skipping_num_indexed_cols() { + use crate::operations::create::CreateBuilder; + + let table_dir = tempfile::tempdir().unwrap(); + let schema = get_delta_schema(); + let path = table_dir.path().to_str().unwrap().to_string(); + let config: HashMap> = vec![( + "delta.dataSkippingNumIndexedCols".to_string(), + Some("1".to_string()), + )] + .into_iter() + .collect(); + let mut table = CreateBuilder::new() + .with_location(&path) + .with_table_name("test-table") + .with_comment("A table for running tests") + .with_columns(schema.fields().cloned()) + .with_configuration(config) + .await + .unwrap(); + assert_eq!(table.version(), 0); + let mut writer = JsonWriter::for_table(&table).unwrap(); + let data = serde_json::json!( + { + "id" : "A", + "value": 42, + "modified": "2021-02-01" + } + ); + + writer.write(vec![data]).await.unwrap(); + writer.flush_and_commit(&mut table).await.unwrap(); + assert_eq!(table.version(), 1); + let add_actions = table.state.unwrap().file_actions().unwrap(); + assert_eq!(add_actions.len(), 1); + let expected_stats = "{\"numRecords\":1,\"minValues\":{\"id\":\"A\"},\"maxValues\":{\"id\":\"A\"},\"nullCount\":{\"id\":0}}"; + assert_eq!( + expected_stats.parse::().unwrap(), + add_actions + .into_iter() + .nth(0) + .unwrap() + .stats + .unwrap() + .parse::() + .unwrap() + ); + } + } diff --git a/crates/core/src/writer/record_batch.rs b/crates/core/src/writer/record_batch.rs index 10ba52ae62..2197d64f5f 100644 --- a/crates/core/src/writer/record_batch.rs +++ b/crates/core/src/writer/record_batch.rs @@ -44,6 +44,8 @@ pub struct RecordBatchWriter { should_evolve: bool, partition_columns: Vec, arrow_writers: HashMap, + num_indexed_cols: i32, + stats_columns: Option>, } impl std::fmt::Debug for RecordBatchWriter { @@ -60,25 +62,39 @@ impl RecordBatchWriter { partition_columns: Option>, storage_options: Option>, ) -> Result { - let storage = DeltaTableBuilder::from_uri(table_uri) + let delta_table = DeltaTableBuilder::from_uri(table_uri) .with_storage_options(storage_options.unwrap_or_default()) - .build_storage()? - .object_store(); - + .build()?; // Initialize writer properties for the underlying arrow writer let writer_properties = WriterProperties::builder() // NOTE: Consider extracting config for writer properties and setting more than just compression .set_compression(Compression::SNAPPY) .build(); + // if metadata fails to load, use an empty hashmap and default values for num_indexed_cols and stats_columns + let configuration: HashMap> = delta_table.metadata().map_or_else( + |_| HashMap::new(), + |metadata| metadata.configuration.clone(), + ); + Ok(Self { - storage, + storage: delta_table.object_store(), arrow_schema_ref: schema.clone(), original_schema_ref: schema, writer_properties, partition_columns: partition_columns.unwrap_or_default(), should_evolve: false, arrow_writers: HashMap::new(), + num_indexed_cols: configuration + .get("delta.dataSkippingNumIndexedCols") + .and_then(|v| v.clone().map(|v| v.parse::().unwrap())) + .unwrap_or(DEFAULT_NUM_INDEX_COLS), + stats_columns: configuration + .get("delta.dataSkippingStatsColumns") + .and_then(|v| { + v.as_ref() + .map(|v| v.split(',').map(|s| s.to_string()).collect()) + }), }) } @@ -96,6 +112,8 @@ impl RecordBatchWriter { // NOTE: Consider extracting config for writer properties and setting more than just compression .set_compression(Compression::SNAPPY) .build(); + let configuration: HashMap> = + table.metadata()?.configuration.clone(); Ok(Self { storage: table.object_store(), @@ -105,6 +123,16 @@ impl RecordBatchWriter { partition_columns, should_evolve: false, arrow_writers: HashMap::new(), + num_indexed_cols: configuration + .get("delta.dataSkippingNumIndexedCols") + .and_then(|v| v.clone().map(|v| v.parse::().unwrap())) + .unwrap_or(DEFAULT_NUM_INDEX_COLS), + stats_columns: configuration + .get("delta.dataSkippingStatsColumns") + .and_then(|v| { + v.as_ref() + .map(|v| v.split(',').map(|s| s.to_string()).collect()) + }), }) } @@ -233,8 +261,8 @@ impl DeltaWriter for RecordBatchWriter { path.to_string(), file_size, &metadata, - DEFAULT_NUM_INDEX_COLS, - &None, + self.num_indexed_cols, + &self.stats_columns, )?); } Ok(actions) @@ -985,4 +1013,100 @@ mod tests { ); } } + + #[tokio::test] + async fn test_write_data_skipping_stats_columns() { + let batch = get_record_batch(None, false); + let partition_cols: &[String] = &vec![]; + let table_schema: StructType = get_delta_schema(); + let table_dir = tempfile::tempdir().unwrap(); + let table_path = table_dir.path(); + let config: HashMap> = vec![( + "delta.dataSkippingStatsColumns".to_string(), + Some("id,value".to_string()), + )] + .into_iter() + .collect(); + + let mut table = CreateBuilder::new() + .with_location(table_path.to_str().unwrap()) + .with_table_name("test-table") + .with_comment("A table for running tests") + .with_columns(table_schema.fields().cloned()) + .with_configuration(config) + .with_partition_columns(partition_cols) + .await + .unwrap(); + + let mut writer = RecordBatchWriter::for_table(&table).unwrap(); + let partitions = writer.divide_by_partition_values(&batch).unwrap(); + + assert_eq!(partitions.len(), 1); + assert_eq!(partitions[0].record_batch, batch); + writer.write(batch).await.unwrap(); + writer.flush_and_commit(&mut table).await.unwrap(); + assert_eq!(table.version(), 1); + let add_actions = table.state.unwrap().file_actions().unwrap(); + assert_eq!(add_actions.len(), 1); + let expected_stats ="{\"numRecords\":11,\"minValues\":{\"value\":1,\"id\":\"A\"},\"maxValues\":{\"id\":\"B\",\"value\":11},\"nullCount\":{\"id\":0,\"value\":0}}"; + assert_eq!( + expected_stats.parse::().unwrap(), + add_actions + .into_iter() + .nth(0) + .unwrap() + .stats + .unwrap() + .parse::() + .unwrap() + ); + } + + #[tokio::test] + async fn test_write_data_skipping_num_indexed_colsn() { + let batch = get_record_batch(None, false); + let partition_cols: &[String] = &vec![]; + let table_schema: StructType = get_delta_schema(); + let table_dir = tempfile::tempdir().unwrap(); + let table_path = table_dir.path(); + let config: HashMap> = vec![( + "delta.dataSkippingNumIndexedCols".to_string(), + Some("1".to_string()), + )] + .into_iter() + .collect(); + + let mut table = CreateBuilder::new() + .with_location(table_path.to_str().unwrap()) + .with_table_name("test-table") + .with_comment("A table for running tests") + .with_columns(table_schema.fields().cloned()) + .with_configuration(config) + .with_partition_columns(partition_cols) + .await + .unwrap(); + + let mut writer = RecordBatchWriter::for_table(&table).unwrap(); + let partitions = writer.divide_by_partition_values(&batch).unwrap(); + + assert_eq!(partitions.len(), 1); + assert_eq!(partitions[0].record_batch, batch); + writer.write(batch).await.unwrap(); + writer.flush_and_commit(&mut table).await.unwrap(); + assert_eq!(table.version(), 1); + let add_actions = table.state.unwrap().file_actions().unwrap(); + assert_eq!(add_actions.len(), 1); + let expected_stats = "{\"numRecords\":11,\"minValues\":{\"id\":\"A\"},\"maxValues\":{\"id\":\"B\"},\"nullCount\":{\"id\":0}}"; + assert_eq!( + expected_stats.parse::().unwrap(), + add_actions + .into_iter() + .nth(0) + .unwrap() + .stats + .unwrap() + .parse::() + .unwrap() + ); + } } diff --git a/crates/core/src/writer/test_utils.rs b/crates/core/src/writer/test_utils.rs index be0dfebb66..93f9b5a225 100644 --- a/crates/core/src/writer/test_utils.rs +++ b/crates/core/src/writer/test_utils.rs @@ -291,7 +291,7 @@ pub fn create_bare_table() -> DeltaTable { } pub async fn create_initialized_table(partition_cols: &[String]) -> DeltaTable { - let table_schema = get_delta_schema(); + let table_schema: StructType = get_delta_schema(); let table_dir = tempfile::tempdir().unwrap(); let table_path = table_dir.path();