Skip to content

Commit

Permalink
Allow overriding the inferred parquet scheme root (#5814)
Browse files Browse the repository at this point in the history
  • Loading branch information
tustvold authored May 29, 2024
1 parent 09e58a4 commit 7fe01bb
Show file tree
Hide file tree
Showing 3 changed files with 41 additions and 16 deletions.
19 changes: 17 additions & 2 deletions parquet/src/arrow/arrow_writer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@ use arrow_array::{ArrayRef, RecordBatch, RecordBatchWriter};
use arrow_schema::{ArrowError, DataType as ArrowDataType, Field, IntervalUnit, SchemaRef};

use super::schema::{
add_encoded_arrow_schema_to_metadata, arrow_to_parquet_schema, decimal_length_from_precision,
add_encoded_arrow_schema_to_metadata, arrow_to_parquet_schema,
arrow_to_parquet_schema_with_root, decimal_length_from_precision,
};

use crate::arrow::arrow_writer::byte_array::ByteArrayEncoder;
Expand Down Expand Up @@ -160,7 +161,10 @@ impl<W: Write + Send> ArrowWriter<W> {
arrow_schema: SchemaRef,
options: ArrowWriterOptions,
) -> Result<Self> {
let schema = arrow_to_parquet_schema(&arrow_schema)?;
let schema = match options.schema_root {
Some(s) => arrow_to_parquet_schema_with_root(&arrow_schema, &s)?,
None => arrow_to_parquet_schema(&arrow_schema)?,
};
let mut props = options.properties;
if !options.skip_arrow_metadata {
// add serialized arrow schema
Expand Down Expand Up @@ -323,6 +327,7 @@ impl<W: Write + Send> RecordBatchWriter for ArrowWriter<W> {
pub struct ArrowWriterOptions {
properties: WriterProperties,
skip_arrow_metadata: bool,
schema_root: Option<String>,
}

impl ArrowWriterOptions {
Expand All @@ -346,6 +351,16 @@ impl ArrowWriterOptions {
..self
}
}

/// Overrides the name of the root parquet schema element
///
/// Defaults to `"arrow_schema"`
pub fn with_schema_root(self, name: String) -> Self {
Self {
schema_root: Some(name),
..self
}
}
}

/// A single column chunk produced by [`ArrowColumnWriter`]
Expand Down
12 changes: 9 additions & 3 deletions parquet/src/arrow/schema/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -223,15 +223,21 @@ pub(crate) fn add_encoded_arrow_schema_to_metadata(schema: &Schema, props: &mut
}

/// Convert arrow schema to parquet schema
///
/// The name of the root schema element defaults to `"arrow_schema"`, this can be
/// overridden with [`arrow_to_parquet_schema_with_root`]
pub fn arrow_to_parquet_schema(schema: &Schema) -> Result<SchemaDescriptor> {
arrow_to_parquet_schema_with_root(schema, "arrow_schema")
}

/// Convert arrow schema to parquet schema specifying the name of the root schema element
pub fn arrow_to_parquet_schema_with_root(schema: &Schema, root: &str) -> Result<SchemaDescriptor> {
let fields = schema
.fields()
.iter()
.map(|field| arrow_to_parquet_type(field).map(Arc::new))
.collect::<Result<_>>()?;
let group = Type::group_type_builder("arrow_schema")
.with_fields(fields)
.build()?;
let group = Type::group_type_builder(root).with_fields(fields).build()?;
Ok(SchemaDescriptor::new(Arc::new(group)))
}

Expand Down
26 changes: 15 additions & 11 deletions parquet/src/bin/parquet-fromcsv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ use std::{
use arrow_csv::ReaderBuilder;
use arrow_schema::{ArrowError, Schema};
use clap::{Parser, ValueEnum};
use parquet::arrow::arrow_writer::ArrowWriterOptions;
use parquet::{
arrow::{parquet_to_arrow_schema, ArrowWriter},
basic::Compression,
Expand Down Expand Up @@ -333,21 +334,16 @@ fn configure_reader_builder(args: &Args, arrow_schema: Arc<Schema>) -> ReaderBui
builder
}

fn arrow_schema_from_string(schema: &str) -> Result<Arc<Schema>, ParquetFromCsvError> {
let schema = Arc::new(parse_message_type(schema)?);
let desc = SchemaDescriptor::new(schema);
let arrow_schema = Arc::new(parquet_to_arrow_schema(&desc, None)?);
Ok(arrow_schema)
}

fn convert_csv_to_parquet(args: &Args) -> Result<(), ParquetFromCsvError> {
let schema = read_to_string(args.schema_path()).map_err(|e| {
ParquetFromCsvError::with_context(
e,
&format!("Failed to open schema file {:#?}", args.schema_path()),
)
})?;
let arrow_schema = arrow_schema_from_string(&schema)?;
let parquet_schema = Arc::new(parse_message_type(&schema)?);
let desc = SchemaDescriptor::new(parquet_schema);
let arrow_schema = Arc::new(parquet_to_arrow_schema(&desc, None)?);

// create output parquet writer
let parquet_file = File::create(&args.output_file).map_err(|e| {
Expand All @@ -357,9 +353,12 @@ fn convert_csv_to_parquet(args: &Args) -> Result<(), ParquetFromCsvError> {
)
})?;

let writer_properties = Some(configure_writer_properties(args));
let options = ArrowWriterOptions::new()
.with_properties(configure_writer_properties(args))
.with_schema_root(desc.name().to_string());

let mut arrow_writer =
ArrowWriter::try_new(parquet_file, arrow_schema.clone(), writer_properties)
ArrowWriter::try_new_with_options(parquet_file, arrow_schema.clone(), options)
.map_err(|e| ParquetFromCsvError::with_context(e, "Failed to create ArrowWriter"))?;

// open input file
Expand Down Expand Up @@ -426,6 +425,7 @@ mod tests {
use clap::{CommandFactory, Parser};
use flate2::write::GzEncoder;
use parquet::basic::{BrotliLevel, GzipLevel, ZstdLevel};
use parquet::file::reader::{FileReader, SerializedFileReader};
use snap::write::FrameEncoder;
use tempfile::NamedTempFile;

Expand Down Expand Up @@ -647,7 +647,7 @@ mod tests {

fn test_convert_compressed_csv_to_parquet(csv_compression: Compression) {
let schema = NamedTempFile::new().unwrap();
let schema_text = r"message schema {
let schema_text = r"message my_amazing_schema {
optional int32 id;
optional binary name (STRING);
}";
Expand Down Expand Up @@ -728,6 +728,10 @@ mod tests {
help: None,
};
convert_csv_to_parquet(&args).unwrap();

let file = SerializedFileReader::new(output_parquet.into_file()).unwrap();
let schema_name = file.metadata().file_metadata().schema().name();
assert_eq!(schema_name, "my_amazing_schema");
}

#[test]
Expand Down

0 comments on commit 7fe01bb

Please sign in to comment.