Skip to content

Commit

Permalink
Comments and cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
jhorstmann committed Nov 20, 2023
1 parent 9ef88c7 commit d04849a
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 66 deletions.
58 changes: 38 additions & 20 deletions arrow-arith/src/aggregate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,21 @@ use arrow_schema::*;
use std::borrow::BorrowMut;
use std::ops::{BitAnd, BitOr, BitXor};

/// An accumulator for primitive numeric values.
trait NumericAccumulator<T: ArrowNativeTypeOp>: Copy + Default {
/// Accumulate a non-null value.
fn accumulate(&mut self, value: T);
/// Accumulate a nullable values.
/// If `valid` is false the `value` should not affect the accumulator state.
fn accumulate_nullable(&mut self, value: T, valid: bool);
/// Merge another accumulator into this accumulator
fn merge(&mut self, other: Self);
/// Return the aggregated value.
fn finish(&mut self) -> T;
}

/// Helper for branchlessly selecting either `a` or `b` based on the boolean `m`.
/// After verifying the generated assembly this can be a simple `if`.
#[inline(always)]
fn select<T: Copy>(m: bool, a: T, b: T) -> T {
if m {
Expand All @@ -36,13 +51,6 @@ fn select<T: Copy>(m: bool, a: T, b: T) -> T {
}
}

trait NumericAccumulator<T: ArrowNativeTypeOp>: Copy + Default {
fn accumulate(&mut self, value: T);
fn accumulate_nullable(&mut self, value: T, valid: bool);
fn merge(&mut self, other: Self);
fn finish(&mut self) -> T;
}

#[derive(Clone, Copy)]
struct SumAccumulator<T: ArrowNativeTypeOp> {
sum: T,
Expand Down Expand Up @@ -195,6 +203,7 @@ fn aggregate_nonnull_simple<T: ArrowNativeTypeOp, A: NumericAccumulator<T>>(valu
fn aggregate_nonnull_lanes<T: ArrowNativeTypeOp, A: NumericAccumulator<T>, const LANES: usize>(
values: &[T],
) -> T {
// aggregate into multiple independent accumulators allows the compiler to use vector registers
let mut acc = [A::default(); LANES];
let mut chunks = values.chunks_exact(LANES);
chunks.borrow_mut().for_each(|chunk| {
Expand All @@ -221,14 +230,17 @@ fn aggregate_nullable_lanes<T: ArrowNativeTypeOp, A: NumericAccumulator<T>, cons
assert!(LANES > 0 && 64 % LANES == 0);
assert_eq!(values.len(), validity.len());

// aggregate into multiple independent accumulators allows the compiler to use vector registers
let mut acc = [A::default(); LANES];
// we process 64 bits of validity at a time
let mut values_chunks = values.chunks_exact(64);
let validity_chunks = validity.inner().bit_chunks();
let mut validity_chunks_iter = validity_chunks.iter();

values_chunks.borrow_mut().for_each(|chunk| {
// Safety: we asserted that values and validity have the same length and trust the iterator impl
let mut validity = unsafe { validity_chunks_iter.next().unwrap_unchecked() };
// chunk further based on the number of vector lanes
chunk.chunks_exact(LANES).for_each(|chunk| {
aggregate_nullable_chunk(&mut acc, chunk[..LANES].try_into().unwrap(), validity);
validity >>= LANES;
Expand Down Expand Up @@ -258,8 +270,8 @@ fn aggregate_nullable_lanes<T: ArrowNativeTypeOp, A: NumericAccumulator<T>, cons
reduce_accumulators(acc).finish()
}

// The preferred vector size in bytes for the target platform.
// Note that the avx512 target feature is still unstable and this also means it is not detected on stable rust.
/// The preferred vector size in bytes for the target platform.
/// Note that the avx512 target feature is still unstable and this also means it is not detected on stable rust.
const PREFERRED_VECTOR_SIZE: usize =
if cfg!(all(target_arch = "x86_64", target_feature = "avx512f")) {
64
Expand All @@ -269,9 +281,11 @@ const PREFERRED_VECTOR_SIZE: usize =
16
};

// non-nullable aggregation requires fewer temporary registers so we can use more of them for accumulators
/// non-nullable aggregation requires fewer temporary registers so we can use more of them for accumulators
const PREFERRED_VECTOR_SIZE_NON_NULL: usize = PREFERRED_VECTOR_SIZE * 2;

/// Generic aggregation for any primitive type.
/// Returns None if there are no non-null values in `array`.
fn aggregate<T: ArrowNativeTypeOp, P: ArrowPrimitiveType<Native = T>, A: NumericAccumulator<T>>(
array: &PrimitiveArray<P>,
) -> Option<T> {
Expand All @@ -281,15 +295,19 @@ fn aggregate<T: ArrowNativeTypeOp, P: ArrowPrimitiveType<Native = T>, A: Numeric
}
let values = array.values().as_ref();
match array.nulls() {
Some(nulls) if null_count > 0 => match PREFERRED_VECTOR_SIZE / std::mem::size_of::<T>() {
64 => Some(aggregate_nullable_lanes::<T, A, 64>(values, nulls)),
32 => Some(aggregate_nullable_lanes::<T, A, 32>(values, nulls)),
16 => Some(aggregate_nullable_lanes::<T, A, 16>(values, nulls)),
8 => Some(aggregate_nullable_lanes::<T, A, 8>(values, nulls)),
4 => Some(aggregate_nullable_lanes::<T, A, 4>(values, nulls)),
2 => Some(aggregate_nullable_lanes::<T, A, 2>(values, nulls)),
_ => Some(aggregate_nullable_lanes::<T, A, 1>(values, nulls)),
},
Some(nulls) if null_count > 0 => {
// const generics depending on a generic type parameter are not supported
// so we have to match and call aggregate with the corresponding constant
match PREFERRED_VECTOR_SIZE / std::mem::size_of::<T>() {
64 => Some(aggregate_nullable_lanes::<T, A, 64>(values, nulls)),
32 => Some(aggregate_nullable_lanes::<T, A, 32>(values, nulls)),
16 => Some(aggregate_nullable_lanes::<T, A, 16>(values, nulls)),
8 => Some(aggregate_nullable_lanes::<T, A, 8>(values, nulls)),
4 => Some(aggregate_nullable_lanes::<T, A, 4>(values, nulls)),
2 => Some(aggregate_nullable_lanes::<T, A, 2>(values, nulls)),
_ => Some(aggregate_nullable_lanes::<T, A, 1>(values, nulls)),
}
}
_ => {
let is_float = matches!(
array.data_type(),
Expand All @@ -303,7 +321,7 @@ fn aggregate<T: ArrowNativeTypeOp, P: ArrowPrimitiveType<Native = T>, A: Numeric
8 => Some(aggregate_nonnull_lanes::<T, A, 8>(values)),
4 => Some(aggregate_nonnull_lanes::<T, A, 4>(values)),
2 => Some(aggregate_nonnull_lanes::<T, A, 2>(values)),
_ => Some(aggregate_nonnull_lanes::<T, A, 1>(values)),
_ => Some(aggregate_nonnull_simple::<T, A>(values)),
}
} else {
// for non-null integers its better to not chunk ourselves and instead
Expand Down
46 changes: 0 additions & 46 deletions arrow-array/src/arithmetic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,6 @@ pub trait ArrowNativeTypeOp: ArrowNativeType {
/// which means that this value is a positive NaN.
const MAX: Self;

/// The number of bytes occupied by this type
const BYTES: usize;

/// Checked addition operation
fn add_checked(self, rhs: Self) -> Result<Self, ArrowError>;

Expand Down Expand Up @@ -150,7 +147,6 @@ macro_rules! native_type_op {
const ONE: Self = $one;
const MIN: Self = $min;
const MAX: Self = $max;
const BYTES: usize = std::mem::size_of::<Self>();

#[inline]
fn add_checked(self, rhs: Self) -> Result<Self, ArrowError> {
Expand Down Expand Up @@ -288,55 +284,13 @@ native_type_op!(u32);
native_type_op!(u64);
native_type_op!(i256, i256::ZERO, i256::ONE, i256::MIN, i256::MAX);

/*
trait ToTotalOrder {
type TotalOrder;
fn to_total_order(self) -> Self::TotalOrder;
}
impl ToTotalOrder for f64 {
type TotalOrder = u64;
#[inline]
fn to_total_order(self) -> Self::TotalOrder {
// reading via integer pointer instead of calling to_bits seems to avoid a move from xmm to gp reg
// let bits = unsafe { (self as *const f64 as *const u64).read() };
let bits = self.to_bits();
(bits ^ ((bits as i64 >> 63) as u64 >> 1)) ^ (1 << 63)
}
}
impl ToTotalOrder for f32 {
type TotalOrder = u32;
#[inline]
fn to_total_order(self) -> Self::TotalOrder {
// reading via integer pointer instead of calling to_bits seems to avoid a move from xmm to gp reg
// let bits = unsafe { (self as *const f32 as *const u32).read() };
let bits = self.to_bits();
(bits ^ ((bits as i32 >> 31) as u32 >> 1)) ^ (1 << 31)
}
}
impl ToTotalOrder for f16 {
type TotalOrder = u16;
#[inline]
fn to_total_order(self) -> Self::TotalOrder {
let bits = self.to_bits();
(bits ^ ((bits as i16 >> 15) as u16 >> 1)) ^ (1 << 15)
}
}
*/

macro_rules! native_type_float_op {
($t:tt, $zero:expr, $one:expr, $min:expr, $max:expr, $bits:ty) => {
impl ArrowNativeTypeOp for $t {
const ZERO: Self = $zero;
const ONE: Self = $one;
const MIN: Self = $min;
const MAX: Self = $max;
const BYTES: usize = std::mem::size_of::<Self>();

#[inline]
fn add_checked(self, rhs: Self) -> Result<Self, ArrowError> {
Expand Down

0 comments on commit d04849a

Please sign in to comment.