diff --git a/benches/arrow_reader.rs b/benches/arrow_reader.rs index d0f935ff..fc517d61 100644 --- a/benches/arrow_reader.rs +++ b/benches/arrow_reader.rs @@ -18,7 +18,7 @@ use std::fs::File; use criterion::{criterion_group, criterion_main, Criterion}; -use datafusion_orc::{ArrowReader, ArrowStreamReader, Cursor, Reader}; +use datafusion_orc::{ArrowReader, ArrowStreamReader, Cursor}; use futures_util::TryStreamExt; fn basic_path(path: &str) -> String { @@ -44,8 +44,7 @@ async fn async_read_all() { let file_path = basic_path(file); let f = tokio::fs::File::open(file_path).await.unwrap(); - let reader = Reader::new_async(f).await.unwrap(); - let cursor = Cursor::root(reader).unwrap(); + let cursor = Cursor::root_async(f).await.unwrap(); ArrowStreamReader::new(cursor, None) .try_collect::>() @@ -58,8 +57,7 @@ fn sync_read_all() { let file_path = basic_path(file); let f = File::open(file_path).unwrap(); - let reader = Reader::new(f).unwrap(); - let cursor = Cursor::root(reader).unwrap(); + let cursor = Cursor::root(f).unwrap(); ArrowReader::new(cursor, None) .collect::, _>>() diff --git a/src/arrow_reader.rs b/src/arrow_reader.rs index 34388221..94eb1b69 100644 --- a/src/arrow_reader.rs +++ b/src/arrow_reader.rs @@ -1,7 +1,7 @@ pub mod column; use std::collections::HashMap; -use std::io::{Read, Seek}; +use std::io::Read; use std::sync::Arc; use arrow::array::{ @@ -20,6 +20,7 @@ use arrow::datatypes::{Field, TimeUnit}; use arrow::error::ArrowError; use arrow::record_batch::{RecordBatch, RecordBatchReader}; use bytes::Bytes; +use prost::Message; use snafu::{OptionExt, ResultExt}; use self::column::list::{new_list_iter, ListDecoder}; @@ -36,15 +37,16 @@ use crate::arrow_reader::column::struct_column::StructDecoder; use crate::arrow_reader::column::timestamp::new_timestamp_iter; use crate::arrow_reader::column::NullableIterator; use crate::builder::BoxedArrayBuilder; -use crate::error::{self, InvalidColumnSnafu, Result}; +use crate::error::{self, InvalidColumnSnafu, IoSnafu, Result}; use crate::proto::stream::Kind; use crate::proto::StripeFooter; use crate::reader::decompress::{Compression, Decompressor}; -use crate::reader::Reader; +use crate::reader::metadata::{read_metadata, read_metadata_async, FileMetadata}; +use crate::reader::{AsyncChunkReader, ChunkReader}; use crate::schema::{DataType, RootDataType}; use crate::stripe::StripeMetadata; -pub struct ArrowReader { +pub struct ArrowReader { cursor: Cursor, schema_ref: SchemaRef, current_stripe: Option>>>, @@ -53,7 +55,7 @@ pub struct ArrowReader { pub const DEFAULT_BATCH_SIZE: usize = 8192; -impl ArrowReader { +impl ArrowReader { pub fn new(cursor: Cursor, batch_size: Option) -> Self { let batch_size = batch_size.unwrap_or(DEFAULT_BATCH_SIZE); let schema = Arc::new(create_arrow_schema(&cursor)); @@ -66,11 +68,11 @@ impl ArrowReader { } pub fn total_row_count(&self) -> u64 { - self.cursor.reader.metadata().number_of_rows() + self.cursor.file_metadata.number_of_rows() } } -impl ArrowReader { +impl ArrowReader { fn try_advance_stripe(&mut self) -> Option> { match self .cursor @@ -96,22 +98,21 @@ impl ArrowReader { pub fn create_arrow_schema(cursor: &Cursor) -> Schema { let metadata = cursor - .reader - .metadata() + .file_metadata .user_custom_metadata() .iter() .map(|(key, value)| (key.clone(), String::from_utf8_lossy(value).to_string())) .collect::>(); - cursor.root_data_type.create_arrow_schema(&metadata) + cursor.projected_data_type.create_arrow_schema(&metadata) } -impl RecordBatchReader for ArrowReader { +impl RecordBatchReader for ArrowReader { fn schema(&self) -> SchemaRef { self.schema_ref.clone() } } -impl Iterator for ArrowReader { +impl Iterator for ArrowReader { type Item = std::result::Result; fn next(&mut self) -> Option { @@ -806,45 +807,78 @@ impl NaiveStripeDecoder { } pub struct Cursor { - pub(crate) reader: Reader, - pub(crate) root_data_type: RootDataType, + pub(crate) reader: R, + pub(crate) file_metadata: Arc, + pub(crate) projected_data_type: RootDataType, pub(crate) stripe_offset: usize, } -impl Cursor { - pub fn new>(r: Reader, fields: &[T]) -> Result { - let projected_data_type = r.metadata().root_data_type().project(fields); +impl Cursor { + pub fn new>(mut reader: R, fields: &[T]) -> Result { + let file_metadata = Arc::new(read_metadata(&mut reader)?); + let projected_data_type = file_metadata.root_data_type().project(fields); Ok(Self { - reader: r, - root_data_type: projected_data_type, + reader, + file_metadata, + projected_data_type, stripe_offset: 0, }) } - pub fn root(r: Reader) -> Result { - let data_type = r.metadata().root_data_type().clone(); + pub fn root(mut reader: R) -> Result { + let file_metadata = Arc::new(read_metadata(&mut reader)?); + let data_type = file_metadata.root_data_type().clone(); Ok(Self { - reader: r, - root_data_type: data_type, + reader, + file_metadata, + projected_data_type: data_type, stripe_offset: 0, }) } } -impl Iterator for Cursor { +impl Cursor { + pub async fn new_async>(mut reader: R, fields: &[T]) -> Result { + let file_metadata = Arc::new(read_metadata_async(&mut reader).await?); + let projected_data_type = file_metadata.root_data_type().project(fields); + Ok(Self { + reader, + file_metadata, + projected_data_type, + stripe_offset: 0, + }) + } + + pub async fn root_async(mut reader: R) -> Result { + let file_metadata = Arc::new(read_metadata_async(&mut reader).await?); + let data_type = file_metadata.root_data_type().clone(); + Ok(Self { + reader, + file_metadata, + projected_data_type: data_type, + stripe_offset: 0, + }) + } +} + +impl Iterator for Cursor { type Item = Result; fn next(&mut self) -> Option { - if let Some(info) = self.reader.stripe(self.stripe_offset).cloned() { + if let Some(info) = self + .file_metadata + .stripe_metadatas() + .get(self.stripe_offset) + .cloned() + { let stripe = Stripe::new( &mut self.reader, - self.root_data_type.clone(), + &self.file_metadata, + &self.projected_data_type.clone(), self.stripe_offset, &info, ); - self.stripe_offset += 1; - Some(stripe) } else { None @@ -861,41 +895,23 @@ pub struct Stripe { pub(crate) stream_map: Arc, } -#[derive(Debug)] -pub struct StreamMap { - pub inner: HashMap<(u32, Kind), Bytes>, - pub compression: Option, -} - -impl StreamMap { - pub fn get(&self, column: &Column, kind: Kind) -> Result { - self.get_opt(column, kind).context(InvalidColumnSnafu { - name: column.name(), - }) - } - - pub fn get_opt(&self, column: &Column, kind: Kind) -> Option { - let column_id = column.column_id(); - - self.inner - .get(&(column_id, kind)) - .cloned() - .map(|data| Decompressor::new(data, self.compression, vec![])) - } -} - impl Stripe { - pub fn new( - r: &mut Reader, - root_data_type: RootDataType, + pub fn new( + reader: &mut R, + file_metadata: &Arc, + projected_data_type: &RootDataType, stripe: usize, info: &StripeMetadata, ) -> Result { - let footer = Arc::new(r.stripe_footer(stripe).clone()); + let compression = file_metadata.compression(); + + let footer = reader + .get_bytes(info.footer_offset(), info.footer_length()) + .context(IoSnafu)?; + let footer = Arc::new(deserialize_stripe_footer(&footer, compression)?); - let compression = r.metadata().compression(); //TODO(weny): add tz - let columns = root_data_type + let columns = projected_data_type .children() .iter() .map(|(name, data_type)| Column::new(name, data_type, &footer, info.number_of_rows())) @@ -907,7 +923,7 @@ impl Stripe { let length = stream.length(); let column_id = stream.column(); let kind = stream.kind(); - let data = Column::read_stream(r, stream_offset, length as usize)?; + let data = Column::read_stream(reader, stream_offset, length)?; // TODO(weny): filter out unused streams. stream_map.insert((column_id, kind), data); @@ -934,3 +950,38 @@ impl Stripe { self.stripe_offset } } + +#[derive(Debug)] +pub struct StreamMap { + pub inner: HashMap<(u32, Kind), Bytes>, + pub compression: Option, +} + +impl StreamMap { + pub fn get(&self, column: &Column, kind: Kind) -> Result { + self.get_opt(column, kind).context(InvalidColumnSnafu { + name: column.name(), + }) + } + + pub fn get_opt(&self, column: &Column, kind: Kind) -> Option { + let column_id = column.column_id(); + + self.inner + .get(&(column_id, kind)) + .cloned() + .map(|data| Decompressor::new(data, self.compression, vec![])) + } +} + +pub(crate) fn deserialize_stripe_footer( + bytes: &[u8], + compression: Option, +) -> Result { + let mut buffer = vec![]; + // TODO: refactor to not need Bytes::copy_from_slice + Decompressor::new(Bytes::copy_from_slice(bytes), compression, vec![]) + .read_to_end(&mut buffer) + .context(error::IoSnafu)?; + StripeFooter::decode(buffer.as_slice()).context(error::DecodeProtoSnafu) +} diff --git a/src/arrow_reader/column.rs b/src/arrow_reader/column.rs index 89bed071..f08e4525 100644 --- a/src/arrow_reader/column.rs +++ b/src/arrow_reader/column.rs @@ -1,14 +1,12 @@ -use std::io::{Read, Seek, SeekFrom}; use std::sync::Arc; use arrow::datatypes::Field; use bytes::Bytes; use snafu::ResultExt; -use tokio::io::{AsyncRead, AsyncReadExt, AsyncSeek, AsyncSeekExt}; -use crate::error::{self, Result}; +use crate::error::{IoSnafu, Result}; use crate::proto::{ColumnEncoding, StripeFooter}; -use crate::reader::Reader; +use crate::reader::{AsyncChunkReader, ChunkReader}; use crate::schema::DataType; pub mod binary; @@ -45,24 +43,6 @@ impl From<&Column> for Field { } } -macro_rules! impl_read_stream { - ($reader:ident,$start:ident,$length:ident $($_await:tt)*) => {{ - $reader - .inner - .seek(SeekFrom::Start($start))$($_await)* - .context(error::IoSnafu)?; - - let mut scratch = vec![0; $length]; - - $reader - .inner - .read_exact(&mut scratch)$($_await)* - .context(error::IoSnafu)?; - - Ok(Bytes::from(scratch)) - }}; -} - impl Column { pub fn new( name: &str, @@ -172,20 +152,16 @@ impl Column { } } - pub fn read_stream( - reader: &mut Reader, - start: u64, - length: usize, - ) -> Result { - impl_read_stream!(reader, start, length) + pub fn read_stream(reader: &mut R, start: u64, length: u64) -> Result { + reader.get_bytes(start, length).context(IoSnafu) } - pub async fn read_stream_async( - reader: &mut Reader, + pub async fn read_stream_async( + reader: &mut R, start: u64, - length: usize, + length: u64, ) -> Result { - impl_read_stream!(reader, start, length.await) + reader.get_bytes(start, length).await.context(IoSnafu) } } diff --git a/src/async_arrow_reader.rs b/src/async_arrow_reader.rs index 37450695..6d254d63 100644 --- a/src/async_arrow_reader.rs +++ b/src/async_arrow_reader.rs @@ -10,14 +10,16 @@ use arrow::record_batch::RecordBatch; use futures::future::BoxFuture; use futures::{ready, Stream}; use futures_util::FutureExt; -use tokio::io::{AsyncRead, AsyncSeek}; +use snafu::ResultExt; use crate::arrow_reader::column::Column; use crate::arrow_reader::{ - create_arrow_schema, Cursor, NaiveStripeDecoder, StreamMap, Stripe, DEFAULT_BATCH_SIZE, + create_arrow_schema, deserialize_stripe_footer, Cursor, NaiveStripeDecoder, StreamMap, Stripe, + DEFAULT_BATCH_SIZE, }; -use crate::error::Result; -use crate::reader::Reader; +use crate::error::{IoSnafu, Result}; +use crate::reader::metadata::FileMetadata; +use crate::reader::AsyncChunkReader; use crate::schema::RootDataType; use crate::stripe::StripeMetadata; @@ -59,26 +61,37 @@ pub struct StripeFactory { is_end: bool, } -pub struct ArrowStreamReader { - factory: Option>, +pub struct ArrowStreamReader { + factory: Option>>, batch_size: usize, schema_ref: SchemaRef, state: StreamState, } -impl StripeFactory { +impl StripeFactory { pub async fn read_next_stripe_inner(&mut self, info: &StripeMetadata) -> Result { let inner = &mut self.inner; - let root_data_type = inner.root_data_type.clone(); let stripe_offset = inner.stripe_offset; inner.stripe_offset += 1; - Stripe::new_async(&mut inner.reader, root_data_type, stripe_offset, info).await + Stripe::new_async( + &mut inner.reader, + &inner.file_metadata, + &inner.projected_data_type, + stripe_offset, + info, + ) + .await } pub async fn read_next_stripe(mut self) -> Result<(Self, Option)> { - let info = self.inner.reader.stripe(self.inner.stripe_offset).cloned(); + let info = self + .inner + .file_metadata + .stripe_metadatas() + .get(self.inner.stripe_offset) + .cloned(); if let Some(info) = info { match self.read_next_stripe_inner(&info).await { @@ -96,12 +109,12 @@ impl StripeFactory { } } -impl ArrowStreamReader { +impl ArrowStreamReader { pub fn new(c: Cursor, batch_size: Option) -> Self { let batch_size = batch_size.unwrap_or(DEFAULT_BATCH_SIZE); let schema = Arc::new(create_arrow_schema(&c)); Self { - factory: Some(c.into()), + factory: Some(Box::new(c.into())), batch_size, schema_ref: schema, state: StreamState::Init, @@ -140,7 +153,7 @@ impl ArrowStreamReader { } StreamState::Reading(f) => match ready!(f.poll_unpin(cx)) { Ok((factory, Some(stripe))) => { - self.factory = Some(factory); + self.factory = Some(Box::new(factory)); match NaiveStripeDecoder::new( stripe, self.schema_ref.clone(), @@ -156,7 +169,7 @@ impl ArrowStreamReader { } } Ok((factory, None)) => { - self.factory = Some(factory); + self.factory = Some(Box::new(factory)); // All rows skipped, read next row group self.state = StreamState::Init; } @@ -171,7 +184,7 @@ impl ArrowStreamReader { } } -impl Stream for ArrowStreamReader { +impl Stream for ArrowStreamReader { type Item = std::result::Result; fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { @@ -182,17 +195,23 @@ impl Stream for ArrowStreamRe impl Stripe { // TODO: reduce duplication with sync version in arrow_reader.rs - pub async fn new_async( - r: &mut Reader, - root_data_type: RootDataType, + pub async fn new_async( + reader: &mut R, + file_metadata: &Arc, + projected_data_type: &RootDataType, stripe: usize, info: &StripeMetadata, ) -> Result { - let footer = Arc::new(r.stripe_footer(stripe).clone()); + let compression = file_metadata.compression(); + + let footer = reader + .get_bytes(info.footer_offset(), info.footer_length()) + .await + .context(IoSnafu)?; + let footer = Arc::new(deserialize_stripe_footer(&footer, compression)?); - let compression = r.metadata().compression(); //TODO(weny): add tz - let columns = root_data_type + let columns = projected_data_type .children() .iter() .map(|(name, data_type)| Column::new(name, data_type, &footer, info.number_of_rows())) @@ -204,7 +223,7 @@ impl Stripe { let length = stream.length(); let column_id = stream.column(); let kind = stream.kind(); - let data = Column::read_stream_async(r, stream_offset, length as usize).await?; + let data = Column::read_stream_async(reader, stream_offset, length).await?; // TODO(weny): filter out unused streams. stream_map.insert((column_id, kind), data); diff --git a/src/error.rs b/src/error.rs index 6332e93b..45b91407 100644 --- a/src/error.rs +++ b/src/error.rs @@ -23,10 +23,13 @@ pub enum Error { location: Location, }, + #[snafu(display("Empty file"))] + EmptyFile { location: Location }, + #[snafu(display("Invalid input, message: {}", msg))] InvalidInput { msg: String, location: Location }, - #[snafu(display("Out of sepc, message: {}", msg))] + #[snafu(display("Out of spec, message: {}", msg))] OutOfSpec { msg: String, location: Location }, #[snafu(display("Error from map builder: {}", source))] diff --git a/src/lib.rs b/src/lib.rs index 915ae156..07f2134f 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -10,4 +10,3 @@ pub mod stripe; pub use arrow_reader::{ArrowReader, Cursor}; pub use async_arrow_reader::ArrowStreamReader; -pub use reader::Reader; diff --git a/src/reader.rs b/src/reader.rs index 23722964..e8986f58 100644 --- a/src/reader.rs +++ b/src/reader.rs @@ -5,55 +5,13 @@ pub mod metadata; use std::fs::File; use std::io::{BufReader, Read, Seek, SeekFrom}; -use tokio::io::{AsyncRead, AsyncSeek}; - -use self::metadata::{read_metadata, FileMetadata}; -use crate::error::Result; -use crate::proto::StripeFooter; -use crate::reader::metadata::read_metadata_async; -use crate::schema::RootDataType; -use crate::stripe::StripeMetadata; - -pub struct Reader { - pub(crate) inner: R, - metadata: Box, -} - -impl Reader { - pub fn new(mut r: R) -> Result { - let metadata = Box::new(read_metadata(&mut r)?); - Ok(Self { inner: r, metadata }) - } -} - -impl Reader { - pub fn metadata(&self) -> &FileMetadata { - &self.metadata - } - - pub fn schema(&self) -> &RootDataType { - self.metadata.root_data_type() - } - - pub fn stripe(&self, index: usize) -> Option<&StripeMetadata> { - self.metadata.stripe_metadatas().get(index) - } - - pub fn stripe_footer(&mut self, stripe: usize) -> &StripeFooter { - &self.metadata.stripe_footers()[stripe] - } -} - -impl Reader { - pub async fn new_async(mut r: R) -> Result { - let metadata = Box::new(read_metadata_async(&mut r).await?); - Ok(Self { inner: r, metadata }) - } -} +use bytes::Bytes; +use futures_util::future::BoxFuture; +use futures_util::FutureExt; +use tokio::io::{AsyncRead, AsyncReadExt, AsyncSeek, AsyncSeekExt}; /// Primary source used for reading required bytes for operations. #[allow(clippy::len_without_is_empty)] -// TODO: async version pub trait ChunkReader { type T: Read; @@ -65,18 +23,19 @@ pub trait ChunkReader { fn get_read(&self, offset_from_start: u64) -> std::io::Result; /// Read bytes from an offset with specific length. - fn get_bytes(&self, offset_from_start: u64, length: u64) -> std::io::Result> { + fn get_bytes(&self, offset_from_start: u64, length: u64) -> std::io::Result { let mut bytes = vec![0; length as usize]; self.get_read(offset_from_start)? .take(length) .read_exact(&mut bytes)?; - Ok(bytes) + Ok(bytes.into()) } } impl ChunkReader for File { type T = BufReader; + // TODO: this is only used for file tail, so replace with load_metadata? fn len(&self) -> u64 { self.metadata().map(|m| m.len()).unwrap_or(0u64) } @@ -91,3 +50,49 @@ impl ChunkReader for File { Ok(BufReader::new(self.try_clone()?)) } } + +#[allow(clippy::len_without_is_empty)] +pub trait AsyncChunkReader: Send { + // TODO: this is only used for file tail, so replace with load_metadata? + fn len(&mut self) -> BoxFuture<'_, std::io::Result>; + + fn get_bytes( + &mut self, + offset_from_start: u64, + length: u64, + ) -> BoxFuture<'_, std::io::Result>; +} + +impl AsyncChunkReader for T { + fn len(&mut self) -> BoxFuture<'_, std::io::Result> { + async move { self.seek(SeekFrom::End(0)).await }.boxed() + } + + fn get_bytes( + &mut self, + offset_from_start: u64, + length: u64, + ) -> BoxFuture<'_, std::io::Result> { + async move { + self.seek(SeekFrom::Start(offset_from_start)).await?; + let mut buffer = vec![0; length as usize]; + self.read_exact(&mut buffer).await?; + Ok(buffer.into()) + } + .boxed() + } +} + +impl AsyncChunkReader for Box { + fn len(&mut self) -> BoxFuture<'_, std::io::Result> { + self.as_mut().len() + } + + fn get_bytes( + &mut self, + offset_from_start: u64, + length: u64, + ) -> BoxFuture<'_, std::io::Result> { + self.as_mut().get_bytes(offset_from_start, length) + } +} diff --git a/src/reader/metadata.rs b/src/reader/metadata.rs index f07ad543..88881e22 100644 --- a/src/reader/metadata.rs +++ b/src/reader/metadata.rs @@ -23,22 +23,21 @@ //! compressed lengths. use std::collections::HashMap; -use std::io::{Read, SeekFrom}; +use std::io::Read; -use bytes::Bytes; +use bytes::{Bytes, BytesMut}; use prost::Message; use snafu::{OptionExt, ResultExt}; -use tokio::io::{AsyncRead, AsyncReadExt, AsyncSeek, AsyncSeekExt}; -use crate::error::{self, Result}; -use crate::proto::{self, Footer, Metadata, PostScript, StripeFooter}; +use crate::error::{self, EmptyFileSnafu, OutOfSpecSnafu, Result}; +use crate::proto::{self, Footer, Metadata, PostScript}; use crate::reader::decompress::Decompressor; use crate::schema::RootDataType; use crate::statistics::ColumnStatistics; use crate::stripe::StripeMetadata; use super::decompress::Compression; -use super::ChunkReader; +use super::{AsyncChunkReader, ChunkReader}; const DEFAULT_FOOTER_SIZE: u64 = 16 * 1024; @@ -52,9 +51,6 @@ pub struct FileMetadata { column_statistics: Vec, stripes: Vec, user_custom_metadata: HashMap>, - // TODO: for now keeping this, but ideally won't want all stripe footers here - // since don't want to require parsing all stripe footers in file unless actually required - stripe_footers: Vec, } impl FileMetadata { @@ -62,7 +58,6 @@ impl FileMetadata { postscript: &proto::PostScript, footer: &proto::Footer, metadata: &proto::Metadata, - stripe_footers: Vec, ) -> Result { let compression = Compression::from_proto(postscript.compression(), postscript.compression_block_size); @@ -92,7 +87,6 @@ impl FileMetadata { column_statistics, stripes, user_custom_metadata, - stripe_footers, }) } @@ -116,21 +110,16 @@ impl FileMetadata { &self.stripes } - pub fn stripe_footers(&self) -> &[StripeFooter] { - &self.stripe_footers - } - pub fn user_custom_metadata(&self) -> &HashMap> { &self.user_custom_metadata } } -pub fn read_metadata(reader: &mut R) -> Result -where - R: ChunkReader, -{ +pub fn read_metadata(reader: &mut R) -> Result { let file_len = reader.len(); - // TODO: return error if empty + if file_len == 0 { + return EmptyFileSnafu.fail(); + } // Initial read of the file tail // Use a default size for first read in hopes of capturing all sections with one read @@ -145,7 +134,12 @@ where let postscript_len = tail_bytes[tail_bytes.len() - 1] as u64; tail_bytes.truncate(tail_bytes.len() - 1); - // TODO: slice here could panic if file too small + if tail_bytes.len() < postscript_len as usize { + return OutOfSpecSnafu { + msg: "File too small for given postscript length", + } + .fail(); + } let postscript = PostScript::decode(&tail_bytes[tail_bytes.len() - postscript_len as usize..]) .context(error::DecodeProtoSnafu)?; let compression = @@ -165,11 +159,13 @@ where // -1 is the postscript length byte let offset = file_len - 1 - postscript_len - footer_length - metadata_length; let bytes_to_read = (footer_length + metadata_length) - tail_bytes.len() as u64; - let mut prepend_bytes = reader + let prepend_bytes = reader .get_bytes(offset, bytes_to_read) .context(error::IoSnafu)?; - prepend_bytes.extend(tail_bytes); - prepend_bytes + let mut all_bytes = BytesMut::with_capacity(prepend_bytes.len() + tail_bytes.len()); + all_bytes.extend_from_slice(&prepend_bytes); + all_bytes.extend_from_slice(&tail_bytes); + all_bytes.into() } else { tail_bytes }; @@ -185,136 +181,78 @@ where compression, )?; - let mut stripe_footers = Vec::with_capacity(footer.stripes.len()); - - // clippy read_zero_byte_vec lint causing issues so init to non-zero length - let mut scratch = vec![0]; - for stripe in &footer.stripes { - let offset = stripe.offset() + stripe.index_length() + stripe.data_length(); - let len = stripe.footer_length(); - - let mut read = reader.get_read(offset).context(error::IoSnafu)?; - scratch.resize(len as usize, 0); - read.read_exact(&mut scratch).context(error::IoSnafu)?; - stripe_footers.push(deserialize_stripe_footer(&scratch, compression)?); - } - - FileMetadata::from_proto(&postscript, &footer, &metadata, stripe_footers) + FileMetadata::from_proto(&postscript, &footer, &metadata) } -// TODO: refactor like for sync -pub async fn read_metadata_async(reader: &mut R) -> Result -where - R: AsyncRead + AsyncSeek + Unpin + Send, -{ - let file_len = { - let old_pos = reader.stream_position().await.context(error::SeekSnafu)?; - let len = reader - .seek(SeekFrom::End(0)) - .await - .context(error::SeekSnafu)?; - - // Avoid seeking a third time when we were already at the end of the - // stream. The branch is usually way cheaper than a seek operation. - if old_pos != len { - reader - .seek(SeekFrom::Start(old_pos)) - .await - .context(error::SeekSnafu)?; - } - len - }; - - // initial read of the footer - let assume_footer_len = if file_len < DEFAULT_FOOTER_SIZE { - file_len - } else { - DEFAULT_FOOTER_SIZE - }; +pub async fn read_metadata_async(reader: &mut R) -> Result { + let file_len = reader.len().await.context(error::IoSnafu)?; + if file_len == 0 { + return EmptyFileSnafu.fail(); + } - reader - .seek(SeekFrom::End(-(assume_footer_len as i64))) - .await - .context(error::SeekSnafu)?; - let mut tail_bytes = Vec::with_capacity(assume_footer_len as usize); - reader - .take(assume_footer_len) - .read_to_end(&mut tail_bytes) + // Initial read of the file tail + // Use a default size for first read in hopes of capturing all sections with one read + // At worst need two reads to get all necessary bytes + let assume_footer_len = file_len.min(DEFAULT_FOOTER_SIZE); + let mut tail_bytes = reader + .get_bytes(file_len - assume_footer_len, assume_footer_len) .await .context(error::IoSnafu)?; // The final byte of the file contains the serialized length of the Postscript, // which must be less than 256 bytes. - let postscript_len = tail_bytes[tail_bytes.len() - 1] as usize; + let postscript_len = tail_bytes[tail_bytes.len() - 1] as u64; tail_bytes.truncate(tail_bytes.len() - 1); - // next is the postscript - let postscript = PostScript::decode(&tail_bytes[tail_bytes.len() - postscript_len..]) + if tail_bytes.len() < postscript_len as usize { + return OutOfSpecSnafu { + msg: "File too small for given postscript length", + } + .fail(); + } + let postscript = PostScript::decode(&tail_bytes[tail_bytes.len() - postscript_len as usize..]) .context(error::DecodeProtoSnafu)?; let compression = Compression::from_proto(postscript.compression(), postscript.compression_block_size); - tail_bytes.truncate(tail_bytes.len() - postscript_len); + tail_bytes.truncate(tail_bytes.len() - postscript_len as usize); - // next is the footer let footer_length = postscript.footer_length.context(error::OutOfSpecSnafu { msg: "Footer length is empty", - })? as usize; // todo: throw error - - let footer_offset = file_len - footer_length as u64 - postscript_len as u64 - 1; - - reader - .seek(SeekFrom::Start(footer_offset)) - .await - .context(error::SeekSnafu)?; - let mut footer = vec![0; footer_length]; - reader - .read_exact(&mut footer) - .await - .context(error::SeekSnafu)?; - let footer = deserialize_footer(&footer, compression)?; - - // finally the metadata + })?; let metadata_length = postscript.metadata_length.context(error::OutOfSpecSnafu { msg: "Metadata length is empty", - })? as usize; - let metadata_offset = - file_len - metadata_length as u64 - footer_length as u64 - postscript_len as u64 - 1; - - reader - .seek(SeekFrom::Start(metadata_offset)) - .await - .context(error::SeekSnafu)?; - let mut metadata = vec![0; metadata_length]; - reader - .read_exact(&mut metadata) - .await - .context(error::IoSnafu)?; - - let metadata = deserialize_footer_metadata(&metadata, compression)?; - - let mut stripe_footers = Vec::with_capacity(footer.stripes.len()); - - let mut scratch = Vec::::new(); - - for stripe in &footer.stripes { - let start = stripe.offset() + stripe.index_length() + stripe.data_length(); - let len = stripe.footer_length(); - reader - .seek(SeekFrom::Start(start)) - .await - .context(error::SeekSnafu)?; + })?; - scratch.clear(); - scratch.reserve(len as usize); - reader - .take(len) - .read_to_end(&mut scratch) + // Ensure we have enough bytes for Footer and Metadata + let mut tail_bytes = if footer_length + metadata_length > tail_bytes.len() as u64 { + // Need second read + // -1 is the postscript length byte + let offset = file_len - 1 - postscript_len - footer_length - metadata_length; + let bytes_to_read = (footer_length + metadata_length) - tail_bytes.len() as u64; + let prepend_bytes = reader + .get_bytes(offset, bytes_to_read) .await .context(error::IoSnafu)?; - stripe_footers.push(deserialize_stripe_footer(&scratch, compression)?); - } + let mut all_bytes = BytesMut::with_capacity(prepend_bytes.len() + tail_bytes.len()); + all_bytes.extend_from_slice(&prepend_bytes); + all_bytes.extend_from_slice(&tail_bytes); + all_bytes.into() + } else { + tail_bytes + }; - FileMetadata::from_proto(&postscript, &footer, &metadata, stripe_footers) + let footer = deserialize_footer( + &tail_bytes[tail_bytes.len() - footer_length as usize..], + compression, + )?; + tail_bytes.truncate(tail_bytes.len() - footer_length as usize); + + let metadata = deserialize_footer_metadata( + &tail_bytes[tail_bytes.len() - metadata_length as usize..], + compression, + )?; + + FileMetadata::from_proto(&postscript, &footer, &metadata) } fn deserialize_footer(bytes: &[u8], compression: Option) -> Result