From 89be0248084414263a9fd461d2c945cd778cd13b Mon Sep 17 00:00:00 2001 From: Jefffrey <22608443+Jefffrey@users.noreply.github.com> Date: Tue, 28 Nov 2023 23:42:23 +1100 Subject: [PATCH] Add FileReaderBuilder for arrow-ipc --- arrow-ipc/src/reader.rs | 183 ++++++++++++++++++++++++++++------------ 1 file changed, 128 insertions(+), 55 deletions(-) diff --git a/arrow-ipc/src/reader.rs b/arrow-ipc/src/reader.rs index 6f2cb30a1629..211247e11390 100644 --- a/arrow-ipc/src/reader.rs +++ b/arrow-ipc/src/reader.rs @@ -20,7 +20,7 @@ //! The `FileReader` and `StreamReader` have similar interfaces, //! however the `FileReader` expects a reader that supports `Seek`ing -use flatbuffers::VectorIter; +use flatbuffers::{VectorIter, VerifierOptions}; use std::collections::HashMap; use std::fmt; use std::io::{BufReader, Read, Seek, SeekFrom}; @@ -498,61 +498,39 @@ pub fn read_dictionary( Ok(()) } -/// Arrow File reader -pub struct FileReader { - /// Buffered file reader that supports reading and seeking - reader: BufReader, - - /// The schema that is read from the file header - schema: SchemaRef, - - /// The blocks in the file - /// - /// A block indicates the regions in the file to read to get data - blocks: Vec, - - /// A counter to keep track of the current block that should be read - current_block: usize, - - /// The total number of blocks, which may contain record batches and other types - total_blocks: usize, +/// Build an Arrow [`FileReader`] with custom options. +#[derive(Debug, Default)] +pub struct FileReaderBuilder { + /// Optional projection for which columns to load (zero-based column indices) + projection: Option>, + /// Flatbuffers options for parsing footer + verifier_options: VerifierOptions, +} - /// Optional dictionaries for each schema field. +impl FileReaderBuilder { + /// Options for creating a new [`FileReader`]. /// - /// Dictionaries may be appended to in the streaming format. - dictionaries_by_id: HashMap, - - /// Metadata version - metadata_version: crate::MetadataVersion, - - /// User defined metadata - custom_metadata: HashMap, + /// To convert a builder into a reader, call [`FileReaderBuilder::build`]. + pub fn new() -> Self { + Self::default() + } - /// Optional projection and projected_schema - projection: Option<(Vec, Schema)>, -} + /// Optional projection for which columns to load (zero-based column indices). + pub fn with_projection(mut self, projection: Vec) -> Self { + self.projection = Some(projection); + self + } -impl fmt::Debug for FileReader { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> std::result::Result<(), fmt::Error> { - f.debug_struct("FileReader") - .field("reader", &"BufReader<..>") - .field("schema", &self.schema) - .field("blocks", &self.blocks) - .field("current_block", &self.current_block) - .field("total_blocks", &self.total_blocks) - .field("dictionaries_by_id", &self.dictionaries_by_id) - .field("metadata_version", &self.metadata_version) - .field("projection", &self.projection) - .finish() + /// Flatbuffers options for parsing footer. Useful if needing to parse a file containing + /// millions of columns, in which case can up the value for `max_tables` to accommodate parsing + /// such a file. + pub fn with_verifier_options(mut self, verifier_options: VerifierOptions) -> Self { + self.verifier_options = verifier_options; + self } -} -impl FileReader { - /// Try to create a new file reader - /// - /// Returns errors if the file does not meet the Arrow Format header and footer - /// requirements - pub fn try_new(reader: R, projection: Option>) -> Result { + /// Build [`FileReader`] with given reader. + pub fn build(self, reader: R) -> Result, ArrowError> { let mut reader = BufReader::new(reader); // check if header and footer contain correct magic bytes let mut magic_buffer: [u8; 6] = [0; 6]; @@ -580,9 +558,10 @@ impl FileReader { reader.seek(SeekFrom::End(-10 - footer_len as i64))?; reader.read_exact(&mut footer_data)?; - let footer = crate::root_as_footer(&footer_data[..]).map_err(|err| { - ArrowError::ParseError(format!("Unable to get root as footer: {err:?}")) - })?; + let footer = crate::root_as_footer_with_opts(&self.verifier_options, &footer_data[..]) + .map_err(|err| { + ArrowError::ParseError(format!("Unable to get root as footer: {err:?}")) + })?; let blocks = footer.recordBatches().ok_or_else(|| { ArrowError::ParseError("Unable to get record batches from IPC Footer".to_string()) @@ -650,7 +629,7 @@ impl FileReader { } } } - let projection = match projection { + let projection = match self.projection { Some(projection_indices) => { let schema = schema.project(&projection_indices)?; Some((projection_indices, schema)) @@ -658,7 +637,7 @@ impl FileReader { _ => None, }; - Ok(Self { + Ok(FileReader { reader, schema: Arc::new(schema), blocks: blocks.iter().copied().collect(), @@ -670,6 +649,69 @@ impl FileReader { projection, }) } +} + +/// Arrow File reader +pub struct FileReader { + /// Buffered file reader that supports reading and seeking + reader: BufReader, + + /// The schema that is read from the file header + schema: SchemaRef, + + /// The blocks in the file + /// + /// A block indicates the regions in the file to read to get data + blocks: Vec, + + /// A counter to keep track of the current block that should be read + current_block: usize, + + /// The total number of blocks, which may contain record batches and other types + total_blocks: usize, + + /// Optional dictionaries for each schema field. + /// + /// Dictionaries may be appended to in the streaming format. + dictionaries_by_id: HashMap, + + /// Metadata version + metadata_version: crate::MetadataVersion, + + /// User defined metadata + custom_metadata: HashMap, + + /// Optional projection and projected_schema + projection: Option<(Vec, Schema)>, +} + +impl fmt::Debug for FileReader { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> std::result::Result<(), fmt::Error> { + f.debug_struct("FileReader") + .field("reader", &"BufReader<..>") + .field("schema", &self.schema) + .field("blocks", &self.blocks) + .field("current_block", &self.current_block) + .field("total_blocks", &self.total_blocks) + .field("dictionaries_by_id", &self.dictionaries_by_id) + .field("metadata_version", &self.metadata_version) + .field("projection", &self.projection) + .finish() + } +} + +impl FileReader { + /// Try to create a new file reader + /// + /// Returns errors if the file does not meet the Arrow Format header and footer + /// requirements + pub fn try_new(reader: R, projection: Option>) -> Result { + let builder = FileReaderBuilder { + projection, + ..Default::default() + }; + builder.build(reader) + } /// Return user defined customized metadata pub fn custom_metadata(&self) -> &HashMap { @@ -1647,4 +1689,35 @@ mod tests { .unwrap(); assert_eq!(batch, roundtrip); } + + #[test] + fn test_file_with_millions_of_columns() { + let limit = 1_500_000; + + let fields = (0..limit) + .map(|i| Field::new(format!("{i}"), DataType::Boolean, false)) + .collect::>(); + let schema = Arc::new(Schema::new(fields)); + let arrays = (0..limit) + .map(|_| Arc::new(BooleanArray::from(vec![false])) as ArrayRef) + .collect(); + let batch = RecordBatch::try_new(schema, arrays).unwrap(); + + let mut buf = Vec::new(); + let mut writer = crate::writer::FileWriter::try_new(&mut buf, &batch.schema()).unwrap(); + writer.write(&batch).unwrap(); + writer.finish().unwrap(); + drop(writer); + + let mut reader = FileReaderBuilder::new() + .with_verifier_options(VerifierOptions { + max_tables: 3_500_000, + ..Default::default() + }) + .build(std::io::Cursor::new(buf)) + .unwrap(); + let roundtrip_batch = reader.next().unwrap().unwrap(); + + assert_eq!(batch, roundtrip_batch); + } }