diff --git a/parquet/src/column/reader/decoder.rs b/parquet/src/column/reader/decoder.rs index 0385f9ea2063..9b198bb8cb08 100644 --- a/parquet/src/column/reader/decoder.rs +++ b/parquet/src/column/reader/decoder.rs @@ -395,6 +395,7 @@ pub struct RepetitionLevelDecoderImpl { buffer_len: usize, buffer_offset: usize, has_partial: bool, + page_start: bool, } impl RepetitionLevelDecoderImpl { @@ -407,6 +408,7 @@ impl RepetitionLevelDecoderImpl { buffer_offset: 0, buffer_len: 0, has_partial: false, + page_start: false, } } @@ -414,6 +416,15 @@ impl RepetitionLevelDecoderImpl { let read = self.decoder.as_mut().unwrap().read(self.buffer.as_mut())?; self.buffer_offset = 0; self.buffer_len = read; + if self.buffer_len != 0 && self.page_start { + if self.buffer[0] != 0 { + return Err(general_err!( + "Record must not be split across page boundary (#4943)" + )); + } + self.page_start = false; + } + Ok(()) } @@ -452,6 +463,7 @@ impl ColumnLevelDecoder for RepetitionLevelDecoderImpl { self.buffer_len = 0; self.buffer_offset = 0; self.has_partial = false; + self.page_start = true; } } @@ -548,6 +560,31 @@ mod tests { assert_eq!(levels, 6); } + #[test] + fn test_record_boundary() { + let mut encoder = RleEncoder::new(1, 1024); + encoder.put(1); + let data = ByteBufferPtr::new(encoder.consume()); + let mut decoder = RepetitionLevelDecoderImpl::new(1); + decoder.set_data(Encoding::RLE, data.clone()); + let err = decoder.skip_rep_levels(1, 4).unwrap_err().to_string(); + assert_eq!( + err, + "Parquet error: Record must not be split across page boundary (#4943)" + ); + + decoder.set_data(Encoding::RLE, data); + let mut out = [0; 8]; + let err = decoder + .read_rep_levels(&mut out, 0..1, 4) + .unwrap_err() + .to_string(); + assert_eq!( + err, + "Parquet error: Record must not be split across page boundary (#4943)" + ); + } + #[test] fn test_skip_rep_levels() { for _ in 0..10 {