From d941ff1c3741ba4e18022d8be8edfbbca8b0af17 Mon Sep 17 00:00:00 2001 From: Letian Jiang Date: Mon, 2 Oct 2023 22:14:23 +0800 Subject: [PATCH] Support parquet bloom filter length (#4885) * Support parquet bloom filter length Signed-off-by: Letian Jiang * update Signed-off-by: Letian Jiang --------- Signed-off-by: Letian Jiang --- parquet/src/bloom_filter/mod.rs | 30 +- parquet/src/file/metadata.rs | 17 ++ parquet/src/file/writer.rs | 9 +- parquet/src/format.rs | 505 ++++++++++---------------------- parquet/src/schema/printer.rs | 5 + 5 files changed, 195 insertions(+), 371 deletions(-) diff --git a/parquet/src/bloom_filter/mod.rs b/parquet/src/bloom_filter/mod.rs index 4d2040b7f258..c893d492b52a 100644 --- a/parquet/src/bloom_filter/mod.rs +++ b/parquet/src/bloom_filter/mod.rs @@ -135,13 +135,12 @@ pub struct Sbbf(Vec); const SBBF_HEADER_SIZE_ESTIMATE: usize = 20; -/// given an initial offset, and a [ChunkReader], try to read out a bloom filter header and return +/// given an initial offset, and a byte buffer, try to read out a bloom filter header and return /// both the header and the offset after it (for bitset). -fn chunk_read_bloom_filter_header_and_offset( +fn chunk_read_bloom_filter_header_and_offset( offset: u64, - reader: Arc, + buffer: Bytes, ) -> Result<(BloomFilterHeader, u64), ParquetError> { - let buffer = reader.get_bytes(offset, SBBF_HEADER_SIZE_ESTIMATE)?; let (header, length) = read_bloom_filter_header_and_length(buffer)?; Ok((header, offset + length)) } @@ -271,8 +270,13 @@ impl Sbbf { return Ok(None); }; + let buffer = match column_metadata.bloom_filter_length() { + Some(length) => reader.get_bytes(offset, length as usize), + None => reader.get_bytes(offset, SBBF_HEADER_SIZE_ESTIMATE), + }?; + let (header, bitset_offset) = - chunk_read_bloom_filter_header_and_offset(offset, reader.clone())?; + chunk_read_bloom_filter_header_and_offset(offset, buffer.clone())?; match header.algorithm { BloomFilterAlgorithm::BLOCK(_) => { @@ -289,11 +293,17 @@ impl Sbbf { // this match exists to future proof the singleton hash enum } } - // length in bytes - let length: usize = header.num_bytes.try_into().map_err(|_| { - ParquetError::General("Bloom filter length is invalid".to_string()) - })?; - let bitset = reader.get_bytes(bitset_offset, length)?; + + let bitset = match column_metadata.bloom_filter_length() { + Some(_) => buffer.slice((bitset_offset - offset) as usize..), + None => { + let bitset_length: usize = header.num_bytes.try_into().map_err(|_| { + ParquetError::General("Bloom filter length is invalid".to_string()) + })?; + reader.get_bytes(bitset_offset, bitset_length)? + } + }; + Ok(Some(Self::new(&bitset))) } diff --git a/parquet/src/file/metadata.rs b/parquet/src/file/metadata.rs index aaa3d28e206a..1f46c8105ebc 100644 --- a/parquet/src/file/metadata.rs +++ b/parquet/src/file/metadata.rs @@ -474,6 +474,7 @@ pub struct ColumnChunkMetaData { statistics: Option, encoding_stats: Option>, bloom_filter_offset: Option, + bloom_filter_length: Option, offset_index_offset: Option, offset_index_length: Option, column_index_offset: Option, @@ -591,6 +592,11 @@ impl ColumnChunkMetaData { self.bloom_filter_offset } + /// Returns the offset for the bloom filter. + pub fn bloom_filter_length(&self) -> Option { + self.bloom_filter_length + } + /// Returns the offset for the column index. pub fn column_index_offset(&self) -> Option { self.column_index_offset @@ -657,6 +663,7 @@ impl ColumnChunkMetaData { }) .transpose()?; let bloom_filter_offset = col_metadata.bloom_filter_offset; + let bloom_filter_length = col_metadata.bloom_filter_length; let offset_index_offset = cc.offset_index_offset; let offset_index_length = cc.offset_index_length; let column_index_offset = cc.column_index_offset; @@ -677,6 +684,7 @@ impl ColumnChunkMetaData { statistics, encoding_stats, bloom_filter_offset, + bloom_filter_length, offset_index_offset, offset_index_length, column_index_offset, @@ -722,6 +730,7 @@ impl ColumnChunkMetaData { .as_ref() .map(|vec| vec.iter().map(page_encoding_stats::to_thrift).collect()), bloom_filter_offset: self.bloom_filter_offset, + bloom_filter_length: self.bloom_filter_length, } } @@ -752,6 +761,7 @@ impl ColumnChunkMetaDataBuilder { statistics: None, encoding_stats: None, bloom_filter_offset: None, + bloom_filter_length: None, offset_index_offset: None, offset_index_length: None, column_index_offset: None, @@ -837,6 +847,12 @@ impl ColumnChunkMetaDataBuilder { self } + /// Sets optional bloom filter length in bytes. + pub fn set_bloom_filter_length(mut self, value: Option) -> Self { + self.0.bloom_filter_length = value; + self + } + /// Sets optional offset index offset in bytes. pub fn set_offset_index_offset(mut self, value: Option) -> Self { self.0.offset_index_offset = value; @@ -1053,6 +1069,7 @@ mod tests { }, ]) .set_bloom_filter_offset(Some(6000)) + .set_bloom_filter_length(Some(25)) .set_offset_index_offset(Some(7000)) .set_offset_index_length(Some(25)) .set_column_index_offset(Some(8000)) diff --git a/parquet/src/file/writer.rs b/parquet/src/file/writer.rs index cafb1761352d..af25cc9689c1 100644 --- a/parquet/src/file/writer.rs +++ b/parquet/src/file/writer.rs @@ -267,12 +267,15 @@ impl SerializedFileWriter { Some(bloom_filter) => { let start_offset = self.buf.bytes_written(); bloom_filter.write(&mut self.buf)?; + let end_offset = self.buf.bytes_written(); // set offset and index for bloom filter - column_chunk + let column_chunk_meta = column_chunk .meta_data .as_mut() - .expect("can't have bloom filter without column metadata") - .bloom_filter_offset = Some(start_offset as i64); + .expect("can't have bloom filter without column metadata"); + column_chunk_meta.bloom_filter_offset = Some(start_offset as i64); + column_chunk_meta.bloom_filter_length = + Some((end_offset - start_offset) as i32); } None => {} } diff --git a/parquet/src/format.rs b/parquet/src/format.rs index 0851b2287fba..12c572c23cf5 100644 --- a/parquet/src/format.rs +++ b/parquet/src/format.rs @@ -1,9 +1,10 @@ -// Autogenerated by Thrift Compiler (0.17.0) +// Autogenerated by Thrift Compiler (0.19.0) // DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING +#![allow(dead_code)] #![allow(unused_imports)] #![allow(unused_extern_crates)] -#![allow(clippy::too_many_arguments, clippy::type_complexity, clippy::vec_box)] +#![allow(clippy::too_many_arguments, clippy::type_complexity, clippy::vec_box, clippy::wrong_self_convention)] #![cfg_attr(rustfmt, rustfmt_skip)] use std::cell::RefCell; @@ -99,7 +100,7 @@ impl From<&Type> for i32 { /// DEPRECATED: Common types used by frameworks(e.g. hive, pig) using parquet. /// ConvertedType is superseded by LogicalType. This enum should not be extended. -/// +/// /// See LogicalTypes.md for conversion between ConvertedType and LogicalType. #[derive(Copy, Clone, Debug, Eq, Hash, Ord, PartialEq, PartialOrd)] pub struct ConvertedType(pub i32); @@ -117,12 +118,12 @@ impl ConvertedType { /// an enum is converted into a binary field pub const ENUM: ConvertedType = ConvertedType(4); /// A decimal value. - /// + /// /// This may be used to annotate binary or fixed primitive types. The /// underlying byte array stores the unscaled value encoded as two's /// complement using big-endian byte order (the most significant byte is the /// zeroth element). The value of the decimal is the value * 10^{-scale}. - /// + /// /// This must be accompanied by a (maximum) precision and a scale in the /// SchemaElement. The precision specifies the number of digits in the decimal /// and the scale stores the location of the decimal point. For example 1.23 @@ -130,62 +131,62 @@ impl ConvertedType { /// 2 digits over). pub const DECIMAL: ConvertedType = ConvertedType(5); /// A Date - /// + /// /// Stored as days since Unix epoch, encoded as the INT32 physical type. - /// + /// pub const DATE: ConvertedType = ConvertedType(6); /// A time - /// + /// /// The total number of milliseconds since midnight. The value is stored /// as an INT32 physical type. pub const TIME_MILLIS: ConvertedType = ConvertedType(7); /// A time. - /// + /// /// The total number of microseconds since midnight. The value is stored as /// an INT64 physical type. pub const TIME_MICROS: ConvertedType = ConvertedType(8); /// A date/time combination - /// + /// /// Date and time recorded as milliseconds since the Unix epoch. Recorded as /// a physical type of INT64. pub const TIMESTAMP_MILLIS: ConvertedType = ConvertedType(9); /// A date/time combination - /// + /// /// Date and time recorded as microseconds since the Unix epoch. The value is /// stored as an INT64 physical type. pub const TIMESTAMP_MICROS: ConvertedType = ConvertedType(10); /// An unsigned integer value. - /// + /// /// The number describes the maximum number of meaningful data bits in /// the stored value. 8, 16 and 32 bit values are stored using the /// INT32 physical type. 64 bit values are stored using the INT64 /// physical type. - /// + /// pub const UINT_8: ConvertedType = ConvertedType(11); pub const UINT_16: ConvertedType = ConvertedType(12); pub const UINT_32: ConvertedType = ConvertedType(13); pub const UINT_64: ConvertedType = ConvertedType(14); /// A signed integer value. - /// + /// /// The number describes the maximum number of meaningful data bits in /// the stored value. 8, 16 and 32 bit values are stored using the /// INT32 physical type. 64 bit values are stored using the INT64 /// physical type. - /// + /// pub const INT_8: ConvertedType = ConvertedType(15); pub const INT_16: ConvertedType = ConvertedType(16); pub const INT_32: ConvertedType = ConvertedType(17); pub const INT_64: ConvertedType = ConvertedType(18); /// An embedded JSON document - /// + /// /// A JSON document embedded within a single UTF8 column. pub const JSON: ConvertedType = ConvertedType(19); /// An embedded BSON document - /// + /// /// A BSON document embedded within a single BINARY column. pub const BSON: ConvertedType = ConvertedType(20); /// An interval of time - /// + /// /// This type annotates data stored as a FIXED_LEN_BYTE_ARRAY of length 12 /// This data is composed of three separate little endian unsigned /// integers. Each stores a component of a duration of time. The first @@ -443,11 +444,11 @@ impl From<&Encoding> for i32 { } /// Supported compression algorithms. -/// +/// /// Codecs added in format version X.Y can be read by readers based on X.Y and later. /// Codec support may vary between readers based on the format version and /// libraries available at runtime. -/// +/// /// See Compression.md for a detailed specification of these algorithms. #[derive(Copy, Clone, Debug, Eq, Hash, Ord, PartialEq, PartialOrd)] pub struct CompressionCodec(pub i32); @@ -637,17 +638,17 @@ impl From<&BoundaryOrder> for i32 { /// Statistics per row group and per page /// All fields are optional. -#[derive(Clone, Debug, Eq, Hash, Ord, PartialEq, PartialOrd)] +#[derive(Clone, Debug, Default, Eq, Hash, Ord, PartialEq, PartialOrd)] pub struct Statistics { /// DEPRECATED: min and max value of the column. Use min_value and max_value. - /// + /// /// Values are encoded using PLAIN encoding, except that variable-length byte /// arrays do not include a length prefix. - /// + /// /// These fields encode min and max values determined by signed comparison /// only. New files should use the correct order for a column's logical type /// and store the values in the min_value and max_value fields. - /// + /// /// To support older readers, these may be set when the column order is /// signed. pub max: Option>, @@ -657,7 +658,7 @@ pub struct Statistics { /// count of distinct values occurring pub distinct_count: Option, /// Min and max values for the column, determined by its ColumnOrder. - /// + /// /// Values are encoded using PLAIN encoding, except that variable-length byte /// arrays do not include a length prefix. pub max_value: Option>, @@ -772,25 +773,12 @@ impl TSerializable for Statistics { } } -impl Default for Statistics { - fn default() -> Self { - Statistics{ - max: Some(Vec::new()), - min: Some(Vec::new()), - null_count: Some(0), - distinct_count: Some(0), - max_value: Some(Vec::new()), - min_value: Some(Vec::new()), - } - } -} - // // StringType // /// Empty structs to use as logical type annotations -#[derive(Clone, Debug, Eq, Hash, Ord, PartialEq, PartialOrd)] +#[derive(Clone, Debug, Default, Eq, Hash, Ord, PartialEq, PartialOrd)] pub struct StringType { } @@ -808,12 +796,7 @@ impl TSerializable for StringType { if field_ident.field_type == TType::Stop { break; } - let field_id = field_id(&field_ident)?; - match field_id { - _ => { - i_prot.skip(field_ident.field_type)?; - }, - }; + i_prot.skip(field_ident.field_type)?; i_prot.read_field_end()?; } i_prot.read_struct_end()?; @@ -828,17 +811,11 @@ impl TSerializable for StringType { } } -impl Default for StringType { - fn default() -> Self { - StringType{} - } -} - // // UUIDType // -#[derive(Clone, Debug, Eq, Hash, Ord, PartialEq, PartialOrd)] +#[derive(Clone, Debug, Default, Eq, Hash, Ord, PartialEq, PartialOrd)] pub struct UUIDType { } @@ -856,12 +833,7 @@ impl TSerializable for UUIDType { if field_ident.field_type == TType::Stop { break; } - let field_id = field_id(&field_ident)?; - match field_id { - _ => { - i_prot.skip(field_ident.field_type)?; - }, - }; + i_prot.skip(field_ident.field_type)?; i_prot.read_field_end()?; } i_prot.read_struct_end()?; @@ -876,17 +848,11 @@ impl TSerializable for UUIDType { } } -impl Default for UUIDType { - fn default() -> Self { - UUIDType{} - } -} - // // MapType // -#[derive(Clone, Debug, Eq, Hash, Ord, PartialEq, PartialOrd)] +#[derive(Clone, Debug, Default, Eq, Hash, Ord, PartialEq, PartialOrd)] pub struct MapType { } @@ -904,12 +870,7 @@ impl TSerializable for MapType { if field_ident.field_type == TType::Stop { break; } - let field_id = field_id(&field_ident)?; - match field_id { - _ => { - i_prot.skip(field_ident.field_type)?; - }, - }; + i_prot.skip(field_ident.field_type)?; i_prot.read_field_end()?; } i_prot.read_struct_end()?; @@ -924,17 +885,11 @@ impl TSerializable for MapType { } } -impl Default for MapType { - fn default() -> Self { - MapType{} - } -} - // // ListType // -#[derive(Clone, Debug, Eq, Hash, Ord, PartialEq, PartialOrd)] +#[derive(Clone, Debug, Default, Eq, Hash, Ord, PartialEq, PartialOrd)] pub struct ListType { } @@ -952,12 +907,7 @@ impl TSerializable for ListType { if field_ident.field_type == TType::Stop { break; } - let field_id = field_id(&field_ident)?; - match field_id { - _ => { - i_prot.skip(field_ident.field_type)?; - }, - }; + i_prot.skip(field_ident.field_type)?; i_prot.read_field_end()?; } i_prot.read_struct_end()?; @@ -972,17 +922,11 @@ impl TSerializable for ListType { } } -impl Default for ListType { - fn default() -> Self { - ListType{} - } -} - // // EnumType // -#[derive(Clone, Debug, Eq, Hash, Ord, PartialEq, PartialOrd)] +#[derive(Clone, Debug, Default, Eq, Hash, Ord, PartialEq, PartialOrd)] pub struct EnumType { } @@ -1000,12 +944,7 @@ impl TSerializable for EnumType { if field_ident.field_type == TType::Stop { break; } - let field_id = field_id(&field_ident)?; - match field_id { - _ => { - i_prot.skip(field_ident.field_type)?; - }, - }; + i_prot.skip(field_ident.field_type)?; i_prot.read_field_end()?; } i_prot.read_struct_end()?; @@ -1020,17 +959,11 @@ impl TSerializable for EnumType { } } -impl Default for EnumType { - fn default() -> Self { - EnumType{} - } -} - // // DateType // -#[derive(Clone, Debug, Eq, Hash, Ord, PartialEq, PartialOrd)] +#[derive(Clone, Debug, Default, Eq, Hash, Ord, PartialEq, PartialOrd)] pub struct DateType { } @@ -1048,12 +981,7 @@ impl TSerializable for DateType { if field_ident.field_type == TType::Stop { break; } - let field_id = field_id(&field_ident)?; - match field_id { - _ => { - i_prot.skip(field_ident.field_type)?; - }, - }; + i_prot.skip(field_ident.field_type)?; i_prot.read_field_end()?; } i_prot.read_struct_end()?; @@ -1068,22 +996,16 @@ impl TSerializable for DateType { } } -impl Default for DateType { - fn default() -> Self { - DateType{} - } -} - // // NullType // /// Logical type to annotate a column that is always null. -/// +/// /// Sometimes when discovering the schema of existing data, values are always /// null and the physical type can't be determined. This annotation signals /// the case where the physical type was guessed from all null values. -#[derive(Clone, Debug, Eq, Hash, Ord, PartialEq, PartialOrd)] +#[derive(Clone, Debug, Default, Eq, Hash, Ord, PartialEq, PartialOrd)] pub struct NullType { } @@ -1101,12 +1023,7 @@ impl TSerializable for NullType { if field_ident.field_type == TType::Stop { break; } - let field_id = field_id(&field_ident)?; - match field_id { - _ => { - i_prot.skip(field_ident.field_type)?; - }, - }; + i_prot.skip(field_ident.field_type)?; i_prot.read_field_end()?; } i_prot.read_struct_end()?; @@ -1121,21 +1038,18 @@ impl TSerializable for NullType { } } -impl Default for NullType { - fn default() -> Self { - NullType{} - } -} - // // DecimalType // /// Decimal logical type annotation -/// +/// +/// Scale must be zero or a positive integer less than or equal to the precision. +/// Precision must be a non-zero positive integer. +/// /// To maintain forward-compatibility in v1, implementations using this logical /// type must also set scale and precision on the annotated SchemaElement. -/// +/// /// Allowed for physical types: INT32, INT64, FIXED, and BINARY #[derive(Clone, Debug, Eq, Hash, Ord, PartialEq, PartialOrd)] pub struct DecimalType { @@ -1206,7 +1120,7 @@ impl TSerializable for DecimalType { // /// Time units for logical types -#[derive(Clone, Debug, Eq, Hash, Ord, PartialEq, PartialOrd)] +#[derive(Clone, Debug, Default, Eq, Hash, Ord, PartialEq, PartialOrd)] pub struct MilliSeconds { } @@ -1224,12 +1138,7 @@ impl TSerializable for MilliSeconds { if field_ident.field_type == TType::Stop { break; } - let field_id = field_id(&field_ident)?; - match field_id { - _ => { - i_prot.skip(field_ident.field_type)?; - }, - }; + i_prot.skip(field_ident.field_type)?; i_prot.read_field_end()?; } i_prot.read_struct_end()?; @@ -1244,17 +1153,11 @@ impl TSerializable for MilliSeconds { } } -impl Default for MilliSeconds { - fn default() -> Self { - MilliSeconds{} - } -} - // // MicroSeconds // -#[derive(Clone, Debug, Eq, Hash, Ord, PartialEq, PartialOrd)] +#[derive(Clone, Debug, Default, Eq, Hash, Ord, PartialEq, PartialOrd)] pub struct MicroSeconds { } @@ -1272,12 +1175,7 @@ impl TSerializable for MicroSeconds { if field_ident.field_type == TType::Stop { break; } - let field_id = field_id(&field_ident)?; - match field_id { - _ => { - i_prot.skip(field_ident.field_type)?; - }, - }; + i_prot.skip(field_ident.field_type)?; i_prot.read_field_end()?; } i_prot.read_struct_end()?; @@ -1292,17 +1190,11 @@ impl TSerializable for MicroSeconds { } } -impl Default for MicroSeconds { - fn default() -> Self { - MicroSeconds{} - } -} - // // NanoSeconds // -#[derive(Clone, Debug, Eq, Hash, Ord, PartialEq, PartialOrd)] +#[derive(Clone, Debug, Default, Eq, Hash, Ord, PartialEq, PartialOrd)] pub struct NanoSeconds { } @@ -1320,12 +1212,7 @@ impl TSerializable for NanoSeconds { if field_ident.field_type == TType::Stop { break; } - let field_id = field_id(&field_ident)?; - match field_id { - _ => { - i_prot.skip(field_ident.field_type)?; - }, - }; + i_prot.skip(field_ident.field_type)?; i_prot.read_field_end()?; } i_prot.read_struct_end()?; @@ -1340,12 +1227,6 @@ impl TSerializable for NanoSeconds { } } -impl Default for NanoSeconds { - fn default() -> Self { - NanoSeconds{} - } -} - // // TimeUnit // @@ -1450,7 +1331,7 @@ impl TSerializable for TimeUnit { // /// Timestamp logical type annotation -/// +/// /// Allowed for physical types: INT64 #[derive(Clone, Debug, Eq, Hash, Ord, PartialEq, PartialOrd)] pub struct TimestampType { @@ -1521,7 +1402,7 @@ impl TSerializable for TimestampType { // /// Time logical type annotation -/// +/// /// Allowed for physical types: INT32 (millis), INT64 (micros, nanos) #[derive(Clone, Debug, Eq, Hash, Ord, PartialEq, PartialOrd)] pub struct TimeType { @@ -1592,9 +1473,9 @@ impl TSerializable for TimeType { // /// Integer logical type annotation -/// +/// /// bitWidth must be 8, 16, 32, or 64. -/// +/// /// Allowed for physical types: INT32, INT64 #[derive(Clone, Debug, Eq, Hash, Ord, PartialEq, PartialOrd)] pub struct IntType { @@ -1665,9 +1546,9 @@ impl TSerializable for IntType { // /// Embedded JSON logical type annotation -/// +/// /// Allowed for physical types: BINARY -#[derive(Clone, Debug, Eq, Hash, Ord, PartialEq, PartialOrd)] +#[derive(Clone, Debug, Default, Eq, Hash, Ord, PartialEq, PartialOrd)] pub struct JsonType { } @@ -1685,12 +1566,7 @@ impl TSerializable for JsonType { if field_ident.field_type == TType::Stop { break; } - let field_id = field_id(&field_ident)?; - match field_id { - _ => { - i_prot.skip(field_ident.field_type)?; - }, - }; + i_prot.skip(field_ident.field_type)?; i_prot.read_field_end()?; } i_prot.read_struct_end()?; @@ -1705,20 +1581,14 @@ impl TSerializable for JsonType { } } -impl Default for JsonType { - fn default() -> Self { - JsonType{} - } -} - // // BsonType // /// Embedded BSON logical type annotation -/// +/// /// Allowed for physical types: BINARY -#[derive(Clone, Debug, Eq, Hash, Ord, PartialEq, PartialOrd)] +#[derive(Clone, Debug, Default, Eq, Hash, Ord, PartialEq, PartialOrd)] pub struct BsonType { } @@ -1736,12 +1606,7 @@ impl TSerializable for BsonType { if field_ident.field_type == TType::Stop { break; } - let field_id = field_id(&field_ident)?; - match field_id { - _ => { - i_prot.skip(field_ident.field_type)?; - }, - }; + i_prot.skip(field_ident.field_type)?; i_prot.read_field_end()?; } i_prot.read_struct_end()?; @@ -1756,12 +1621,6 @@ impl TSerializable for BsonType { } } -impl Default for BsonType { - fn default() -> Self { - BsonType{} - } -} - // // LogicalType // @@ -2003,7 +1862,7 @@ impl TSerializable for LogicalType { pub struct SchemaElement { /// Data type for this field. Not set if the current element is a non-leaf node pub type_: Option, - /// If type is FIXED_LEN_BYTE_ARRAY, this is the byte length of the vales. + /// If type is FIXED_LEN_BYTE_ARRAY, this is the byte length of the values. /// Otherwise, if specified, this is the maximum bit length to store any of the values. /// (e.g. a low cardinality INT col could have this set to 3). Note that this is /// in the schema, and therefore fixed for the entire file. @@ -2020,12 +1879,12 @@ pub struct SchemaElement { pub num_children: Option, /// DEPRECATED: When the schema is the result of a conversion from another model. /// Used to record the original type to help with cross conversion. - /// + /// /// This is superseded by logicalType. pub converted_type: Option, /// DEPRECATED: Used when this column contains decimal data. /// See the DECIMAL converted type for more details. - /// + /// /// This is superseded by using the DecimalType annotation in logicalType. pub scale: Option, pub precision: Option, @@ -2033,7 +1892,7 @@ pub struct SchemaElement { /// original field id in the parquet schema pub field_id: Option, /// The logical type of this SchemaElement - /// + /// /// LogicalType replaces ConvertedType, but ConvertedType is still required /// for some logical types to ensure forward-compatibility in format v1. pub logical_type: Option, @@ -2309,7 +2168,7 @@ impl TSerializable for DataPageHeader { // IndexPageHeader // -#[derive(Clone, Debug, Eq, Hash, Ord, PartialEq, PartialOrd)] +#[derive(Clone, Debug, Default, Eq, Hash, Ord, PartialEq, PartialOrd)] pub struct IndexPageHeader { } @@ -2327,12 +2186,7 @@ impl TSerializable for IndexPageHeader { if field_ident.field_type == TType::Stop { break; } - let field_id = field_id(&field_ident)?; - match field_id { - _ => { - i_prot.skip(field_ident.field_type)?; - }, - }; + i_prot.skip(field_ident.field_type)?; i_prot.read_field_end()?; } i_prot.read_struct_end()?; @@ -2347,16 +2201,14 @@ impl TSerializable for IndexPageHeader { } } -impl Default for IndexPageHeader { - fn default() -> Self { - IndexPageHeader{} - } -} - // // DictionaryPageHeader // +/// The dictionary page must be placed at the first position of the column chunk +/// if it is partly or completely dictionary encoded. At most one dictionary page +/// can be placed in a column chunk. +/// #[derive(Clone, Debug, Eq, Hash, Ord, PartialEq, PartialOrd)] pub struct DictionaryPageHeader { /// Number of values in the dictionary * @@ -2444,7 +2296,7 @@ impl TSerializable for DictionaryPageHeader { /// New page format allowing reading levels without decompressing the data /// Repetition and definition levels are uncompressed /// The remaining section containing the data is compressed if is_compressed is true -/// +/// #[derive(Clone, Debug, Eq, Hash, Ord, PartialEq, PartialOrd)] pub struct DataPageHeaderV2 { /// Number of values, including NULLs, in this data page. * @@ -2601,7 +2453,7 @@ impl TSerializable for DataPageHeaderV2 { // /// Block-based algorithm type annotation. * -#[derive(Clone, Debug, Eq, Hash, Ord, PartialEq, PartialOrd)] +#[derive(Clone, Debug, Default, Eq, Hash, Ord, PartialEq, PartialOrd)] pub struct SplitBlockAlgorithm { } @@ -2619,12 +2471,7 @@ impl TSerializable for SplitBlockAlgorithm { if field_ident.field_type == TType::Stop { break; } - let field_id = field_id(&field_ident)?; - match field_id { - _ => { - i_prot.skip(field_ident.field_type)?; - }, - }; + i_prot.skip(field_ident.field_type)?; i_prot.read_field_end()?; } i_prot.read_struct_end()?; @@ -2639,12 +2486,6 @@ impl TSerializable for SplitBlockAlgorithm { } } -impl Default for SplitBlockAlgorithm { - fn default() -> Self { - SplitBlockAlgorithm{} - } -} - // // BloomFilterAlgorithm // @@ -2724,8 +2565,8 @@ impl TSerializable for BloomFilterAlgorithm { /// Hash strategy type annotation. xxHash is an extremely fast non-cryptographic hash /// algorithm. It uses 64 bits version of xxHash. -/// -#[derive(Clone, Debug, Eq, Hash, Ord, PartialEq, PartialOrd)] +/// +#[derive(Clone, Debug, Default, Eq, Hash, Ord, PartialEq, PartialOrd)] pub struct XxHash { } @@ -2743,12 +2584,7 @@ impl TSerializable for XxHash { if field_ident.field_type == TType::Stop { break; } - let field_id = field_id(&field_ident)?; - match field_id { - _ => { - i_prot.skip(field_ident.field_type)?; - }, - }; + i_prot.skip(field_ident.field_type)?; i_prot.read_field_end()?; } i_prot.read_struct_end()?; @@ -2763,12 +2599,6 @@ impl TSerializable for XxHash { } } -impl Default for XxHash { - fn default() -> Self { - XxHash{} - } -} - // // BloomFilterHash // @@ -2847,8 +2677,8 @@ impl TSerializable for BloomFilterHash { // /// The compression used in the Bloom filter. -/// -#[derive(Clone, Debug, Eq, Hash, Ord, PartialEq, PartialOrd)] +/// +#[derive(Clone, Debug, Default, Eq, Hash, Ord, PartialEq, PartialOrd)] pub struct Uncompressed { } @@ -2866,12 +2696,7 @@ impl TSerializable for Uncompressed { if field_ident.field_type == TType::Stop { break; } - let field_id = field_id(&field_ident)?; - match field_id { - _ => { - i_prot.skip(field_ident.field_type)?; - }, - }; + i_prot.skip(field_ident.field_type)?; i_prot.read_field_end()?; } i_prot.read_struct_end()?; @@ -2886,12 +2711,6 @@ impl TSerializable for Uncompressed { } } -impl Default for Uncompressed { - fn default() -> Self { - Uncompressed{} - } -} - // // BloomFilterCompression // @@ -2971,7 +2790,7 @@ impl TSerializable for BloomFilterCompression { /// Bloom filter header is stored at beginning of Bloom filter data of each column /// and followed by its bitset. -/// +/// #[derive(Clone, Debug, Eq, Hash, Ord, PartialEq, PartialOrd)] pub struct BloomFilterHeader { /// The size of bitset in bytes * @@ -3076,32 +2895,22 @@ pub struct PageHeader { pub uncompressed_page_size: i32, /// Compressed (and potentially encrypted) page size in bytes, not including this header * pub compressed_page_size: i32, - /// The 32bit CRC for the page, to be be calculated as follows: - /// - Using the standard CRC32 algorithm - /// - On the data only, i.e. this header should not be included. 'Data' - /// hereby refers to the concatenation of the repetition levels, the - /// definition levels and the column value, in this exact order. - /// - On the encoded versions of the repetition levels, definition levels and - /// column values - /// - On the compressed versions of the repetition levels, definition levels - /// and column values where possible; - /// - For v1 data pages, the repetition levels, definition levels and column - /// values are always compressed together. If a compression scheme is - /// specified, the CRC shall be calculated on the compressed version of - /// this concatenation. If no compression scheme is specified, the CRC - /// shall be calculated on the uncompressed version of this concatenation. - /// - For v2 data pages, the repetition levels and definition levels are - /// handled separately from the data and are never compressed (only - /// encoded). If a compression scheme is specified, the CRC shall be - /// calculated on the concatenation of the uncompressed repetition levels, - /// uncompressed definition levels and the compressed column values. - /// If no compression scheme is specified, the CRC shall be calculated on - /// the uncompressed concatenation. - /// - In encrypted columns, CRC is calculated after page encryption; the - /// encryption itself is performed after page compression (if compressed) + /// The 32-bit CRC checksum for the page, to be be calculated as follows: + /// + /// - The standard CRC32 algorithm is used (with polynomial 0x04C11DB7, + /// the same as in e.g. GZip). + /// - All page types can have a CRC (v1 and v2 data pages, dictionary pages, + /// etc.). + /// - The CRC is computed on the serialization binary representation of the page + /// (as written to disk), excluding the page header. For example, for v1 + /// data pages, the CRC is computed on the concatenation of repetition levels, + /// definition levels and column values (optionally compressed, optionally + /// encrypted). + /// - The CRC computation therefore takes place after any compression + /// and encryption steps, if any. + /// /// If enabled, this allows for disabling checksumming in HDFS if only a few /// pages need to be read. - /// pub crc: Option, pub data_page_header: Option, pub index_page_header: Option, @@ -3516,10 +3325,16 @@ pub struct ColumnMetaData { pub encoding_stats: Option>, /// Byte offset from beginning of file to Bloom filter data. * pub bloom_filter_offset: Option, + /// Size of Bloom filter data including the serialized header, in bytes. + /// Added in 2.10 so readers may not read this field from old files and + /// it can be obtained after the BloomFilterHeader has been deserialized. + /// Writers should write this field so readers can read the bloom filter + /// in a single I/O. + pub bloom_filter_length: Option, } impl ColumnMetaData { - pub fn new(type_: Type, encodings: Vec, path_in_schema: Vec, codec: CompressionCodec, num_values: i64, total_uncompressed_size: i64, total_compressed_size: i64, key_value_metadata: F8, data_page_offset: i64, index_page_offset: F10, dictionary_page_offset: F11, statistics: F12, encoding_stats: F13, bloom_filter_offset: F14) -> ColumnMetaData where F8: Into>>, F10: Into>, F11: Into>, F12: Into>, F13: Into>>, F14: Into> { + pub fn new(type_: Type, encodings: Vec, path_in_schema: Vec, codec: CompressionCodec, num_values: i64, total_uncompressed_size: i64, total_compressed_size: i64, key_value_metadata: F8, data_page_offset: i64, index_page_offset: F10, dictionary_page_offset: F11, statistics: F12, encoding_stats: F13, bloom_filter_offset: F14, bloom_filter_length: F15) -> ColumnMetaData where F8: Into>>, F10: Into>, F11: Into>, F12: Into>, F13: Into>>, F14: Into>, F15: Into> { ColumnMetaData { type_, encodings, @@ -3535,6 +3350,7 @@ impl ColumnMetaData { statistics: statistics.into(), encoding_stats: encoding_stats.into(), bloom_filter_offset: bloom_filter_offset.into(), + bloom_filter_length: bloom_filter_length.into(), } } } @@ -3556,6 +3372,7 @@ impl TSerializable for ColumnMetaData { let mut f_12: Option = None; let mut f_13: Option> = None; let mut f_14: Option = None; + let mut f_15: Option = None; loop { let field_ident = i_prot.read_field_begin()?; if field_ident.field_type == TType::Stop { @@ -3643,6 +3460,10 @@ impl TSerializable for ColumnMetaData { let val = i_prot.read_i64()?; f_14 = Some(val); }, + 15 => { + let val = i_prot.read_i32()?; + f_15 = Some(val); + }, _ => { i_prot.skip(field_ident.field_type)?; }, @@ -3673,6 +3494,7 @@ impl TSerializable for ColumnMetaData { statistics: f_12, encoding_stats: f_13, bloom_filter_offset: f_14, + bloom_filter_length: f_15, }; Ok(ret) } @@ -3749,6 +3571,11 @@ impl TSerializable for ColumnMetaData { o_prot.write_i64(fld_var)?; o_prot.write_field_end()? } + if let Some(fld_var) = self.bloom_filter_length { + o_prot.write_field_begin(&TFieldIdentifier::new("bloom_filter_length", TType::I32, 15))?; + o_prot.write_i32(fld_var)?; + o_prot.write_field_end()? + } o_prot.write_field_stop()?; o_prot.write_struct_end() } @@ -3758,7 +3585,7 @@ impl TSerializable for ColumnMetaData { // EncryptionWithFooterKey // -#[derive(Clone, Debug, Eq, Hash, Ord, PartialEq, PartialOrd)] +#[derive(Clone, Debug, Default, Eq, Hash, Ord, PartialEq, PartialOrd)] pub struct EncryptionWithFooterKey { } @@ -3776,12 +3603,7 @@ impl TSerializable for EncryptionWithFooterKey { if field_ident.field_type == TType::Stop { break; } - let field_id = field_id(&field_ident)?; - match field_id { - _ => { - i_prot.skip(field_ident.field_type)?; - }, - }; + i_prot.skip(field_ident.field_type)?; i_prot.read_field_end()?; } i_prot.read_struct_end()?; @@ -3796,12 +3618,6 @@ impl TSerializable for EncryptionWithFooterKey { } } -impl Default for EncryptionWithFooterKey { - fn default() -> Self { - EncryptionWithFooterKey{} - } -} - // // EncryptionWithColumnKey // @@ -3977,14 +3793,14 @@ impl TSerializable for ColumnCryptoMetaData { pub struct ColumnChunk { /// File where column data is stored. If not set, assumed to be same file as /// metadata. This path is relative to the current file. - /// + /// pub file_path: Option, /// Byte offset in file_path to the ColumnMetaData * pub file_offset: i64, /// Column metadata for this chunk. This is the same content as what is at /// file_path/file_offset. Having it here has it replicated in the file /// metadata. - /// + /// pub meta_data: Option, /// File offset of ColumnChunk's OffsetIndex * pub offset_index_offset: Option, @@ -4151,7 +3967,7 @@ impl TSerializable for ColumnChunk { pub struct RowGroup { /// Metadata for each column chunk in this row group. /// This list must have the same order as the SchemaElement list in FileMetaData. - /// + /// pub columns: Vec, /// Total byte size of all the uncompressed column data in this row group * pub total_byte_size: i64, @@ -4312,7 +4128,7 @@ impl TSerializable for RowGroup { // /// Empty struct to signal the order defined by the physical or logical type -#[derive(Clone, Debug, Eq, Hash, Ord, PartialEq, PartialOrd)] +#[derive(Clone, Debug, Default, Eq, Hash, Ord, PartialEq, PartialOrd)] pub struct TypeDefinedOrder { } @@ -4330,12 +4146,7 @@ impl TSerializable for TypeDefinedOrder { if field_ident.field_type == TType::Stop { break; } - let field_id = field_id(&field_ident)?; - match field_id { - _ => { - i_prot.skip(field_ident.field_type)?; - }, - }; + i_prot.skip(field_ident.field_type)?; i_prot.read_field_end()?; } i_prot.read_struct_end()?; @@ -4350,12 +4161,6 @@ impl TSerializable for TypeDefinedOrder { } } -impl Default for TypeDefinedOrder { - fn default() -> Self { - TypeDefinedOrder{} - } -} - // // ColumnOrder // @@ -4596,13 +4401,14 @@ pub struct ColumnIndex { /// byte\[0\], so that all lists have the same length. If false, the /// corresponding entries in min_values and max_values must be valid. pub null_pages: Vec, - /// Two lists containing lower and upper bounds for the values of each page. - /// These may be the actual minimum and maximum values found on a page, but - /// can also be (more compact) values that do not exist on a page. For - /// example, instead of storing ""Blart Versenwald III", a writer may set - /// min_values\[i\]="B", max_values\[i\]="C". Such more compact values must still - /// be valid values within the column's logical type. Readers must make sure - /// that list entries are populated before using them by inspecting null_pages. + /// Two lists containing lower and upper bounds for the values of each page + /// determined by the ColumnOrder of the column. These may be the actual + /// minimum and maximum values found on a page, but can also be (more compact) + /// values that do not exist on a page. For example, instead of storing ""Blart + /// Versenwald III", a writer may set min_values\[i\]="B", max_values\[i\]="C". + /// Such more compact values must still be valid values within the column's + /// logical type. Readers must make sure that list entries are populated before + /// using them by inspecting null_pages. pub min_values: Vec>, pub max_values: Vec>, /// Stores whether both min_values and max_values are ordered and if so, in @@ -4750,7 +4556,7 @@ impl TSerializable for ColumnIndex { // AesGcmV1 // -#[derive(Clone, Debug, Eq, Hash, Ord, PartialEq, PartialOrd)] +#[derive(Clone, Debug, Default, Eq, Hash, Ord, PartialEq, PartialOrd)] pub struct AesGcmV1 { /// AAD prefix * pub aad_prefix: Option>, @@ -4833,21 +4639,11 @@ impl TSerializable for AesGcmV1 { } } -impl Default for AesGcmV1 { - fn default() -> Self { - AesGcmV1{ - aad_prefix: Some(Vec::new()), - aad_file_unique: Some(Vec::new()), - supply_aad_prefix: Some(false), - } - } -} - // // AesGcmCtrV1 // -#[derive(Clone, Debug, Eq, Hash, Ord, PartialEq, PartialOrd)] +#[derive(Clone, Debug, Default, Eq, Hash, Ord, PartialEq, PartialOrd)] pub struct AesGcmCtrV1 { /// AAD prefix * pub aad_prefix: Option>, @@ -4930,16 +4726,6 @@ impl TSerializable for AesGcmCtrV1 { } } -impl Default for AesGcmCtrV1 { - fn default() -> Self { - AesGcmCtrV1{ - aad_prefix: Some(Vec::new()), - aad_file_unique: Some(Vec::new()), - supply_aad_prefix: Some(false), - } - } -} - // // EncryptionAlgorithm // @@ -5051,19 +4837,22 @@ pub struct FileMetaData { /// String for application that wrote this file. This should be in the format /// `` version `` (build ``). /// e.g. impala version 1.0 (build 6cf94d29b2b7115df4de2c06e2ab4326d721eb55) - /// + /// pub created_by: Option, - /// Sort order used for the min_value and max_value fields of each column in - /// this file. Sort orders are listed in the order matching the columns in the - /// schema. The indexes are not necessary the same though, because only leaf - /// nodes of the schema are represented in the list of sort orders. - /// - /// Without column_orders, the meaning of the min_value and max_value fields is - /// undefined. To ensure well-defined behaviour, if min_value and max_value are - /// written to a Parquet file, column_orders must be written as well. - /// - /// The obsolete min and max fields are always sorted by signed comparison - /// regardless of column_orders. + /// Sort order used for the min_value and max_value fields in the Statistics + /// objects and the min_values and max_values fields in the ColumnIndex + /// objects of each column in this file. Sort orders are listed in the order + /// matching the columns in the schema. The indexes are not necessary the same + /// though, because only leaf nodes of the schema are represented in the list + /// of sort orders. + /// + /// Without column_orders, the meaning of the min_value and max_value fields + /// in the Statistics object and the ColumnIndex object is undefined. To ensure + /// well-defined behaviour, if these fields are written to a Parquet file, + /// column_orders must be written as well. + /// + /// The obsolete min and max fields in the Statistics object are always sorted + /// by signed comparison regardless of column_orders. pub column_orders: Option>, /// Encryption algorithm. This field is set only in encrypted files /// with plaintext footer. Files with encrypted footer store algorithm id diff --git a/parquet/src/schema/printer.rs b/parquet/src/schema/printer.rs index 0c90c5405a2b..fe63e758b251 100644 --- a/parquet/src/schema/printer.rs +++ b/parquet/src/schema/printer.rs @@ -167,6 +167,11 @@ fn print_column_chunk_metadata( Some(bfo) => bfo.to_string(), }; writeln!(out, "bloom filter offset: {bloom_filter_offset_str}"); + let bloom_filter_length_str = match cc_metadata.bloom_filter_length() { + None => "N/A".to_owned(), + Some(bfo) => bfo.to_string(), + }; + writeln!(out, "bloom filter length: {bloom_filter_length_str}"); let offset_index_offset_str = match cc_metadata.offset_index_offset() { None => "N/A".to_owned(), Some(oio) => oio.to_string(),