From cbb0c523afd57a960a7425b5585cea13e09b1ccd Mon Sep 17 00:00:00 2001 From: Sven Cattell Date: Tue, 5 Dec 2023 19:14:16 -0500 Subject: [PATCH] This uses a struct wrapping Buffer and implementing AsRef<[u8]> instead of a breaking trait. --- arrow-ipc/src/reader.rs | 179 +++++++++++++++++----------------------- 1 file changed, 78 insertions(+), 101 deletions(-) diff --git a/arrow-ipc/src/reader.rs b/arrow-ipc/src/reader.rs index 46ec171a1f61..c923047cbd6c 100644 --- a/arrow-ipc/src/reader.rs +++ b/arrow-ipc/src/reader.rs @@ -22,9 +22,9 @@ use flatbuffers::VectorIter; use std::collections::HashMap; -use std::io::{BufReader, Read, Seek, SeekFrom}; +use std::fmt; +use std::io::{BufReader, Cursor, Read, Seek, SeekFrom}; use std::sync::Arc; -use std::{fmt, io}; use arrow_array::*; use arrow_buffer::{Buffer, MutableBuffer}; @@ -498,67 +498,24 @@ pub fn read_dictionary( Ok(()) } -pub trait BufferRead { - fn read_buffer(&mut self, len: usize) -> Result; -} - -impl BufferRead for BufReader { - fn read_buffer(&mut self, len: usize) -> Result { - let mut buf = MutableBuffer::from_len_zeroed(len); - self.read_exact(&mut buf)?; - Ok(buf.into()) - } -} - +#[derive(Clone, Debug)] struct BufferReader { - buffer: Buffer, - position: u64, + inter: Buffer, } -impl Seek for BufferReader { - fn seek(&mut self, style: SeekFrom) -> io::Result { - let (base_pos, offset) = match style { - SeekFrom::Start(n) => { - self.position = n; - return Ok(n); - } - SeekFrom::End(n) => (self.buffer.as_ref().len() as u64, n), - SeekFrom::Current(n) => (self.position, n), - }; - match base_pos.checked_add_signed(offset) { - Some(n) => { - self.position = n; - Ok(self.position) - } - None => Err(io::Error::new( - std::io::ErrorKind::InvalidInput, - "invalid seek to a negative or overflowing position", - )), - } - } -} - -impl Read for BufferReader { - fn read(&mut self, buf: &mut [u8]) -> io::Result { - let mut remaining_slice = &self.buffer[self.position as usize..]; - let n = Read::read(&mut remaining_slice, buf)?; - self.position += n as u64; - Ok(n) - } -} - -impl BufferRead for BufferReader { - fn read_buffer(&mut self, len: usize) -> Result { - let buf = self.buffer.slice_with_length(self.position as usize, len); - self.position += len as u64; - Ok(buf) +impl AsRef<[u8]> for BufferReader { + fn as_ref(&self) -> &[u8] { + &self.inter } } /// Arrow File reader -pub struct FileReader { +pub struct FileReader { /// Buffered file reader that supports reading and seeking - reader: R, + reader: BufReader, + + /// Optional Buffer for when we want to map the underlying data without copying + buffer: Option, /// The schema that is read from the file header schema: SchemaRef, @@ -589,7 +546,7 @@ pub struct FileReader { projection: Option<(Vec, Schema)>, } -impl fmt::Debug for FileReader { +impl fmt::Debug for FileReader { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> std::result::Result<(), fmt::Error> { f.debug_struct("FileReader") .field("reader", &"BufReader<..>") @@ -605,52 +562,35 @@ impl fmt::Debug for FileReader { } } -impl FileReader> { +impl FileReader> { + pub fn try_new_map(buffer: Buffer, projection: Option>) -> Result { + let buffer = BufferReader { inter: buffer }; + let cursor = Cursor::new(buffer.clone()); + Self::try_new_inter(cursor, projection, Some(buffer)) + } +} + +impl FileReader { /// Try to create a new file reader /// /// Returns errors if the file does not meet the Arrow Format header and footer /// requirements pub fn try_new(reader: R, projection: Option>) -> Result { - Self::try_new_inter(BufReader::new(reader), projection) - } - - /// Gets a reference to the underlying reader. - /// - /// It is inadvisable to directly read from the underlying reader. - pub fn get_ref(&self) -> &R { - self.reader.get_ref() - } - - /// Gets a mutable reference to the underlying reader. - /// - /// It is inadvisable to directly read from the underlying reader. - pub fn get_mut(&mut self) -> &mut R { - self.reader.get_mut() - } -} - -impl FileReader { - pub fn try_new_map(buffer: Buffer, projection: Option>) -> Result { - let reader = BufferReader { - buffer, - position: 0, - }; - Self::try_new_inter(reader, projection) + Self::try_new_inter(reader, projection, None) } - pub fn get_buf(&self) -> &Buffer { - &self.reader.buffer - } -} - -impl FileReader { /// Try to create a new file reader /// /// Returns errors if the file does not meet the Arrow Format header and footer /// requirements /// /// If a buffer is provided, then the reader needs to be a io::Cursor wrapped around the buffer - fn try_new_inter(mut reader: R, projection: Option>) -> Result { + fn try_new_inter( + reader: R, + projection: Option>, + buffer: Option, + ) -> Result { + let mut reader = BufReader::new(reader); // check if header and footer contain correct magic bytes // check if header and footer contain correct magic bytes let mut magic_buffer: [u8; 6] = [0; 6]; @@ -723,12 +663,24 @@ impl FileReader { match message.header_type() { crate::MessageHeader::DictionaryBatch => { - // read the block that makes up the dictionary batch into a buffer let batch = message.header_as_dictionary_batch().unwrap(); - reader.seek(SeekFrom::Start( - block.offset() as u64 + block.metaDataLength() as u64, - ))?; - let buf = reader.read_buffer(message.bodyLength() as usize)?; + + let buf = if let Some(buffer) = &buffer { + // Don't need to check compression as dictionaries aren't compressed. + buffer.inter.slice_with_length( + block.offset() as usize + block.metaDataLength() as usize, + message.bodyLength() as usize, + ) + } else { + // read the block that makes up the dictionary batch into a buffer + let mut buf = + MutableBuffer::from_len_zeroed(message.bodyLength() as usize); + reader.seek(SeekFrom::Start( + block.offset() as u64 + block.metaDataLength() as u64, + ))?; + reader.read_exact(&mut buf)?; + buf.into() + }; read_dictionary( &buf, @@ -756,6 +708,7 @@ impl FileReader { Ok(Self { reader, + buffer, schema: Arc::new(schema), blocks: blocks.iter().copied().collect(), current_block: 0, @@ -835,11 +788,21 @@ impl FileReader { ArrowError::IpcError("Unable to read IPC message as record batch".to_string()) })?; - // read the block that makes up the dictionary batch into a buffer - self.reader.seek(SeekFrom::Start( - block.offset() as u64 + block.metaDataLength() as u64, - ))?; - let buf = self.reader.read_buffer(message.bodyLength() as usize)?; + let buf = if let Some(buffer) = &self.buffer { + // Don't need to check compression as if it is compressed it will be read into memory. + buffer.inter.slice_with_length( + block.offset() as usize + block.metaDataLength() as usize, + message.bodyLength() as usize, + ) + } else { + // read the block that makes up the batch into a buffer + let mut buf = MutableBuffer::from_len_zeroed(message.bodyLength() as usize); + self.reader.seek(SeekFrom::Start( + block.offset() as u64 + block.metaDataLength() as u64, + ))?; + self.reader.read_exact(&mut buf)?; + buf.into() + }; read_record_batch( &buf, @@ -857,9 +820,23 @@ impl FileReader { ))), } } + + /// Gets a reference to the underlying reader. + /// + /// It is inadvisable to directly read from the underlying reader. + pub fn get_ref(&self) -> &R { + self.reader.get_ref() + } + + /// Gets a mutable reference to the underlying reader. + /// + /// It is inadvisable to directly read from the underlying reader. + pub fn get_mut(&mut self) -> &mut R { + self.reader.get_mut() + } } -impl Iterator for FileReader { +impl Iterator for FileReader { type Item = Result; fn next(&mut self) -> Option { @@ -872,7 +849,7 @@ impl Iterator for FileReader { } } -impl RecordBatchReader for FileReader { +impl RecordBatchReader for FileReader { fn schema(&self) -> SchemaRef { self.schema.clone() }