Skip to content

Commit

Permalink
Add ArrowToParquetSchemaConverter, deprecate `arrow_to_parquet_schema…
Browse files Browse the repository at this point in the history
…` et al
  • Loading branch information
alamb committed Dec 5, 2024
1 parent 93ce75c commit 29d2509
Show file tree
Hide file tree
Showing 4 changed files with 142 additions and 43 deletions.
23 changes: 13 additions & 10 deletions parquet/src/arrow/arrow_writer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,10 @@ use arrow_array::types::*;
use arrow_array::{ArrayRef, RecordBatch, RecordBatchWriter};
use arrow_schema::{ArrowError, DataType as ArrowDataType, Field, IntervalUnit, SchemaRef};

use super::schema::{
add_encoded_arrow_schema_to_metadata, arrow_to_parquet_schema,
arrow_to_parquet_schema_with_root, decimal_length_from_precision,
};
use super::schema::{add_encoded_arrow_schema_to_metadata, decimal_length_from_precision};

use crate::arrow::arrow_writer::byte_array::ByteArrayEncoder;
use crate::arrow::ArrowToParquetSchemaConverter;
use crate::column::page::{CompressedPage, PageWriteSpec, PageWriter};
use crate::column::writer::encoder::ColumnValueEncoder;
use crate::column::writer::{
Expand Down Expand Up @@ -181,10 +179,12 @@ impl<W: Write + Send> ArrowWriter<W> {
options: ArrowWriterOptions,
) -> Result<Self> {
let mut props = options.properties;
let schema = match options.schema_root {
Some(s) => arrow_to_parquet_schema_with_root(&arrow_schema, &s, props.coerce_types())?,
None => arrow_to_parquet_schema(&arrow_schema, props.coerce_types())?,
};
let mut converter = ArrowToParquetSchemaConverter::new(&arrow_schema)
.with_coerce_types(props.coerce_types());
if let Some(s) = &options.schema_root {
converter = converter.schema_root(s);
}
let schema = converter.build()?;
if !options.skip_arrow_metadata {
// add serialized arrow schema
add_encoded_arrow_schema_to_metadata(&arrow_schema, &mut props);
Expand Down Expand Up @@ -538,7 +538,7 @@ impl ArrowColumnChunk {
/// # use std::sync::Arc;
/// # use arrow_array::*;
/// # use arrow_schema::*;
/// # use parquet::arrow::arrow_to_parquet_schema;
/// # use parquet::arrow::ArrowToParquetSchemaConverter;
/// # use parquet::arrow::arrow_writer::{ArrowLeafColumn, compute_leaves, get_column_writers};
/// # use parquet::file::properties::WriterProperties;
/// # use parquet::file::writer::SerializedFileWriter;
Expand All @@ -550,7 +550,10 @@ impl ArrowColumnChunk {
///
/// // Compute the parquet schema
/// let props = Arc::new(WriterProperties::default());
/// let parquet_schema = arrow_to_parquet_schema(schema.as_ref(), props.coerce_types()).unwrap();
/// let parquet_schema = ArrowToParquetSchemaConverter::new(schema.as_ref())
/// .with_coerce_types(props.coerce_types())
/// .build()
/// .unwrap();
///
/// // Create writers for each of the leaf columns
/// let col_writers = get_column_writers(&parquet_schema, &props, &schema).unwrap();
Expand Down
8 changes: 6 additions & 2 deletions parquet/src/arrow/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,9 +116,13 @@ pub use self::async_writer::AsyncArrowWriter;
use crate::schema::types::SchemaDescriptor;
use arrow_schema::{FieldRef, Schema};

// continue to until functions are removed
#[allow(deprecated)]
pub use self::schema::arrow_to_parquet_schema;

pub use self::schema::{
arrow_to_parquet_schema, parquet_to_arrow_field_levels, parquet_to_arrow_schema,
parquet_to_arrow_schema_by_columns, FieldLevels,
parquet_to_arrow_field_levels, parquet_to_arrow_schema, parquet_to_arrow_schema_by_columns,
ArrowToParquetSchemaConverter, FieldLevels,
};

/// Schema metadata key used to store serialized Arrow IPC schema
Expand Down
135 changes: 115 additions & 20 deletions parquet/src/arrow/schema/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -225,29 +225,121 @@ pub(crate) fn add_encoded_arrow_schema_to_metadata(schema: &Schema, props: &mut
}
}

/// Converter for arrow schema to parquet schema
///
/// Example:
/// ```
/// # use std::sync::Arc;
/// # use arrow_schema::{Field, Schema, DataType};
/// # use parquet::arrow::ArrowToParquetSchemaConverter;
/// use parquet::schema::types::{SchemaDescriptor, Type};
/// use parquet::basic; // note there are two `Type`s in the following example
/// let arrow_schema = Schema::new(vec![
/// Field::new("a", DataType::Int64, true),
/// Field::new("b", DataType::Date32, true),
/// ]);
///
/// let parquet_schema = ArrowToParquetSchemaConverter::new(&arrow_schema)
/// .build()
/// .unwrap();
/// //
/// let expected_parquet_schema = SchemaDescriptor::new(
/// Arc::new(
/// Type::group_type_builder("arrow_schema")
/// .with_fields(vec![
/// Arc::new(
/// Type::primitive_type_builder("a", basic::Type::INT64)
/// .build().unwrap()
/// ),
/// Arc::new(
/// Type::primitive_type_builder("b", basic::Type::INT32)
/// .with_converted_type(basic::ConvertedType::DATE)
/// .with_logical_type(Some(basic::LogicalType::Date))
/// .build().unwrap()
/// ),
/// ])
/// .build().unwrap()
/// )
/// );
///
/// assert_eq!(parquet_schema, expected_parquet_schema);
/// ```
#[derive(Debug)]
pub struct ArrowToParquetSchemaConverter<'a> {
/// The schema to convert
schema: &'a Schema,
/// Name of the root schema in Parquet
schema_root: &'a str,
/// Should we Coerce arrow types to compatible Parquet types?
///
/// See docs on [Self::with_coerce_types]`
coerce_types: bool
}

impl <'a> ArrowToParquetSchemaConverter<'a> {
/// Create a new converter
pub fn new(schema: &'a Schema) -> Self {
Self {
schema,
schema_root: "arrow_schema",
coerce_types: false,
}
}

/// Should arrow types be coerced into parquet native types (default false).
///
/// Setting this option to `true` will result in parquet files that can be
/// read by more readers, but may lose precision for arrow types such as
/// [`DataType::Date64`] which have no direct corresponding Parquet type.
///
/// # Discussion
///
/// Some Arrow types such as `Date64`, `Timestamp` and `Interval` have no
/// corresponding Parquet logical type. Thus, they can not be losslessly
/// round-tripped when stored using the appropriate Parquet logical type.
///
/// For example, some Date64 values may be truncated when stored with
/// parquet's native 32 bit date type.
///
/// By default, the arrow writer does not coerce to native parquet types. It
/// writes data in such a way that it can be lossless round tripped.
/// However, this means downstream readers must be aware of and correctly
/// interpret the embedded Arrow schema.
pub fn with_coerce_types(mut self, coerce_types: bool) -> Self {
self.coerce_types = coerce_types;
self
}

/// Set the root schema element name (defaults to `"arrow_schema"`).
pub fn schema_root(mut self, schema_root: &'a str) -> Self {
self.schema_root = schema_root;
self
}

/// Build the desired parquet [`SchemaDescriptor`]
pub fn build(self) -> Result<SchemaDescriptor> {
let Self { schema, schema_root: root_schema_name, coerce_types } = self;
let fields = schema
.fields()
.iter()
.map(|field| arrow_to_parquet_type(field, coerce_types).map(Arc::new))
.collect::<Result<_>>()?;
let group = Type::group_type_builder(root_schema_name).with_fields(fields).build()?;
Ok(SchemaDescriptor::new(Arc::new(group)))
}
}

/// Convert arrow schema to parquet schema
///
/// The name of the root schema element defaults to `"arrow_schema"`, this can be
/// overridden with [`arrow_to_parquet_schema_with_root`]
pub fn arrow_to_parquet_schema(schema: &Schema, coerce_types: bool) -> Result<SchemaDescriptor> {
arrow_to_parquet_schema_with_root(schema, "arrow_schema", coerce_types)
}
#[deprecated(since = "54.0.0", note = "Use `ArrowToParquetSchemaConverter` instead")]
pub fn arrow_to_parquet_schema(schema: &Schema) -> Result<SchemaDescriptor> {

/// Convert arrow schema to parquet schema specifying the name of the root schema element
pub fn arrow_to_parquet_schema_with_root(
schema: &Schema,
root: &str,
coerce_types: bool,
) -> Result<SchemaDescriptor> {
let fields = schema
.fields()
.iter()
.map(|field| arrow_to_parquet_type(field, coerce_types).map(Arc::new))
.collect::<Result<_>>()?;
let group = Type::group_type_builder(root).with_fields(fields).build()?;
Ok(SchemaDescriptor::new(Arc::new(group)))
ArrowToParquetSchemaConverter::new(schema).build()
}


fn parse_key_value_metadata(
key_value_metadata: Option<&Vec<KeyValue>>,
) -> Option<HashMap<String, String>> {
Expand Down Expand Up @@ -1569,7 +1661,7 @@ mod tests {
Field::new("decimal256", DataType::Decimal256(39, 2), false),
];
let arrow_schema = Schema::new(arrow_fields);
let converted_arrow_schema = arrow_to_parquet_schema(&arrow_schema, false).unwrap();
let converted_arrow_schema = ArrowToParquetSchemaConverter::new(&arrow_schema).build().unwrap();

assert_eq!(
parquet_schema.columns().len(),
Expand Down Expand Up @@ -1606,9 +1698,10 @@ mod tests {
false,
)];
let arrow_schema = Schema::new(arrow_fields);
let converted_arrow_schema = arrow_to_parquet_schema(&arrow_schema, true);
let converted_arrow_schema = ArrowToParquetSchemaConverter::new(&arrow_schema)
.with_coerce_types(true)
.build();

assert!(converted_arrow_schema.is_err());
converted_arrow_schema.unwrap();
}

Expand Down Expand Up @@ -1878,7 +1971,9 @@ mod tests {
// don't pass metadata so field ids are read from Parquet and not from serialized Arrow schema
let arrow_schema = crate::arrow::parquet_to_arrow_schema(&schema_descriptor, None)?;

let parq_schema_descr = crate::arrow::arrow_to_parquet_schema(&arrow_schema, true)?;
let parq_schema_descr = crate::arrow::ArrowToParquetSchemaConverter::new(&arrow_schema)
.with_coerce_types(true)
.build()?;
let parq_fields = parq_schema_descr.root_schema().get_fields();
assert_eq!(parq_fields.len(), 2);
assert_eq!(parq_fields[0].get_basic_info().id(), 1);
Expand Down
19 changes: 8 additions & 11 deletions parquet/src/file/properties.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,13 @@
// under the License.

//! Configuration via [`WriterProperties`] and [`ReaderProperties`]
use std::str::FromStr;
use std::{collections::HashMap, sync::Arc};

use crate::basic::{Compression, Encoding};
use crate::compression::{CodecOptions, CodecOptionsBuilder};
use crate::file::metadata::KeyValue;
use crate::format::SortingColumn;
use crate::schema::types::ColumnPath;
use std::str::FromStr;
use std::{collections::HashMap, sync::Arc};

/// Default value for [`WriterProperties::data_page_size_limit`]
pub const DEFAULT_PAGE_SIZE: usize = 1024 * 1024;
Expand Down Expand Up @@ -287,15 +286,13 @@ impl WriterProperties {
self.statistics_truncate_length
}

/// Returns `coerce_types` boolean
/// Should the writer coerce types to parquet native types.
///
/// Setting this option to `true` will result in parquet files that can be
/// read by more readers, but may lose precision for arrow types such as
/// [`DataType::Date64`] which have no direct corresponding Parquet type.
///
/// Some Arrow types do not have a corresponding Parquet logical type.
/// Affected Arrow data types include `Date64`, `Timestamp` and `Interval`.
/// Writers have the option to coerce these into native Parquet types. Type
/// coercion allows for meaningful representations that do not require
/// downstream readers to consider the embedded Arrow schema. However, type
/// coercion also prevents the data from being losslessly round-tripped. This method
/// returns `true` if type coercion enabled.
/// See [`ArrowToParquetSchemaConverter::with_coerce_types`] for more details
pub fn coerce_types(&self) -> bool {
self.coerce_types
}
Expand Down

0 comments on commit 29d2509

Please sign in to comment.