diff --git a/parquet/src/arrow/arrow_writer/mod.rs b/parquet/src/arrow/arrow_writer/mod.rs index 3563348791bc..d9771838ada6 100644 --- a/parquet/src/arrow/arrow_writer/mod.rs +++ b/parquet/src/arrow/arrow_writer/mod.rs @@ -119,11 +119,27 @@ impl ArrowWriter { writer: W, arrow_schema: SchemaRef, props: Option, + ) -> Result { + let options = ArrowWriterOptions::new().with_properties(props.unwrap_or_default()); + Self::try_new_with_options(writer, arrow_schema, options) + } + + /// Try to create a new Arrow writer with [`ArrowWriterOptions`]. + /// + /// The writer will fail if: + /// * a `SerializedFileWriter` cannot be created from the ParquetWriter + /// * the Arrow schema contains unsupported datatypes such as Unions + pub fn try_new_with_options( + writer: W, + arrow_schema: SchemaRef, + options: ArrowWriterOptions, ) -> Result { let schema = arrow_to_parquet_schema(&arrow_schema)?; - // add serialized arrow schema - let mut props = props.unwrap_or_default(); - add_encoded_arrow_schema_to_metadata(&arrow_schema, &mut props); + let mut props = options.properties; + if !options.skip_arrow_metadata { + // add serialized arrow schema + add_encoded_arrow_schema_to_metadata(&arrow_schema, &mut props); + } let max_row_group_size = props.max_row_group_size(); @@ -245,6 +261,38 @@ impl RecordBatchWriter for ArrowWriter { } } +/// Arrow-specific configuration settings for writing parquet files. +/// +/// See [`ArrowWriter`] for how to configure the writer. +#[derive(Debug, Clone, Default)] +pub struct ArrowWriterOptions { + properties: WriterProperties, + skip_arrow_metadata: bool, +} + +impl ArrowWriterOptions { + /// Creates a new [`ArrowWriterOptions`] with the default settings. + pub fn new() -> Self { + Self::default() + } + + /// Sets the [`WriterProperties`] for writing parquet files. + pub fn with_properties(self, properties: WriterProperties) -> Self { + Self { properties, ..self } + } + + /// Parquet files generated by the [`ArrowWriter`] contain embedded arrow schema + /// by default. + /// + /// Set `skip_arrow_metadata` to true, to skip encoding this. + pub fn with_skip_arrow_metadata(self, skip_arrow_metadata: bool) -> Self { + Self { + skip_arrow_metadata, + ..self + } + } +} + /// A single column chunk produced by [`ArrowColumnWriter`] #[derive(Default)] struct ArrowColumnChunkData { @@ -904,6 +952,7 @@ mod tests { use std::sync::Arc; use crate::arrow::arrow_reader::{ParquetRecordBatchReader, ParquetRecordBatchReaderBuilder}; + use crate::arrow::ARROW_SCHEMA_META_KEY; use arrow::datatypes::ToByteSlice; use arrow::datatypes::{DataType, Field, Schema, UInt32Type, UInt8Type}; use arrow::error::Result as ArrowResult; @@ -2882,4 +2931,36 @@ mod tests { let b_idx = &column_index[0][1]; assert!(matches!(b_idx, Index::NONE), "{b_idx:?}"); } + + #[test] + fn test_arrow_writer_skip_metadata() { + let batch_schema = Schema::new(vec![Field::new("int32", DataType::Int32, false)]); + let file_schema = Arc::new(batch_schema.clone()); + + let batch = RecordBatch::try_new( + Arc::new(batch_schema), + vec![Arc::new(Int32Array::from(vec![1, 2, 3, 4])) as _], + ) + .unwrap(); + let skip_options = ArrowWriterOptions::new().with_skip_arrow_metadata(true); + + let mut buf = Vec::with_capacity(1024); + let mut writer = + ArrowWriter::try_new_with_options(&mut buf, file_schema.clone(), skip_options).unwrap(); + writer.write(&batch).unwrap(); + writer.close().unwrap(); + + let bytes = Bytes::from(buf); + let reader_builder = ParquetRecordBatchReaderBuilder::try_new(bytes).unwrap(); + assert_eq!(file_schema, *reader_builder.schema()); + if let Some(key_value_metadata) = reader_builder + .metadata() + .file_metadata() + .key_value_metadata() + { + assert!(!key_value_metadata + .iter() + .any(|kv| kv.key.as_str() == ARROW_SCHEMA_META_KEY)); + } + } } diff --git a/parquet/src/arrow/async_writer/mod.rs b/parquet/src/arrow/async_writer/mod.rs index 30080c579e8f..3f3da1a5f9b9 100644 --- a/parquet/src/arrow/async_writer/mod.rs +++ b/parquet/src/arrow/async_writer/mod.rs @@ -54,6 +54,7 @@ use std::{io::Write, sync::Arc}; use crate::{ + arrow::arrow_writer::ArrowWriterOptions, arrow::ArrowWriter, errors::{ParquetError, Result}, file::properties::WriterProperties, @@ -97,9 +98,29 @@ impl AsyncArrowWriter { arrow_schema: SchemaRef, buffer_size: usize, props: Option, + ) -> Result { + let options = ArrowWriterOptions::new().with_properties(props.unwrap_or_default()); + Self::try_new_with_options(writer, arrow_schema, buffer_size, options) + } + + /// Try to create a new Async Arrow Writer with [`ArrowWriterOptions`]. + /// + /// `buffer_size` determines the number of bytes to buffer before flushing + /// to the underlying [`AsyncWrite`] + /// + /// The intermediate buffer will automatically be resized if necessary + /// + /// [`Self::write`] will flush this intermediate buffer if it is at least + /// half full + pub fn try_new_with_options( + writer: W, + arrow_schema: SchemaRef, + buffer_size: usize, + options: ArrowWriterOptions, ) -> Result { let shared_buffer = SharedBuffer::new(buffer_size); - let sync_writer = ArrowWriter::try_new(shared_buffer.clone(), arrow_schema, props)?; + let sync_writer = + ArrowWriter::try_new_with_options(shared_buffer.clone(), arrow_schema, options)?; Ok(Self { sync_writer,