Skip to content

Commit

Permalink
Add FileReaderBuilder for arrow-ipc
Browse files Browse the repository at this point in the history
  • Loading branch information
Jefffrey committed Nov 28, 2023
1 parent 34a816d commit 89be024
Showing 1 changed file with 128 additions and 55 deletions.
183 changes: 128 additions & 55 deletions arrow-ipc/src/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
//! The `FileReader` and `StreamReader` have similar interfaces,
//! however the `FileReader` expects a reader that supports `Seek`ing
use flatbuffers::VectorIter;
use flatbuffers::{VectorIter, VerifierOptions};
use std::collections::HashMap;
use std::fmt;
use std::io::{BufReader, Read, Seek, SeekFrom};
Expand Down Expand Up @@ -498,61 +498,39 @@ pub fn read_dictionary(
Ok(())
}

/// Arrow File reader
pub struct FileReader<R: Read + Seek> {
/// Buffered file reader that supports reading and seeking
reader: BufReader<R>,

/// The schema that is read from the file header
schema: SchemaRef,

/// The blocks in the file
///
/// A block indicates the regions in the file to read to get data
blocks: Vec<crate::Block>,

/// A counter to keep track of the current block that should be read
current_block: usize,

/// The total number of blocks, which may contain record batches and other types
total_blocks: usize,
/// Build an Arrow [`FileReader`] with custom options.
#[derive(Debug, Default)]
pub struct FileReaderBuilder {
/// Optional projection for which columns to load (zero-based column indices)
projection: Option<Vec<usize>>,
/// Flatbuffers options for parsing footer
verifier_options: VerifierOptions,
}

/// Optional dictionaries for each schema field.
impl FileReaderBuilder {
/// Options for creating a new [`FileReader`].
///
/// Dictionaries may be appended to in the streaming format.
dictionaries_by_id: HashMap<i64, ArrayRef>,

/// Metadata version
metadata_version: crate::MetadataVersion,

/// User defined metadata
custom_metadata: HashMap<String, String>,
/// To convert a builder into a reader, call [`FileReaderBuilder::build`].
pub fn new() -> Self {
Self::default()
}

/// Optional projection and projected_schema
projection: Option<(Vec<usize>, Schema)>,
}
/// Optional projection for which columns to load (zero-based column indices).
pub fn with_projection(mut self, projection: Vec<usize>) -> Self {
self.projection = Some(projection);
self
}

impl<R: Read + Seek> fmt::Debug for FileReader<R> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> std::result::Result<(), fmt::Error> {
f.debug_struct("FileReader<R>")
.field("reader", &"BufReader<..>")
.field("schema", &self.schema)
.field("blocks", &self.blocks)
.field("current_block", &self.current_block)
.field("total_blocks", &self.total_blocks)
.field("dictionaries_by_id", &self.dictionaries_by_id)
.field("metadata_version", &self.metadata_version)
.field("projection", &self.projection)
.finish()
/// Flatbuffers options for parsing footer. Useful if needing to parse a file containing
/// millions of columns, in which case can up the value for `max_tables` to accommodate parsing
/// such a file.
pub fn with_verifier_options(mut self, verifier_options: VerifierOptions) -> Self {
self.verifier_options = verifier_options;
self
}
}

impl<R: Read + Seek> FileReader<R> {
/// Try to create a new file reader
///
/// Returns errors if the file does not meet the Arrow Format header and footer
/// requirements
pub fn try_new(reader: R, projection: Option<Vec<usize>>) -> Result<Self, ArrowError> {
/// Build [`FileReader`] with given reader.
pub fn build<R: Read + Seek>(self, reader: R) -> Result<FileReader<R>, ArrowError> {
let mut reader = BufReader::new(reader);
// check if header and footer contain correct magic bytes
let mut magic_buffer: [u8; 6] = [0; 6];
Expand Down Expand Up @@ -580,9 +558,10 @@ impl<R: Read + Seek> FileReader<R> {
reader.seek(SeekFrom::End(-10 - footer_len as i64))?;
reader.read_exact(&mut footer_data)?;

let footer = crate::root_as_footer(&footer_data[..]).map_err(|err| {
ArrowError::ParseError(format!("Unable to get root as footer: {err:?}"))
})?;
let footer = crate::root_as_footer_with_opts(&self.verifier_options, &footer_data[..])
.map_err(|err| {
ArrowError::ParseError(format!("Unable to get root as footer: {err:?}"))
})?;

let blocks = footer.recordBatches().ok_or_else(|| {
ArrowError::ParseError("Unable to get record batches from IPC Footer".to_string())
Expand Down Expand Up @@ -650,15 +629,15 @@ impl<R: Read + Seek> FileReader<R> {
}
}
}
let projection = match projection {
let projection = match self.projection {
Some(projection_indices) => {
let schema = schema.project(&projection_indices)?;
Some((projection_indices, schema))
}
_ => None,
};

Ok(Self {
Ok(FileReader {
reader,
schema: Arc::new(schema),
blocks: blocks.iter().copied().collect(),
Expand All @@ -670,6 +649,69 @@ impl<R: Read + Seek> FileReader<R> {
projection,
})
}
}

/// Arrow File reader
pub struct FileReader<R: Read + Seek> {
/// Buffered file reader that supports reading and seeking
reader: BufReader<R>,

/// The schema that is read from the file header
schema: SchemaRef,

/// The blocks in the file
///
/// A block indicates the regions in the file to read to get data
blocks: Vec<crate::Block>,

/// A counter to keep track of the current block that should be read
current_block: usize,

/// The total number of blocks, which may contain record batches and other types
total_blocks: usize,

/// Optional dictionaries for each schema field.
///
/// Dictionaries may be appended to in the streaming format.
dictionaries_by_id: HashMap<i64, ArrayRef>,

/// Metadata version
metadata_version: crate::MetadataVersion,

/// User defined metadata
custom_metadata: HashMap<String, String>,

/// Optional projection and projected_schema
projection: Option<(Vec<usize>, Schema)>,
}

impl<R: Read + Seek> fmt::Debug for FileReader<R> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> std::result::Result<(), fmt::Error> {
f.debug_struct("FileReader<R>")
.field("reader", &"BufReader<..>")
.field("schema", &self.schema)
.field("blocks", &self.blocks)
.field("current_block", &self.current_block)
.field("total_blocks", &self.total_blocks)
.field("dictionaries_by_id", &self.dictionaries_by_id)
.field("metadata_version", &self.metadata_version)
.field("projection", &self.projection)
.finish()
}
}

impl<R: Read + Seek> FileReader<R> {
/// Try to create a new file reader
///
/// Returns errors if the file does not meet the Arrow Format header and footer
/// requirements
pub fn try_new(reader: R, projection: Option<Vec<usize>>) -> Result<Self, ArrowError> {
let builder = FileReaderBuilder {
projection,
..Default::default()
};
builder.build(reader)
}

/// Return user defined customized metadata
pub fn custom_metadata(&self) -> &HashMap<String, String> {
Expand Down Expand Up @@ -1647,4 +1689,35 @@ mod tests {
.unwrap();
assert_eq!(batch, roundtrip);
}

#[test]
fn test_file_with_millions_of_columns() {
let limit = 1_500_000;

let fields = (0..limit)
.map(|i| Field::new(format!("{i}"), DataType::Boolean, false))
.collect::<Vec<_>>();
let schema = Arc::new(Schema::new(fields));
let arrays = (0..limit)
.map(|_| Arc::new(BooleanArray::from(vec![false])) as ArrayRef)
.collect();
let batch = RecordBatch::try_new(schema, arrays).unwrap();

let mut buf = Vec::new();
let mut writer = crate::writer::FileWriter::try_new(&mut buf, &batch.schema()).unwrap();
writer.write(&batch).unwrap();
writer.finish().unwrap();
drop(writer);

let mut reader = FileReaderBuilder::new()
.with_verifier_options(VerifierOptions {
max_tables: 3_500_000,
..Default::default()
})
.build(std::io::Cursor::new(buf))
.unwrap();
let roundtrip_batch = reader.next().unwrap().unwrap();

assert_eq!(batch, roundtrip_batch);
}
}

0 comments on commit 89be024

Please sign in to comment.