Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

parquet: Add ArrowWriterOptions to skip embedding the arrow metadata #5299

Merged
merged 4 commits into from
Jan 22, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 76 additions & 2 deletions parquet/src/arrow/arrow_writer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,11 +119,27 @@ impl<W: Write + Send> ArrowWriter<W> {
writer: W,
arrow_schema: SchemaRef,
props: Option<WriterProperties>,
) -> Result<Self> {
Self::try_new_with_options(writer, arrow_schema, props, Default::default())
}

/// Try to create a new Arrow writer with [`ArrowWriterOptions`].
///
/// The writer will fail if:
/// * a `SerializedFileWriter` cannot be created from the ParquetWriter
/// * the Arrow schema contains unsupported datatypes such as Unions
pub fn try_new_with_options(
writer: W,
arrow_schema: SchemaRef,
props: Option<WriterProperties>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if we could move this on to ArrowWriterOptions?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, I'll try it. The cpp FileWriter API separates these two parameters so I kept try_new_with_options() similar to theirs.

::arrow::Result<std::unique_ptr<FileWriter>> Open(
    const ::arrow::Schema &schema,
    MemoryPool *pool,
    std::shared_ptr<::arrow::io::OutputStream> sink,
    std::shared_ptr<WriterProperties> properties = default_writer_properties(),
    std::shared_ptr<ArrowWriterProperties> arrow_properties = default_arrow_writer_properties()
)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do think we should probably move WriterProperties on to ArrowWriterOptions

This approach looks good to me. And we can also consider deprecating the old try_new method or making a breaking change to remove it, since these two are very similar

options: ArrowWriterOptions,
) -> Result<Self> {
let schema = arrow_to_parquet_schema(&arrow_schema)?;
// add serialized arrow schema
let mut props = props.unwrap_or_default();
add_encoded_arrow_schema_to_metadata(&arrow_schema, &mut props);
if !options.skip_arrow_metadata {
// add serialized arrow schema
add_encoded_arrow_schema_to_metadata(&arrow_schema, &mut props);
}

let max_row_group_size = props.max_row_group_size();

Expand Down Expand Up @@ -245,6 +261,30 @@ impl<W: Write + Send> RecordBatchWriter for ArrowWriter<W> {
}
}

/// Arrow-specific configuration settings for writing parquet files.
///
/// See [`ArrowWriter`] for how to configure the writer.
#[derive(Debug, Clone, Default)]
pub struct ArrowWriterOptions {
skip_arrow_metadata: bool,
}

impl ArrowWriterOptions {
/// Creates a new [`ArrowWriterOptions`] with the default settings.
pub fn new() -> Self {
Self::default()
}

/// Parquet files generated by the [`ArrowWriter`] contain embedded arrow schema
/// by default.
///
/// Set `skip_arrow_metadata` to true, to skip encoding this.
pub fn with_skip_arrow_metadata(mut self, skip_arrow_metadata: bool) -> Self {
self.skip_arrow_metadata = skip_arrow_metadata;
self
}
}

/// A single column chunk produced by [`ArrowColumnWriter`]
#[derive(Default)]
struct ArrowColumnChunkData {
Expand Down Expand Up @@ -904,6 +944,7 @@ mod tests {
use std::sync::Arc;

use crate::arrow::arrow_reader::{ParquetRecordBatchReader, ParquetRecordBatchReaderBuilder};
use crate::arrow::ARROW_SCHEMA_META_KEY;
use arrow::datatypes::ToByteSlice;
use arrow::datatypes::{DataType, Field, Schema, UInt32Type, UInt8Type};
use arrow::error::Result as ArrowResult;
Expand Down Expand Up @@ -2882,4 +2923,37 @@ mod tests {
let b_idx = &column_index[0][1];
assert!(matches!(b_idx, Index::NONE), "{b_idx:?}");
}

#[test]
fn test_arrow_writer_skip_metadata() {
let batch_schema = Schema::new(vec![Field::new("int32", DataType::Int32, false)]);
let file_schema = Arc::new(batch_schema.clone());

let batch = RecordBatch::try_new(
Arc::new(batch_schema),
vec![Arc::new(Int32Array::from(vec![1, 2, 3, 4])) as _],
)
.unwrap();
let skip_options = ArrowWriterOptions::new().with_skip_arrow_metadata(true);

let mut buf = Vec::with_capacity(1024);
let mut writer =
ArrowWriter::try_new_with_options(&mut buf, file_schema.clone(), None, skip_options)
.unwrap();
writer.write(&batch).unwrap();
writer.close().unwrap();

let bytes = Bytes::from(buf);
let reader_builder = ParquetRecordBatchReaderBuilder::try_new(bytes).unwrap();
assert_eq!(file_schema, *reader_builder.schema());
if let Some(key_value_metadata) = reader_builder
.metadata()
.file_metadata()
.key_value_metadata()
{
assert!(!key_value_metadata
.iter()
.any(|kv| kv.key.as_str() == ARROW_SCHEMA_META_KEY));
}
}
}
23 changes: 22 additions & 1 deletion parquet/src/arrow/async_writer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
use std::{io::Write, sync::Arc};

use crate::{
arrow::arrow_writer::ArrowWriterOptions,
arrow::ArrowWriter,
errors::{ParquetError, Result},
file::properties::WriterProperties,
Expand Down Expand Up @@ -97,9 +98,29 @@ impl<W: AsyncWrite + Unpin + Send> AsyncArrowWriter<W> {
arrow_schema: SchemaRef,
buffer_size: usize,
props: Option<WriterProperties>,
) -> Result<Self> {
Self::try_new_with_options(writer, arrow_schema, buffer_size, props, Default::default())
}

/// Try to create a new Async Arrow Writer with [`ArrowWriterOptions`].
///
/// `buffer_size` determines the number of bytes to buffer before flushing
/// to the underlying [`AsyncWrite`]
///
/// The intermediate buffer will automatically be resized if necessary
///
/// [`Self::write`] will flush this intermediate buffer if it is at least
/// half full
pub fn try_new_with_options(
writer: W,
arrow_schema: SchemaRef,
buffer_size: usize,
props: Option<WriterProperties>,
options: ArrowWriterOptions,
) -> Result<Self> {
let shared_buffer = SharedBuffer::new(buffer_size);
let sync_writer = ArrowWriter::try_new(shared_buffer.clone(), arrow_schema, props)?;
let sync_writer =
ArrowWriter::try_new_with_options(shared_buffer.clone(), arrow_schema, props, options)?;

Ok(Self {
sync_writer,
Expand Down
Loading