-
Notifications
You must be signed in to change notification settings - Fork 13
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Refactor synchronous parsing of file tail metadata #43
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let me know if this looks like a good direction to head in, or if you have other thoughts 👍
src/reader.rs
Outdated
/// Primary source used for reading required bytes for operations. | ||
#[allow(clippy::len_without_is_empty)] | ||
// TODO: async version | ||
pub trait ChunkReader { | ||
type T: Read; | ||
|
||
/// Get total length of bytes. Useful for parsing the metadata located at | ||
/// the end of the file. | ||
fn len(&self) -> u64; | ||
|
||
/// Get a reader starting at a specific offset. | ||
fn get_read(&self, offset_from_start: u64) -> std::io::Result<Self::T>; | ||
|
||
/// Read bytes from an offset with specific length. | ||
fn get_bytes(&self, offset_from_start: u64, length: u64) -> std::io::Result<Vec<u8>> { | ||
let mut bytes = vec![0; length as usize]; | ||
self.get_read(offset_from_start)? | ||
.take(length) | ||
.read_exact(&mut bytes)?; | ||
Ok(bytes) | ||
} | ||
} | ||
|
||
impl ChunkReader for File { | ||
type T = BufReader<File>; | ||
|
||
fn len(&self) -> u64 { | ||
self.metadata().map(|m| m.len()).unwrap_or(0u64) | ||
} | ||
|
||
/// Care needs to be taken when using this simulatenously as underlying | ||
/// file descriptor is the same and will be affected by other invocations. | ||
/// | ||
/// See [`File::try_clone()`] for more details. | ||
fn get_read(&self, offset_from_start: u64) -> std::io::Result<Self::T> { | ||
let mut reader = self.try_clone()?; | ||
reader.seek(SeekFrom::Start(offset_from_start))?; | ||
Ok(BufReader::new(self.try_clone()?)) | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Essentially what is done in parquet at the moment. I like this interface as we don't have to keep doing the Seek ourselves
/// Indicates length of block and whether it's compressed or not. | ||
#[derive(Debug, PartialEq, Eq)] | ||
enum CompressionHeader { | ||
Original(u32), | ||
Compressed(u32), | ||
} | ||
|
||
(is_original, length) | ||
/// ORC files are compressed in blocks, with a 3 byte header at the start | ||
/// of these blocks indicating the length of the block and whether it's | ||
/// compressed or not. | ||
fn decode_header(bytes: [u8; 3]) -> CompressionHeader { | ||
let bytes = [bytes[0], bytes[1], bytes[2], 0]; | ||
let length = u32::from_le_bytes(bytes); | ||
let is_original = length & 1 == 1; | ||
// to clear the is_original bit | ||
let length = length >> 1; | ||
if is_original { | ||
CompressionHeader::Original(length) | ||
} else { | ||
CompressionHeader::Compressed(length) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just trying to make things more explicit, utilize Rust's enums more instead of relying on booleans
//! Parse ORC file tail metadata structures from file. | ||
//! | ||
//! File tail structure: | ||
//! | ||
//! ------------------ | ||
//! | Metadata | | ||
//! | | | ||
//! ------------------ | ||
//! | Footer | | ||
//! | | | ||
//! ------------------ | ||
//! | Postscript |X| | ||
//! ------------------ | ||
//! | ||
//! Where X is last byte in file indicating | ||
//! Postscript length in bytes. | ||
//! | ||
//! Footer and Metadata lengths are encoded in Postscript. | ||
//! Postscript is never compressed, Footer and Metadata | ||
//! may be compressed depending Postscript config value. | ||
//! | ||
//! If they are compressed then their lengths indicate their | ||
//! compressed lengths. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Where possible lets start adding docs
I prefer adding docs about the spec here since it removes the need for people to have to refer to the spec separately when trying to understand the code
// Initial read of the file tail | ||
// Use a default size for first read in hopes of capturing all sections with one read | ||
// At worst need two reads to get all necessary bytes | ||
let assume_footer_len = file_len.min(DEFAULT_FOOTER_SIZE); | ||
let mut tail_bytes = reader | ||
.get_bytes(file_len - assume_footer_len, assume_footer_len) | ||
.context(error::IoSnafu)?; | ||
|
||
$reader | ||
.seek(SeekFrom::Start(metadata_offset))$($_await)* | ||
.context(error::SeekSnafu)?; | ||
let mut metadata = vec![0; metadata_length]; | ||
$reader.read_exact(&mut metadata)$($_await)*.context(error::IoSnafu)?; | ||
|
||
let metadata = deserialize_footer_metadata(&metadata, compression)?; | ||
|
||
let mut stripe_footers = Vec::with_capacity(footer.stripes.len()); | ||
|
||
let mut scratch = Vec::<u8>::new(); | ||
|
||
for stripe in &footer.stripes { | ||
let start = stripe.offset() + stripe.index_length() + stripe.data_length(); | ||
let len = stripe.footer_length(); | ||
$reader | ||
.seek(SeekFrom::Start(start))$($_await)* | ||
.context(error::SeekSnafu)?; | ||
|
||
scratch.clear(); | ||
scratch.reserve(len as usize); | ||
$reader | ||
.take(len) | ||
.read_to_end(&mut scratch)$($_await)* | ||
.context(error::IoSnafu)?; | ||
stripe_footers.push(deserialize_stripe_footer( | ||
&scratch, | ||
compression, | ||
)?); | ||
} | ||
|
||
Ok(FileMetadata { | ||
postscript, | ||
footer, | ||
metadata, | ||
stripe_footers, | ||
}) | ||
} | ||
// The final byte of the file contains the serialized length of the Postscript, | ||
// which must be less than 256 bytes. | ||
let postscript_len = tail_bytes[tail_bytes.len() - 1] as u64; | ||
tail_bytes.truncate(tail_bytes.len() - 1); | ||
|
||
// TODO: slice here could panic if file too small | ||
let postscript = PostScript::decode(&tail_bytes[tail_bytes.len() - postscript_len as usize..]) | ||
.context(error::DecodeProtoSnafu)?; | ||
let compression = | ||
Compression::from_proto(postscript.compression(), postscript.compression_block_size); | ||
tail_bytes.truncate(tail_bytes.len() - postscript_len as usize); | ||
|
||
let footer_length = postscript.footer_length.context(error::OutOfSpecSnafu { | ||
msg: "Footer length is empty", | ||
})?; | ||
let metadata_length = postscript.metadata_length.context(error::OutOfSpecSnafu { | ||
msg: "Metadata length is empty", | ||
})?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Using the new ChunkReader trait here to do the reads instead
let mut tail_bytes = if footer_length + metadata_length > tail_bytes.len() as u64 { | ||
// Need second read | ||
// -1 is the postscript length byte | ||
let offset = file_len - 1 - postscript_len - footer_length - metadata_length; | ||
let bytes_to_read = (footer_length + metadata_length) - tail_bytes.len() as u64; | ||
let mut prepend_bytes = reader | ||
.get_bytes(offset, bytes_to_read) | ||
.context(error::IoSnafu)?; | ||
prepend_bytes.extend(tail_bytes); | ||
prepend_bytes | ||
} else { | ||
tail_bytes |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Idea being we do an initial read hoping to get all bytes necessary to decode postscript, footer and metadata
But at worst need one more read if first read is insufficient (as informed by decoded postscript)
// clippy read_zero_byte_vec lint causing issues so init to non-zero length | ||
let mut scratch = vec![0]; | ||
for stripe in &footer.stripes { | ||
let offset = stripe.offset() + stripe.index_length() + stripe.data_length(); | ||
let len = stripe.footer_length(); | ||
|
||
let mut read = reader.get_read(offset).context(error::IoSnafu)?; | ||
scratch.resize(len as usize, 0); | ||
read.read_exact(&mut scratch).context(error::IoSnafu)?; | ||
stripe_footers.push(deserialize_stripe_footer(&scratch, compression)?); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Might refactor this away later, we'll see (might not want to decode all stripe footers here and now)
pub async fn read_metadata_async<R>(reader: &mut R) -> Result<FileMetadata> | ||
where | ||
R: AsyncRead + AsyncSeek + Unpin + Send, | ||
{ | ||
impl_read_metadata!(reader.await) | ||
let file_len = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I just copied what the macro did into here
I'll focus on sync interface first then can apply to async, to limit scope of the work
* Refactor synchronous parsing of file tail metadata * Typo * Typo
Initial attempt at refactoring how file read is handled, as stated here #42 (comment)