Skip to content

[Parquet] Use u64 for SerializedPageReaderState.offset & remaining_bytes, instead of usize #7918

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Jul 14, 2025
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 32 additions & 30 deletions parquet/src/file/serialized_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -469,10 +469,12 @@ pub(crate) fn decode_page(
enum SerializedPageReaderState {
Values {
/// The current byte offset in the reader
offset: usize,
/// Note that offset is u64 (i.e., not usize) to support 32-bit architectures such as WASM
offset: u64,

/// The length of the chunk in bytes
remaining_bytes: usize,
/// Note that remaining_bytes is u64 (i.e., not usize) to support 32-bit architectures such as WASM
remaining_bytes: u64,

// If the next page header has already been "peeked", we will cache it and it`s length here
next_page_header: Option<Box<PageHeader>>,
Expand Down Expand Up @@ -601,8 +603,8 @@ impl<R: ChunkReader> SerializedPageReader<R> {
}
}
None => SerializedPageReaderState::Values {
offset: usize::try_from(start)?,
remaining_bytes: usize::try_from(len)?,
offset: start,
remaining_bytes: len,
next_page_header: None,
page_index: 0,
require_dictionary: meta.dictionary_page_offset().is_some(),
Expand All @@ -623,7 +625,7 @@ impl<R: ChunkReader> SerializedPageReader<R> {
/// This is used when we need to read parquet with row-filter, and we don't want to decompress the page twice.
/// This function allows us to check if the next page is being cached or read previously.
#[cfg(test)]
fn peek_next_page_offset(&mut self) -> Result<Option<usize>> {
fn peek_next_page_offset(&mut self) -> Result<Option<u64>> {
match &mut self.state {
SerializedPageReaderState::Values {
offset,
Expand All @@ -645,15 +647,15 @@ impl<R: ChunkReader> SerializedPageReader<R> {
continue;
}
} else {
let mut read = self.reader.get_read(*offset as u64)?;
let mut read = self.reader.get_read(*offset)?;
let (header_len, header) = Self::read_page_header_len(
&self.context,
&mut read,
*page_index,
*require_dictionary,
)?;
*offset += header_len;
*remaining_bytes -= header_len;
*offset += header_len as u64;
*remaining_bytes -= header_len as u64;
let page_meta = if let Ok(_page_meta) = PageMetadata::try_from(&header) {
Ok(Some(*offset))
} else {
Expand All @@ -671,9 +673,9 @@ impl<R: ChunkReader> SerializedPageReader<R> {
..
} => {
if let Some(page) = dictionary_page {
Ok(Some(usize::try_from(page.offset)?))
Ok(Some(page.offset as u64))
} else if let Some(page) = page_locations.front() {
Ok(Some(usize::try_from(page.offset)?))
Ok(Some(page.offset as u64))
} else {
Ok(None)
}
Expand Down Expand Up @@ -813,8 +815,8 @@ impl<R: ChunkReader> Iterator for SerializedPageReader<R> {
}
}

fn verify_page_header_len(header_len: usize, remaining_bytes: usize) -> Result<()> {
if header_len > remaining_bytes {
fn verify_page_header_len(header_len: usize, remaining_bytes: u64) -> Result<()> {
if header_len as u64 > remaining_bytes {
return Err(eof_err!("Invalid page header"));
}
Ok(())
Expand All @@ -823,12 +825,12 @@ fn verify_page_header_len(header_len: usize, remaining_bytes: usize) -> Result<(
fn verify_page_size(
compressed_size: i32,
uncompressed_size: i32,
remaining_bytes: usize,
remaining_bytes: u64,
) -> Result<()> {
// The page's compressed size should not exceed the remaining bytes that are
// available to read. The page's uncompressed size is the expected size
// after decompression, which can never be negative.
if compressed_size < 0 || compressed_size as usize > remaining_bytes || uncompressed_size < 0 {
if compressed_size < 0 || compressed_size as u64 > remaining_bytes || uncompressed_size < 0 {
return Err(eof_err!("Invalid page header"));
}
Ok(())
Expand All @@ -849,7 +851,7 @@ impl<R: ChunkReader> PageReader for SerializedPageReader<R> {
return Ok(None);
}

let mut read = self.reader.get_read(*offset as u64)?;
let mut read = self.reader.get_read(*offset)?;
let header = if let Some(header) = next_page_header.take() {
*header
} else {
Expand All @@ -860,8 +862,8 @@ impl<R: ChunkReader> PageReader for SerializedPageReader<R> {
*require_dictionary,
)?;
verify_page_header_len(header_len, *remaining)?;
*offset += header_len;
*remaining -= header_len;
*offset += header_len as u64;
*remaining -= header_len as u64;
header
};
verify_page_size(
Expand All @@ -870,8 +872,8 @@ impl<R: ChunkReader> PageReader for SerializedPageReader<R> {
*remaining,
)?;
let data_len = header.compressed_page_size as usize;
*offset += data_len;
*remaining -= data_len;
*offset += data_len as u64;
*remaining -= data_len as u64;

if header.type_ == PageType::INDEX_PAGE {
continue;
Expand Down Expand Up @@ -971,16 +973,16 @@ impl<R: ChunkReader> PageReader for SerializedPageReader<R> {
continue;
}
} else {
let mut read = self.reader.get_read(*offset as u64)?;
let mut read = self.reader.get_read(*offset)?;
let (header_len, header) = Self::read_page_header_len(
&self.context,
&mut read,
*page_index,
*require_dictionary,
)?;
verify_page_header_len(header_len, *remaining_bytes)?;
*offset += header_len;
*remaining_bytes -= header_len;
*offset += header_len as u64;
*remaining_bytes -= header_len as u64;
let page_meta = if let Ok(page_meta) = (&header).try_into() {
Ok(Some(page_meta))
} else {
Expand Down Expand Up @@ -1038,10 +1040,10 @@ impl<R: ChunkReader> PageReader for SerializedPageReader<R> {
*remaining_bytes,
)?;
// The next page header has already been peeked, so just advance the offset
*offset += buffered_header.compressed_page_size as usize;
*remaining_bytes -= buffered_header.compressed_page_size as usize;
*offset += buffered_header.compressed_page_size as u64;
*remaining_bytes -= buffered_header.compressed_page_size as u64;
} else {
let mut read = self.reader.get_read(*offset as u64)?;
let mut read = self.reader.get_read(*offset)?;
let (header_len, header) = Self::read_page_header_len(
&self.context,
&mut read,
Expand All @@ -1054,9 +1056,9 @@ impl<R: ChunkReader> PageReader for SerializedPageReader<R> {
header.uncompressed_page_size,
*remaining_bytes,
)?;
let data_page_size = header.compressed_page_size as usize;
*offset += header_len + data_page_size;
*remaining_bytes -= header_len + data_page_size;
let data_page_size = header.compressed_page_size as u64;
*offset += header_len as u64 + data_page_size;
*remaining_bytes -= header_len as u64 + data_page_size;
}
if *require_dictionary {
*require_dictionary = false;
Expand Down Expand Up @@ -1652,9 +1654,9 @@ mod tests {
..
} => {
if let Some(page) = dictionary_page {
assert_eq!(page.offset as usize, page_offset);
assert_eq!(page.offset as u64, page_offset);
} else if let Some(page) = page_locations.front() {
assert_eq!(page.offset as usize, page_offset);
assert_eq!(page.offset as u64, page_offset);
} else {
unreachable!()
}
Expand Down
Loading