From d82b9b3840d5d23907a8a0743c226efd952872f3 Mon Sep 17 00:00:00 2001 From: "R. Tyler Croy" Date: Fri, 22 Nov 2024 19:19:57 +0000 Subject: [PATCH] chore: refactor JsonWriter to utilize the DeltaTable configuration more The JsonWriter was created before a lot of other code and was in a need of a little refactor. The writer does not commit to the Delta table on its own, which can be a benefit for some performance specific use-cases. This change does however enforce that it must be initialized with a valid Delta table path which will ensure it can use table configuration properly Signed-off-by: R. Tyler Croy --- crates/core/src/table/config.rs | 1 + crates/core/src/writer/json.rs | 171 +++++++++++++++----------------- crates/core/src/writer/stats.rs | 8 +- 3 files changed, 84 insertions(+), 96 deletions(-) diff --git a/crates/core/src/table/config.rs b/crates/core/src/table/config.rs index df28f11aae..f8a223560a 100644 --- a/crates/core/src/table/config.rs +++ b/crates/core/src/table/config.rs @@ -207,6 +207,7 @@ macro_rules! table_config { } /// Well known delta table configuration +#[derive(Debug)] pub struct TableConfig<'a>(pub(crate) &'a HashMap>); /// Default num index cols diff --git a/crates/core/src/writer/json.rs b/crates/core/src/writer/json.rs index 27257cf1bd..abb46ed91e 100644 --- a/crates/core/src/writer/json.rs +++ b/crates/core/src/writer/json.rs @@ -9,7 +9,6 @@ use bytes::Bytes; use delta_kernel::expressions::Scalar; use indexmap::IndexMap; use object_store::path::Path; -use object_store::ObjectStore; use parquet::{ arrow::ArrowWriter, basic::Compression, errors::ParquetError, file::properties::WriterProperties, @@ -26,26 +25,26 @@ use super::utils::{ use super::{DeltaWriter, DeltaWriterError, WriteMode}; use crate::errors::DeltaTableError; use crate::kernel::{scalars::ScalarExt, Add, PartitionsExt, StructType}; -use crate::storage::ObjectStoreRetryExt; +use crate::storage::retry_ext::ObjectStoreRetryExt; use crate::table::builder::DeltaTableBuilder; -use crate::table::config::DEFAULT_NUM_INDEX_COLS; use crate::writer::utils::ShareableBuffer; use crate::DeltaTable; type BadValue = (Value, ParquetError); /// Writes messages to a delta lake table. +#[derive(Debug)] pub struct JsonWriter { - storage: Arc, - arrow_schema_ref: Arc, + table: DeltaTable, + /// Optional schema to use, otherwise try to rely on the schema from the [DeltaTable] + schema_ref: Option, writer_properties: WriterProperties, partition_columns: Vec, arrow_writers: HashMap, - num_indexed_cols: i32, - stats_columns: Option>, } /// Writes messages to an underlying arrow buffer. +#[derive(Debug)] pub(crate) struct DataArrowWriter { arrow_schema: Arc, writer_properties: WriterProperties, @@ -183,43 +182,28 @@ impl DataArrowWriter { impl JsonWriter { /// Create a new JsonWriter instance - pub fn try_new( + pub async fn try_new( table_uri: String, - schema: ArrowSchemaRef, + schema_ref: ArrowSchemaRef, partition_columns: Option>, storage_options: Option>, ) -> Result { - let delta_table = DeltaTableBuilder::from_uri(table_uri) + let table = DeltaTableBuilder::from_uri(table_uri) .with_storage_options(storage_options.unwrap_or_default()) - .build()?; + .load() + .await?; // Initialize writer properties for the underlying arrow writer let writer_properties = WriterProperties::builder() // NOTE: Consider extracting config for writer properties and setting more than just compression .set_compression(Compression::SNAPPY) .build(); - // if metadata fails to load, use an empty hashmap and default values for num_indexed_cols and stats_columns - let configuration: HashMap> = delta_table.metadata().map_or_else( - |_| HashMap::new(), - |metadata| metadata.configuration.clone(), - ); - Ok(Self { - storage: delta_table.object_store(), - arrow_schema_ref: schema, + table, + schema_ref: Some(schema_ref), writer_properties, partition_columns: partition_columns.unwrap_or_default(), arrow_writers: HashMap::new(), - num_indexed_cols: configuration - .get("delta.dataSkippingNumIndexedCols") - .and_then(|v| v.clone().map(|v| v.parse::().unwrap())) - .unwrap_or(DEFAULT_NUM_INDEX_COLS), - stats_columns: configuration - .get("delta.dataSkippingStatsColumns") - .and_then(|v| { - v.as_ref() - .map(|v| v.split(',').map(|s| s.to_string()).collect()) - }), }) } @@ -227,8 +211,6 @@ impl JsonWriter { pub fn for_table(table: &DeltaTable) -> Result { // Initialize an arrow schema ref from the delta table schema let metadata = table.metadata()?; - let arrow_schema = >::try_from(&metadata.schema()?)?; - let arrow_schema_ref = Arc::new(arrow_schema); let partition_columns = metadata.partition_columns.clone(); // Initialize writer properties for the underlying arrow writer @@ -236,25 +218,13 @@ impl JsonWriter { // NOTE: Consider extracting config for writer properties and setting more than just compression .set_compression(Compression::SNAPPY) .build(); - let configuration: HashMap> = - table.metadata()?.configuration.clone(); Ok(Self { - storage: table.object_store(), - arrow_schema_ref, + table: table.clone(), writer_properties, partition_columns, + schema_ref: None, arrow_writers: HashMap::new(), - num_indexed_cols: configuration - .get("delta.dataSkippingNumIndexedCols") - .and_then(|v| v.clone().map(|v| v.parse::().unwrap())) - .unwrap_or(DEFAULT_NUM_INDEX_COLS), - stats_columns: configuration - .get("delta.dataSkippingStatsColumns") - .and_then(|v| { - v.as_ref() - .map(|v| v.split(',').map(|s| s.to_string()).collect()) - }), }) } @@ -277,10 +247,20 @@ impl JsonWriter { self.arrow_writers.clear(); } - /// Returns the arrow schema representation of the delta table schema defined for the wrapped + /// Returns the user-defined arrow schema representation or the schema defined for the wrapped /// table. + /// pub fn arrow_schema(&self) -> Arc { - self.arrow_schema_ref.clone() + if let Some(schema_ref) = self.schema_ref.as_ref() { + return schema_ref.clone(); + } + let schema = self + .table + .schema() + .expect("Failed to unwrap schema for table"); + >::try_from(schema) + .expect("Failed to coerce delta schema to arrow") + .into() } fn divide_by_partition_values( @@ -378,7 +358,11 @@ impl DeltaWriter> for JsonWriter { Ok(()) } - /// Writes the existing parquet bytes to storage and resets internal state to handle another file. + /// Writes the existing parquet bytes to storage and resets internal state to handle another + /// file. + /// + /// This function returns the [Add] actions which should be committed to the [DeltaTable] for + /// the written data files async fn flush(&mut self) -> Result, DeltaTableError> { let writers = std::mem::take(&mut self.arrow_writers); let mut actions = Vec::new(); @@ -392,17 +376,20 @@ impl DeltaWriter> for JsonWriter { let path = next_data_path(&prefix, 0, &uuid, &writer.writer_properties); let obj_bytes = Bytes::from(writer.buffer.to_vec()); let file_size = obj_bytes.len() as i64; - self.storage + self.table + .object_store() .put_with_retries(&path, obj_bytes.into(), 15) .await?; + let table_config = self.table.snapshot()?.table_config(); + actions.push(create_add( &writer.partition_values, path.to_string(), file_size, &metadata, - self.num_indexed_cols, - &self.stats_columns, + table_config.num_indexed_cols(), + &table_config.stats_columns(), )?); } Ok(actions) @@ -464,35 +451,49 @@ fn extract_partition_values( #[cfg(test)] mod tests { + use super::*; + use arrow_schema::ArrowError; use parquet::file::reader::FileReader; use parquet::file::serialized_reader::SerializedFileReader; use std::fs::File; - use std::sync::Arc; - use super::*; use crate::arrow::array::Int32Array; - use crate::arrow::datatypes::{ - DataType as ArrowDataType, Field as ArrowField, Schema as ArrowSchema, - }; + use crate::arrow::datatypes::{DataType as ArrowDataType, Field as ArrowField}; use crate::kernel::DataType; + use crate::operations::create::CreateBuilder; use crate::writer::test_utils::get_delta_schema; - use crate::writer::DeltaWriter; - use crate::writer::JsonWriter; - #[tokio::test] - async fn test_partition_not_written_to_parquet() { - let table_dir = tempfile::tempdir().unwrap(); + /// Generate a simple test table which has been pre-created at version 0 + async fn get_test_table(table_dir: &tempfile::TempDir) -> DeltaTable { let schema = get_delta_schema(); let path = table_dir.path().to_str().unwrap().to_string(); - let arrow_schema = >::try_from(&schema).unwrap(); + let mut table = CreateBuilder::new() + .with_location(&path) + .with_table_name("test-table") + .with_comment("A table for running tests") + .with_columns(schema.fields().cloned()) + .await + .unwrap(); + table.load().await.expect("Failed to load table"); + assert_eq!(table.version(), 0); + table + } + + #[tokio::test] + async fn test_partition_not_written_to_parquet() { + let table_dir = tempfile::tempdir().unwrap(); + let table = get_test_table(&table_dir).await; + let schema = table.schema().unwrap(); + let arrow_schema = >::try_from(schema).unwrap(); let mut writer = JsonWriter::try_new( - path.clone(), + table.table_uri(), Arc::new(arrow_schema), Some(vec!["modified".to_string()]), None, ) + .await .unwrap(); let data = serde_json::json!( @@ -564,16 +565,17 @@ mod tests { #[tokio::test] async fn test_parsing_error() { let table_dir = tempfile::tempdir().unwrap(); - let schema = get_delta_schema(); - let path = table_dir.path().to_str().unwrap().to_string(); + let table = get_test_table(&table_dir).await; - let arrow_schema = >::try_from(&schema).unwrap(); + let arrow_schema = + >::try_from(table.schema().unwrap()).unwrap(); let mut writer = JsonWriter::try_new( - path.clone(), + table.table_uri(), Arc::new(arrow_schema), Some(vec!["modified".to_string()]), None, ) + .await .unwrap(); let data = serde_json::json!( @@ -601,16 +603,17 @@ mod tests { #[tokio::test] async fn test_json_write_mismatched_values() { let table_dir = tempfile::tempdir().unwrap(); - let schema = get_delta_schema(); - let path = table_dir.path().to_str().unwrap().to_string(); + let table = get_test_table(&table_dir).await; - let arrow_schema = >::try_from(&schema).unwrap(); + let arrow_schema = + >::try_from(table.schema().unwrap()).unwrap(); let mut writer = JsonWriter::try_new( - path.clone(), + table.table_uri(), Arc::new(arrow_schema), Some(vec!["modified".to_string()]), None, ) + .await .unwrap(); let data = serde_json::json!( @@ -639,28 +642,18 @@ mod tests { #[tokio::test] async fn test_json_write_mismatched_schema() { - use crate::operations::create::CreateBuilder; let table_dir = tempfile::tempdir().unwrap(); - let schema = get_delta_schema(); - let path = table_dir.path().to_str().unwrap().to_string(); - - let mut table = CreateBuilder::new() - .with_location(&path) - .with_table_name("test-table") - .with_comment("A table for running tests") - .with_columns(schema.fields().cloned()) - .await - .unwrap(); - table.load().await.expect("Failed to load table"); - assert_eq!(table.version(), 0); + let mut table = get_test_table(&table_dir).await; - let arrow_schema = >::try_from(&schema).unwrap(); + let schema = table.schema().unwrap(); + let arrow_schema = >::try_from(schema).unwrap(); let mut writer = JsonWriter::try_new( - path.clone(), + table.table_uri(), Arc::new(arrow_schema), Some(vec!["modified".to_string()]), None, ) + .await .unwrap(); let data = serde_json::json!( @@ -691,7 +684,6 @@ mod tests { #[tokio::test] async fn test_json_write_checkpoint() { - use crate::operations::create::CreateBuilder; use std::fs; let table_dir = tempfile::tempdir().unwrap(); @@ -740,8 +732,6 @@ mod tests { #[tokio::test] async fn test_json_write_data_skipping_stats_columns() { - use crate::operations::create::CreateBuilder; - let table_dir = tempfile::tempdir().unwrap(); let schema = get_delta_schema(); let path = table_dir.path().to_str().unwrap().to_string(); @@ -790,8 +780,6 @@ mod tests { #[tokio::test] async fn test_json_write_data_skipping_num_indexed_cols() { - use crate::operations::create::CreateBuilder; - let table_dir = tempfile::tempdir().unwrap(); let schema = get_delta_schema(); let path = table_dir.path().to_str().unwrap().to_string(); @@ -837,5 +825,4 @@ mod tests { .unwrap() ); } - } diff --git a/crates/core/src/writer/stats.rs b/crates/core/src/writer/stats.rs index 984754d510..c09efbf651 100644 --- a/crates/core/src/writer/stats.rs +++ b/crates/core/src/writer/stats.rs @@ -26,7 +26,7 @@ pub fn create_add( size: i64, file_metadata: &FileMetaData, num_indexed_cols: i32, - stats_columns: &Option>, + stats_columns: &Option>>, ) -> Result { let stats = stats_from_file_metadata( partition_values, @@ -99,7 +99,7 @@ fn stats_from_file_metadata( partition_values: &IndexMap, file_metadata: &FileMetaData, num_indexed_cols: i32, - stats_columns: &Option>, + stats_columns: &Option>>, ) -> Result { let type_ptr = parquet::schema::types::from_thrift(file_metadata.schema.as_slice()); let schema_descriptor = type_ptr.map(|type_| Arc::new(SchemaDescriptor::new(type_)))?; @@ -126,7 +126,7 @@ fn stats_from_metadata( row_group_metadata: Vec, num_rows: i64, num_indexed_cols: i32, - stats_columns: &Option>, + stats_columns: &Option>>, ) -> Result { let mut min_values: HashMap = HashMap::new(); let mut max_values: HashMap = HashMap::new(); @@ -138,7 +138,7 @@ fn stats_from_metadata( .into_iter() .map(|v| { match sqlparser::parser::Parser::new(&dialect) - .try_with_sql(v) + .try_with_sql(v.as_ref()) .map_err(|e| DeltaTableError::generic(e.to_string()))? .parse_multipart_identifier() {