Skip to content

Commit

Permalink
parquet: Add ArrowWriterOptions to skip embedding the arrow metadata (#…
Browse files Browse the repository at this point in the history
…5299)

* feat(parquet): Add ArrowWriterOptions

* test(parquet): test skip_arrow_metadata

* feat(parquet): Add try_new_with_options to async writer

* refactor: move WriterProperties to ArrowWriterOptions
  • Loading branch information
evenyag authored Jan 22, 2024
1 parent b03613e commit ce58932
Show file tree
Hide file tree
Showing 2 changed files with 106 additions and 4 deletions.
87 changes: 84 additions & 3 deletions parquet/src/arrow/arrow_writer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,11 +119,27 @@ impl<W: Write + Send> ArrowWriter<W> {
writer: W,
arrow_schema: SchemaRef,
props: Option<WriterProperties>,
) -> Result<Self> {
let options = ArrowWriterOptions::new().with_properties(props.unwrap_or_default());
Self::try_new_with_options(writer, arrow_schema, options)
}

/// Try to create a new Arrow writer with [`ArrowWriterOptions`].
///
/// The writer will fail if:
/// * a `SerializedFileWriter` cannot be created from the ParquetWriter
/// * the Arrow schema contains unsupported datatypes such as Unions
pub fn try_new_with_options(
writer: W,
arrow_schema: SchemaRef,
options: ArrowWriterOptions,
) -> Result<Self> {
let schema = arrow_to_parquet_schema(&arrow_schema)?;
// add serialized arrow schema
let mut props = props.unwrap_or_default();
add_encoded_arrow_schema_to_metadata(&arrow_schema, &mut props);
let mut props = options.properties;
if !options.skip_arrow_metadata {
// add serialized arrow schema
add_encoded_arrow_schema_to_metadata(&arrow_schema, &mut props);
}

let max_row_group_size = props.max_row_group_size();

Expand Down Expand Up @@ -245,6 +261,38 @@ impl<W: Write + Send> RecordBatchWriter for ArrowWriter<W> {
}
}

/// Arrow-specific configuration settings for writing parquet files.
///
/// See [`ArrowWriter`] for how to configure the writer.
#[derive(Debug, Clone, Default)]
pub struct ArrowWriterOptions {
properties: WriterProperties,
skip_arrow_metadata: bool,
}

impl ArrowWriterOptions {
/// Creates a new [`ArrowWriterOptions`] with the default settings.
pub fn new() -> Self {
Self::default()
}

/// Sets the [`WriterProperties`] for writing parquet files.
pub fn with_properties(self, properties: WriterProperties) -> Self {
Self { properties, ..self }
}

/// Parquet files generated by the [`ArrowWriter`] contain embedded arrow schema
/// by default.
///
/// Set `skip_arrow_metadata` to true, to skip encoding this.
pub fn with_skip_arrow_metadata(self, skip_arrow_metadata: bool) -> Self {
Self {
skip_arrow_metadata,
..self
}
}
}

/// A single column chunk produced by [`ArrowColumnWriter`]
#[derive(Default)]
struct ArrowColumnChunkData {
Expand Down Expand Up @@ -904,6 +952,7 @@ mod tests {
use std::sync::Arc;

use crate::arrow::arrow_reader::{ParquetRecordBatchReader, ParquetRecordBatchReaderBuilder};
use crate::arrow::ARROW_SCHEMA_META_KEY;
use arrow::datatypes::ToByteSlice;
use arrow::datatypes::{DataType, Field, Schema, UInt32Type, UInt8Type};
use arrow::error::Result as ArrowResult;
Expand Down Expand Up @@ -2882,4 +2931,36 @@ mod tests {
let b_idx = &column_index[0][1];
assert!(matches!(b_idx, Index::NONE), "{b_idx:?}");
}

#[test]
fn test_arrow_writer_skip_metadata() {
let batch_schema = Schema::new(vec![Field::new("int32", DataType::Int32, false)]);
let file_schema = Arc::new(batch_schema.clone());

let batch = RecordBatch::try_new(
Arc::new(batch_schema),
vec![Arc::new(Int32Array::from(vec![1, 2, 3, 4])) as _],
)
.unwrap();
let skip_options = ArrowWriterOptions::new().with_skip_arrow_metadata(true);

let mut buf = Vec::with_capacity(1024);
let mut writer =
ArrowWriter::try_new_with_options(&mut buf, file_schema.clone(), skip_options).unwrap();
writer.write(&batch).unwrap();
writer.close().unwrap();

let bytes = Bytes::from(buf);
let reader_builder = ParquetRecordBatchReaderBuilder::try_new(bytes).unwrap();
assert_eq!(file_schema, *reader_builder.schema());
if let Some(key_value_metadata) = reader_builder
.metadata()
.file_metadata()
.key_value_metadata()
{
assert!(!key_value_metadata
.iter()
.any(|kv| kv.key.as_str() == ARROW_SCHEMA_META_KEY));
}
}
}
23 changes: 22 additions & 1 deletion parquet/src/arrow/async_writer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
use std::{io::Write, sync::Arc};

use crate::{
arrow::arrow_writer::ArrowWriterOptions,
arrow::ArrowWriter,
errors::{ParquetError, Result},
file::properties::WriterProperties,
Expand Down Expand Up @@ -97,9 +98,29 @@ impl<W: AsyncWrite + Unpin + Send> AsyncArrowWriter<W> {
arrow_schema: SchemaRef,
buffer_size: usize,
props: Option<WriterProperties>,
) -> Result<Self> {
let options = ArrowWriterOptions::new().with_properties(props.unwrap_or_default());
Self::try_new_with_options(writer, arrow_schema, buffer_size, options)
}

/// Try to create a new Async Arrow Writer with [`ArrowWriterOptions`].
///
/// `buffer_size` determines the number of bytes to buffer before flushing
/// to the underlying [`AsyncWrite`]
///
/// The intermediate buffer will automatically be resized if necessary
///
/// [`Self::write`] will flush this intermediate buffer if it is at least
/// half full
pub fn try_new_with_options(
writer: W,
arrow_schema: SchemaRef,
buffer_size: usize,
options: ArrowWriterOptions,
) -> Result<Self> {
let shared_buffer = SharedBuffer::new(buffer_size);
let sync_writer = ArrowWriter::try_new(shared_buffer.clone(), arrow_schema, props)?;
let sync_writer =
ArrowWriter::try_new_with_options(shared_buffer.clone(), arrow_schema, options)?;

Ok(Self {
sync_writer,
Expand Down

0 comments on commit ce58932

Please sign in to comment.