diff --git a/arrow-arith/src/aggregate.rs b/arrow-arith/src/aggregate.rs index 20ff0711d735..0dabaa50f5f6 100644 --- a/arrow-arith/src/aggregate.rs +++ b/arrow-arith/src/aggregate.rs @@ -20,317 +20,39 @@ use arrow_array::cast::*; use arrow_array::iterator::ArrayIter; use arrow_array::*; -use arrow_buffer::{ArrowNativeType, NullBuffer}; +use arrow_buffer::ArrowNativeType; use arrow_data::bit_iterator::try_for_each_valid_idx; use arrow_schema::ArrowError; use arrow_schema::*; -use std::borrow::BorrowMut; use std::ops::{BitAnd, BitOr, BitXor}; -/// An accumulator for primitive numeric values. -trait NumericAccumulator: Copy + Default { - /// Accumulate a non-null value. - fn accumulate(&mut self, value: T); - /// Accumulate a nullable values. - /// If `valid` is false the `value` should not affect the accumulator state. - fn accumulate_nullable(&mut self, value: T, valid: bool); - /// Merge another accumulator into this accumulator - fn merge(&mut self, other: Self); - /// Return the aggregated value. - fn finish(&mut self) -> T; +/// Generic test for NaN, the optimizer should be able to remove this for integer types. +#[inline] +pub(crate) fn is_nan(a: T) -> bool { + #[allow(clippy::eq_op)] + !(a == a) } -/// Helper for branchlessly selecting either `a` or `b` based on the boolean `m`. -/// After verifying the generated assembly this can be a simple `if`. -#[inline(always)] -fn select(m: bool, a: T, b: T) -> T { - if m { - a - } else { - b - } -} - -#[derive(Clone, Copy)] -struct SumAccumulator { - sum: T, -} - -impl Default for SumAccumulator { - fn default() -> Self { - Self { sum: T::ZERO } - } -} - -impl NumericAccumulator for SumAccumulator { - fn accumulate(&mut self, value: T) { - self.sum = self.sum.add_wrapping(value); - } - - fn accumulate_nullable(&mut self, value: T, valid: bool) { - let sum = self.sum; - self.sum = select(valid, sum.add_wrapping(value), sum) - } - - fn merge(&mut self, other: Self) { - self.sum = self.sum.add_wrapping(other.sum); - } - - fn finish(&mut self) -> T { - self.sum - } -} - -#[derive(Clone, Copy)] -struct MinAccumulator { - min: T, -} - -impl Default for MinAccumulator { - fn default() -> Self { - Self { - min: T::MAX_TOTAL_ORDER, - } - } -} - -impl NumericAccumulator for MinAccumulator { - fn accumulate(&mut self, value: T) { - let min = self.min; - self.min = select(value.is_lt(min), value, min); - } - - fn accumulate_nullable(&mut self, value: T, valid: bool) { - let min = self.min; - let is_lt = valid & value.is_lt(min); - self.min = select(is_lt, value, min); - } - - fn merge(&mut self, other: Self) { - self.accumulate(other.min) - } - - fn finish(&mut self) -> T { - self.min - } -} - -#[derive(Clone, Copy)] -struct MaxAccumulator { - max: T, -} - -impl Default for MaxAccumulator { - fn default() -> Self { - Self { - max: T::MIN_TOTAL_ORDER, - } - } -} - -impl NumericAccumulator for MaxAccumulator { - fn accumulate(&mut self, value: T) { - let max = self.max; - self.max = select(value.is_gt(max), value, max); - } - - fn accumulate_nullable(&mut self, value: T, valid: bool) { - let max = self.max; - let is_gt = value.is_gt(max) & valid; - self.max = select(is_gt, value, max); - } - - fn merge(&mut self, other: Self) { - self.accumulate(other.max) - } - - fn finish(&mut self) -> T { - self.max - } -} - -fn reduce_accumulators, const LANES: usize>( - mut acc: [A; LANES], -) -> A { - assert!(LANES > 0 && LANES.is_power_of_two()); - let mut len = LANES; - - // attempt at tree reduction, unfortunately llvm does not fully recognize this pattern, - // but the generated code is still a little faster than purely sequential reduction for floats. - while len >= 2 { - let mid = len / 2; - let (h, t) = acc[..len].split_at_mut(mid); - - for i in 0..mid { - h[i].merge(t[i]); - } - len /= 2; - } - acc[0] -} - -#[inline(always)] -fn aggregate_nonnull_chunk, const LANES: usize>( - acc: &mut [A; LANES], - values: &[T; LANES], -) { - for i in 0..LANES { - acc[i].accumulate(values[i]); - } -} - -#[inline(always)] -fn aggregate_nullable_chunk, const LANES: usize>( - acc: &mut [A; LANES], - values: &[T; LANES], - validity: u64, -) { - let mut bit = 1; - for i in 0..LANES { - acc[i].accumulate_nullable(values[i], (validity & bit) != 0); - bit <<= 1; - } -} - -fn aggregate_nonnull_simple>(values: &[T]) -> T { - return values - .iter() - .copied() - .fold(A::default(), |mut a, b| { - a.accumulate(b); - a - }) - .finish(); -} - -#[inline(never)] -fn aggregate_nonnull_lanes, const LANES: usize>( - values: &[T], -) -> T { - // aggregating into multiple independent accumulators allows the compiler to use vector registers - // with a single accumulator the compiler would not be allowed to reorder floating point addition - let mut acc = [A::default(); LANES]; - let mut chunks = values.chunks_exact(LANES); - chunks.borrow_mut().for_each(|chunk| { - aggregate_nonnull_chunk(&mut acc, chunk[..LANES].try_into().unwrap()); - }); - - let remainder = chunks.remainder(); - for i in 0..remainder.len() { - acc[i].accumulate(remainder[i]); - } - - reduce_accumulators(acc).finish() -} - -#[inline(never)] -fn aggregate_nullable_lanes, const LANES: usize>( - values: &[T], - validity: &NullBuffer, -) -> T { - assert!(LANES > 0 && 64 % LANES == 0); - assert_eq!(values.len(), validity.len()); - - // aggregating into multiple independent accumulators allows the compiler to use vector registers - let mut acc = [A::default(); LANES]; - // we process 64 bits of validity at a time - let mut values_chunks = values.chunks_exact(64); - let validity_chunks = validity.inner().bit_chunks(); - let mut validity_chunks_iter = validity_chunks.iter(); - - values_chunks.borrow_mut().for_each(|chunk| { - // Safety: we asserted that values and validity have the same length and trust the iterator impl - let mut validity = unsafe { validity_chunks_iter.next().unwrap_unchecked() }; - // chunk further based on the number of vector lanes - chunk.chunks_exact(LANES).for_each(|chunk| { - aggregate_nullable_chunk(&mut acc, chunk[..LANES].try_into().unwrap(), validity); - validity >>= LANES; - }); - }); - - let remainder = values_chunks.remainder(); - if !remainder.is_empty() { - let mut validity = validity_chunks.remainder_bits(); - - let mut remainder_chunks = remainder.chunks_exact(LANES); - remainder_chunks.borrow_mut().for_each(|chunk| { - aggregate_nullable_chunk(&mut acc, chunk[..LANES].try_into().unwrap(), validity); - validity >>= LANES; - }); - - let remainder = remainder_chunks.remainder(); - if !remainder.is_empty() { - let mut bit = 1; - for i in 0..remainder.len() { - acc[i].accumulate_nullable(remainder[i], (validity & bit) != 0); - bit <<= 1; - } - } - } - - reduce_accumulators(acc).finish() +/// Returns the minimum value in the array, according to the natural order. +/// For floating point arrays any NaN values are considered to be greater than any other non-null value +#[cfg(not(feature = "simd"))] +pub fn min(array: &PrimitiveArray) -> Option +where + T: ArrowNumericType, + T::Native: ArrowNativeType, +{ + min_max_helper::(array, |a, b| (is_nan(*a) & !is_nan(*b)) || a > b) } -/// The preferred vector size in bytes for the target platform. -/// Note that the avx512 target feature is still unstable and this also means it is not detected on stable rust. -const PREFERRED_VECTOR_SIZE: usize = - if cfg!(all(target_arch = "x86_64", target_feature = "avx512f")) { - 64 - } else if cfg!(all(target_arch = "x86_64", target_feature = "avx")) { - 32 - } else { - 16 - }; - -/// non-nullable aggregation requires fewer temporary registers so we can use more of them for accumulators -const PREFERRED_VECTOR_SIZE_NON_NULL: usize = PREFERRED_VECTOR_SIZE * 2; - -/// Generic aggregation for any primitive type. -/// Returns None if there are no non-null values in `array`. -fn aggregate, A: NumericAccumulator>( - array: &PrimitiveArray

, -) -> Option { - let null_count = array.null_count(); - if null_count == array.len() { - return None; - } - let values = array.values().as_ref(); - match array.nulls() { - Some(nulls) if null_count > 0 => { - // const generics depending on a generic type parameter are not supported - // so we have to match and call aggregate with the corresponding constant - match PREFERRED_VECTOR_SIZE / std::mem::size_of::() { - 64 => Some(aggregate_nullable_lanes::(values, nulls)), - 32 => Some(aggregate_nullable_lanes::(values, nulls)), - 16 => Some(aggregate_nullable_lanes::(values, nulls)), - 8 => Some(aggregate_nullable_lanes::(values, nulls)), - 4 => Some(aggregate_nullable_lanes::(values, nulls)), - 2 => Some(aggregate_nullable_lanes::(values, nulls)), - _ => Some(aggregate_nullable_lanes::(values, nulls)), - } - } - _ => { - let is_float = matches!( - array.data_type(), - DataType::Float16 | DataType::Float32 | DataType::Float64 - ); - if is_float { - match PREFERRED_VECTOR_SIZE_NON_NULL / std::mem::size_of::() { - 64 => Some(aggregate_nonnull_lanes::(values)), - 32 => Some(aggregate_nonnull_lanes::(values)), - 16 => Some(aggregate_nonnull_lanes::(values)), - 8 => Some(aggregate_nonnull_lanes::(values)), - 4 => Some(aggregate_nonnull_lanes::(values)), - 2 => Some(aggregate_nonnull_lanes::(values)), - _ => Some(aggregate_nonnull_simple::(values)), - } - } else { - // for non-null integers its better to not chunk ourselves and instead - // let llvm fully handle loop unrolling and vectorization - Some(aggregate_nonnull_simple::(values)) - } - } - } +/// Returns the maximum value in the array, according to the natural order. +/// For floating point arrays any NaN values are considered to be greater than any other non-null value +#[cfg(not(feature = "simd"))] +pub fn max(array: &PrimitiveArray) -> Option +where + T: ArrowNumericType, + T::Native: ArrowNativeType, +{ + min_max_helper::(array, |a, b| (!is_nan(*a) & is_nan(*b)) || a < b) } /// Returns the minimum value in the boolean array. @@ -508,7 +230,7 @@ where T: ArrowNumericType, T::Native: ArrowNativeType, { - min_max_array_helper::(array, |a, b| a.is_gt(*b), min) + min_max_array_helper::(array, |a, b| (is_nan(*a) & !is_nan(*b)) || a > b, min) } /// Returns the max of values in the array of `ArrowNumericType` type, or dictionary @@ -516,9 +238,9 @@ where pub fn max_array>(array: A) -> Option where T: ArrowNumericType, - T::Native: ArrowNativeTypeOp, + T::Native: ArrowNativeType, { - min_max_array_helper::(array, |a, b| a.is_lt(*b), max) + min_max_array_helper::(array, |a, b| (!is_nan(*a) & is_nan(*b)) || a < b, max) } fn min_max_array_helper, F, M>( @@ -537,6 +259,66 @@ where } } +/// Returns the sum of values in the primitive array. +/// +/// Returns `None` if the array is empty or only contains null values. +/// +/// This doesn't detect overflow. Once overflowing, the result will wrap around. +/// For an overflow-checking variant, use `sum_checked` instead. +#[cfg(not(feature = "simd"))] +pub fn sum(array: &PrimitiveArray) -> Option +where + T: ArrowNumericType, + T::Native: ArrowNativeTypeOp, +{ + let null_count = array.null_count(); + + if null_count == array.len() { + return None; + } + + let data: &[T::Native] = array.values(); + + match array.nulls() { + None => { + let sum = data.iter().fold(T::default_value(), |accumulator, value| { + accumulator.add_wrapping(*value) + }); + + Some(sum) + } + Some(nulls) => { + let mut sum = T::default_value(); + let data_chunks = data.chunks_exact(64); + let remainder = data_chunks.remainder(); + + let bit_chunks = nulls.inner().bit_chunks(); + data_chunks + .zip(bit_chunks.iter()) + .for_each(|(chunk, mask)| { + // index_mask has value 1 << i in the loop + let mut index_mask = 1; + chunk.iter().for_each(|value| { + if (mask & index_mask) != 0 { + sum = sum.add_wrapping(*value); + } + index_mask <<= 1; + }); + }); + + let remainder_bits = bit_chunks.remainder_bits(); + + remainder.iter().enumerate().for_each(|(i, value)| { + if remainder_bits & (1 << i) != 0 { + sum = sum.add_wrapping(*value); + } + }); + + Some(sum) + } + } +} + macro_rules! bit_operation { ($NAME:ident, $OP:ident, $NATIVE:ident, $DEFAULT:expr, $DOC:expr) => { #[doc = $DOC] @@ -694,35 +476,369 @@ where } } +#[cfg(feature = "simd")] +mod simd { + use super::is_nan; + use arrow_array::*; + use std::marker::PhantomData; + + pub(super) trait SimdAggregate { + type ScalarAccumulator; + type SimdAccumulator; + + /// Returns the accumulator for aggregating scalar values + fn init_accumulator_scalar() -> Self::ScalarAccumulator; + + /// Returns the accumulator for aggregating simd chunks of values + fn init_accumulator_chunk() -> Self::SimdAccumulator; + + /// Updates the accumulator with the values of one chunk + fn accumulate_chunk_non_null(accumulator: &mut Self::SimdAccumulator, chunk: T::Simd); + + /// Updates the accumulator with the values of one chunk according to the given vector mask + fn accumulate_chunk_nullable( + accumulator: &mut Self::SimdAccumulator, + chunk: T::Simd, + mask: T::SimdMask, + ); + + /// Updates the accumulator with one value + fn accumulate_scalar(accumulator: &mut Self::ScalarAccumulator, value: T::Native); + + /// Reduces the vector lanes of the simd accumulator and the scalar accumulator to a single value + fn reduce( + simd_accumulator: Self::SimdAccumulator, + scalar_accumulator: Self::ScalarAccumulator, + ) -> Option; + } + + pub(super) struct SumAggregate { + phantom: PhantomData, + } + + impl SimdAggregate for SumAggregate + where + T::Native: ArrowNativeTypeOp, + { + type ScalarAccumulator = T::Native; + type SimdAccumulator = T::Simd; + + fn init_accumulator_scalar() -> Self::ScalarAccumulator { + T::default_value() + } + + fn init_accumulator_chunk() -> Self::SimdAccumulator { + T::init(Self::init_accumulator_scalar()) + } + + fn accumulate_chunk_non_null(accumulator: &mut T::Simd, chunk: T::Simd) { + *accumulator = *accumulator + chunk; + } + + fn accumulate_chunk_nullable( + accumulator: &mut T::Simd, + chunk: T::Simd, + vecmask: T::SimdMask, + ) { + let zero = T::init(T::default_value()); + let blended = T::mask_select(vecmask, chunk, zero); + + *accumulator = *accumulator + blended; + } + + fn accumulate_scalar(accumulator: &mut T::Native, value: T::Native) { + *accumulator = accumulator.add_wrapping(value) + } + + fn reduce( + simd_accumulator: Self::SimdAccumulator, + scalar_accumulator: Self::ScalarAccumulator, + ) -> Option { + // we can't use T::lanes() as the slice len because it is not const, + // instead always reserve the maximum number of lanes + let mut tmp = [T::default_value(); 64]; + let slice = &mut tmp[0..T::lanes()]; + T::write(simd_accumulator, slice); + + let mut reduced = Self::init_accumulator_scalar(); + slice + .iter() + .for_each(|value| Self::accumulate_scalar(&mut reduced, *value)); + + Self::accumulate_scalar(&mut reduced, scalar_accumulator); + + // result can not be None because we checked earlier for the null count + Some(reduced) + } + } + + pub(super) struct MinAggregate { + phantom: PhantomData, + } + + impl SimdAggregate for MinAggregate + where + T::Native: PartialOrd, + { + type ScalarAccumulator = (T::Native, bool); + type SimdAccumulator = (T::Simd, T::SimdMask); + + fn init_accumulator_scalar() -> Self::ScalarAccumulator { + (T::default_value(), false) + } + + fn init_accumulator_chunk() -> Self::SimdAccumulator { + (T::init(T::default_value()), T::mask_init(false)) + } + + fn accumulate_chunk_non_null(accumulator: &mut Self::SimdAccumulator, chunk: T::Simd) { + let acc_is_nan = !T::eq(accumulator.0, accumulator.0); + let is_lt = acc_is_nan | T::lt(chunk, accumulator.0); + let first_or_lt = !accumulator.1 | is_lt; + + accumulator.0 = T::mask_select(first_or_lt, chunk, accumulator.0); + accumulator.1 = T::mask_init(true); + } + + fn accumulate_chunk_nullable( + accumulator: &mut Self::SimdAccumulator, + chunk: T::Simd, + vecmask: T::SimdMask, + ) { + let acc_is_nan = !T::eq(accumulator.0, accumulator.0); + let is_lt = vecmask & (acc_is_nan | T::lt(chunk, accumulator.0)); + let first_or_lt = !accumulator.1 | is_lt; + + accumulator.0 = T::mask_select(first_or_lt, chunk, accumulator.0); + accumulator.1 |= vecmask; + } + + fn accumulate_scalar(accumulator: &mut Self::ScalarAccumulator, value: T::Native) { + if !accumulator.1 { + accumulator.0 = value; + } else { + let acc_is_nan = is_nan(accumulator.0); + if acc_is_nan || value < accumulator.0 { + accumulator.0 = value + } + } + accumulator.1 = true + } + + fn reduce( + simd_accumulator: Self::SimdAccumulator, + scalar_accumulator: Self::ScalarAccumulator, + ) -> Option { + // we can't use T::lanes() as the slice len because it is not const, + // instead always reserve the maximum number of lanes + let mut tmp = [T::default_value(); 64]; + let slice = &mut tmp[0..T::lanes()]; + T::write(simd_accumulator.0, slice); + + let mut reduced = Self::init_accumulator_scalar(); + slice + .iter() + .enumerate() + .filter(|(i, _value)| T::mask_get(&simd_accumulator.1, *i)) + .for_each(|(_i, value)| Self::accumulate_scalar(&mut reduced, *value)); + + if scalar_accumulator.1 { + Self::accumulate_scalar(&mut reduced, scalar_accumulator.0); + } + + if reduced.1 { + Some(reduced.0) + } else { + None + } + } + } + + pub(super) struct MaxAggregate { + phantom: PhantomData, + } + + impl SimdAggregate for MaxAggregate + where + T::Native: PartialOrd, + { + type ScalarAccumulator = (T::Native, bool); + type SimdAccumulator = (T::Simd, T::SimdMask); + + fn init_accumulator_scalar() -> Self::ScalarAccumulator { + (T::default_value(), false) + } + + fn init_accumulator_chunk() -> Self::SimdAccumulator { + (T::init(T::default_value()), T::mask_init(false)) + } + + fn accumulate_chunk_non_null(accumulator: &mut Self::SimdAccumulator, chunk: T::Simd) { + let chunk_is_nan = !T::eq(chunk, chunk); + let is_gt = chunk_is_nan | T::gt(chunk, accumulator.0); + let first_or_gt = !accumulator.1 | is_gt; + + accumulator.0 = T::mask_select(first_or_gt, chunk, accumulator.0); + accumulator.1 = T::mask_init(true); + } + + fn accumulate_chunk_nullable( + accumulator: &mut Self::SimdAccumulator, + chunk: T::Simd, + vecmask: T::SimdMask, + ) { + let chunk_is_nan = !T::eq(chunk, chunk); + let is_gt = vecmask & (chunk_is_nan | T::gt(chunk, accumulator.0)); + let first_or_gt = !accumulator.1 | is_gt; + + accumulator.0 = T::mask_select(first_or_gt, chunk, accumulator.0); + accumulator.1 |= vecmask; + } + + fn accumulate_scalar(accumulator: &mut Self::ScalarAccumulator, value: T::Native) { + if !accumulator.1 { + accumulator.0 = value; + } else { + let value_is_nan = is_nan(value); + if value_is_nan || value > accumulator.0 { + accumulator.0 = value + } + } + accumulator.1 = true; + } + + fn reduce( + simd_accumulator: Self::SimdAccumulator, + scalar_accumulator: Self::ScalarAccumulator, + ) -> Option { + // we can't use T::lanes() as the slice len because it is not const, + // instead always reserve the maximum number of lanes + let mut tmp = [T::default_value(); 64]; + let slice = &mut tmp[0..T::lanes()]; + T::write(simd_accumulator.0, slice); + + let mut reduced = Self::init_accumulator_scalar(); + slice + .iter() + .enumerate() + .filter(|(i, _value)| T::mask_get(&simd_accumulator.1, *i)) + .for_each(|(_i, value)| Self::accumulate_scalar(&mut reduced, *value)); + + if scalar_accumulator.1 { + Self::accumulate_scalar(&mut reduced, scalar_accumulator.0); + } + + if reduced.1 { + Some(reduced.0) + } else { + None + } + } + } + + pub(super) fn simd_aggregation>( + array: &PrimitiveArray, + ) -> Option { + let null_count = array.null_count(); + + if null_count == array.len() { + return None; + } + + let data: &[T::Native] = array.values(); + + let mut chunk_acc = A::init_accumulator_chunk(); + let mut rem_acc = A::init_accumulator_scalar(); + + match array.nulls() { + None => { + let data_chunks = data.chunks_exact(64); + let remainder = data_chunks.remainder(); + + data_chunks.for_each(|chunk| { + chunk.chunks_exact(T::lanes()).for_each(|chunk| { + let chunk = T::load(&chunk); + A::accumulate_chunk_non_null(&mut chunk_acc, chunk); + }); + }); + + remainder.iter().for_each(|value| { + A::accumulate_scalar(&mut rem_acc, *value); + }); + } + Some(nulls) => { + // process data in chunks of 64 elements since we also get 64 bits of validity information at a time + let data_chunks = data.chunks_exact(64); + let remainder = data_chunks.remainder(); + + let bit_chunks = nulls.inner().bit_chunks(); + let remainder_bits = bit_chunks.remainder_bits(); + + data_chunks.zip(bit_chunks).for_each(|(chunk, mut mask)| { + // split chunks further into slices corresponding to the vector length + // the compiler is able to unroll this inner loop and remove bounds checks + // since the outer chunk size (64) is always a multiple of the number of lanes + chunk.chunks_exact(T::lanes()).for_each(|chunk| { + let vecmask = T::mask_from_u64(mask); + let chunk = T::load(&chunk); + + A::accumulate_chunk_nullable(&mut chunk_acc, chunk, vecmask); + + // skip the shift and avoid overflow for u8 type, which uses 64 lanes. + mask >>= T::lanes() % 64; + }); + }); + + remainder.iter().enumerate().for_each(|(i, value)| { + if remainder_bits & (1 << i) != 0 { + A::accumulate_scalar(&mut rem_acc, *value) + } + }); + } + } + + A::reduce(chunk_acc, rem_acc) + } +} + /// Returns the sum of values in the primitive array. /// /// Returns `None` if the array is empty or only contains null values. /// /// This doesn't detect overflow in release mode by default. Once overflowing, the result will /// wrap around. For an overflow-checking variant, use `sum_checked` instead. +#[cfg(feature = "simd")] pub fn sum(array: &PrimitiveArray) -> Option where T::Native: ArrowNativeTypeOp, { - aggregate::>(array) + use simd::*; + + simd::simd_aggregation::>(&array) } +#[cfg(feature = "simd")] /// Returns the minimum value in the array, according to the natural order. /// For floating point arrays any NaN values are considered to be greater than any other non-null value pub fn min(array: &PrimitiveArray) -> Option where T::Native: PartialOrd, { - aggregate::>(array) + use simd::*; + + simd::simd_aggregation::>(&array) } +#[cfg(feature = "simd")] /// Returns the maximum value in the array, according to the natural order. /// For floating point arrays any NaN values are considered to be greater than any other non-null value pub fn max(array: &PrimitiveArray) -> Option where T::Native: PartialOrd, { - aggregate::>(array) + use simd::*; + + simd::simd_aggregation::>(&array) } #[cfg(test)] @@ -756,41 +872,8 @@ mod tests { assert_eq!(None, sum(&a)); } - #[test] - fn test_primitive_array_sum_large_float_64() { - let c = Float64Array::new((1..=100).map(|x| x as f64).collect(), None); - assert_eq!(Some((1..=100).sum::() as f64), sum(&c)); - - // create an array that actually has non-zero values at the invalid indices - let validity = NullBuffer::new((1..=100).map(|x| x % 3 == 0).collect()); - let c = Float64Array::new((1..=100).map(|x| x as f64).collect(), Some(validity)); - - assert_eq!( - Some((1..=100).filter(|i| i % 3 == 0).sum::() as f64), - sum(&c) - ); - } - - #[test] - fn test_primitive_array_sum_large_float_32() { - let c = Float32Array::new((1..=100).map(|x| x as f32).collect(), None); - assert_eq!(Some((1..=100).sum::() as f32), sum(&c)); - - // create an array that actually has non-zero values at the invalid indices - let validity = NullBuffer::new((1..=100).map(|x| x % 3 == 0).collect()); - let c = Float32Array::new((1..=100).map(|x| x as f32).collect(), Some(validity)); - - assert_eq!( - Some((1..=100).filter(|i| i % 3 == 0).sum::() as f32), - sum(&c) - ); - } - #[test] fn test_primitive_array_sum_large_64() { - let c = Int64Array::new((1..=100).collect(), None); - assert_eq!(Some((1..=100).sum()), sum(&c)); - // create an array that actually has non-zero values at the invalid indices let validity = NullBuffer::new((1..=100).map(|x| x % 3 == 0).collect()); let c = Int64Array::new((1..=100).collect(), Some(validity)); @@ -800,9 +883,6 @@ mod tests { #[test] fn test_primitive_array_sum_large_32() { - let c = Int32Array::new((1..=100).collect(), None); - assert_eq!(Some((1..=100).sum()), sum(&c)); - // create an array that actually has non-zero values at the invalid indices let validity = NullBuffer::new((1..=100).map(|x| x % 3 == 0).collect()); let c = Int32Array::new((1..=100).collect(), Some(validity)); @@ -811,9 +891,6 @@ mod tests { #[test] fn test_primitive_array_sum_large_16() { - let c = Int16Array::new((1..=100).collect(), None); - assert_eq!(Some((1..=100).sum()), sum(&c)); - // create an array that actually has non-zero values at the invalid indices let validity = NullBuffer::new((1..=100).map(|x| x % 3 == 0).collect()); let c = Int16Array::new((1..=100).collect(), Some(validity)); @@ -822,23 +899,11 @@ mod tests { #[test] fn test_primitive_array_sum_large_8() { - let c = UInt8Array::new((1..=100).collect(), None); - assert_eq!( - Some((1..=100).fold(0_u8, |a, x| a.wrapping_add(x))), - sum(&c) - ); - + // include fewer values than other large tests so the result does not overflow the u8 // create an array that actually has non-zero values at the invalid indices - let validity = NullBuffer::new((1..=100).map(|x| x % 3 == 0).collect()); + let validity = NullBuffer::new((1..=100).map(|x| x % 33 == 0).collect()); let c = UInt8Array::new((1..=100).collect(), Some(validity)); - assert_eq!( - Some( - (1..=100) - .filter(|i| i % 3 == 0) - .fold(0_u8, |a, x| a.wrapping_add(x)) - ), - sum(&c) - ); + assert_eq!(Some((1..=100).filter(|i| i % 33 == 0).sum()), sum(&c)); } #[test] @@ -1038,19 +1103,6 @@ mod tests { assert!(min(&a).unwrap().is_nan()); } - #[test] - fn test_primitive_min_max_float_negative_nan() { - let a: Float64Array = - Float64Array::from(vec![f64::NEG_INFINITY, f64::NAN, f64::INFINITY, -f64::NAN]); - let max = max(&a).unwrap(); - let min = min(&a).unwrap(); - assert!(max.is_nan()); - assert!(max.is_sign_positive()); - - assert!(min.is_nan()); - assert!(min.is_sign_negative()); - } - #[test] fn test_primitive_min_max_float_first_nan_nonnull() { let a: Float64Array = (0..100) @@ -1403,6 +1455,7 @@ mod tests { } #[test] + #[cfg(not(feature = "simd"))] fn test_sum_overflow() { let a = Int32Array::from(vec![i32::MAX, 1]); diff --git a/arrow-array/src/arithmetic.rs b/arrow-array/src/arithmetic.rs index 590536190309..c9be39d44144 100644 --- a/arrow-array/src/arithmetic.rs +++ b/arrow-array/src/arithmetic.rs @@ -45,16 +45,6 @@ pub trait ArrowNativeTypeOp: ArrowNativeType { /// The multiplicative identity const ONE: Self; - /// The minimum value and identity for the `max` aggregation. - /// Note that the aggregation uses the total order predicate for floating point values, - /// which means that this value is a negative NaN. - const MIN_TOTAL_ORDER: Self; - - /// The maximum value and identity for the `min` aggregation. - /// Note that the aggregation uses the total order predicate for floating point values, - /// which means that this value is a positive NaN. - const MAX_TOTAL_ORDER: Self; - /// Checked addition operation fn add_checked(self, rhs: Self) -> Result; @@ -139,14 +129,12 @@ pub trait ArrowNativeTypeOp: ArrowNativeType { macro_rules! native_type_op { ($t:tt) => { - native_type_op!($t, 0, 1, $t::MIN, $t::MAX); + native_type_op!($t, 0, 1); }; - ($t:tt, $zero:expr, $one: expr, $min: expr, $max: expr) => { + ($t:tt, $zero:expr, $one: expr) => { impl ArrowNativeTypeOp for $t { const ZERO: Self = $zero; const ONE: Self = $one; - const MIN_TOTAL_ORDER: Self = $min; - const MAX_TOTAL_ORDER: Self = $max; #[inline] fn add_checked(self, rhs: Self) -> Result { @@ -282,15 +270,13 @@ native_type_op!(u8); native_type_op!(u16); native_type_op!(u32); native_type_op!(u64); -native_type_op!(i256, i256::ZERO, i256::ONE, i256::MIN, i256::MAX); +native_type_op!(i256, i256::ZERO, i256::ONE); macro_rules! native_type_float_op { - ($t:tt, $zero:expr, $one:expr, $min:expr, $max:expr) => { + ($t:tt, $zero:expr, $one:expr) => { impl ArrowNativeTypeOp for $t { const ZERO: Self = $zero; const ONE: Self = $one; - const MIN_TOTAL_ORDER: Self = $min; - const MAX_TOTAL_ORDER: Self = $max; #[inline] fn add_checked(self, rhs: Self) -> Result { @@ -391,30 +377,9 @@ macro_rules! native_type_float_op { }; } -// the smallest/largest bit patterns for floating point numbers are NaN, but differ from the canonical NAN constants. -// See test_float_total_order_min_max for details. -native_type_float_op!( - f16, - f16::ZERO, - f16::ONE, - f16::from_bits(-1 as _), - f16::from_bits(i16::MAX as _) -); -// from_bits is not yet stable as const fn, see https://github.com/rust-lang/rust/issues/72447 -native_type_float_op!( - f32, - 0., - 1., - unsafe { std::mem::transmute(-1_i32) }, - unsafe { std::mem::transmute(i32::MAX) } -); -native_type_float_op!( - f64, - 0., - 1., - unsafe { std::mem::transmute(-1_i64) }, - unsafe { std::mem::transmute(i64::MAX) } -); +native_type_float_op!(f16, f16::ZERO, f16::ONE); +native_type_float_op!(f32, 0., 1.); +native_type_float_op!(f64, 0., 1.); #[cfg(test)] mod tests { @@ -815,40 +780,4 @@ mod tests { assert_eq!(8.0_f32.pow_checked(2_u32).unwrap(), 64_f32); assert_eq!(8.0_f64.pow_checked(2_u32).unwrap(), 64_f64); } - - #[test] - fn test_float_total_order_min_max() { - assert!(::MIN_TOTAL_ORDER.is_lt(f64::NEG_INFINITY)); - assert!(::MAX_TOTAL_ORDER.is_gt(f64::INFINITY)); - - assert!(::MIN_TOTAL_ORDER.is_nan()); - assert!(::MIN_TOTAL_ORDER.is_sign_negative()); - assert!(::MIN_TOTAL_ORDER.is_lt(-f64::NAN)); - - assert!(::MAX_TOTAL_ORDER.is_nan()); - assert!(::MAX_TOTAL_ORDER.is_sign_positive()); - assert!(::MAX_TOTAL_ORDER.is_gt(f64::NAN)); - - assert!(::MIN_TOTAL_ORDER.is_lt(f32::NEG_INFINITY)); - assert!(::MAX_TOTAL_ORDER.is_gt(f32::INFINITY)); - - assert!(::MIN_TOTAL_ORDER.is_nan()); - assert!(::MIN_TOTAL_ORDER.is_sign_negative()); - assert!(::MIN_TOTAL_ORDER.is_lt(-f32::NAN)); - - assert!(::MAX_TOTAL_ORDER.is_nan()); - assert!(::MAX_TOTAL_ORDER.is_sign_positive()); - assert!(::MAX_TOTAL_ORDER.is_gt(f32::NAN)); - - assert!(::MIN_TOTAL_ORDER.is_lt(f16::NEG_INFINITY)); - assert!(::MAX_TOTAL_ORDER.is_gt(f16::INFINITY)); - - assert!(::MIN_TOTAL_ORDER.is_nan()); - assert!(::MIN_TOTAL_ORDER.is_sign_negative()); - assert!(::MIN_TOTAL_ORDER.is_lt(-f16::NAN)); - - assert!(::MAX_TOTAL_ORDER.is_nan()); - assert!(::MAX_TOTAL_ORDER.is_sign_positive()); - assert!(::MAX_TOTAL_ORDER.is_gt(f16::NAN)); - } } diff --git a/arrow-buffer/src/buffer/boolean.rs b/arrow-buffer/src/buffer/boolean.rs index 1589cc5b102b..c651edcad92e 100644 --- a/arrow-buffer/src/buffer/boolean.rs +++ b/arrow-buffer/src/buffer/boolean.rs @@ -90,7 +90,6 @@ impl BooleanBuffer { /// Returns a `BitChunks` instance which can be used to iterate over /// this buffer's bits in `u64` chunks - #[inline] pub fn bit_chunks(&self) -> BitChunks { BitChunks::new(self.values(), self.offset, self.len) }