From 0265c5028a2ea83856cec23a48cf5dc8560d9f3a Mon Sep 17 00:00:00 2001 From: Runji Wang Date: Tue, 20 Dec 2022 15:03:37 +0800 Subject: [PATCH] perf(expr): further optimize performance (#744) * optimize to string array Signed-off-by: Runji Wang * optimize bitvec Signed-off-by: Runji Wang * avoid zip_eq for performance Signed-off-by: Runji Wang * array: add `is_null` and `get_raw` Signed-off-by: Runji Wang * add bench for array filter Signed-off-by: Runji Wang * optimize filter -30% Signed-off-by: Runji Wang * optimize filter from bool array Signed-off-by: Runji Wang * clear null data Signed-off-by: Runji Wang * fix cardinality error Signed-off-by: Runji Wang * remove array iterator Signed-off-by: Runji Wang * introduce non-null iterator Signed-off-by: Runji Wang * optimize bitmap && Signed-off-by: Runji Wang * optimize BitVec operations Signed-off-by: Runji Wang * fix clippy and test Signed-off-by: Runji Wang Signed-off-by: Runji Wang --- benches/array.rs | 42 ++++++-- src/array/data_chunk.rs | 13 ++- src/array/data_chunk_builder.rs | 6 +- src/array/iterator.rs | 93 ------------------ src/array/mod.rs | 61 ++++++------ src/array/ops.rs | 142 ++++++++++++++++++++-------- src/array/primitive_array.rs | 59 ++++++++---- src/array/utf8_array.rs | 64 ++++++++++--- src/executor_v2/evaluator.rs | 5 +- src/executor_v2/filter.rs | 4 +- src/executor_v2/nested_loop_join.rs | 4 +- src/executor_v2/order.rs | 2 +- src/executor_v2/top_n.rs | 2 +- src/planner/rules/type_.rs | 3 +- src/storage/chunk.rs | 2 +- src/storage/memory/iterator.rs | 4 +- src/v1/executor/filter.rs | 4 +- src/v1/executor/nested_loop_join.rs | 4 +- src/v1/function/binary.rs | 18 +--- src/v1/function/unary.rs | 6 +- 20 files changed, 287 insertions(+), 251 deletions(-) delete mode 100644 src/array/iterator.rs diff --git a/benches/array.rs b/benches/array.rs index f74efc0da..6388db131 100644 --- a/benches/array.rs +++ b/benches/array.rs @@ -39,12 +39,17 @@ fn ops(c: &mut Criterion) { } for_all_size(c, "and(bool,bool)", |b, &size| { - let a1: ArrayImpl = (0..size).map(|i| i % 2 == 0).collect::().into(); - let a2: ArrayImpl = a1.clone(); + let a1: ArrayImpl = make_bool_array(size); + let a2: ArrayImpl = make_bool_array(size); b.iter(|| a1.and(&a2)); }); + for_all_size(c, "or(bool,bool)", |b, &size| { + let a1: ArrayImpl = make_bool_array(size); + let a2: ArrayImpl = make_bool_array(size); + b.iter(|| a1.or(&a2)); + }); for_all_size(c, "not(bool)", |b, &size| { - let a1: ArrayImpl = (0..size).map(|i| i % 2 == 0).collect::().into(); + let a1: ArrayImpl = make_bool_array(size); b.iter(|| a1.not()); }); } @@ -85,9 +90,24 @@ fn cast(c: &mut Criterion) { let a1 = make_f64_array(size); b.iter(|| a1.cast(&DataTypeKind::Decimal(None, None))) }); - for_all_size(c, "cast(i32->string)", |b, &size| { + for ty in ["i32", "f64", "decimal"] { + for_all_size(c, format!("cast({ty}->string)"), |b, &size| { + let a1 = match ty { + "i32" => make_i32_array(size), + "f64" => make_f64_array(size), + "decimal" => make_decimal_array(size), + _ => unreachable!(), + }; + b.iter(|| a1.cast(&DataTypeKind::String)) + }); + } +} + +fn filter(c: &mut Criterion) { + for_all_size(c, "filter(i32)", |b, &size| { let a1 = make_i32_array(size); - b.iter(|| a1.cast(&DataTypeKind::String)) + let ArrayImpl::Bool(a2) = make_bool_array(size) else { unreachable!() }; + b.iter(|| a1.filter(a2.true_array())) }); } @@ -155,6 +175,14 @@ fn function(c: &mut Criterion) { } } +fn make_bool_array(size: usize) -> ArrayImpl { + let mask = make_valid_bitmap(size); + let iter = (0..size as i32) + .zip(mask.clone()) + .map(|(i, v)| if v { i % 2 == 0 } else { false }); + BoolArray::from_data(iter, mask).into() +} + fn make_i32_array(size: usize) -> ArrayImpl { let mask = make_valid_bitmap(size); let iter = (0..size as i32) @@ -199,11 +227,11 @@ fn for_all_size( ) { let mut group = c.benchmark_group(name); group.plot_config(PlotConfiguration::default().summary_scale(AxisScale::Logarithmic)); - for size in [1, 16, 256, 4096, 65536] { + for size in [1, 16, 256, 4096] { group.bench_with_input(BenchmarkId::from_parameter(size), &size, &mut f); } group.finish(); } -criterion_group!(benches, function, ops, agg, cast); +criterion_group!(benches, function, ops, agg, cast, filter); criterion_main!(benches); diff --git a/src/array/data_chunk.rs b/src/array/data_chunk.rs index 24167121b..ea69c3d7f 100644 --- a/src/array/data_chunk.rs +++ b/src/array/data_chunk.rs @@ -88,15 +88,14 @@ impl DataChunk { } /// Filter elements and create a new chunk. - pub fn filter(&self, visibility: impl Iterator + Clone) -> Self { - let arrays = self - .arrays - .iter() - .map(|a| a.filter(visibility.clone())) - .collect(); + pub fn filter(&self, visibility: &[bool]) -> Self { + let arrays: Arc<[ArrayImpl]> = self.arrays.iter().map(|a| a.filter(visibility)).collect(); DataChunk { + cardinality: match arrays.first() { + Some(a) => a.len(), + None => visibility.iter().filter(|b| **b).count(), + }, arrays, - cardinality: visibility.filter(|b| *b).count(), } } diff --git a/src/array/data_chunk_builder.rs b/src/array/data_chunk_builder.rs index 1c2bef8de..dc3dfc028 100644 --- a/src/array/data_chunk_builder.rs +++ b/src/array/data_chunk_builder.rs @@ -1,7 +1,5 @@ use std::iter::IntoIterator; -use itertools::Itertools; - use super::{ArrayBuilderImpl, DataChunk}; use crate::types::{ConvertError, DataType, DataValue}; @@ -37,7 +35,7 @@ impl DataChunkBuilder { pub fn push_row(&mut self, row: impl IntoIterator) -> Option { self.array_builders .iter_mut() - .zip_eq(row) + .zip(row) .for_each(|(builder, v)| builder.push(&v)); self.size += 1; if self.size == self.capacity { @@ -60,7 +58,7 @@ impl DataChunkBuilder { &mut self, row: impl IntoIterator, ) -> Result, ConvertError> { - for (builder, r) in self.array_builders.iter_mut().zip_eq(row) { + for (builder, r) in self.array_builders.iter_mut().zip(row) { builder.push_str(r)? } diff --git a/src/array/iterator.rs b/src/array/iterator.rs deleted file mode 100644 index 505f5ec24..000000000 --- a/src/array/iterator.rs +++ /dev/null @@ -1,93 +0,0 @@ -// Copyright 2022 RisingLight Project Authors. Licensed under Apache-2.0. - -use std::iter::{Iterator, TrustedLen}; -use std::marker::PhantomData; - -use super::Array; - -/// An iterator over the elements of an [`Array`]. -#[derive(Clone)] -pub struct ArrayIter<'a, A: Array> { - data: &'a A, - begin: usize, - end: usize, -} - -impl<'a, A: Array> ArrayIter<'a, A> { - pub fn new(data: &'a A) -> Self { - Self { - data, - begin: 0, - end: data.len(), - } - } -} - -impl<'a, A: Array> Iterator for ArrayIter<'a, A> { - type Item = Option<&'a A::Item>; - - fn next(&mut self) -> Option { - if self.begin >= self.end { - None - } else { - let item = self.data.get(self.begin); - self.begin += 1; - Some(item) - } - } - - fn size_hint(&self) -> (usize, Option) { - let exact = self.end - self.begin; - (exact, Some(exact)) - } -} - -impl<'a, A: Array> DoubleEndedIterator for ArrayIter<'a, A> { - fn next_back(&mut self) -> Option { - if self.begin >= self.end { - None - } else { - self.end -= 1; - Some(self.data.get(self.end)) - } - } -} - -pub struct NonNullArrayIter<'a, A: Array> { - data: &'a A, - pos: usize, - _phantom: PhantomData<&'a usize>, -} - -impl<'a, A: Array> NonNullArrayIter<'a, A> { - pub fn new(data: &'a A) -> Self { - Self { - data, - pos: 0, - _phantom: PhantomData, - } - } -} - -impl<'a, A: Array> Iterator for NonNullArrayIter<'a, A> { - type Item = &'a A::Item; - - fn next(&mut self) -> Option { - if self.pos >= self.data.len() { - None - } else { - let value = self.data.get_unchecked(self.pos); - self.pos += 1; - Some(value) - } - } - - fn size_hint(&self) -> (usize, Option) { - let exact = self.data.len() - self.pos; - (exact, Some(exact)) - } -} - -unsafe impl TrustedLen for ArrayIter<'_, A> {} - -unsafe impl TrustedLen for NonNullArrayIter<'_, A> {} diff --git a/src/array/mod.rs b/src/array/mod.rs index 60f47a68b..ed6cc29f3 100644 --- a/src/array/mod.rs +++ b/src/array/mod.rs @@ -2,7 +2,6 @@ use std::convert::TryFrom; use std::fmt::Debug; -use std::iter::TrustedLen; use std::ops::{Bound, RangeBounds}; use std::sync::Arc; @@ -17,14 +16,12 @@ use crate::types::{ mod data_chunk; mod data_chunk_builder; -mod iterator; pub mod ops; mod primitive_array; mod utf8_array; pub use self::data_chunk::*; pub use self::data_chunk_builder::*; -pub use self::iterator::ArrayIter; pub use self::primitive_array::*; pub use self::utf8_array::*; @@ -101,50 +98,56 @@ pub trait Array: Sized + Send + Sync + 'static { /// Type of element in the array. type Item: ToOwned + ?Sized; - type RawIter<'a>: Iterator + TrustedLen; + /// Returns true if the value at `idx` is null. + fn is_null(&self, idx: usize) -> bool; - /// Retrieve a reference to value. - fn get(&self, idx: usize) -> Option<&Self::Item>; - - fn get_unchecked(&self, idx: usize) -> &Self::Item; + /// Returns the raw value at `idx` regardless of null. + fn get_raw(&self, idx: usize) -> &Self::Item; /// Number of items of array. fn len(&self) -> usize; + /// Retrieve a reference to value. + fn get(&self, idx: usize) -> Option<&Self::Item> { + if self.is_null(idx) { + None + } else { + Some(self.get_raw(idx)) + } + } + + fn filter(&self, p: &[bool]) -> Self; + /// Get iterator of current array. - fn iter(&self) -> ArrayIter<'_, Self> { - ArrayIter::new(self) + fn iter(&self) -> impl DoubleEndedIterator> { + (0..self.len()).map(|i| self.get(i)) + } + + /// Get iterator over the raw values. + fn raw_iter(&self) -> impl DoubleEndedIterator { + (0..self.len()).map(|i| self.get_raw(i)) + } + + /// Get iterator over the non-null values. + fn nonnull_iter(&self) -> impl DoubleEndedIterator { + (0..self.len()) + .filter(|i| !self.is_null(*i)) + .map(|i| self.get_raw(i)) } /// Check if `Array` is empty. fn is_empty(&self) -> bool { self.len() == 0 } - - fn raw_iter(&self) -> Self::RawIter<'_>; } /// An extension trait for [`Array`]. pub trait ArrayExt: Array { - /// Filter the elements and return a new array. - fn filter(&self, visibility: impl Iterator) -> Self; - /// Return a slice of self for the provided range. fn slice(&self, range: impl RangeBounds) -> Self; } impl ArrayExt for A { - /// Filter the elements and return a new array. - fn filter(&self, visibility: impl Iterator) -> Self { - let mut builder = Self::Builder::with_capacity(self.len()); - for (a, visible) in self.iter().zip(visibility) { - if visible { - builder.push(a); - } - } - builder.finish() - } - /// Return a slice of self for the provided range. fn slice(&self, range: impl RangeBounds) -> Self { let len = self.len(); @@ -547,11 +550,11 @@ macro_rules! impl_array { } /// Filter the elements and return a new array. - pub fn filter(&self, visibility: impl Iterator) -> Self { + pub fn filter(&self, visibility: &[bool]) -> Self { match self { - Self::Null(a) => Self::Null(a.filter(visibility).into()), + Self::Null(a) => Self::Null(a.filter(&visibility).into()), $( - Self::$Abc(a) => Self::$Abc(a.filter(visibility).into()), + Self::$Abc(a) => Self::$Abc(a.filter(&visibility).into()), )* } } diff --git a/src/array/ops.rs b/src/array/ops.rs index c0b4fbcf9..6f48b5f6f 100644 --- a/src/array/ops.rs +++ b/src/array/ops.rs @@ -83,32 +83,32 @@ macro_rules! cmp { &self, other: &Self, ) -> Result { - Ok(match (self, other) { - (A::Bool(a), A::Bool(b)) => A::new_bool(binary_op(a.as_ref(), b.as_ref(), |a, b| a $op b)), - (A::Int32(a), A::Int32(b)) => A::new_bool(binary_op(a.as_ref(), b.as_ref(), |a, b| a $op b)), - (A::Int64(a), A::Int64(b)) => A::new_bool(binary_op(a.as_ref(), b.as_ref(), |a, b| a $op b)), - (A::Float64(a), A::Float64(b)) => A::new_bool(binary_op(a.as_ref(), b.as_ref(), |a, b| *a $op *b)), - (A::Utf8(a), A::Utf8(b)) => A::new_bool(binary_op(a.as_ref(), b.as_ref(), |a, b| a $op b)), - (A::Date(a), A::Date(b)) => A::new_bool(binary_op(a.as_ref(), b.as_ref(), |a, b| a $op b)), - (A::Decimal(a), A::Decimal(b)) => A::new_bool(binary_op(a.as_ref(), b.as_ref(), |a, b| a $op b)), - - (A::Int32(a), A::Int64(b)) => A::new_bool(binary_op(a.as_ref(), b.as_ref(), |a, b| (*a as i64) $op *b)), - (A::Int64(a), A::Int32(b)) => A::new_bool(binary_op(a.as_ref(), b.as_ref(), |a, b| *a $op (*b as i64))), - - (A::Int32(a), A::Float64(b)) => A::new_bool(binary_op(a.as_ref(), b.as_ref(), |a, b| F64::from(*a as f64) $op *b)), - (A::Int64(a), A::Float64(b)) => A::new_bool(binary_op(a.as_ref(), b.as_ref(), |a, b| F64::from(*a as f64) $op *b)), - (A::Float64(a), A::Int32(b)) => A::new_bool(binary_op(a.as_ref(), b.as_ref(), |a, b| *a $op F64::from(*b as f64))), - (A::Float64(a), A::Int64(b)) => A::new_bool(binary_op(a.as_ref(), b.as_ref(), |a, b| *a $op F64::from(*b as f64))), - - (A::Int32(a), A::Decimal(b)) => A::new_bool(binary_op(a.as_ref(), b.as_ref(), |a, b| Decimal::from(*a) $op *b)), - (A::Int64(a), A::Decimal(b)) => A::new_bool(binary_op(a.as_ref(), b.as_ref(), |a, b| Decimal::from(*a) $op *b)), - (A::Float64(a), A::Decimal(b)) => A::new_bool(binary_op(a.as_ref(), b.as_ref(), |a, b| Decimal::from_f64_retain(a.0).unwrap() $op *b)), - (A::Decimal(a), A::Int32(b)) => A::new_bool(binary_op(a.as_ref(), b.as_ref(), |a, b| *a $op Decimal::from(*b))), - (A::Decimal(a), A::Int64(b)) => A::new_bool(binary_op(a.as_ref(), b.as_ref(), |a, b| *a $op Decimal::from(*b))), - (A::Decimal(a), A::Float64(b)) => A::new_bool(binary_op(a.as_ref(), b.as_ref(), |a, b| *a $op Decimal::from_f64_retain(b.0).unwrap())), + Ok(A::new_bool(clear_null(match (self, other) { + (A::Bool(a), A::Bool(b)) => binary_op(a.as_ref(), b.as_ref(), |a, b| a $op b), + (A::Int32(a), A::Int32(b)) => binary_op(a.as_ref(), b.as_ref(), |a, b| a $op b), + (A::Int64(a), A::Int64(b)) => binary_op(a.as_ref(), b.as_ref(), |a, b| a $op b), + (A::Float64(a), A::Float64(b)) => binary_op(a.as_ref(), b.as_ref(), |a, b| *a $op *b), + (A::Utf8(a), A::Utf8(b)) => binary_op(a.as_ref(), b.as_ref(), |a, b| a $op b), + (A::Date(a), A::Date(b)) => binary_op(a.as_ref(), b.as_ref(), |a, b| a $op b), + (A::Decimal(a), A::Decimal(b)) => binary_op(a.as_ref(), b.as_ref(), |a, b| a $op b), + + (A::Int32(a), A::Int64(b)) => binary_op(a.as_ref(), b.as_ref(), |a, b| (*a as i64) $op *b), + (A::Int64(a), A::Int32(b)) => binary_op(a.as_ref(), b.as_ref(), |a, b| *a $op (*b as i64)), + + (A::Int32(a), A::Float64(b)) => binary_op(a.as_ref(), b.as_ref(), |a, b| F64::from(*a as f64) $op *b), + (A::Int64(a), A::Float64(b)) => binary_op(a.as_ref(), b.as_ref(), |a, b| F64::from(*a as f64) $op *b), + (A::Float64(a), A::Int32(b)) => binary_op(a.as_ref(), b.as_ref(), |a, b| *a $op F64::from(*b as f64)), + (A::Float64(a), A::Int64(b)) => binary_op(a.as_ref(), b.as_ref(), |a, b| *a $op F64::from(*b as f64)), + + (A::Int32(a), A::Decimal(b)) => binary_op(a.as_ref(), b.as_ref(), |a, b| Decimal::from(*a) $op *b), + (A::Int64(a), A::Decimal(b)) => binary_op(a.as_ref(), b.as_ref(), |a, b| Decimal::from(*a) $op *b), + (A::Float64(a), A::Decimal(b)) => binary_op(a.as_ref(), b.as_ref(), |a, b| Decimal::from_f64_retain(a.0).unwrap() $op *b), + (A::Decimal(a), A::Int32(b)) => binary_op(a.as_ref(), b.as_ref(), |a, b| *a $op Decimal::from(*b)), + (A::Decimal(a), A::Int64(b)) => binary_op(a.as_ref(), b.as_ref(), |a, b| *a $op Decimal::from(*b)), + (A::Decimal(a), A::Float64(b)) => binary_op(a.as_ref(), b.as_ref(), |a, b| *a $op Decimal::from_f64_retain(b.0).unwrap()), _ => return Err(ConvertError::NoBinaryOp(stringify!($name).into(), self.type_string(), other.type_string())), - }) + }))) } } } @@ -131,9 +131,10 @@ impl ArrayImpl { return Err(ConvertError::NoBinaryOp("and".into(), self.type_string(), other.type_string())); }; let mut c: BoolArray = binary_op(a.as_ref(), b.as_ref(), |a, b| *a && *b); - let a_false = !a.to_raw_bitvec() & a.get_valid_bitmap(); - let b_false = !b.to_raw_bitvec() & b.get_valid_bitmap(); - *c.get_valid_bitmap_mut() |= a_false | b_false; + let a_false = a.to_raw_bitvec().not_then_and(a.get_valid_bitmap()); + let b_false = b.to_raw_bitvec().not_then_and(b.get_valid_bitmap()); + c.get_valid_bitmap_mut().or(&a_false); + c.get_valid_bitmap_mut().or(&b_false); Ok(A::new_bool(c)) } @@ -142,9 +143,8 @@ impl ArrayImpl { return Err(ConvertError::NoBinaryOp("or".into(), self.type_string(), other.type_string())); }; let mut c: BoolArray = binary_op(a.as_ref(), b.as_ref(), |a, b| *a || *b); - let a_true = a.to_raw_bitvec() & a.get_valid_bitmap(); - let b_true = b.to_raw_bitvec() & b.get_valid_bitmap(); - *c.get_valid_bitmap_mut() |= a_true | b_true; + let bitmap = c.to_raw_bitvec(); + c.get_valid_bitmap_mut().or(&bitmap); Ok(A::new_bool(c)) } @@ -152,8 +152,7 @@ impl ArrayImpl { let A::Bool(a) = self else { return Err(ConvertError::NoUnaryOp("not".into(), self.type_string())); }; - // should we zero the null values? - Ok(A::new_bool(unary_op(a.as_ref(), |b| !b))) + Ok(A::new_bool(clear_null(unary_op(a.as_ref(), |b| !b)))) } /// Perform binary operation. @@ -219,7 +218,7 @@ impl ArrayImpl { Type::Int32 => Self::Int32(a.clone()), Type::Int64 => Self::new_int64(unary_op(a.as_ref(), |&b| b as i64)), Type::Float64 => Self::new_float64(unary_op(a.as_ref(), |&i| F64::from(i as f64))), - Type::String => Self::new_utf8(unary_op(a.as_ref(), |&i| i.to_string())), + Type::String => Self::new_utf8(Utf8Array::from_iter_display(a.iter())), Type::Decimal(_, _) => { Self::new_decimal(unary_op(a.as_ref(), |&i| Decimal::from(i))) } @@ -235,7 +234,7 @@ impl ArrayImpl { })?), Type::Int64 => Self::Int64(a.clone()), Type::Float64 => Self::new_float64(unary_op(a.as_ref(), |&i| F64::from(i as f64))), - Type::String => Self::new_utf8(unary_op(a.as_ref(), |&i| i.to_string())), + Type::String => Self::new_utf8(Utf8Array::from_iter_display(a.iter())), Type::Decimal(_, _) => { Self::new_decimal(unary_op(a.as_ref(), |&i| Decimal::from(i))) } @@ -254,7 +253,7 @@ impl ArrayImpl { None => Err(ConvertError::Overflow(DataValue::Float64(b), Type::Int64)), })?), Type::Float64 => Self::Float64(a.clone()), - Type::String => Self::new_utf8(unary_op(a.as_ref(), |&f| f.to_string())), + Type::String => Self::new_utf8(Utf8Array::from_iter_display(a.iter())), Type::Decimal(_, scale) => { Self::new_decimal(try_unary_op( a.as_ref(), @@ -330,7 +329,7 @@ impl ArrayImpl { DataValue::Decimal(d), )) })?), - Type::String => Self::new_utf8(unary_op(a.as_ref(), |d| d.to_string())), + Type::String => Self::new_utf8(Utf8Array::from_iter_display(a.iter())), Type::Decimal(_, _) => self.clone(), Type::Null | Type::Blob | Type::Date | Type::Interval | Type::Struct(_) => { return Err(ConvertError::NoCast("DOUBLE", data_type.clone())); @@ -338,12 +337,12 @@ impl ArrayImpl { }, Self::Date(a) => match data_type { Type::Date => self.clone(), - Type::String => Self::new_utf8(unary_op(a.as_ref(), |&d| d.to_string())), + Type::String => Self::new_utf8(Utf8Array::from_iter_display(a.iter())), _ => return Err(ConvertError::NoCast("DATE", data_type.clone())), }, Self::Interval(a) => match data_type { Type::Interval => self.clone(), - Type::String => Self::new_utf8(unary_op(a.as_ref(), |&d| d.to_string())), + Type::String => Self::new_utf8(Utf8Array::from_iter_display(a.iter())), _ => return Err(ConvertError::NoCast("INTERVAL", data_type.clone())), }, }) @@ -374,14 +373,14 @@ macro_rules! impl_agg { /// Returns the minimum of values. pub fn min_(&self) -> DataValue { match self { - $(Self::$Abc(a) => a.iter().flatten().min().into(),)* + $(Self::$Abc(a) => a.nonnull_iter().min().into(),)* } } /// Returns the maximum of values. pub fn max_(&self) -> DataValue { match self { - $(Self::$Abc(a) => a.iter().flatten().max().into(),)* + $(Self::$Abc(a) => a.nonnull_iter().max().into(),)* } } @@ -414,7 +413,7 @@ where { assert_eq!(a.len(), b.len()); let it = a.raw_iter().zip(b.raw_iter()).map(|(a, b)| f(a, b)); - let valid = a.get_valid_bitmap().clone() & b.get_valid_bitmap(); + let valid = a.get_valid_bitmap().and(b.get_valid_bitmap()); O::from_data(it, valid) } @@ -445,3 +444,64 @@ where } Ok(builder.finish()) } + +/// Optimized operations. +/// +/// Assume both bitvecs have the same length. +pub trait BitVecExt { + /// self & other + fn and(&self, other: &Self) -> Self; + /// self |= other + fn or(&mut self, other: &Self); + /// !self & other + fn not_then_and(&self, other: &Self) -> Self; + /// Creates a [`BitVec`] from `&[bool]`. + fn from_bool_slice(bools: &[bool]) -> Self; +} + +impl BitVecExt for BitVec { + fn and(&self, other: &Self) -> Self { + let mut res: BitVec = (self.as_raw_slice().iter()) + .zip(other.as_raw_slice()) + .map(|(a, b)| a & b) + .collect(); + unsafe { res.set_len(self.len()) }; + res + } + + fn or(&mut self, other: &Self) { + for (a, b) in self.as_raw_mut_slice().iter_mut().zip(other.as_raw_slice()) { + *a |= b; + } + } + + fn not_then_and(&self, other: &Self) -> Self { + let mut res: BitVec = (self.as_raw_slice().iter()) + .zip(other.as_raw_slice()) + .map(|(a, b)| !a & b) + .collect(); + unsafe { res.set_len(self.len()) }; + res + } + + fn from_bool_slice(bools: &[bool]) -> Self { + // use SIMD to speed up + use std::simd::ToBitMask; + let mut iter = bools.array_chunks::<64>(); + let mut bitvec = Vec::with_capacity((bools.len() + 63) / 64); + for chunk in iter.by_ref() { + let bitmask = std::simd::Mask::::from_array(*chunk).to_bitmask() as usize; + bitvec.push(bitmask); + } + if !iter.remainder().is_empty() { + let mut bitmask = 0; + for (i, b) in iter.remainder().iter().enumerate() { + bitmask |= (*b as usize) << i; + } + bitvec.push(bitmask); + } + let mut bitvec = BitVec::from_vec(bitvec); + bitvec.truncate(bools.len()); + bitvec + } +} diff --git a/src/array/primitive_array.rs b/src/array/primitive_array.rs index 63a8f6a52..20898e49d 100644 --- a/src/array/primitive_array.rs +++ b/src/array/primitive_array.rs @@ -7,7 +7,8 @@ use std::mem; use bitvec::vec::BitVec; use serde::{Deserialize, Serialize}; -use super::{Array, ArrayBuilder, ArrayEstimateExt, ArrayFromDataExt, ArrayValidExt}; +use super::ops::BitVecExt; +use super::{Array, ArrayBuilder, ArrayEstimateExt, ArrayFromDataExt, ArrayValidExt, BoolArray}; use crate::types::{NativeType, F32, F64}; /// A collection of primitive types, such as `i32`, `F32`. @@ -66,13 +67,12 @@ impl FromIterator for PrimitiveArray { impl Array for PrimitiveArray { type Item = T; type Builder = PrimitiveArrayBuilder; - type RawIter<'a> = std::slice::Iter<'a, T>; - fn get(&self, idx: usize) -> Option<&T> { - self.valid[idx].then(|| &self.data[idx]) + fn is_null(&self, idx: usize) -> bool { + !self.valid[idx] } - fn get_unchecked(&self, idx: usize) -> &T { + fn get_raw(&self, idx: usize) -> &T { &self.data[idx] } @@ -80,9 +80,21 @@ impl Array for PrimitiveArray { self.valid.len() } - fn raw_iter(&self) -> Self::RawIter<'_> { + fn raw_iter(&self) -> impl DoubleEndedIterator { self.data.iter() } + + fn filter(&self, p: &[bool]) -> Self { + assert_eq!(p.len(), self.len()); + let mut builder = Self::Builder::with_capacity(self.len()); + for (i, &v) in p.iter().enumerate() { + if v { + builder.valid.push(unsafe { *self.valid.get_unchecked(i) }); + builder.data.push(self.data[i]); + } + } + builder.finish() + } } impl ArrayValidExt for PrimitiveArray { @@ -146,8 +158,7 @@ impl ArrayBuilder for PrimitiveArrayBuilder { } fn push_n(&mut self, n: usize, value: Option<&T>) { - self.valid - .extend(std::iter::repeat(value.is_some()).take(n)); + self.valid.resize(self.valid.len() + n, value.is_some()); self.data .extend(std::iter::repeat(value.cloned().unwrap_or_default()).take(n)); } @@ -168,20 +179,26 @@ impl ArrayBuilder for PrimitiveArrayBuilder { impl PrimitiveArray { /// Converts the raw bool array into a [`BitVec`]. pub fn to_raw_bitvec(&self) -> BitVec { - if self.len() <= 1024 { - return self.data.iter().collect(); - } - // use SIMD to speed up - use std::simd::ToBitMask; - let mut iter = self.data.array_chunks::<64>(); - let mut bitvec = BitVec::with_capacity(self.len()); - for chunk in iter.by_ref() { - let bitmask = std::simd::Mask::::from_array(*chunk).to_bitmask() as usize; - bitvec.extend_from_raw_slice(std::slice::from_ref(&bitmask)); - } - bitvec.extend(iter.remainder()); - bitvec + BitVec::from_bool_slice(&self.data) + } + + /// Returns a bool array of `true` values. + pub fn true_array(&self) -> &[bool] { + &self.data + } +} + +pub fn clear_null(mut array: BoolArray) -> BoolArray { + use std::simd::ToBitMask; + let mut valid = Vec::with_capacity(array.valid.as_raw_slice().len() * 64); + for &bitmask in array.valid.as_raw_slice() { + let chunk = std::simd::Mask::::from_bitmask(bitmask as u64).to_array(); + valid.extend_from_slice(&chunk); + } + for (d, v) in array.data.iter_mut().zip(valid) { + *d &= v; } + array } #[cfg(test)] diff --git a/src/array/utf8_array.rs b/src/array/utf8_array.rs index bf47e31b1..f2b88ba3b 100644 --- a/src/array/utf8_array.rs +++ b/src/array/utf8_array.rs @@ -1,13 +1,13 @@ // Copyright 2022 RisingLight Project Authors. Licensed under Apache-2.0. use std::borrow::Borrow; +use std::fmt::{Display, Write}; use std::marker::PhantomData; use std::mem; use bitvec::vec::BitVec; use serde::{Deserialize, Serialize}; -use super::iterator::NonNullArrayIter; use super::{Array, ArrayBuilder, ArrayEstimateExt, ArrayFromDataExt, ArrayValidExt}; use crate::types::BlobRef; @@ -56,18 +56,12 @@ impl Clone for BytesArray { impl Array for BytesArray { type Item = T; type Builder = BytesArrayBuilder; - type RawIter<'a> = NonNullArrayIter<'a, Self>; - fn get(&self, idx: usize) -> Option<&T> { - if self.valid[idx] { - let data_slice = &self.data[self.offset[idx]..self.offset[idx + 1]]; - Some(T::from_bytes(data_slice)) - } else { - None - } + fn is_null(&self, idx: usize) -> bool { + !self.valid[idx] } - fn get_unchecked(&self, idx: usize) -> &T { + fn get_raw(&self, idx: usize) -> &T { let data_slice = &self.data[self.offset[idx]..self.offset[idx + 1]]; T::from_bytes(data_slice) } @@ -76,8 +70,15 @@ impl Array for BytesArray { self.valid.len() } - fn raw_iter(&self) -> Self::RawIter<'_> { - NonNullArrayIter::new(self) + fn filter(&self, p: &[bool]) -> Self { + assert_eq!(p.len(), self.len()); + let mut builder = Self::Builder::with_capacity(self.len()); + for (i, &v) in p.iter().enumerate() { + if v { + builder.push(self.get(i)); + } + } + builder.finish() } } @@ -168,8 +169,7 @@ impl ArrayBuilder for BytesArrayBuilder { } fn push_n(&mut self, n: usize, value: Option<&T>) { - self.valid - .extend(std::iter::repeat(value.is_some()).take(n)); + self.valid.resize(self.valid.len() + n, value.is_some()); if let Some(value) = value { self.data.reserve(value.as_ref().len() * n); self.offset.reserve(n); @@ -203,6 +203,42 @@ impl ArrayBuilder for BytesArrayBuilder { } } +struct BytesArrayWriter<'a, T: ValueRef + ?Sized> { + builder: &'a mut BytesArrayBuilder, +} + +impl Write for BytesArrayWriter<'_, T> { + fn write_str(&mut self, s: &str) -> std::fmt::Result { + self.builder.data.extend_from_slice(s.as_bytes()); + Ok(()) + } +} + +impl Drop for BytesArrayWriter<'_, T> { + fn drop(&mut self) { + self.builder.offset.push(self.builder.data.len()); + self.builder.valid.push(true); + } +} + +impl Utf8Array { + pub fn from_iter_display(iter: impl IntoIterator>) -> Self { + let iter = iter.into_iter(); + let mut builder = ::Builder::with_capacity(iter.size_hint().0); + for e in iter { + if let Some(s) = e { + let mut writer = BytesArrayWriter { + builder: &mut builder, + }; + write!(writer, "{}", s).unwrap(); + } else { + builder.push(None); + } + } + builder.finish() + } +} + // Enable `collect()` an array from iterator of `Option<&T>` or `Option`. impl, T: ValueRef + ?Sized> FromIterator> for BytesArray { fn from_iter>>(iter: I) -> Self { diff --git a/src/executor_v2/evaluator.rs b/src/executor_v2/evaluator.rs index 6bbfe7091..1684d3224 100644 --- a/src/executor_v2/evaluator.rs +++ b/src/executor_v2/evaluator.rs @@ -5,7 +5,6 @@ use std::fmt; use egg::{Id, Language}; -use itertools::Itertools; use crate::array::*; use crate::planner::{Expr, RecExpr}; @@ -119,7 +118,7 @@ impl<'a> Evaluator<'a> { chunk: &DataChunk, ) -> Result<(), ConvertError> { let list = self.node().as_list(); - for (state, id) in states.iter_mut().zip_eq(list) { + for (state, id) in states.iter_mut().zip(list) { *state = self.next(*id).eval_agg(state.clone(), chunk)?; } Ok(()) @@ -132,7 +131,7 @@ impl<'a> Evaluator<'a> { values: impl Iterator, ) { let list = self.node().as_list(); - for ((state, id), value) in states.iter_mut().zip_eq(list).zip_eq(values) { + for ((state, id), value) in states.iter_mut().zip(list).zip(values) { *state = self.next(*id).agg_append(state.clone(), value); } } diff --git a/src/executor_v2/filter.rs b/src/executor_v2/filter.rs index 0800afc04..77aa04f88 100644 --- a/src/executor_v2/filter.rs +++ b/src/executor_v2/filter.rs @@ -1,7 +1,7 @@ // Copyright 2022 RisingLight Project Authors. Licensed under Apache-2.0. use super::*; -use crate::array::{Array, ArrayImpl, DataChunk}; +use crate::array::{ArrayImpl, DataChunk}; /// The executor of a filter operation. pub struct FilterExecutor { @@ -18,7 +18,7 @@ impl FilterExecutor { ArrayImpl::Bool(a) => a, _ => panic!("filters can only accept bool array"), }; - yield batch.filter(vis.iter().map(|b| matches!(b, Some(true)))); + yield batch.filter(vis.true_array()); } } } diff --git a/src/executor_v2/nested_loop_join.rs b/src/executor_v2/nested_loop_join.rs index a7d49ae1d..427d4b4c5 100644 --- a/src/executor_v2/nested_loop_join.rs +++ b/src/executor_v2/nested_loop_join.rs @@ -43,7 +43,7 @@ impl NestedLoopJoinExecutor { let ArrayImpl::Bool(a) = Evaluator::new(&self.condition).eval(&chunk)? else { panic!("join condition should return bool"); }; - yield chunk.filter(a.iter().map(|b| matches!(b, Some(true)))); + yield chunk.filter(a.true_array()); filter_builder.append(&a); } tokio::task::consume_budget().await; @@ -58,7 +58,7 @@ impl NestedLoopJoinExecutor { let ArrayImpl::Bool(a) = Evaluator::new(&self.condition).eval(&chunk)? else { panic!("join condition should return bool"); }; - yield chunk.filter(a.iter().map(|b| matches!(b, Some(true)))); + yield chunk.filter(a.true_array()); filter_builder.append(&a); } let filter = filter_builder.take(); diff --git a/src/executor_v2/order.rs b/src/executor_v2/order.rs index beb417cd5..6cc4153e3 100644 --- a/src/executor_v2/order.rs +++ b/src/executor_v2/order.rs @@ -51,7 +51,7 @@ impl OrderExecutor { /// /// The order is `false` for ascending and `true` for descending. fn cmp(row1: &RowRef, row2: &RowRef, orders: &[bool]) -> Ordering { - for ((v1, v2), desc) in row1.values().zip_eq(row2.values()).zip(orders) { + for ((v1, v2), desc) in row1.values().zip(row2.values()).zip(orders) { match v1.cmp(&v2) { Ordering::Equal => continue, o if *desc => return o.reverse(), diff --git a/src/executor_v2/top_n.rs b/src/executor_v2/top_n.rs index 4accd50aa..02f2412a6 100644 --- a/src/executor_v2/top_n.rs +++ b/src/executor_v2/top_n.rs @@ -65,7 +65,7 @@ impl TopNExecutor { /// /// The order is `false` for ascending and `true` for descending. fn cmp(row1: &Row, row2: &Row, orders: &[bool]) -> Ordering { - for ((v1, v2), desc) in row1.iter().zip_eq(row2.iter()).zip(orders) { + for ((v1, v2), desc) in row1.iter().zip(row2.iter()).zip(orders) { match v1.cmp(v2) { Ordering::Equal => continue, o if *desc => return o.reverse(), diff --git a/src/planner/rules/type_.rs b/src/planner/rules/type_.rs index 183879ae0..bae9619c9 100644 --- a/src/planner/rules/type_.rs +++ b/src/planner/rules/type_.rs @@ -68,7 +68,8 @@ pub fn analyze_type(enode: &Expr, x: impl Fn(&Id) -> Type, catalog: &RootCatalog }) } And([a, b]) | Or([a, b]) | Xor([a, b]) => merge(enode, [x(a)?, x(b)?], |[a, b]| { - (a == Kind::Bool && b == Kind::Bool).then_some(Kind::Bool) + (matches!(a, Kind::Bool | Kind::Null) && matches!(b, Kind::Bool | Kind::Null)) + .then_some(Kind::Bool) }), If([cond, then, else_]) => merge( enode, diff --git a/src/storage/chunk.rs b/src/storage/chunk.rs index 8d32d8f5e..16866c447 100644 --- a/src/storage/chunk.rs +++ b/src/storage/chunk.rs @@ -86,7 +86,7 @@ impl StorageChunk { Some(visibility) => DataChunk::from_iter( self.arrays .iter() - .map(|a| a.filter(visibility.iter().map(|x| *x))), + .map(|a| a.filter(&visibility.iter().map(|x| *x).collect::>())), ), None => DataChunk::from_iter(self.arrays), } diff --git a/src/storage/memory/iterator.rs b/src/storage/memory/iterator.rs index 06efa4fb4..112bf4a0d 100644 --- a/src/storage/memory/iterator.rs +++ b/src/storage/memory/iterator.rs @@ -59,11 +59,11 @@ impl InMemoryTxnIterator { .map(|idx| match idx { StorageColumnRef::Idx(idx) => selected_chunk .array_at(*idx as usize) - .filter(visibility.iter().map(|x| *x)), + .filter(&visibility.iter().map(|x| *x).collect::>()), StorageColumnRef::RowHandler => ArrayImpl::new_int64(I64Array::from_iter( batch_range.clone().map(|x| x as i64), )) - .filter(visibility.iter().map(|x| *x)), + .filter(&visibility.iter().map(|x| *x).collect::>()), }) .collect::() }; diff --git a/src/v1/executor/filter.rs b/src/v1/executor/filter.rs index b83bcfbc5..d0b9e39be 100644 --- a/src/v1/executor/filter.rs +++ b/src/v1/executor/filter.rs @@ -1,7 +1,7 @@ // Copyright 2022 RisingLight Project Authors. Licensed under Apache-2.0. use super::*; -use crate::array::{Array, ArrayImpl, DataChunk}; +use crate::array::{ArrayImpl, DataChunk}; use crate::v1::binder::BoundExpr; /// The executor of a filter operation. @@ -20,7 +20,7 @@ impl FilterExecutor { ArrayImpl::Bool(a) => a, _ => panic!("filters can only accept bool array"), }; - yield batch.filter(vis.iter().map(|b| matches!(b, Some(true)))); + yield batch.filter(vis.true_array()); } } } diff --git a/src/v1/executor/nested_loop_join.rs b/src/v1/executor/nested_loop_join.rs index 233478a11..14a41f939 100644 --- a/src/v1/executor/nested_loop_join.rs +++ b/src/v1/executor/nested_loop_join.rs @@ -51,7 +51,7 @@ impl NestedLoopJoinExecutor { // evaluate filter bitmap match self.condition.eval(&chunk)? { ArrayImpl::Bool(a) => { - yield chunk.filter(a.iter().map(|b| matches!(b, Some(true)))); + yield chunk.filter(a.true_array()); filter_builder.append(&ArrayImpl::Bool(a)) } _ => panic!("unsupported value from join condition"), @@ -67,7 +67,7 @@ impl NestedLoopJoinExecutor { if let Some(chunk) = builder.take() { match self.condition.eval(&chunk)? { ArrayImpl::Bool(a) => { - yield chunk.filter(a.iter().map(|b| matches!(b, Some(true)))); + yield chunk.filter(a.true_array()); filter_builder.append(&ArrayImpl::Bool(a)) } _ => panic!("unsupported value from join condition"), diff --git a/src/v1/function/binary.rs b/src/v1/function/binary.rs index 164269de8..de359c778 100644 --- a/src/v1/function/binary.rs +++ b/src/v1/function/binary.rs @@ -70,11 +70,7 @@ impl BinaryExecutor { let base = cnt; // auto vectoried buffer.iter_mut().enumerate().for_each(|(i, value)| { - *value = f( - i1a.get_unchecked(base + i), - i2a.get_unchecked(base + i), - &mut ctx, - ); + *value = f(i1a.get_raw(base + i), i2a.get_raw(base + i), &mut ctx); }); builder.extend_from_raw_data(&buffer); @@ -86,11 +82,7 @@ impl BinaryExecutor { let mut res_count = 0; buffer.iter_mut().enumerate().for_each(|(i, value)| unsafe { if *masks.get_unchecked(base + i) { - *value = f( - i1a.get_unchecked(base + i), - i2a.get_unchecked(base + i), - &mut ctx, - ); + *value = f(i1a.get_raw(base + i), i2a.get_raw(base + i), &mut ctx); res_count += 1; } }); @@ -107,11 +99,7 @@ impl BinaryExecutor { .enumerate() .for_each(|(i, (value, mask))| { if *mask { - *value = f( - i1a.get_unchecked(cnt + i), - i2a.get_unchecked(cnt + i), - &mut ctx, - ); + *value = f(i1a.get_raw(cnt + i), i2a.get_raw(cnt + i), &mut ctx); res_count += 1; } }); diff --git a/src/v1/function/unary.rs b/src/v1/function/unary.rs index c39a495bd..c4d66601c 100644 --- a/src/v1/function/unary.rs +++ b/src/v1/function/unary.rs @@ -60,7 +60,7 @@ impl UnaryExecutor { buffer .iter_mut() .enumerate() - .for_each(|(i, value)| *value = f(i1a.get_unchecked(base + i), &mut ctx)); + .for_each(|(i, value)| *value = f(i1a.get_raw(base + i), &mut ctx)); builder.extend_from_raw_data(&buffer); } else if zeros == y.len() { // all invalid @@ -70,7 +70,7 @@ impl UnaryExecutor { let mut res_count = 0; buffer.iter_mut().enumerate().for_each(|(i, value)| unsafe { if *masks.get_unchecked(base + i) { - *value = f(i1a.get_unchecked(base + i), &mut ctx); + *value = f(i1a.get_raw(base + i), &mut ctx); res_count += 1; } }); @@ -87,7 +87,7 @@ impl UnaryExecutor { .enumerate() .for_each(|(i, (value, mask))| { if *mask { - *value = f(i1a.get_unchecked(cnt + i), &mut ctx); + *value = f(i1a.get_raw(cnt + i), &mut ctx); res_count += 1; } });