diff --git a/arrow-arith/src/aggregate.rs b/arrow-arith/src/aggregate.rs index 642337530843..321e96e0c142 100644 --- a/arrow-arith/src/aggregate.rs +++ b/arrow-arith/src/aggregate.rs @@ -27,6 +27,21 @@ use arrow_schema::*; use std::borrow::BorrowMut; use std::ops::{BitAnd, BitOr, BitXor}; +/// An accumulator for primitive numeric values. +trait NumericAccumulator: Copy + Default { + /// Accumulate a non-null value. + fn accumulate(&mut self, value: T); + /// Accumulate a nullable values. + /// If `valid` is false the `value` should not affect the accumulator state. + fn accumulate_nullable(&mut self, value: T, valid: bool); + /// Merge another accumulator into this accumulator + fn merge(&mut self, other: Self); + /// Return the aggregated value. + fn finish(&mut self) -> T; +} + +/// Helper for branchlessly selecting either `a` or `b` based on the boolean `m`. +/// After verifying the generated assembly this can be a simple `if`. #[inline(always)] fn select(m: bool, a: T, b: T) -> T { if m { @@ -36,13 +51,6 @@ fn select(m: bool, a: T, b: T) -> T { } } -trait NumericAccumulator: Copy + Default { - fn accumulate(&mut self, value: T); - fn accumulate_nullable(&mut self, value: T, valid: bool); - fn merge(&mut self, other: Self); - fn finish(&mut self) -> T; -} - #[derive(Clone, Copy)] struct SumAccumulator { sum: T, @@ -195,6 +203,7 @@ fn aggregate_nonnull_simple>(valu fn aggregate_nonnull_lanes, const LANES: usize>( values: &[T], ) -> T { + // aggregate into multiple independent accumulators allows the compiler to use vector registers let mut acc = [A::default(); LANES]; let mut chunks = values.chunks_exact(LANES); chunks.borrow_mut().for_each(|chunk| { @@ -221,7 +230,9 @@ fn aggregate_nullable_lanes, cons assert!(LANES > 0 && 64 % LANES == 0); assert_eq!(values.len(), validity.len()); + // aggregate into multiple independent accumulators allows the compiler to use vector registers let mut acc = [A::default(); LANES]; + // we process 64 bits of validity at a time let mut values_chunks = values.chunks_exact(64); let validity_chunks = validity.inner().bit_chunks(); let mut validity_chunks_iter = validity_chunks.iter(); @@ -229,6 +240,7 @@ fn aggregate_nullable_lanes, cons values_chunks.borrow_mut().for_each(|chunk| { // Safety: we asserted that values and validity have the same length and trust the iterator impl let mut validity = unsafe { validity_chunks_iter.next().unwrap_unchecked() }; + // chunk further based on the number of vector lanes chunk.chunks_exact(LANES).for_each(|chunk| { aggregate_nullable_chunk(&mut acc, chunk[..LANES].try_into().unwrap(), validity); validity >>= LANES; @@ -258,8 +270,8 @@ fn aggregate_nullable_lanes, cons reduce_accumulators(acc).finish() } -// The preferred vector size in bytes for the target platform. -// Note that the avx512 target feature is still unstable and this also means it is not detected on stable rust. +/// The preferred vector size in bytes for the target platform. +/// Note that the avx512 target feature is still unstable and this also means it is not detected on stable rust. const PREFERRED_VECTOR_SIZE: usize = if cfg!(all(target_arch = "x86_64", target_feature = "avx512f")) { 64 @@ -269,9 +281,11 @@ const PREFERRED_VECTOR_SIZE: usize = 16 }; -// non-nullable aggregation requires fewer temporary registers so we can use more of them for accumulators +/// non-nullable aggregation requires fewer temporary registers so we can use more of them for accumulators const PREFERRED_VECTOR_SIZE_NON_NULL: usize = PREFERRED_VECTOR_SIZE * 2; +/// Generic aggregation for any primitive type. +/// Returns None if there are no non-null values in `array`. fn aggregate, A: NumericAccumulator>( array: &PrimitiveArray

, ) -> Option { @@ -281,15 +295,19 @@ fn aggregate, A: Numeric } let values = array.values().as_ref(); match array.nulls() { - Some(nulls) if null_count > 0 => match PREFERRED_VECTOR_SIZE / std::mem::size_of::() { - 64 => Some(aggregate_nullable_lanes::(values, nulls)), - 32 => Some(aggregate_nullable_lanes::(values, nulls)), - 16 => Some(aggregate_nullable_lanes::(values, nulls)), - 8 => Some(aggregate_nullable_lanes::(values, nulls)), - 4 => Some(aggregate_nullable_lanes::(values, nulls)), - 2 => Some(aggregate_nullable_lanes::(values, nulls)), - _ => Some(aggregate_nullable_lanes::(values, nulls)), - }, + Some(nulls) if null_count > 0 => { + // const generics depending on a generic type parameter are not supported + // so we have to match and call aggregate with the corresponding constant + match PREFERRED_VECTOR_SIZE / std::mem::size_of::() { + 64 => Some(aggregate_nullable_lanes::(values, nulls)), + 32 => Some(aggregate_nullable_lanes::(values, nulls)), + 16 => Some(aggregate_nullable_lanes::(values, nulls)), + 8 => Some(aggregate_nullable_lanes::(values, nulls)), + 4 => Some(aggregate_nullable_lanes::(values, nulls)), + 2 => Some(aggregate_nullable_lanes::(values, nulls)), + _ => Some(aggregate_nullable_lanes::(values, nulls)), + } + } _ => { let is_float = matches!( array.data_type(), @@ -303,7 +321,7 @@ fn aggregate, A: Numeric 8 => Some(aggregate_nonnull_lanes::(values)), 4 => Some(aggregate_nonnull_lanes::(values)), 2 => Some(aggregate_nonnull_lanes::(values)), - _ => Some(aggregate_nonnull_lanes::(values)), + _ => Some(aggregate_nonnull_simple::(values)), } } else { // for non-null integers its better to not chunk ourselves and instead diff --git a/arrow-array/src/arithmetic.rs b/arrow-array/src/arithmetic.rs index d6e2e04faa89..20e98aaa2579 100644 --- a/arrow-array/src/arithmetic.rs +++ b/arrow-array/src/arithmetic.rs @@ -55,9 +55,6 @@ pub trait ArrowNativeTypeOp: ArrowNativeType { /// which means that this value is a positive NaN. const MAX: Self; - /// The number of bytes occupied by this type - const BYTES: usize; - /// Checked addition operation fn add_checked(self, rhs: Self) -> Result; @@ -150,7 +147,6 @@ macro_rules! native_type_op { const ONE: Self = $one; const MIN: Self = $min; const MAX: Self = $max; - const BYTES: usize = std::mem::size_of::(); #[inline] fn add_checked(self, rhs: Self) -> Result { @@ -288,47 +284,6 @@ native_type_op!(u32); native_type_op!(u64); native_type_op!(i256, i256::ZERO, i256::ONE, i256::MIN, i256::MAX); -/* -trait ToTotalOrder { - type TotalOrder; - fn to_total_order(self) -> Self::TotalOrder; -} - -impl ToTotalOrder for f64 { - type TotalOrder = u64; - - #[inline] - fn to_total_order(self) -> Self::TotalOrder { - // reading via integer pointer instead of calling to_bits seems to avoid a move from xmm to gp reg - // let bits = unsafe { (self as *const f64 as *const u64).read() }; - let bits = self.to_bits(); - (bits ^ ((bits as i64 >> 63) as u64 >> 1)) ^ (1 << 63) - } -} - -impl ToTotalOrder for f32 { - type TotalOrder = u32; - - #[inline] - fn to_total_order(self) -> Self::TotalOrder { - // reading via integer pointer instead of calling to_bits seems to avoid a move from xmm to gp reg - // let bits = unsafe { (self as *const f32 as *const u32).read() }; - let bits = self.to_bits(); - (bits ^ ((bits as i32 >> 31) as u32 >> 1)) ^ (1 << 31) - } -} - -impl ToTotalOrder for f16 { - type TotalOrder = u16; - - #[inline] - fn to_total_order(self) -> Self::TotalOrder { - let bits = self.to_bits(); - (bits ^ ((bits as i16 >> 15) as u16 >> 1)) ^ (1 << 15) - } -} -*/ - macro_rules! native_type_float_op { ($t:tt, $zero:expr, $one:expr, $min:expr, $max:expr, $bits:ty) => { impl ArrowNativeTypeOp for $t { @@ -336,7 +291,6 @@ macro_rules! native_type_float_op { const ONE: Self = $one; const MIN: Self = $min; const MAX: Self = $max; - const BYTES: usize = std::mem::size_of::(); #[inline] fn add_checked(self, rhs: Self) -> Result {