diff --git a/parquet/src/arrow/array_reader/fixed_len_byte_array.rs b/parquet/src/arrow/array_reader/fixed_len_byte_array.rs index 072130833903..324dbe21e1a3 100644 --- a/parquet/src/arrow/array_reader/fixed_len_byte_array.rs +++ b/parquet/src/arrow/array_reader/fixed_len_byte_array.rs @@ -162,10 +162,11 @@ impl ArrayReader for FixedLenByteArrayReader { fn consume_batch(&mut self) -> Result { let record_data = self.record_reader.consume_record_data(); - let array_data = ArrayDataBuilder::new(ArrowType::FixedSizeBinary(self.byte_length as i32)) - .len(self.record_reader.num_values()) - .add_buffer(record_data) - .null_bit_buffer(self.record_reader.consume_bitmap_buffer()); + let array_data = + ArrayDataBuilder::new(ArrowType::FixedSizeBinary(self.byte_length as i32)) + .len(self.record_reader.num_values()) + .add_buffer(record_data) + .null_bit_buffer(self.record_reader.consume_bitmap_buffer()); let binary = FixedSizeBinaryArray::from(unsafe { array_data.build_unchecked() }); @@ -196,13 +197,19 @@ impl ArrayReader for FixedLenByteArrayReader { IntervalUnit::YearMonth => Arc::new( binary .iter() - .map(|o| o.map(|b| i32::from_le_bytes(b[0..4].try_into().unwrap()))) + .map(|o| { + o.map(|b| i32::from_le_bytes(b[0..4].try_into().unwrap())) + }) .collect::(), ) as ArrayRef, IntervalUnit::DayTime => Arc::new( binary .iter() - .map(|o| o.map(|b| i64::from_le_bytes(b[4..12].try_into().unwrap()))) + .map(|o| { + o.map(|b| { + i64::from_le_bytes(b[4..12].try_into().unwrap()) + }) + }) .collect::(), ) as ArrayRef, IntervalUnit::MonthDayNano => { @@ -286,7 +293,9 @@ impl ValuesBuffer for FixedLenByteArrayBuffer { let slice = self.buffer.as_slice_mut(); let values_range = read_offset..read_offset + values_read; - for (value_pos, level_pos) in values_range.rev().zip(iter_set_bits_rev(valid_mask)) { + for (value_pos, level_pos) in + values_range.rev().zip(iter_set_bits_rev(valid_mask)) + { debug_assert!(level_pos >= value_pos); if level_pos <= value_pos { break; @@ -382,7 +391,8 @@ impl ColumnValueDecoder for ValueDecoder { let len = range.end - range.start; match self.decoder.as_mut().unwrap() { Decoder::Plain { offset, buf } => { - let to_read = (len * self.byte_length).min(buf.len() - *offset) / self.byte_length; + let to_read = + (len * self.byte_length).min(buf.len() - *offset) / self.byte_length; let end_offset = *offset + to_read * self.byte_length; out.buffer .extend_from_slice(&buf.as_ref()[*offset..end_offset]); @@ -475,12 +485,15 @@ mod tests { .build() .unwrap(); - let written = - RecordBatch::try_from_iter([("list", Arc::new(ListArray::from(data)) as ArrayRef)]) - .unwrap(); + let written = RecordBatch::try_from_iter([( + "list", + Arc::new(ListArray::from(data)) as ArrayRef, + )]) + .unwrap(); let mut buffer = Vec::with_capacity(1024); - let mut writer = ArrowWriter::try_new(&mut buffer, written.schema(), None).unwrap(); + let mut writer = + ArrowWriter::try_new(&mut buffer, written.schema(), None).unwrap(); writer.write(&written).unwrap(); writer.close().unwrap(); diff --git a/parquet/src/arrow/schema/mod.rs b/parquet/src/arrow/schema/mod.rs index 1b44c0123089..4c350c4b1d8c 100644 --- a/parquet/src/arrow/schema/mod.rs +++ b/parquet/src/arrow/schema/mod.rs @@ -32,7 +32,8 @@ use arrow_ipc::writer; use arrow_schema::{DataType, Field, Fields, Schema, TimeUnit}; use crate::basic::{ - ConvertedType, LogicalType, Repetition, TimeUnit as ParquetTimeUnit, Type as PhysicalType, + ConvertedType, LogicalType, Repetition, TimeUnit as ParquetTimeUnit, + Type as PhysicalType, }; use crate::errors::{ParquetError, Result}; use crate::file::{metadata::KeyValue, properties::WriterProperties}; @@ -54,7 +55,11 @@ pub fn parquet_to_arrow_schema( parquet_schema: &SchemaDescriptor, key_value_metadata: Option<&Vec>, ) -> Result { - parquet_to_arrow_schema_by_columns(parquet_schema, ProjectionMask::all(), key_value_metadata) + parquet_to_arrow_schema_by_columns( + parquet_schema, + ProjectionMask::all(), + key_value_metadata, + ) } /// Convert parquet schema to arrow schema including optional metadata, @@ -194,7 +199,10 @@ fn encode_arrow_schema(schema: &Schema) -> String { /// Mutates writer metadata by storing the encoded Arrow schema. /// If there is an existing Arrow schema metadata, it is replaced. -pub(crate) fn add_encoded_arrow_schema_to_metadata(schema: &Schema, props: &mut WriterProperties) { +pub(crate) fn add_encoded_arrow_schema_to_metadata( + schema: &Schema, + props: &mut WriterProperties, +) { let encoded = encode_arrow_schema(schema); let schema_kv = KeyValue { @@ -262,15 +270,16 @@ fn parse_key_value_metadata( /// Convert parquet column schema to arrow field. pub fn parquet_to_arrow_field(parquet_column: &ColumnDescriptor) -> Result { let field = complex::convert_type(&parquet_column.self_type_ptr())?; - let mut ret = Field::new(parquet_column.name(), field.arrow_type, field.nullable); + let mut ret = Field::new( + parquet_column.name(), + field.arrow_type, + field.nullable, + ); let basic_info = parquet_column.self_type().get_basic_info(); if basic_info.has_id() { let mut meta = HashMap::with_capacity(1); - meta.insert( - PARQUET_FIELD_ID_META_KEY.to_string(), - basic_info.id().to_string(), - ); + meta.insert(PARQUET_FIELD_ID_META_KEY.to_string(), basic_info.id().to_string()); ret.set_metadata(meta); } @@ -392,9 +401,15 @@ fn arrow_to_parquet_type(field: &Field) -> Result { is_adjusted_to_u_t_c: matches!(tz, Some(z) if !z.as_ref().is_empty()), unit: match time_unit { TimeUnit::Second => unreachable!(), - TimeUnit::Millisecond => ParquetTimeUnit::MILLIS(Default::default()), - TimeUnit::Microsecond => ParquetTimeUnit::MICROS(Default::default()), - TimeUnit::Nanosecond => ParquetTimeUnit::NANOS(Default::default()), + TimeUnit::Millisecond => { + ParquetTimeUnit::MILLIS(Default::default()) + } + TimeUnit::Microsecond => { + ParquetTimeUnit::MICROS(Default::default()) + } + TimeUnit::Nanosecond => { + ParquetTimeUnit::NANOS(Default::default()) + } }, })) .with_repetition(repetition) @@ -442,7 +457,9 @@ fn arrow_to_parquet_type(field: &Field) -> Result { .with_repetition(repetition) .with_id(id) .build(), - DataType::Duration(_) => Err(arrow_err!("Converting Duration to parquet not supported",)), + DataType::Duration(_) => { + Err(arrow_err!("Converting Duration to parquet not supported",)) + } DataType::Interval(_) => { Type::primitive_type_builder(name, PhysicalType::FIXED_LEN_BYTE_ARRAY) .with_converted_type(ConvertedType::INTERVAL) @@ -464,7 +481,8 @@ fn arrow_to_parquet_type(field: &Field) -> Result { .with_length(*length) .build() } - DataType::Decimal128(precision, scale) | DataType::Decimal256(precision, scale) => { + DataType::Decimal128(precision, scale) + | DataType::Decimal256(precision, scale) => { // Decimal precision determines the Parquet physical type to use. // Following the: https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#decimal let (physical_type, length) = if *precision > 1 && *precision <= 9 { @@ -511,7 +529,9 @@ fn arrow_to_parquet_type(field: &Field) -> Result { } DataType::Struct(fields) => { if fields.is_empty() { - return Err(arrow_err!("Parquet does not support writing empty structs",)); + return Err( + arrow_err!("Parquet does not support writing empty structs",), + ); } // recursively convert children to types/nodes let fields = fields @@ -601,7 +621,8 @@ mod tests { let parquet_group_type = parse_message_type(message_type).unwrap(); let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type)); - let converted_arrow_schema = parquet_to_arrow_schema(&parquet_schema, None).unwrap(); + let converted_arrow_schema = + parquet_to_arrow_schema(&parquet_schema, None).unwrap(); let arrow_fields = Fields::from(vec![ Field::new("boolean", DataType::Boolean, false), @@ -639,7 +660,8 @@ mod tests { let parquet_group_type = parse_message_type(message_type).unwrap(); let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type)); - let converted_arrow_schema = parquet_to_arrow_schema(&parquet_schema, None).unwrap(); + let converted_arrow_schema = + parquet_to_arrow_schema(&parquet_schema, None).unwrap(); let arrow_fields = Fields::from(vec![ Field::new("decimal1", DataType::Decimal128(4, 2), false), @@ -665,7 +687,8 @@ mod tests { let parquet_group_type = parse_message_type(message_type).unwrap(); let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type)); - let converted_arrow_schema = parquet_to_arrow_schema(&parquet_schema, None).unwrap(); + let converted_arrow_schema = + parquet_to_arrow_schema(&parquet_schema, None).unwrap(); let arrow_fields = Fields::from(vec![ Field::new("binary", DataType::Binary, false), @@ -686,7 +709,8 @@ mod tests { let parquet_group_type = parse_message_type(message_type).unwrap(); let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type)); - let converted_arrow_schema = parquet_to_arrow_schema(&parquet_schema, None).unwrap(); + let converted_arrow_schema = + parquet_to_arrow_schema(&parquet_schema, None).unwrap(); let arrow_fields = Fields::from(vec![ Field::new("boolean", DataType::Boolean, false), @@ -694,9 +718,12 @@ mod tests { ]); assert_eq!(&arrow_fields, converted_arrow_schema.fields()); - let converted_arrow_schema = - parquet_to_arrow_schema_by_columns(&parquet_schema, ProjectionMask::all(), None) - .unwrap(); + let converted_arrow_schema = parquet_to_arrow_schema_by_columns( + &parquet_schema, + ProjectionMask::all(), + None, + ) + .unwrap(); assert_eq!(&arrow_fields, converted_arrow_schema.fields()); } @@ -894,7 +921,8 @@ mod tests { let parquet_group_type = parse_message_type(message_type).unwrap(); let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type)); - let converted_arrow_schema = parquet_to_arrow_schema(&parquet_schema, None).unwrap(); + let converted_arrow_schema = + parquet_to_arrow_schema(&parquet_schema, None).unwrap(); let converted_fields = converted_arrow_schema.fields(); assert_eq!(arrow_fields.len(), converted_fields.len()); @@ -972,7 +1000,8 @@ mod tests { let parquet_group_type = parse_message_type(message_type).unwrap(); let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type)); - let converted_arrow_schema = parquet_to_arrow_schema(&parquet_schema, None).unwrap(); + let converted_arrow_schema = + parquet_to_arrow_schema(&parquet_schema, None).unwrap(); let converted_fields = converted_arrow_schema.fields(); assert_eq!(arrow_fields.len(), converted_fields.len()); @@ -1066,7 +1095,8 @@ mod tests { let parquet_group_type = parse_message_type(message_type).unwrap(); let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type)); - let converted_arrow_schema = parquet_to_arrow_schema(&parquet_schema, None).unwrap(); + let converted_arrow_schema = + parquet_to_arrow_schema(&parquet_schema, None).unwrap(); let converted_fields = converted_arrow_schema.fields(); assert_eq!(arrow_fields.len(), converted_fields.len()); @@ -1083,7 +1113,8 @@ mod tests { Field::new("leaf1", DataType::Boolean, false), Field::new("leaf2", DataType::Int32, false), ]); - let group1_struct = Field::new("group1", DataType::Struct(group1_fields), false); + let group1_struct = + Field::new("group1", DataType::Struct(group1_fields), false); arrow_fields.push(group1_struct); let leaf3_field = Field::new("leaf3", DataType::Int64, false); @@ -1102,7 +1133,8 @@ mod tests { let parquet_group_type = parse_message_type(message_type).unwrap(); let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type)); - let converted_arrow_schema = parquet_to_arrow_schema(&parquet_schema, None).unwrap(); + let converted_arrow_schema = + parquet_to_arrow_schema(&parquet_schema, None).unwrap(); let converted_fields = converted_arrow_schema.fields(); assert_eq!(arrow_fields.len(), converted_fields.len()); @@ -1255,7 +1287,8 @@ mod tests { let parquet_group_type = parse_message_type(message_type).unwrap(); let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type)); - let converted_arrow_schema = parquet_to_arrow_schema(&parquet_schema, None).unwrap(); + let converted_arrow_schema = + parquet_to_arrow_schema(&parquet_schema, None).unwrap(); let converted_fields = converted_arrow_schema.fields(); assert_eq!(arrow_fields.len(), converted_fields.len()); @@ -1480,11 +1513,20 @@ mod tests { vec![ Field::new("bools", DataType::Boolean, false), Field::new("uint32", DataType::UInt32, false), - Field::new_list("int32", Field::new("element", DataType::Int32, true), false), + Field::new_list( + "int32", + Field::new("element", DataType::Int32, true), + false, + ), ], false, ), - Field::new_dictionary("dictionary_strings", DataType::Int32, DataType::Utf8, false), + Field::new_dictionary( + "dictionary_strings", + DataType::Int32, + DataType::Utf8, + false, + ), Field::new("decimal_int32", DataType::Decimal128(8, 2), false), Field::new("decimal_int64", DataType::Decimal128(16, 2), false), Field::new("decimal_fix_length", DataType::Decimal128(30, 2), false), @@ -1569,8 +1611,10 @@ mod tests { let schema = Schema::new_with_metadata( vec![ - Field::new("c1", DataType::Utf8, false) - .with_metadata(meta(&[("Key", "Foo"), (PARQUET_FIELD_ID_META_KEY, "2")])), + Field::new("c1", DataType::Utf8, false).with_metadata(meta(&[ + ("Key", "Foo"), + (PARQUET_FIELD_ID_META_KEY, "2"), + ])), Field::new("c2", DataType::Binary, false), Field::new("c3", DataType::FixedSizeBinary(3), false), Field::new("c4", DataType::Boolean, false), @@ -1588,7 +1632,10 @@ mod tests { ), Field::new( "c17", - DataType::Timestamp(TimeUnit::Microsecond, Some("Africa/Johannesburg".into())), + DataType::Timestamp( + TimeUnit::Microsecond, + Some("Africa/Johannesburg".into()), + ), false, ), Field::new( @@ -1600,8 +1647,10 @@ mod tests { Field::new("c20", DataType::Interval(IntervalUnit::YearMonth), false), Field::new_list( "c21", - Field::new("item", DataType::Boolean, true) - .with_metadata(meta(&[("Key", "Bar"), (PARQUET_FIELD_ID_META_KEY, "5")])), + Field::new("item", DataType::Boolean, true).with_metadata(meta(&[ + ("Key", "Bar"), + (PARQUET_FIELD_ID_META_KEY, "5"), + ])), false, ) .with_metadata(meta(&[(PARQUET_FIELD_ID_META_KEY, "4")])), @@ -1651,7 +1700,10 @@ mod tests { // Field::new("c30", DataType::Duration(TimeUnit::Nanosecond), false), Field::new_dict( "c31", - DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)), + DataType::Dictionary( + Box::new(DataType::Int32), + Box::new(DataType::Utf8), + ), true, 123, true, @@ -1686,7 +1738,11 @@ mod tests { "c39", "key_value", Field::new("key", DataType::Utf8, false), - Field::new_list("value", Field::new("element", DataType::Utf8, true), true), + Field::new_list( + "value", + Field::new("element", DataType::Utf8, true), + true, + ), false, // fails to roundtrip keys_sorted true, ), @@ -1725,8 +1781,11 @@ mod tests { // write to an empty parquet file so that schema is serialized let file = tempfile::tempfile().unwrap(); - let writer = - ArrowWriter::try_new(file.try_clone().unwrap(), Arc::new(schema.clone()), None)?; + let writer = ArrowWriter::try_new( + file.try_clone().unwrap(), + Arc::new(schema.clone()), + None, + )?; writer.close()?; // read file back @@ -1785,23 +1844,33 @@ mod tests { }; let schema = Schema::new_with_metadata( vec![ - Field::new("c1", DataType::Utf8, true) - .with_metadata(meta(&[(PARQUET_FIELD_ID_META_KEY, "1")])), - Field::new("c2", DataType::Utf8, true) - .with_metadata(meta(&[(PARQUET_FIELD_ID_META_KEY, "2")])), + Field::new("c1", DataType::Utf8, true).with_metadata(meta(&[ + (PARQUET_FIELD_ID_META_KEY, "1"), + ])), + Field::new("c2", DataType::Utf8, true).with_metadata(meta(&[ + (PARQUET_FIELD_ID_META_KEY, "2"), + ])), ], HashMap::new(), ); - let writer = ArrowWriter::try_new(vec![], Arc::new(schema.clone()), None)?; + let writer = ArrowWriter::try_new( + vec![], + Arc::new(schema.clone()), + None, + )?; let parquet_bytes = writer.into_inner()?; - let reader = - crate::file::reader::SerializedFileReader::new(bytes::Bytes::from(parquet_bytes))?; + let reader = crate::file::reader::SerializedFileReader::new( + bytes::Bytes::from(parquet_bytes), + )?; let schema_descriptor = reader.metadata().file_metadata().schema_descr_ptr(); // don't pass metadata so field ids are read from Parquet and not from serialized Arrow schema - let arrow_schema = crate::arrow::parquet_to_arrow_schema(&schema_descriptor, None)?; + let arrow_schema = crate::arrow::parquet_to_arrow_schema( + &schema_descriptor, + None, + )?; let parq_schema_descr = crate::arrow::arrow_to_parquet_schema(&arrow_schema)?; let parq_fields = parq_schema_descr.root_schema().get_fields(); @@ -1814,14 +1883,19 @@ mod tests { #[test] fn test_arrow_schema_roundtrip_lists() -> Result<()> { - let metadata: HashMap = [("Key".to_string(), "Value".to_string())] - .iter() - .cloned() - .collect(); + let metadata: HashMap = + [("Key".to_string(), "Value".to_string())] + .iter() + .cloned() + .collect(); let schema = Schema::new_with_metadata( vec![ - Field::new_list("c21", Field::new("array", DataType::Boolean, true), false), + Field::new_list( + "c21", + Field::new("array", DataType::Boolean, true), + false, + ), Field::new( "c22", DataType::FixedSizeList( @@ -1852,8 +1926,11 @@ mod tests { // write to an empty parquet file so that schema is serialized let file = tempfile::tempfile().unwrap(); - let writer = - ArrowWriter::try_new(file.try_clone().unwrap(), Arc::new(schema.clone()), None)?; + let writer = ArrowWriter::try_new( + file.try_clone().unwrap(), + Arc::new(schema.clone()), + None, + )?; writer.close()?; // read file back diff --git a/parquet/src/arrow/schema/primitive.rs b/parquet/src/arrow/schema/primitive.rs index 447fe5fc3ab4..fdc744831a25 100644 --- a/parquet/src/arrow/schema/primitive.rs +++ b/parquet/src/arrow/schema/primitive.rs @@ -15,7 +15,9 @@ // specific language governing permissions and limitations // under the License. -use crate::basic::{ConvertedType, LogicalType, TimeUnit as ParquetTimeUnit, Type as PhysicalType}; +use crate::basic::{ + ConvertedType, LogicalType, TimeUnit as ParquetTimeUnit, Type as PhysicalType, +}; use crate::errors::{ParquetError, Result}; use crate::schema::types::{BasicTypeInfo, Type}; use arrow_schema::{DataType, IntervalUnit, TimeUnit, DECIMAL128_MAX_PRECISION}; @@ -156,7 +158,9 @@ fn from_int32(info: &BasicTypeInfo, scale: i32, precision: i32) -> Result Ok(DataType::UInt32), _ => Err(arrow_err!("Cannot create INT32 physical type from {:?}", t)), }, - (Some(LogicalType::Decimal { scale, precision }), _) => decimal_128_type(scale, precision), + (Some(LogicalType::Decimal { scale, precision }), _) => { + decimal_128_type(scale, precision) + } (Some(LogicalType::Date), _) => Ok(DataType::Date32), (Some(LogicalType::Time { unit, .. }), _) => match unit { ParquetTimeUnit::MILLIS(_) => Ok(DataType::Time32(TimeUnit::Millisecond)), @@ -233,7 +237,9 @@ fn from_int64(info: &BasicTypeInfo, scale: i32, precision: i32) -> Result decimal_128_type(scale, precision), + (Some(LogicalType::Decimal { scale, precision }), _) => { + decimal_128_type(scale, precision) + } (None, ConvertedType::DECIMAL) => decimal_128_type(scale, precision), (logical, converted) => Err(arrow_err!( "Unable to convert parquet INT64 logical type {:?} or converted type {}",