diff --git a/parquet/src/arrow/arrow_reader/selection.rs b/parquet/src/arrow/arrow_reader/selection.rs index a558f893c43e..204ca1f37db2 100644 --- a/parquet/src/arrow/arrow_reader/selection.rs +++ b/parquet/src/arrow/arrow_reader/selection.rs @@ -64,25 +64,30 @@ impl RowSelector { /// use parquet::arrow::arrow_reader::{RowSelection, RowSelector}; /// /// let selectors = vec![ -/// RowSelector { row_count: 5, skip: true }, -/// RowSelector { row_count: 5, skip: false }, -/// RowSelector { row_count: 5, skip: false }, -/// RowSelector { row_count: 5, skip: true }, +/// RowSelector::skip(5), +/// RowSelector::select(5), +/// RowSelector::select(5), +/// RowSelector::skip(5), /// ]; /// /// // Creating a selection will combine adjacent selectors /// let selection: RowSelection = selectors.into(); /// /// let expected = vec![ -/// RowSelector { row_count: 5, skip: true }, -/// RowSelector { row_count: 10, skip: false }, -/// RowSelector { row_count: 5, skip: true }, +/// RowSelector::skip(5), +/// RowSelector::select(10), +/// RowSelector::skip(5), /// ]; /// /// let actual: Vec = selection.into(); /// assert_eq!(actual, expected); /// ``` /// +/// A [`RowSelection`] maintains the following invariants: +/// +/// * It contains no [`RowSelector`] of 0 rows +/// * Consecutive [`RowSelector`]s alternate skipping or selecting rows +/// /// [`PageIndex`]: crate::file::page_index::index::PageIndex #[derive(Debug, Clone, Default, Eq, PartialEq)] pub struct RowSelection { @@ -119,10 +124,15 @@ impl RowSelection { let mut last_end = 0; for range in ranges { let len = range.end - range.start; + if len == 0 { + continue; + } match range.start.cmp(&last_end) { Ordering::Equal => match selectors.last_mut() { - Some(last) => last.row_count += len, + Some(last) => { + last.row_count = last.row_count.checked_add(len).unwrap() + } None => selectors.push(RowSelector::select(len)), }, Ordering::Greater => { @@ -141,38 +151,6 @@ impl RowSelection { Self { selectors } } - /// Creates a [`RowSelection`] from a slice of uncombined `RowSelector`: - /// Like [skip(5),skip(5),read(10)]. - /// After combine will return [skip(10),read(10)] - /// # Note - /// [`RowSelection`] must be combined prior to use within offset_index or else the code will panic. - fn from_selectors_and_combine(selectors: &[RowSelector]) -> Self { - if selectors.len() < 2 { - return Self { - selectors: Vec::from(selectors), - }; - } - let first = selectors.first().unwrap(); - let mut sum_rows = first.row_count; - let mut skip = first.skip; - let mut combined_result = vec![]; - - for s in selectors.iter().skip(1) { - if s.skip == skip { - sum_rows += s.row_count - } else { - add_selector(skip, sum_rows, &mut combined_result); - sum_rows = s.row_count; - skip = s.skip; - } - } - add_selector(skip, sum_rows, &mut combined_result); - - Self { - selectors: combined_result, - } - } - /// Given an offset index, return the byte ranges for all data pages selected by `self` /// /// This is useful for determining what byte ranges to fetch from underlying storage @@ -358,9 +336,7 @@ impl RowSelection { /// /// returned: NNNNNNNNYYNYN pub fn intersection(&self, other: &Self) -> Self { - Self { - selectors: intersect_row_selections(&self.selectors, &other.selectors), - } + intersect_row_selections(&self.selectors, &other.selectors) } /// Returns `true` if this [`RowSelection`] selects any rows @@ -450,7 +426,37 @@ impl RowSelection { impl From> for RowSelection { fn from(selectors: Vec) -> Self { - Self::from_selectors_and_combine(selectors.as_slice()) + selectors.into_iter().collect() + } +} + +impl FromIterator for RowSelection { + fn from_iter>(iter: T) -> Self { + let iter = iter.into_iter(); + + // Capacity before filter + let mut selectors = Vec::with_capacity(iter.size_hint().0); + + let mut filtered = iter.filter(|x| x.row_count != 0); + if let Some(x) = filtered.next() { + selectors.push(x); + } + + for s in filtered { + if s.row_count == 0 { + continue; + } + + // Combine consecutive selectors + let last = selectors.last_mut().unwrap(); + if last.skip == s.skip { + last.row_count = last.row_count.checked_add(s.row_count).unwrap(); + } else { + selectors.push(s) + } + } + + Self { selectors } } } @@ -472,67 +478,58 @@ impl From for VecDeque { /// other: NYNNNNNNY /// /// returned: NNNNNNNNYYNYN -fn intersect_row_selections( - left: &[RowSelector], - right: &[RowSelector], -) -> Vec { - let mut res = Vec::with_capacity(left.len()); +fn intersect_row_selections(left: &[RowSelector], right: &[RowSelector]) -> RowSelection { let mut l_iter = left.iter().copied().peekable(); let mut r_iter = right.iter().copied().peekable(); - while let (Some(a), Some(b)) = (l_iter.peek_mut(), r_iter.peek_mut()) { - if a.row_count == 0 { - l_iter.next().unwrap(); - continue; - } - if b.row_count == 0 { - r_iter.next().unwrap(); - continue; - } - match (a.skip, b.skip) { - // Keep both ranges - (false, false) => { - if a.row_count < b.row_count { - res.push(RowSelector::select(a.row_count)); - b.row_count -= a.row_count; + let iter = std::iter::from_fn(move || { + loop { + let l = l_iter.peek_mut(); + let r = r_iter.peek_mut(); + + match (l, r) { + (Some(a), _) if a.row_count == 0 => { l_iter.next().unwrap(); - } else { - res.push(RowSelector::select(b.row_count)); - a.row_count -= b.row_count; - r_iter.next().unwrap(); } - } - // skip at least one - _ => { - if a.row_count < b.row_count { - res.push(RowSelector::skip(a.row_count)); - b.row_count -= a.row_count; - l_iter.next().unwrap(); - } else { - res.push(RowSelector::skip(b.row_count)); - a.row_count -= b.row_count; + (_, Some(b)) if b.row_count == 0 => { r_iter.next().unwrap(); } + (Some(l), Some(r)) => { + return match (l.skip, r.skip) { + // Keep both ranges + (false, false) => { + if l.row_count < r.row_count { + r.row_count -= l.row_count; + l_iter.next() + } else { + l.row_count -= r.row_count; + r_iter.next() + } + } + // skip at least one + _ => { + if l.row_count < r.row_count { + let skip = l.row_count; + r.row_count -= l.row_count; + l_iter.next(); + Some(RowSelector::skip(skip)) + } else { + let skip = r.row_count; + l.row_count -= skip; + r_iter.next(); + Some(RowSelector::skip(skip)) + } + } + }; + } + (Some(_), None) => return l_iter.next(), + (None, Some(_)) => return r_iter.next(), + (None, None) => return None, } } - } - - if l_iter.peek().is_some() { - res.extend(l_iter); - } - if r_iter.peek().is_some() { - res.extend(r_iter); - } - res -} + }); -fn add_selector(skip: bool, sum_row: usize, combined_result: &mut Vec) { - let selector = if skip { - RowSelector::skip(sum_row) - } else { - RowSelector::select(sum_row) - }; - combined_result.push(selector); + iter.collect() } #[cfg(test)] @@ -780,40 +777,28 @@ mod tests { RowSelector::skip(4), ]); - assert_eq!(RowSelection::from_selectors_and_combine(&a), expected); - assert_eq!(RowSelection::from_selectors_and_combine(&b), expected); - assert_eq!(RowSelection::from_selectors_and_combine(&c), expected); + assert_eq!(RowSelection::from_iter(a), expected); + assert_eq!(RowSelection::from_iter(b), expected); + assert_eq!(RowSelection::from_iter(c), expected); } #[test] fn test_combine_2elements() { let a = vec![RowSelector::select(10), RowSelector::select(5)]; let a_expect = vec![RowSelector::select(15)]; - assert_eq!( - RowSelection::from_selectors_and_combine(&a).selectors, - a_expect - ); + assert_eq!(RowSelection::from_iter(a).selectors, a_expect); let b = vec![RowSelector::select(10), RowSelector::skip(5)]; let b_expect = vec![RowSelector::select(10), RowSelector::skip(5)]; - assert_eq!( - RowSelection::from_selectors_and_combine(&b).selectors, - b_expect - ); + assert_eq!(RowSelection::from_iter(b).selectors, b_expect); let c = vec![RowSelector::skip(10), RowSelector::select(5)]; let c_expect = vec![RowSelector::skip(10), RowSelector::select(5)]; - assert_eq!( - RowSelection::from_selectors_and_combine(&c).selectors, - c_expect - ); + assert_eq!(RowSelection::from_iter(c).selectors, c_expect); let d = vec![RowSelector::skip(10), RowSelector::skip(5)]; let d_expect = vec![RowSelector::skip(15)]; - assert_eq!( - RowSelection::from_selectors_and_combine(&d).selectors, - d_expect - ); + assert_eq!(RowSelection::from_iter(d).selectors, d_expect); } #[test] @@ -869,7 +854,7 @@ mod tests { let res = intersect_row_selections(&a, &b); assert_eq!( - RowSelection::from_selectors_and_combine(&res).selectors, + res.selectors, vec![ RowSelector::select(5), RowSelector::skip(4), @@ -887,7 +872,7 @@ mod tests { let b = vec![RowSelector::select(36), RowSelector::skip(36)]; let res = intersect_row_selections(&a, &b); assert_eq!( - RowSelection::from_selectors_and_combine(&res).selectors, + res.selectors, vec![RowSelector::select(3), RowSelector::skip(69)] ); @@ -902,7 +887,7 @@ mod tests { ]; let res = intersect_row_selections(&a, &b); assert_eq!( - RowSelection::from_selectors_and_combine(&res).selectors, + res.selectors, vec![RowSelector::select(2), RowSelector::skip(8)] ); @@ -916,7 +901,7 @@ mod tests { ]; let res = intersect_row_selections(&a, &b); assert_eq!( - RowSelection::from_selectors_and_combine(&res).selectors, + res.selectors, vec![RowSelector::select(2), RowSelector::skip(8)] ); } @@ -1154,4 +1139,70 @@ mod tests { // assert_eq!(mask, vec![false, true, true, false, true, true, true]); assert_eq!(ranges, vec![10..20, 20..30, 30..40]); } + + #[test] + fn test_empty_ranges() { + let ranges = [1..3, 4..6, 6..6, 8..8, 9..10]; + let selection = RowSelection::from_consecutive_ranges(ranges.into_iter(), 10); + assert_eq!( + selection.selectors, + vec![ + RowSelector::skip(1), + RowSelector::select(2), + RowSelector::skip(1), + RowSelector::select(2), + RowSelector::skip(3), + RowSelector::select(1) + ] + ) + } + + #[test] + fn test_empty_selector() { + let selection = RowSelection::from(vec![ + RowSelector::skip(0), + RowSelector::select(2), + RowSelector::skip(0), + RowSelector::select(2), + ]); + assert_eq!(selection.selectors, vec![RowSelector::select(4)]); + + let selection = RowSelection::from(vec![ + RowSelector::select(0), + RowSelector::skip(2), + RowSelector::select(0), + RowSelector::skip(2), + ]); + assert_eq!(selection.selectors, vec![RowSelector::skip(4)]); + } + + #[test] + fn test_intersection() { + let selection = RowSelection::from(vec![RowSelector::select(1048576)]); + let result = selection.intersection(&selection); + assert_eq!(result, selection); + + let a = RowSelection::from(vec![ + RowSelector::skip(10), + RowSelector::select(10), + RowSelector::skip(10), + RowSelector::select(20), + ]); + + let b = RowSelection::from(vec![ + RowSelector::skip(20), + RowSelector::select(20), + RowSelector::skip(10), + ]); + + let result = a.intersection(&b); + assert_eq!( + result.selectors, + vec![ + RowSelector::skip(30), + RowSelector::select(10), + RowSelector::skip(10) + ] + ); + } }