From e9ff64b5e1a4fa5b3b8408910fd68b6abe30f9c8 Mon Sep 17 00:00:00 2001 From: Yijun Zhao Date: Wed, 6 Mar 2024 14:32:35 +0800 Subject: [PATCH] update arrow-format --- arrow-array/src/array/byte_view_array.rs | 1 - .../src/builder/generic_bytes_view_builder.rs | 1 + arrow-ipc/src/convert.rs | 2 +- arrow-ipc/src/gen/Message.rs | 51 ++ arrow-ipc/src/gen/Schema.rs | 475 +++++++++++++++++- format/Message.fbs | 18 +- format/Schema.fbs | 46 +- parquet/src/arrow/schema/mod.rs | 194 +++---- 8 files changed, 644 insertions(+), 144 deletions(-) diff --git a/arrow-array/src/array/byte_view_array.rs b/arrow-array/src/array/byte_view_array.rs index a3b8a5dcb803..9f3a6809d9d0 100644 --- a/arrow-array/src/array/byte_view_array.rs +++ b/arrow-array/src/array/byte_view_array.rs @@ -93,7 +93,6 @@ use std::sync::Arc; /// └───┘ /// ``` /// [`GenericByteArray`]: crate::array::GenericByteArray - pub struct GenericByteViewArray { data_type: DataType, views: ScalarBuffer, diff --git a/arrow-array/src/builder/generic_bytes_view_builder.rs b/arrow-array/src/builder/generic_bytes_view_builder.rs index 29de7feb0ec1..9accb932ae20 100644 --- a/arrow-array/src/builder/generic_bytes_view_builder.rs +++ b/arrow-array/src/builder/generic_bytes_view_builder.rs @@ -20,6 +20,7 @@ use crate::types::{BinaryViewType, ByteViewType, StringViewType}; use crate::{ArrayRef, GenericByteViewArray}; use arrow_buffer::{Buffer, BufferBuilder, NullBufferBuilder, ScalarBuffer}; use arrow_data::ByteView; + use std::any::Any; use std::marker::PhantomData; use std::sync::Arc; diff --git a/arrow-ipc/src/convert.rs b/arrow-ipc/src/convert.rs index a821008d89ab..b2e580241adc 100644 --- a/arrow-ipc/src/convert.rs +++ b/arrow-ipc/src/convert.rs @@ -543,7 +543,7 @@ pub(crate) fn get_fb_field_type<'a>( .as_union_value(), children: Some(fbb.create_vector(&empty_fields[..])), }, - BinaryView | Utf8View => unimplemented!("BinaryView/Utf8View not implemented"), + BinaryView | Utf8View => unimplemented!("unimplemented"), Utf8 => FBFieldType { type_type: crate::Type::Utf8, type_: crate::Utf8Builder::new(fbb).finish().as_union_value(), diff --git a/arrow-ipc/src/gen/Message.rs b/arrow-ipc/src/gen/Message.rs index a546b54d9170..1f49f1d9428b 100644 --- a/arrow-ipc/src/gen/Message.rs +++ b/arrow-ipc/src/gen/Message.rs @@ -25,6 +25,8 @@ use flatbuffers::EndianScalar; use std::{cmp::Ordering, mem}; // automatically generated by the FlatBuffers compiler, do not modify +// @generated + #[deprecated( since = "2.0.0", note = "Use associated constants instead. This will no longer be generated in 2021." @@ -636,6 +638,7 @@ impl<'a> RecordBatch<'a> { pub const VT_NODES: flatbuffers::VOffsetT = 6; pub const VT_BUFFERS: flatbuffers::VOffsetT = 8; pub const VT_COMPRESSION: flatbuffers::VOffsetT = 10; + pub const VT_VARIADICBUFFERCOUNTS: flatbuffers::VOffsetT = 12; #[inline] pub unsafe fn init_from_table(table: flatbuffers::Table<'a>) -> Self { @@ -648,6 +651,9 @@ impl<'a> RecordBatch<'a> { ) -> flatbuffers::WIPOffset> { let mut builder = RecordBatchBuilder::new(_fbb); builder.add_length(args.length); + if let Some(x) = args.variadicBufferCounts { + builder.add_variadicBufferCounts(x); + } if let Some(x) = args.compression { builder.add_compression(x); } @@ -720,6 +726,33 @@ impl<'a> RecordBatch<'a> { ) } } + /// Some types such as Utf8View are represented using a variable number of buffers. + /// For each such Field in the pre-ordered flattened logical schema, there will be + /// an entry in variadicBufferCounts to indicate the number of number of variadic + /// buffers which belong to that Field in the current RecordBatch. + /// + /// For example, the schema + /// col1: Struct + /// col2: Utf8View + /// contains two Fields with variadic buffers so variadicBufferCounts will have + /// two entries, the first counting the variadic buffers of `col1.beta` and the + /// second counting `col2`'s. + /// + /// This field may be omitted if and only if the schema contains no Fields with + /// a variable number of buffers, such as BinaryView and Utf8View. + #[inline] + pub fn variadicBufferCounts(&self) -> Option> { + // Safety: + // Created from valid Table for this object + // which contains a valid value in this slot + unsafe { + self._tab + .get::>>( + RecordBatch::VT_VARIADICBUFFERCOUNTS, + None, + ) + } + } } impl flatbuffers::Verifiable for RecordBatch<'_> { @@ -746,6 +779,11 @@ impl flatbuffers::Verifiable for RecordBatch<'_> { Self::VT_COMPRESSION, false, )? + .visit_field::>>( + "variadicBufferCounts", + Self::VT_VARIADICBUFFERCOUNTS, + false, + )? .finish(); Ok(()) } @@ -755,6 +793,7 @@ pub struct RecordBatchArgs<'a> { pub nodes: Option>>, pub buffers: Option>>, pub compression: Option>>, + pub variadicBufferCounts: Option>>, } impl<'a> Default for RecordBatchArgs<'a> { #[inline] @@ -764,6 +803,7 @@ impl<'a> Default for RecordBatchArgs<'a> { nodes: None, buffers: None, compression: None, + variadicBufferCounts: None, } } } @@ -800,6 +840,16 @@ impl<'a: 'b, 'b> RecordBatchBuilder<'a, 'b> { ); } #[inline] + pub fn add_variadicBufferCounts( + &mut self, + variadicBufferCounts: flatbuffers::WIPOffset>, + ) { + self.fbb_.push_slot_always::>( + RecordBatch::VT_VARIADICBUFFERCOUNTS, + variadicBufferCounts, + ); + } + #[inline] pub fn new(_fbb: &'b mut flatbuffers::FlatBufferBuilder<'a>) -> RecordBatchBuilder<'a, 'b> { let start = _fbb.start_table(); RecordBatchBuilder { @@ -821,6 +871,7 @@ impl core::fmt::Debug for RecordBatch<'_> { ds.field("nodes", &self.nodes()); ds.field("buffers", &self.buffers()); ds.field("compression", &self.compression()); + ds.field("variadicBufferCounts", &self.variadicBufferCounts()); ds.finish() } } diff --git a/arrow-ipc/src/gen/Schema.rs b/arrow-ipc/src/gen/Schema.rs index 0dc5dccd39e7..ed9dbaa249f0 100644 --- a/arrow-ipc/src/gen/Schema.rs +++ b/arrow-ipc/src/gen/Schema.rs @@ -22,6 +22,8 @@ use flatbuffers::EndianScalar; use std::{cmp::Ordering, mem}; // automatically generated by the FlatBuffers compiler, do not modify +// @generated + #[deprecated( since = "2.0.0", note = "Use associated constants instead. This will no longer be generated in 2021." @@ -58,7 +60,7 @@ impl MetadataVersion { pub const V3: Self = Self(2); /// >= 0.8.0 (December 2017). Non-backwards compatible with V3. pub const V4: Self = Self(3); - /// >= 1.0.0 (July 2020. Backwards compatible with V4 (V5 readers can read V4 + /// >= 1.0.0 (July 2020). Backwards compatible with V4 (V5 readers can read V4 /// metadata and IPC messages). Implementations are recommended to provide a /// V4 compatibility mode with V5 format changes disabled. /// @@ -734,13 +736,13 @@ pub const ENUM_MIN_TYPE: u8 = 0; since = "2.0.0", note = "Use associated constants instead. This will no longer be generated in 2021." )] -pub const ENUM_MAX_TYPE: u8 = 22; +pub const ENUM_MAX_TYPE: u8 = 26; #[deprecated( since = "2.0.0", note = "Use associated constants instead. This will no longer be generated in 2021." )] #[allow(non_camel_case_types)] -pub const ENUM_VALUES_TYPE: [Type; 23] = [ +pub const ENUM_VALUES_TYPE: [Type; 27] = [ Type::NONE, Type::Null, Type::Int, @@ -764,6 +766,10 @@ pub const ENUM_VALUES_TYPE: [Type; 23] = [ Type::LargeUtf8, Type::LargeList, Type::RunEndEncoded, + Type::BinaryView, + Type::Utf8View, + Type::ListView, + Type::LargeListView, ]; /// ---------------------------------------------------------------------- @@ -797,9 +803,13 @@ impl Type { pub const LargeUtf8: Self = Self(20); pub const LargeList: Self = Self(21); pub const RunEndEncoded: Self = Self(22); + pub const BinaryView: Self = Self(23); + pub const Utf8View: Self = Self(24); + pub const ListView: Self = Self(25); + pub const LargeListView: Self = Self(26); pub const ENUM_MIN: u8 = 0; - pub const ENUM_MAX: u8 = 22; + pub const ENUM_MAX: u8 = 26; pub const ENUM_VALUES: &'static [Self] = &[ Self::NONE, Self::Null, @@ -824,6 +834,10 @@ impl Type { Self::LargeUtf8, Self::LargeList, Self::RunEndEncoded, + Self::BinaryView, + Self::Utf8View, + Self::ListView, + Self::LargeListView, ]; /// Returns the variant's name or "" if unknown. pub fn variant_name(self) -> Option<&'static str> { @@ -851,6 +865,10 @@ impl Type { Self::LargeUtf8 => Some("LargeUtf8"), Self::LargeList => Some("LargeList"), Self::RunEndEncoded => Some("RunEndEncoded"), + Self::BinaryView => Some("BinaryView"), + Self::Utf8View => Some("Utf8View"), + Self::ListView => Some("ListView"), + Self::LargeListView => Some("LargeListView"), _ => None, } } @@ -1545,6 +1563,165 @@ impl core::fmt::Debug for LargeList<'_> { ds.finish() } } +pub enum ListViewOffset {} +#[derive(Copy, Clone, PartialEq)] + +/// Represents the same logical types that List can, but contains offsets and +/// sizes allowing for writes in any order and sharing of child values among +/// list values. +pub struct ListView<'a> { + pub _tab: flatbuffers::Table<'a>, +} + +impl<'a> flatbuffers::Follow<'a> for ListView<'a> { + type Inner = ListView<'a>; + #[inline] + unsafe fn follow(buf: &'a [u8], loc: usize) -> Self::Inner { + Self { + _tab: flatbuffers::Table::new(buf, loc), + } + } +} + +impl<'a> ListView<'a> { + #[inline] + pub unsafe fn init_from_table(table: flatbuffers::Table<'a>) -> Self { + ListView { _tab: table } + } + #[allow(unused_mut)] + pub fn create<'bldr: 'args, 'args: 'mut_bldr, 'mut_bldr>( + _fbb: &'mut_bldr mut flatbuffers::FlatBufferBuilder<'bldr>, + _args: &'args ListViewArgs, + ) -> flatbuffers::WIPOffset> { + let mut builder = ListViewBuilder::new(_fbb); + builder.finish() + } +} + +impl flatbuffers::Verifiable for ListView<'_> { + #[inline] + fn run_verifier( + v: &mut flatbuffers::Verifier, + pos: usize, + ) -> Result<(), flatbuffers::InvalidFlatbuffer> { + use flatbuffers::Verifiable; + v.visit_table(pos)?.finish(); + Ok(()) + } +} +pub struct ListViewArgs {} +impl<'a> Default for ListViewArgs { + #[inline] + fn default() -> Self { + ListViewArgs {} + } +} + +pub struct ListViewBuilder<'a: 'b, 'b> { + fbb_: &'b mut flatbuffers::FlatBufferBuilder<'a>, + start_: flatbuffers::WIPOffset, +} +impl<'a: 'b, 'b> ListViewBuilder<'a, 'b> { + #[inline] + pub fn new(_fbb: &'b mut flatbuffers::FlatBufferBuilder<'a>) -> ListViewBuilder<'a, 'b> { + let start = _fbb.start_table(); + ListViewBuilder { + fbb_: _fbb, + start_: start, + } + } + #[inline] + pub fn finish(self) -> flatbuffers::WIPOffset> { + let o = self.fbb_.end_table(self.start_); + flatbuffers::WIPOffset::new(o.value()) + } +} + +impl core::fmt::Debug for ListView<'_> { + fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { + let mut ds = f.debug_struct("ListView"); + ds.finish() + } +} +pub enum LargeListViewOffset {} +#[derive(Copy, Clone, PartialEq)] + +/// Same as ListView, but with 64-bit offsets and sizes, allowing to represent +/// extremely large data values. +pub struct LargeListView<'a> { + pub _tab: flatbuffers::Table<'a>, +} + +impl<'a> flatbuffers::Follow<'a> for LargeListView<'a> { + type Inner = LargeListView<'a>; + #[inline] + unsafe fn follow(buf: &'a [u8], loc: usize) -> Self::Inner { + Self { + _tab: flatbuffers::Table::new(buf, loc), + } + } +} + +impl<'a> LargeListView<'a> { + #[inline] + pub unsafe fn init_from_table(table: flatbuffers::Table<'a>) -> Self { + LargeListView { _tab: table } + } + #[allow(unused_mut)] + pub fn create<'bldr: 'args, 'args: 'mut_bldr, 'mut_bldr>( + _fbb: &'mut_bldr mut flatbuffers::FlatBufferBuilder<'bldr>, + _args: &'args LargeListViewArgs, + ) -> flatbuffers::WIPOffset> { + let mut builder = LargeListViewBuilder::new(_fbb); + builder.finish() + } +} + +impl flatbuffers::Verifiable for LargeListView<'_> { + #[inline] + fn run_verifier( + v: &mut flatbuffers::Verifier, + pos: usize, + ) -> Result<(), flatbuffers::InvalidFlatbuffer> { + use flatbuffers::Verifiable; + v.visit_table(pos)?.finish(); + Ok(()) + } +} +pub struct LargeListViewArgs {} +impl<'a> Default for LargeListViewArgs { + #[inline] + fn default() -> Self { + LargeListViewArgs {} + } +} + +pub struct LargeListViewBuilder<'a: 'b, 'b> { + fbb_: &'b mut flatbuffers::FlatBufferBuilder<'a>, + start_: flatbuffers::WIPOffset, +} +impl<'a: 'b, 'b> LargeListViewBuilder<'a, 'b> { + #[inline] + pub fn new(_fbb: &'b mut flatbuffers::FlatBufferBuilder<'a>) -> LargeListViewBuilder<'a, 'b> { + let start = _fbb.start_table(); + LargeListViewBuilder { + fbb_: _fbb, + start_: start, + } + } + #[inline] + pub fn finish(self) -> flatbuffers::WIPOffset> { + let o = self.fbb_.end_table(self.start_); + flatbuffers::WIPOffset::new(o.value()) + } +} + +impl core::fmt::Debug for LargeListView<'_> { + fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { + let mut ds = f.debug_struct("LargeListView"); + ds.finish() + } +} pub enum FixedSizeListOffset {} #[derive(Copy, Clone, PartialEq)] @@ -2453,6 +2630,174 @@ impl core::fmt::Debug for LargeBinary<'_> { ds.finish() } } +pub enum Utf8ViewOffset {} +#[derive(Copy, Clone, PartialEq)] + +/// Logically the same as Utf8, but the internal representation uses a view +/// struct that contains the string length and either the string's entire data +/// inline (for small strings) or an inlined prefix, an index of another buffer, +/// and an offset pointing to a slice in that buffer (for non-small strings). +/// +/// Since it uses a variable number of data buffers, each Field with this type +/// must have a corresponding entry in `variadicBufferCounts`. +pub struct Utf8View<'a> { + pub _tab: flatbuffers::Table<'a>, +} + +impl<'a> flatbuffers::Follow<'a> for Utf8View<'a> { + type Inner = Utf8View<'a>; + #[inline] + unsafe fn follow(buf: &'a [u8], loc: usize) -> Self::Inner { + Self { + _tab: flatbuffers::Table::new(buf, loc), + } + } +} + +impl<'a> Utf8View<'a> { + #[inline] + pub unsafe fn init_from_table(table: flatbuffers::Table<'a>) -> Self { + Utf8View { _tab: table } + } + #[allow(unused_mut)] + pub fn create<'bldr: 'args, 'args: 'mut_bldr, 'mut_bldr>( + _fbb: &'mut_bldr mut flatbuffers::FlatBufferBuilder<'bldr>, + _args: &'args Utf8ViewArgs, + ) -> flatbuffers::WIPOffset> { + let mut builder = Utf8ViewBuilder::new(_fbb); + builder.finish() + } +} + +impl flatbuffers::Verifiable for Utf8View<'_> { + #[inline] + fn run_verifier( + v: &mut flatbuffers::Verifier, + pos: usize, + ) -> Result<(), flatbuffers::InvalidFlatbuffer> { + use flatbuffers::Verifiable; + v.visit_table(pos)?.finish(); + Ok(()) + } +} +pub struct Utf8ViewArgs {} +impl<'a> Default for Utf8ViewArgs { + #[inline] + fn default() -> Self { + Utf8ViewArgs {} + } +} + +pub struct Utf8ViewBuilder<'a: 'b, 'b> { + fbb_: &'b mut flatbuffers::FlatBufferBuilder<'a>, + start_: flatbuffers::WIPOffset, +} +impl<'a: 'b, 'b> Utf8ViewBuilder<'a, 'b> { + #[inline] + pub fn new(_fbb: &'b mut flatbuffers::FlatBufferBuilder<'a>) -> Utf8ViewBuilder<'a, 'b> { + let start = _fbb.start_table(); + Utf8ViewBuilder { + fbb_: _fbb, + start_: start, + } + } + #[inline] + pub fn finish(self) -> flatbuffers::WIPOffset> { + let o = self.fbb_.end_table(self.start_); + flatbuffers::WIPOffset::new(o.value()) + } +} + +impl core::fmt::Debug for Utf8View<'_> { + fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { + let mut ds = f.debug_struct("Utf8View"); + ds.finish() + } +} +pub enum BinaryViewOffset {} +#[derive(Copy, Clone, PartialEq)] + +/// Logically the same as Binary, but the internal representation uses a view +/// struct that contains the string length and either the string's entire data +/// inline (for small strings) or an inlined prefix, an index of another buffer, +/// and an offset pointing to a slice in that buffer (for non-small strings). +/// +/// Since it uses a variable number of data buffers, each Field with this type +/// must have a corresponding entry in `variadicBufferCounts`. +pub struct BinaryView<'a> { + pub _tab: flatbuffers::Table<'a>, +} + +impl<'a> flatbuffers::Follow<'a> for BinaryView<'a> { + type Inner = BinaryView<'a>; + #[inline] + unsafe fn follow(buf: &'a [u8], loc: usize) -> Self::Inner { + Self { + _tab: flatbuffers::Table::new(buf, loc), + } + } +} + +impl<'a> BinaryView<'a> { + #[inline] + pub unsafe fn init_from_table(table: flatbuffers::Table<'a>) -> Self { + BinaryView { _tab: table } + } + #[allow(unused_mut)] + pub fn create<'bldr: 'args, 'args: 'mut_bldr, 'mut_bldr>( + _fbb: &'mut_bldr mut flatbuffers::FlatBufferBuilder<'bldr>, + _args: &'args BinaryViewArgs, + ) -> flatbuffers::WIPOffset> { + let mut builder = BinaryViewBuilder::new(_fbb); + builder.finish() + } +} + +impl flatbuffers::Verifiable for BinaryView<'_> { + #[inline] + fn run_verifier( + v: &mut flatbuffers::Verifier, + pos: usize, + ) -> Result<(), flatbuffers::InvalidFlatbuffer> { + use flatbuffers::Verifiable; + v.visit_table(pos)?.finish(); + Ok(()) + } +} +pub struct BinaryViewArgs {} +impl<'a> Default for BinaryViewArgs { + #[inline] + fn default() -> Self { + BinaryViewArgs {} + } +} + +pub struct BinaryViewBuilder<'a: 'b, 'b> { + fbb_: &'b mut flatbuffers::FlatBufferBuilder<'a>, + start_: flatbuffers::WIPOffset, +} +impl<'a: 'b, 'b> BinaryViewBuilder<'a, 'b> { + #[inline] + pub fn new(_fbb: &'b mut flatbuffers::FlatBufferBuilder<'a>) -> BinaryViewBuilder<'a, 'b> { + let start = _fbb.start_table(); + BinaryViewBuilder { + fbb_: _fbb, + start_: start, + } + } + #[inline] + pub fn finish(self) -> flatbuffers::WIPOffset> { + let o = self.fbb_.end_table(self.start_); + flatbuffers::WIPOffset::new(o.value()) + } +} + +impl core::fmt::Debug for BinaryView<'_> { + fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { + let mut ds = f.debug_struct("BinaryView"); + ds.finish() + } +} pub enum FixedSizeBinaryOffset {} #[derive(Copy, Clone, PartialEq)] @@ -3213,7 +3558,7 @@ pub enum TimestampOffset {} /// no indication of how to map this information to a physical point in time. /// Naive date-times must be handled with care because of this missing /// information, and also because daylight saving time (DST) may make -/// some values ambiguous or non-existent. A naive date-time may be +/// some values ambiguous or nonexistent. A naive date-time may be /// stored as a struct with Date and Time fields. However, it may also be /// encoded into a Timestamp column with an empty timezone. The timestamp /// values should be computed "as if" the timezone of the date-time values @@ -4365,6 +4710,66 @@ impl<'a> Field<'a> { None } } + + #[inline] + #[allow(non_snake_case)] + pub fn type_as_binary_view(&self) -> Option> { + if self.type_type() == Type::BinaryView { + self.type_().map(|t| { + // Safety: + // Created from a valid Table for this object + // Which contains a valid union in this slot + unsafe { BinaryView::init_from_table(t) } + }) + } else { + None + } + } + + #[inline] + #[allow(non_snake_case)] + pub fn type_as_utf_8_view(&self) -> Option> { + if self.type_type() == Type::Utf8View { + self.type_().map(|t| { + // Safety: + // Created from a valid Table for this object + // Which contains a valid union in this slot + unsafe { Utf8View::init_from_table(t) } + }) + } else { + None + } + } + + #[inline] + #[allow(non_snake_case)] + pub fn type_as_list_view(&self) -> Option> { + if self.type_type() == Type::ListView { + self.type_().map(|t| { + // Safety: + // Created from a valid Table for this object + // Which contains a valid union in this slot + unsafe { ListView::init_from_table(t) } + }) + } else { + None + } + } + + #[inline] + #[allow(non_snake_case)] + pub fn type_as_large_list_view(&self) -> Option> { + if self.type_type() == Type::LargeListView { + self.type_().map(|t| { + // Safety: + // Created from a valid Table for this object + // Which contains a valid union in this slot + unsafe { LargeListView::init_from_table(t) } + }) + } else { + None + } + } } impl flatbuffers::Verifiable for Field<'_> { @@ -4484,6 +4889,26 @@ impl flatbuffers::Verifiable for Field<'_> { "Type::RunEndEncoded", pos, ), + Type::BinaryView => v + .verify_union_variant::>( + "Type::BinaryView", + pos, + ), + Type::Utf8View => v + .verify_union_variant::>( + "Type::Utf8View", + pos, + ), + Type::ListView => v + .verify_union_variant::>( + "Type::ListView", + pos, + ), + Type::LargeListView => v + .verify_union_variant::>( + "Type::LargeListView", + pos, + ), _ => Ok(()), }, )? @@ -4827,6 +5252,46 @@ impl core::fmt::Debug for Field<'_> { ) } } + Type::BinaryView => { + if let Some(x) = self.type_as_binary_view() { + ds.field("type_", &x) + } else { + ds.field( + "type_", + &"InvalidFlatbuffer: Union discriminant does not match value.", + ) + } + } + Type::Utf8View => { + if let Some(x) = self.type_as_utf_8_view() { + ds.field("type_", &x) + } else { + ds.field( + "type_", + &"InvalidFlatbuffer: Union discriminant does not match value.", + ) + } + } + Type::ListView => { + if let Some(x) = self.type_as_list_view() { + ds.field("type_", &x) + } else { + ds.field( + "type_", + &"InvalidFlatbuffer: Union discriminant does not match value.", + ) + } + } + Type::LargeListView => { + if let Some(x) = self.type_as_large_list_view() { + ds.field("type_", &x) + } else { + ds.field( + "type_", + &"InvalidFlatbuffer: Union discriminant does not match value.", + ) + } + } _ => { let x: Option<()> = None; ds.field("type_", &x) diff --git a/format/Message.fbs b/format/Message.fbs index 170ea8fbced8..c8c9b4b82cbf 100644 --- a/format/Message.fbs +++ b/format/Message.fbs @@ -99,6 +99,22 @@ table RecordBatch { /// Optional compression of the message body compression: BodyCompression; + + /// Some types such as Utf8View are represented using a variable number of buffers. + /// For each such Field in the pre-ordered flattened logical schema, there will be + /// an entry in variadicBufferCounts to indicate the number of number of variadic + /// buffers which belong to that Field in the current RecordBatch. + /// + /// For example, the schema + /// col1: Struct + /// col2: Utf8View + /// contains two Fields with variadic buffers so variadicBufferCounts will have + /// two entries, the first counting the variadic buffers of `col1.beta` and the + /// second counting `col2`'s. + /// + /// This field may be omitted if and only if the schema contains no Fields with + /// a variable number of buffers, such as BinaryView and Utf8View. + variadicBufferCounts: [long]; } /// For sending dictionary encoding information. Any Field can be @@ -138,4 +154,4 @@ table Message { custom_metadata: [ KeyValue ]; } -root_type Message; +root_type Message; \ No newline at end of file diff --git a/format/Schema.fbs b/format/Schema.fbs index 6337f72ec9de..ab726903d19f 100644 --- a/format/Schema.fbs +++ b/format/Schema.fbs @@ -20,8 +20,10 @@ /// Format Version History. /// Version 1.0 - Forward and backwards compatibility guaranteed. /// Version 1.1 - Add Decimal256. -/// Version 1.2 - Add Interval MONTH_DAY_NANO +/// Version 1.2 - Add Interval MONTH_DAY_NANO. /// Version 1.3 - Add Run-End Encoded. +/// Version 1.4 - Add BinaryView, Utf8View, variadicBufferCounts, ListView, and +/// LargeListView. namespace org.apache.arrow.flatbuf; @@ -38,7 +40,7 @@ enum MetadataVersion:short { /// >= 0.8.0 (December 2017). Non-backwards compatible with V3. V4, - /// >= 1.0.0 (July 2020. Backwards compatible with V4 (V5 readers can read V4 + /// >= 1.0.0 (July 2020). Backwards compatible with V4 (V5 readers can read V4 /// metadata and IPC messages). Implementations are recommended to provide a /// V4 compatibility mode with V5 format changes disabled. /// @@ -96,6 +98,17 @@ table List { table LargeList { } +/// Represents the same logical types that List can, but contains offsets and +/// sizes allowing for writes in any order and sharing of child values among +/// list values. +table ListView { +} + +/// Same as ListView, but with 64-bit offsets and sizes, allowing to represent +/// extremely large data values. +table LargeListView { +} + table FixedSizeList { /// Number of list items per value listSize: int; @@ -171,6 +184,27 @@ table LargeUtf8 { table LargeBinary { } +/// Logically the same as Utf8, but the internal representation uses a view +/// struct that contains the string length and either the string's entire data +/// inline (for small strings) or an inlined prefix, an index of another buffer, +/// and an offset pointing to a slice in that buffer (for non-small strings). +/// +/// Since it uses a variable number of data buffers, each Field with this type +/// must have a corresponding entry in `variadicBufferCounts`. +table Utf8View { +} + +/// Logically the same as Binary, but the internal representation uses a view +/// struct that contains the string length and either the string's entire data +/// inline (for small strings) or an inlined prefix, an index of another buffer, +/// and an offset pointing to a slice in that buffer (for non-small strings). +/// +/// Since it uses a variable number of data buffers, each Field with this type +/// must have a corresponding entry in `variadicBufferCounts`. +table BinaryView { +} + + table FixedSizeBinary { /// Number of bytes per value byteWidth: int; @@ -338,7 +372,7 @@ table Time { /// no indication of how to map this information to a physical point in time. /// Naive date-times must be handled with care because of this missing /// information, and also because daylight saving time (DST) may make -/// some values ambiguous or non-existent. A naive date-time may be +/// some values ambiguous or nonexistent. A naive date-time may be /// stored as a struct with Date and Time fields. However, it may also be /// encoded into a Timestamp column with an empty timezone. The timestamp /// values should be computed "as if" the timezone of the date-time values @@ -427,6 +461,10 @@ union Type { LargeUtf8, LargeList, RunEndEncoded, + BinaryView, + Utf8View, + ListView, + LargeListView, } /// ---------------------------------------------------------------------- @@ -529,4 +567,4 @@ table Schema { features : [ Feature ]; } -root_type Schema; +root_type Schema; \ No newline at end of file diff --git a/parquet/src/arrow/schema/mod.rs b/parquet/src/arrow/schema/mod.rs index 4a78db05ed2d..9316f4fb8d1f 100644 --- a/parquet/src/arrow/schema/mod.rs +++ b/parquet/src/arrow/schema/mod.rs @@ -32,8 +32,7 @@ use arrow_ipc::writer; use arrow_schema::{DataType, Field, Fields, Schema, TimeUnit}; use crate::basic::{ - ConvertedType, LogicalType, Repetition, TimeUnit as ParquetTimeUnit, - Type as PhysicalType, + ConvertedType, LogicalType, Repetition, TimeUnit as ParquetTimeUnit, Type as PhysicalType, }; use crate::errors::{ParquetError, Result}; use crate::file::{metadata::KeyValue, properties::WriterProperties}; @@ -55,11 +54,7 @@ pub fn parquet_to_arrow_schema( parquet_schema: &SchemaDescriptor, key_value_metadata: Option<&Vec>, ) -> Result { - parquet_to_arrow_schema_by_columns( - parquet_schema, - ProjectionMask::all(), - key_value_metadata, - ) + parquet_to_arrow_schema_by_columns(parquet_schema, ProjectionMask::all(), key_value_metadata) } /// Convert parquet schema to arrow schema including optional metadata, @@ -199,10 +194,7 @@ fn encode_arrow_schema(schema: &Schema) -> String { /// Mutates writer metadata by storing the encoded Arrow schema. /// If there is an existing Arrow schema metadata, it is replaced. -pub(crate) fn add_encoded_arrow_schema_to_metadata( - schema: &Schema, - props: &mut WriterProperties, -) { +pub(crate) fn add_encoded_arrow_schema_to_metadata(schema: &Schema, props: &mut WriterProperties) { let encoded = encode_arrow_schema(schema); let schema_kv = KeyValue { @@ -270,16 +262,15 @@ fn parse_key_value_metadata( /// Convert parquet column schema to arrow field. pub fn parquet_to_arrow_field(parquet_column: &ColumnDescriptor) -> Result { let field = complex::convert_type(&parquet_column.self_type_ptr())?; - let mut ret = Field::new( - parquet_column.name(), - field.arrow_type, - field.nullable, - ); + let mut ret = Field::new(parquet_column.name(), field.arrow_type, field.nullable); let basic_info = parquet_column.self_type().get_basic_info(); if basic_info.has_id() { let mut meta = HashMap::with_capacity(1); - meta.insert(PARQUET_FIELD_ID_META_KEY.to_string(), basic_info.id().to_string()); + meta.insert( + PARQUET_FIELD_ID_META_KEY.to_string(), + basic_info.id().to_string(), + ); ret.set_metadata(meta); } @@ -401,15 +392,9 @@ fn arrow_to_parquet_type(field: &Field) -> Result { is_adjusted_to_u_t_c: matches!(tz, Some(z) if !z.as_ref().is_empty()), unit: match time_unit { TimeUnit::Second => unreachable!(), - TimeUnit::Millisecond => { - ParquetTimeUnit::MILLIS(Default::default()) - } - TimeUnit::Microsecond => { - ParquetTimeUnit::MICROS(Default::default()) - } - TimeUnit::Nanosecond => { - ParquetTimeUnit::NANOS(Default::default()) - } + TimeUnit::Millisecond => ParquetTimeUnit::MILLIS(Default::default()), + TimeUnit::Microsecond => ParquetTimeUnit::MICROS(Default::default()), + TimeUnit::Nanosecond => ParquetTimeUnit::NANOS(Default::default()), }, })) .with_repetition(repetition) @@ -457,9 +442,7 @@ fn arrow_to_parquet_type(field: &Field) -> Result { .with_repetition(repetition) .with_id(id) .build(), - DataType::Duration(_) => { - Err(arrow_err!("Converting Duration to parquet not supported",)) - } + DataType::Duration(_) => Err(arrow_err!("Converting Duration to parquet not supported",)), DataType::Interval(_) => { Type::primitive_type_builder(name, PhysicalType::FIXED_LEN_BYTE_ARRAY) .with_converted_type(ConvertedType::INTERVAL) @@ -481,9 +464,13 @@ fn arrow_to_parquet_type(field: &Field) -> Result { .with_length(*length) .build() } - DataType::BinaryView | DataType::Utf8View => unimplemented!("BinaryView/Utf8View not implemented"), - DataType::Decimal128(precision, scale) - | DataType::Decimal256(precision, scale) => { + DataType::BinaryView | DataType::Utf8View => { + Type::primitive_type_builder(name, PhysicalType::BYTE_ARRAY) + .with_repetition(repetition) + .with_id(id) + .build() + } + DataType::Decimal128(precision, scale) | DataType::Decimal256(precision, scale) => { // Decimal precision determines the Parquet physical type to use. // Following the: https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#decimal let (physical_type, length) = if *precision > 1 && *precision <= 9 { @@ -528,12 +515,12 @@ fn arrow_to_parquet_type(field: &Field) -> Result { .with_id(id) .build() } - DataType::ListView(_) | DataType::LargeListView(_) => unimplemented!("ListView/LargeListView not implemented"), + DataType::ListView(_) | DataType::LargeListView(_) => { + unimplemented!("ListView/LargeListView not implemented") + } DataType::Struct(fields) => { if fields.is_empty() { - return Err( - arrow_err!("Parquet does not support writing empty structs",), - ); + return Err(arrow_err!("Parquet does not support writing empty structs",)); } // recursively convert children to types/nodes let fields = fields @@ -623,8 +610,7 @@ mod tests { let parquet_group_type = parse_message_type(message_type).unwrap(); let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type)); - let converted_arrow_schema = - parquet_to_arrow_schema(&parquet_schema, None).unwrap(); + let converted_arrow_schema = parquet_to_arrow_schema(&parquet_schema, None).unwrap(); let arrow_fields = Fields::from(vec![ Field::new("boolean", DataType::Boolean, false), @@ -662,8 +648,7 @@ mod tests { let parquet_group_type = parse_message_type(message_type).unwrap(); let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type)); - let converted_arrow_schema = - parquet_to_arrow_schema(&parquet_schema, None).unwrap(); + let converted_arrow_schema = parquet_to_arrow_schema(&parquet_schema, None).unwrap(); let arrow_fields = Fields::from(vec![ Field::new("decimal1", DataType::Decimal128(4, 2), false), @@ -689,8 +674,7 @@ mod tests { let parquet_group_type = parse_message_type(message_type).unwrap(); let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type)); - let converted_arrow_schema = - parquet_to_arrow_schema(&parquet_schema, None).unwrap(); + let converted_arrow_schema = parquet_to_arrow_schema(&parquet_schema, None).unwrap(); let arrow_fields = Fields::from(vec![ Field::new("binary", DataType::Binary, false), @@ -711,8 +695,7 @@ mod tests { let parquet_group_type = parse_message_type(message_type).unwrap(); let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type)); - let converted_arrow_schema = - parquet_to_arrow_schema(&parquet_schema, None).unwrap(); + let converted_arrow_schema = parquet_to_arrow_schema(&parquet_schema, None).unwrap(); let arrow_fields = Fields::from(vec![ Field::new("boolean", DataType::Boolean, false), @@ -720,12 +703,9 @@ mod tests { ]); assert_eq!(&arrow_fields, converted_arrow_schema.fields()); - let converted_arrow_schema = parquet_to_arrow_schema_by_columns( - &parquet_schema, - ProjectionMask::all(), - None, - ) - .unwrap(); + let converted_arrow_schema = + parquet_to_arrow_schema_by_columns(&parquet_schema, ProjectionMask::all(), None) + .unwrap(); assert_eq!(&arrow_fields, converted_arrow_schema.fields()); } @@ -923,8 +903,7 @@ mod tests { let parquet_group_type = parse_message_type(message_type).unwrap(); let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type)); - let converted_arrow_schema = - parquet_to_arrow_schema(&parquet_schema, None).unwrap(); + let converted_arrow_schema = parquet_to_arrow_schema(&parquet_schema, None).unwrap(); let converted_fields = converted_arrow_schema.fields(); assert_eq!(arrow_fields.len(), converted_fields.len()); @@ -1002,8 +981,7 @@ mod tests { let parquet_group_type = parse_message_type(message_type).unwrap(); let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type)); - let converted_arrow_schema = - parquet_to_arrow_schema(&parquet_schema, None).unwrap(); + let converted_arrow_schema = parquet_to_arrow_schema(&parquet_schema, None).unwrap(); let converted_fields = converted_arrow_schema.fields(); assert_eq!(arrow_fields.len(), converted_fields.len()); @@ -1097,8 +1075,7 @@ mod tests { let parquet_group_type = parse_message_type(message_type).unwrap(); let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type)); - let converted_arrow_schema = - parquet_to_arrow_schema(&parquet_schema, None).unwrap(); + let converted_arrow_schema = parquet_to_arrow_schema(&parquet_schema, None).unwrap(); let converted_fields = converted_arrow_schema.fields(); assert_eq!(arrow_fields.len(), converted_fields.len()); @@ -1115,8 +1092,7 @@ mod tests { Field::new("leaf1", DataType::Boolean, false), Field::new("leaf2", DataType::Int32, false), ]); - let group1_struct = - Field::new("group1", DataType::Struct(group1_fields), false); + let group1_struct = Field::new("group1", DataType::Struct(group1_fields), false); arrow_fields.push(group1_struct); let leaf3_field = Field::new("leaf3", DataType::Int64, false); @@ -1135,8 +1111,7 @@ mod tests { let parquet_group_type = parse_message_type(message_type).unwrap(); let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type)); - let converted_arrow_schema = - parquet_to_arrow_schema(&parquet_schema, None).unwrap(); + let converted_arrow_schema = parquet_to_arrow_schema(&parquet_schema, None).unwrap(); let converted_fields = converted_arrow_schema.fields(); assert_eq!(arrow_fields.len(), converted_fields.len()); @@ -1289,8 +1264,7 @@ mod tests { let parquet_group_type = parse_message_type(message_type).unwrap(); let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type)); - let converted_arrow_schema = - parquet_to_arrow_schema(&parquet_schema, None).unwrap(); + let converted_arrow_schema = parquet_to_arrow_schema(&parquet_schema, None).unwrap(); let converted_fields = converted_arrow_schema.fields(); assert_eq!(arrow_fields.len(), converted_fields.len()); @@ -1515,20 +1489,11 @@ mod tests { vec![ Field::new("bools", DataType::Boolean, false), Field::new("uint32", DataType::UInt32, false), - Field::new_list( - "int32", - Field::new("element", DataType::Int32, true), - false, - ), + Field::new_list("int32", Field::new("element", DataType::Int32, true), false), ], false, ), - Field::new_dictionary( - "dictionary_strings", - DataType::Int32, - DataType::Utf8, - false, - ), + Field::new_dictionary("dictionary_strings", DataType::Int32, DataType::Utf8, false), Field::new("decimal_int32", DataType::Decimal128(8, 2), false), Field::new("decimal_int64", DataType::Decimal128(16, 2), false), Field::new("decimal_fix_length", DataType::Decimal128(30, 2), false), @@ -1613,10 +1578,8 @@ mod tests { let schema = Schema::new_with_metadata( vec![ - Field::new("c1", DataType::Utf8, false).with_metadata(meta(&[ - ("Key", "Foo"), - (PARQUET_FIELD_ID_META_KEY, "2"), - ])), + Field::new("c1", DataType::Utf8, false) + .with_metadata(meta(&[("Key", "Foo"), (PARQUET_FIELD_ID_META_KEY, "2")])), Field::new("c2", DataType::Binary, false), Field::new("c3", DataType::FixedSizeBinary(3), false), Field::new("c4", DataType::Boolean, false), @@ -1634,10 +1597,7 @@ mod tests { ), Field::new( "c17", - DataType::Timestamp( - TimeUnit::Microsecond, - Some("Africa/Johannesburg".into()), - ), + DataType::Timestamp(TimeUnit::Microsecond, Some("Africa/Johannesburg".into())), false, ), Field::new( @@ -1649,10 +1609,8 @@ mod tests { Field::new("c20", DataType::Interval(IntervalUnit::YearMonth), false), Field::new_list( "c21", - Field::new("item", DataType::Boolean, true).with_metadata(meta(&[ - ("Key", "Bar"), - (PARQUET_FIELD_ID_META_KEY, "5"), - ])), + Field::new("item", DataType::Boolean, true) + .with_metadata(meta(&[("Key", "Bar"), (PARQUET_FIELD_ID_META_KEY, "5")])), false, ) .with_metadata(meta(&[(PARQUET_FIELD_ID_META_KEY, "4")])), @@ -1702,10 +1660,7 @@ mod tests { // Field::new("c30", DataType::Duration(TimeUnit::Nanosecond), false), Field::new_dict( "c31", - DataType::Dictionary( - Box::new(DataType::Int32), - Box::new(DataType::Utf8), - ), + DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)), true, 123, true, @@ -1740,11 +1695,7 @@ mod tests { "c39", "key_value", Field::new("key", DataType::Utf8, false), - Field::new_list( - "value", - Field::new("element", DataType::Utf8, true), - true, - ), + Field::new_list("value", Field::new("element", DataType::Utf8, true), true), false, // fails to roundtrip keys_sorted true, ), @@ -1783,11 +1734,8 @@ mod tests { // write to an empty parquet file so that schema is serialized let file = tempfile::tempfile().unwrap(); - let writer = ArrowWriter::try_new( - file.try_clone().unwrap(), - Arc::new(schema.clone()), - None, - )?; + let writer = + ArrowWriter::try_new(file.try_clone().unwrap(), Arc::new(schema.clone()), None)?; writer.close()?; // read file back @@ -1846,33 +1794,23 @@ mod tests { }; let schema = Schema::new_with_metadata( vec![ - Field::new("c1", DataType::Utf8, true).with_metadata(meta(&[ - (PARQUET_FIELD_ID_META_KEY, "1"), - ])), - Field::new("c2", DataType::Utf8, true).with_metadata(meta(&[ - (PARQUET_FIELD_ID_META_KEY, "2"), - ])), + Field::new("c1", DataType::Utf8, true) + .with_metadata(meta(&[(PARQUET_FIELD_ID_META_KEY, "1")])), + Field::new("c2", DataType::Utf8, true) + .with_metadata(meta(&[(PARQUET_FIELD_ID_META_KEY, "2")])), ], HashMap::new(), ); - let writer = ArrowWriter::try_new( - vec![], - Arc::new(schema.clone()), - None, - )?; + let writer = ArrowWriter::try_new(vec![], Arc::new(schema.clone()), None)?; let parquet_bytes = writer.into_inner()?; - let reader = crate::file::reader::SerializedFileReader::new( - bytes::Bytes::from(parquet_bytes), - )?; + let reader = + crate::file::reader::SerializedFileReader::new(bytes::Bytes::from(parquet_bytes))?; let schema_descriptor = reader.metadata().file_metadata().schema_descr_ptr(); // don't pass metadata so field ids are read from Parquet and not from serialized Arrow schema - let arrow_schema = crate::arrow::parquet_to_arrow_schema( - &schema_descriptor, - None, - )?; + let arrow_schema = crate::arrow::parquet_to_arrow_schema(&schema_descriptor, None)?; let parq_schema_descr = crate::arrow::arrow_to_parquet_schema(&arrow_schema)?; let parq_fields = parq_schema_descr.root_schema().get_fields(); @@ -1885,19 +1823,14 @@ mod tests { #[test] fn test_arrow_schema_roundtrip_lists() -> Result<()> { - let metadata: HashMap = - [("Key".to_string(), "Value".to_string())] - .iter() - .cloned() - .collect(); + let metadata: HashMap = [("Key".to_string(), "Value".to_string())] + .iter() + .cloned() + .collect(); let schema = Schema::new_with_metadata( vec![ - Field::new_list( - "c21", - Field::new("array", DataType::Boolean, true), - false, - ), + Field::new_list("c21", Field::new("array", DataType::Boolean, true), false), Field::new( "c22", DataType::FixedSizeList( @@ -1928,11 +1861,8 @@ mod tests { // write to an empty parquet file so that schema is serialized let file = tempfile::tempfile().unwrap(); - let writer = ArrowWriter::try_new( - file.try_clone().unwrap(), - Arc::new(schema.clone()), - None, - )?; + let writer = + ArrowWriter::try_new(file.try_clone().unwrap(), Arc::new(schema.clone()), None)?; writer.close()?; // read file back