diff --git a/parquet/src/arrow/record_reader/mod.rs b/parquet/src/arrow/record_reader/mod.rs index 49c69c87e302..948c46c1d980 100644 --- a/parquet/src/arrow/record_reader/mod.rs +++ b/parquet/src/arrow/record_reader/mod.rs @@ -128,8 +128,21 @@ where let mut records_read = 0; loop { - let records_to_read = num_records - records_read; - records_read += self.read_one_batch(records_to_read)?; + // Try to find some records from buffers that has been read into memory + // but not counted as seen records. + + // Check to see if the column is exhausted. Only peek the next page since in + // case we are reading to a page boundary and do not actually need to read + // the next page. + let end_of_column = !self.column_reader.as_mut().unwrap().peek_next()?; + + let (record_count, value_count) = + self.count_records(num_records - records_read, end_of_column); + + self.num_records += record_count; + self.num_values += value_count; + records_read += record_count; + if records_read == num_records || !self.column_reader.as_mut().unwrap().has_next()? { break; } @@ -143,9 +156,21 @@ where /// /// Number of records skipped pub fn skip_records(&mut self, num_records: usize) -> Result { - match self.column_reader.as_mut() { - Some(reader) => reader.skip_records(num_records), - None => Ok(0), + // First need to clear the buffer + let end_of_column = match self.column_reader.as_mut() { + Some(reader) => !reader.peek_next()?, + None => return Ok(0), + }; + + let (buffered_records, buffered_values) = self.count_records(num_records, end_of_column); + + self.num_records += buffered_records; + self.num_values += buffered_values; + + let remaining = num_records - buffered_records; + + if remaining == 0 { + return Ok(buffered_records); } } diff --git a/parquet/src/column/reader.rs b/parquet/src/column/reader.rs index 6c712ead625c..971589e5f2dc 100644 --- a/parquet/src/column/reader.rs +++ b/parquet/src/column/reader.rs @@ -571,6 +571,23 @@ where } } + /// Check whether there is more data to read from this column, + /// If the current page is fully decoded, this will NOT load the next page + /// into the buffer + #[inline] + pub(crate) fn peek_next(&mut self) -> Result { + if self.num_buffered_values == 0 || self.num_buffered_values == self.num_decoded_values { + // TODO: should we return false if read_new_page() = true and + // num_buffered_values = 0? + match self.page_reader.peek_next_page()? { + Some(next_page) => Ok(next_page.num_rows != 0), + None => Ok(false), + } + } else { + Ok(true) + } + } + /// Check whether there is more data to read from this column, /// If the current page is fully decoded, this will load the next page /// (if it exists) into the buffer