From 3935479f449bc9f3904d570d55f3dfda2ec4fe61 Mon Sep 17 00:00:00 2001 From: "Lei, HUANG" Date: Tue, 9 Jul 2024 20:01:15 +0800 Subject: [PATCH 1/6] feat/copy-to-parquet-parameter: Commit Message: MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Enhance Parquet Writer with Column-wise Configuration Summary: • Introduced column_wise_config function to customize per-column properties in Parquet writer. --- .../datasource/src/file_format/parquet.rs | 41 +++++++++++++++---- src/operator/src/statement/copy_table_to.rs | 20 +++++---- 2 files changed, 46 insertions(+), 15 deletions(-) diff --git a/src/common/datasource/src/file_format/parquet.rs b/src/common/datasource/src/file_format/parquet.rs index 9988a311f51c..03735bab9f9c 100644 --- a/src/common/datasource/src/file_format/parquet.rs +++ b/src/common/datasource/src/file_format/parquet.rs @@ -27,12 +27,14 @@ use datafusion::parquet::file::metadata::ParquetMetaData; use datafusion::parquet::format::FileMetaData; use datafusion::physical_plan::metrics::ExecutionPlanMetricsSet; use datafusion::physical_plan::SendableRecordBatchStream; +use datatypes::schema::SchemaRef; use futures::future::BoxFuture; use futures::StreamExt; use object_store::{FuturesAsyncReader, ObjectStore}; use parquet::arrow::AsyncArrowWriter; -use parquet::basic::{Compression, ZstdLevel}; -use parquet::file::properties::WriterProperties; +use parquet::basic::{Compression, Encoding, ZstdLevel}; +use parquet::file::properties::{WriterProperties, WriterPropertiesBuilder}; +use parquet::schema::types::ColumnPath; use snafu::ResultExt; use tokio_util::compat::{Compat, FuturesAsyncReadCompatExt, FuturesAsyncWriteCompatExt}; @@ -184,14 +186,16 @@ impl ArrowWriterCloser for ArrowWriter { /// Returns number of rows written. pub async fn stream_to_parquet( mut stream: SendableRecordBatchStream, + schema: datatypes::schema::SchemaRef, store: ObjectStore, path: &str, concurrency: usize, ) -> Result { - let write_props = WriterProperties::builder() - .set_compression(Compression::ZSTD(ZstdLevel::default())) - .build(); - let schema = stream.schema(); + let write_props = column_wise_config( + WriterProperties::builder().set_compression(Compression::ZSTD(ZstdLevel::default())), + schema, + ) + .build(); let inner_writer = store .writer_with(path) .concurrent(concurrency) @@ -200,7 +204,7 @@ pub async fn stream_to_parquet( .map(|w| w.into_futures_async_write().compat_write()) .context(WriteObjectSnafu { path })?; - let mut writer = AsyncArrowWriter::try_new(inner_writer, schema, Some(write_props)) + let mut writer = AsyncArrowWriter::try_new(inner_writer, stream.schema(), Some(write_props)) .context(WriteParquetSnafu { path })?; let mut rows_written = 0; @@ -216,6 +220,29 @@ pub async fn stream_to_parquet( Ok(rows_written) } +/// Customizes per-column properties. +fn column_wise_config( + mut props: WriterPropertiesBuilder, + schema: SchemaRef, +) -> WriterPropertiesBuilder { + // Disable dictionary for timestamp column. + if let Some(ts_col) = schema.timestamp_column() { + let path = ColumnPath::new(vec![ts_col.name.clone()]); + props = props.set_column_dictionary_enabled(path, false); + } + + // Use delta binary packed encoding for all numeric columns. + for column in schema.column_schemas() { + let data_type = &column.data_type; + if data_type.is_numeric() { + let path = ColumnPath::new(vec![column.name.clone()]); + props = props.set_column_encoding(path, Encoding::DELTA_BINARY_PACKED); + } + } + + props +} + #[cfg(test)] mod tests { use common_test_util::find_workspace_path; diff --git a/src/operator/src/statement/copy_table_to.rs b/src/operator/src/statement/copy_table_to.rs index 0fb6f1137e9c..8a90d1095569 100644 --- a/src/operator/src/statement/copy_table_to.rs +++ b/src/operator/src/statement/copy_table_to.rs @@ -75,14 +75,18 @@ impl StatementExecutor { ) .await .context(error::WriteStreamToFileSnafu { path }), - Format::Parquet(_) => stream_to_parquet( - Box::pin(DfRecordBatchStreamAdapter::new(stream)), - object_store, - path, - WRITE_CONCURRENCY, - ) - .await - .context(error::WriteStreamToFileSnafu { path }), + Format::Parquet(_) => { + let schema = stream.schema(); + stream_to_parquet( + Box::pin(DfRecordBatchStreamAdapter::new(stream)), + schema, + object_store, + path, + WRITE_CONCURRENCY, + ) + .await + .context(error::WriteStreamToFileSnafu { path }) + } _ => error::UnsupportedFormatSnafu { format: *format }.fail(), } } From e3db2c73d7baddbb522c7a76636e984762faadb8 Mon Sep 17 00:00:00 2001 From: "Lei, HUANG" Date: Tue, 9 Jul 2024 20:32:01 +0800 Subject: [PATCH 2/6] feat/copy-to-parquet-parameter: Commit Message: MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Enhance Parquet File Format Handling for Specific Data Types Summary: • Added ConcreteDataType import to support specific data type handling. --- src/common/datasource/src/file_format/parquet.rs | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/src/common/datasource/src/file_format/parquet.rs b/src/common/datasource/src/file_format/parquet.rs index 03735bab9f9c..3b1a5789d368 100644 --- a/src/common/datasource/src/file_format/parquet.rs +++ b/src/common/datasource/src/file_format/parquet.rs @@ -27,6 +27,7 @@ use datafusion::parquet::file::metadata::ParquetMetaData; use datafusion::parquet::format::FileMetaData; use datafusion::physical_plan::metrics::ExecutionPlanMetricsSet; use datafusion::physical_plan::SendableRecordBatchStream; +use datatypes::prelude::ConcreteDataType; use datatypes::schema::SchemaRef; use futures::future::BoxFuture; use futures::StreamExt; @@ -231,10 +232,12 @@ fn column_wise_config( props = props.set_column_dictionary_enabled(path, false); } - // Use delta binary packed encoding for all numeric columns. + // Use delta binary packed encoding for int32/int64 columns. for column in schema.column_schemas() { let data_type = &column.data_type; - if data_type.is_numeric() { + if data_type == &ConcreteDataType::int64_datatype() + || data_type == &ConcreteDataType::int32_datatype() + { let path = ColumnPath::new(vec![column.name.clone()]); props = props.set_column_encoding(path, Encoding::DELTA_BINARY_PACKED); } From 19c87a72c0d23f468a9b8d54d5440768ebbbb644 Mon Sep 17 00:00:00 2001 From: "Lei, HUANG" Date: Tue, 9 Jul 2024 21:35:52 +0800 Subject: [PATCH 3/6] feat/copy-to-parquet-parameter: Commit Message: Refactor Parquet file format configuration --- .../datasource/src/file_format/parquet.rs | 17 +++-------------- 1 file changed, 3 insertions(+), 14 deletions(-) diff --git a/src/common/datasource/src/file_format/parquet.rs b/src/common/datasource/src/file_format/parquet.rs index 3b1a5789d368..13c08239729b 100644 --- a/src/common/datasource/src/file_format/parquet.rs +++ b/src/common/datasource/src/file_format/parquet.rs @@ -27,7 +27,6 @@ use datafusion::parquet::file::metadata::ParquetMetaData; use datafusion::parquet::format::FileMetaData; use datafusion::physical_plan::metrics::ExecutionPlanMetricsSet; use datafusion::physical_plan::SendableRecordBatchStream; -use datatypes::prelude::ConcreteDataType; use datatypes::schema::SchemaRef; use futures::future::BoxFuture; use futures::StreamExt; @@ -229,20 +228,10 @@ fn column_wise_config( // Disable dictionary for timestamp column. if let Some(ts_col) = schema.timestamp_column() { let path = ColumnPath::new(vec![ts_col.name.clone()]); - props = props.set_column_dictionary_enabled(path, false); + props = props + .set_column_dictionary_enabled(path.clone(), false) + .set_column_encoding(path, Encoding::DELTA_BINARY_PACKED) } - - // Use delta binary packed encoding for int32/int64 columns. - for column in schema.column_schemas() { - let data_type = &column.data_type; - if data_type == &ConcreteDataType::int64_datatype() - || data_type == &ConcreteDataType::int32_datatype() - { - let path = ColumnPath::new(vec![column.name.clone()]); - props = props.set_column_encoding(path, Encoding::DELTA_BINARY_PACKED); - } - } - props } From 4ac87c27261a3c981a53c3bd827dddf76a3b8ef8 Mon Sep 17 00:00:00 2001 From: "Lei, HUANG" Date: Wed, 10 Jul 2024 10:14:36 +0800 Subject: [PATCH 4/6] feat/copy-to-parquet-parameter: Enhance Parquet file format handling for timestamp columns - Added logic to disable dictionary encoding and set DELTA_BINARY_PACKED encoding for timestamp columns in the Parquet file format configuration. --- src/common/datasource/src/file_format/parquet.rs | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/src/common/datasource/src/file_format/parquet.rs b/src/common/datasource/src/file_format/parquet.rs index 13c08239729b..9175ee9fe053 100644 --- a/src/common/datasource/src/file_format/parquet.rs +++ b/src/common/datasource/src/file_format/parquet.rs @@ -232,6 +232,15 @@ fn column_wise_config( .set_column_dictionary_enabled(path.clone(), false) .set_column_encoding(path, Encoding::DELTA_BINARY_PACKED) } + + for col in schema.column_schemas() { + if col.data_type.is_timestamp() { + let path = ColumnPath::new(vec![col.name.clone()]); + props = props + .set_column_dictionary_enabled(path.clone(), false) + .set_column_encoding(path, Encoding::DELTA_BINARY_PACKED) + } + } props } From 5e515467d2963fd0b42c51d8514fd172956897ec Mon Sep 17 00:00:00 2001 From: "Lei, HUANG" Date: Wed, 10 Jul 2024 15:07:04 +0800 Subject: [PATCH 5/6] feat/copy-to-parquet-parameter: Disable dictionary encoding for timestamp columns in Parquet writer and update default max_active_window_runs in TwcsOptions - Modified Parquet writer to disable dictionary encoding for timestamp columns to optimize for increasing timestamp data. --- src/common/datasource/src/file_format/parquet.rs | 10 ++-------- src/mito2/src/region/options.rs | 2 +- 2 files changed, 3 insertions(+), 9 deletions(-) diff --git a/src/common/datasource/src/file_format/parquet.rs b/src/common/datasource/src/file_format/parquet.rs index 9175ee9fe053..88f21ce9dbf8 100644 --- a/src/common/datasource/src/file_format/parquet.rs +++ b/src/common/datasource/src/file_format/parquet.rs @@ -225,14 +225,8 @@ fn column_wise_config( mut props: WriterPropertiesBuilder, schema: SchemaRef, ) -> WriterPropertiesBuilder { - // Disable dictionary for timestamp column. - if let Some(ts_col) = schema.timestamp_column() { - let path = ColumnPath::new(vec![ts_col.name.clone()]); - props = props - .set_column_dictionary_enabled(path.clone(), false) - .set_column_encoding(path, Encoding::DELTA_BINARY_PACKED) - } - + // Disable dictionary for timestamp column, since for increasing timestamp column, + // the dictionary pages will be larger than data pages. for col in schema.column_schemas() { if col.data_type.is_timestamp() { let path = ColumnPath::new(vec![col.name.clone()]); diff --git a/src/mito2/src/region/options.rs b/src/mito2/src/region/options.rs index 9e740cff86b0..7a28cee977d6 100644 --- a/src/mito2/src/region/options.rs +++ b/src/mito2/src/region/options.rs @@ -216,7 +216,7 @@ impl TwcsOptions { impl Default for TwcsOptions { fn default() -> Self { Self { - max_active_window_runs: 1, + max_active_window_runs: 4, max_inactive_window_runs: 1, time_window: None, remote_compaction: false, From 76e66aa351cc94e94fc9f9fc81957f15433f700d Mon Sep 17 00:00:00 2001 From: "Lei, HUANG" Date: Wed, 10 Jul 2024 15:29:31 +0800 Subject: [PATCH 6/6] feat/copy-to-parquet-parameter: Update compaction settings in tests - Modified `test_compaction_region` to include new compaction options: `compaction.type`, `compaction.twcs.max_active_window_runs`, and `compaction.twcs.max_inactive_window_runs`. - Updated `test_merge_mode_compaction` to use `compaction.twcs.max_active_window_runs` and `compaction.twcs.max_inactive_window_runs` instead of `max_active_window_files` and `max_inactive_window_files`. --- src/mito2/src/engine/compaction_test.rs | 6 +++++- src/mito2/src/engine/merge_mode_test.rs | 4 ++-- 2 files changed, 7 insertions(+), 3 deletions(-) diff --git a/src/mito2/src/engine/compaction_test.rs b/src/mito2/src/engine/compaction_test.rs index f71b665a252f..9de4a0ddf572 100644 --- a/src/mito2/src/engine/compaction_test.rs +++ b/src/mito2/src/engine/compaction_test.rs @@ -112,7 +112,11 @@ async fn test_compaction_region() { let engine = env.create_engine(MitoConfig::default()).await; let region_id = RegionId::new(1, 1); - let request = CreateRequestBuilder::new().build(); + let request = CreateRequestBuilder::new() + .insert_option("compaction.type", "twcs") + .insert_option("compaction.twcs.max_active_window_runs", "1") + .insert_option("compaction.twcs.max_inactive_window_runs", "1") + .build(); let column_schemas = request .column_metadatas diff --git a/src/mito2/src/engine/merge_mode_test.rs b/src/mito2/src/engine/merge_mode_test.rs index 1adf51d12f41..0f0be6b8f12b 100644 --- a/src/mito2/src/engine/merge_mode_test.rs +++ b/src/mito2/src/engine/merge_mode_test.rs @@ -101,8 +101,8 @@ async fn test_merge_mode_compaction() { let request = CreateRequestBuilder::new() .field_num(2) .insert_option("compaction.type", "twcs") - .insert_option("compaction.twcs.max_active_window_files", "2") - .insert_option("compaction.twcs.max_inactive_window_files", "2") + .insert_option("compaction.twcs.max_active_window_runs", "1") + .insert_option("compaction.twcs.max_inactive_window_runs", "1") .insert_option("merge_mode", "last_non_null") .build(); let region_dir = request.region_dir.clone();