diff --git a/.github/workflows/audit.yml b/.github/workflows/audit.yml new file mode 100644 index 000000000000..2c1dcdfd2100 --- /dev/null +++ b/.github/workflows/audit.yml @@ -0,0 +1,43 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +name: audit + +concurrency: + group: ${{ github.repository }}-${{ github.head_ref || github.sha }}-${{ github.workflow }} + cancel-in-progress: true + +# trigger for all PRs that touch certain files and changes to master +on: + push: + branches: + - master + pull_request: + paths: + - '**/Cargo.toml' + - '**/Cargo.lock' + +jobs: + cargo-audit: + name: Audit + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v4 + - name: Install cargo-audit + run: cargo install cargo-audit + - name: Run audit check + run: cargo audit diff --git a/.github/workflows/dev.yml b/.github/workflows/dev.yml index 1447d72a53b1..2026e257ab29 100644 --- a/.github/workflows/dev.yml +++ b/.github/workflows/dev.yml @@ -40,7 +40,7 @@ jobs: steps: - uses: actions/checkout@v4 - name: Setup Python - uses: actions/setup-python@v4 + uses: actions/setup-python@v5 with: python-version: 3.8 - name: Audit licenses diff --git a/.github/workflows/dev_pr.yml b/.github/workflows/dev_pr.yml index 5f3d9e54c8db..0d60ae006796 100644 --- a/.github/workflows/dev_pr.yml +++ b/.github/workflows/dev_pr.yml @@ -44,7 +44,7 @@ jobs: github.event_name == 'pull_request_target' && (github.event.action == 'opened' || github.event.action == 'synchronize') - uses: actions/labeler@v4.3.0 + uses: actions/labeler@v5.0.0 with: repo-token: ${{ secrets.GITHUB_TOKEN }} configuration-path: .github/workflows/dev_pr/labeler.yml diff --git a/.github/workflows/dev_pr/labeler.yml b/.github/workflows/dev_pr/labeler.yml index ea5873081f18..cae015018eac 100644 --- a/.github/workflows/dev_pr/labeler.yml +++ b/.github/workflows/dev_pr/labeler.yml @@ -16,33 +16,40 @@ # under the License. arrow: - - arrow-arith/**/* - - arrow-array/**/* - - arrow-buffer/**/* - - arrow-cast/**/* - - arrow-csv/**/* - - arrow-data/**/* - - arrow-flight/**/* - - arrow-integration-test/**/* - - arrow-integration-testing/**/* - - arrow-ipc/**/* - - arrow-json/**/* - - arrow-avro/**/* - - arrow-ord/**/* - - arrow-row/**/* - - arrow-schema/**/* - - arrow-select/**/* - - arrow-string/**/* - - arrow/**/* + - changed-files: + - any-glob-to-any-file: + - 'arrow-arith/**/*' + - 'arrow-array/**/*' + - 'arrow-buffer/**/*' + - 'arrow-cast/**/*' + - 'arrow-csv/**/*' + - 'arrow-data/**/*' + - 'arrow-flight/**/*' + - 'arrow-integration-test/**/*' + - 'arrow-integration-testing/**/*' + - 'arrow-ipc/**/*' + - 'arrow-json/**/*' + - 'arrow-avro/**/*' + - 'arrow-ord/**/*' + - 'arrow-row/**/*' + - 'arrow-schema/**/*' + - 'arrow-select/**/*' + - 'arrow-string/**/*' + - 'arrow/**/*' arrow-flight: - - arrow-flight/**/* + - changed-files: + - any-glob-to-any-file: + - 'arrow-flight/**/*' parquet: - - parquet/**/* + - changed-files: + - any-glob-to-any-file: [ 'parquet/**/*' ] parquet-derive: - - parquet_derive/**/* + - changed-files: + - any-glob-to-any-file: [ 'parquet_derive/**/*' ] object-store: - - object_store/**/* + - changed-files: + - any-glob-to-any-file: [ 'object_store/**/*' ] diff --git a/.github/workflows/integration.yml b/.github/workflows/integration.yml index c9cb4e31ced9..1604a7be4372 100644 --- a/.github/workflows/integration.yml +++ b/.github/workflows/integration.yml @@ -129,7 +129,7 @@ jobs: path: /home/runner/target # this key is not equal because maturin uses different compilation flags. key: ${{ runner.os }}-${{ matrix.arch }}-target-maturin-cache-${{ matrix.rust }}- - - uses: actions/setup-python@v4 + - uses: actions/setup-python@v5 with: python-version: '3.8' - name: Upgrade pip and setuptools diff --git a/.github/workflows/parquet.yml b/.github/workflows/parquet.yml index d664a0dc0730..a4e654892662 100644 --- a/.github/workflows/parquet.yml +++ b/.github/workflows/parquet.yml @@ -140,7 +140,7 @@ jobs: steps: - uses: actions/checkout@v4 - name: Setup Python - uses: actions/setup-python@v4 + uses: actions/setup-python@v5 with: python-version: "3.10" cache: "pip" diff --git a/arrow-arith/src/aggregate.rs b/arrow-arith/src/aggregate.rs index 0dabaa50f5f6..20ff0711d735 100644 --- a/arrow-arith/src/aggregate.rs +++ b/arrow-arith/src/aggregate.rs @@ -20,39 +20,317 @@ use arrow_array::cast::*; use arrow_array::iterator::ArrayIter; use arrow_array::*; -use arrow_buffer::ArrowNativeType; +use arrow_buffer::{ArrowNativeType, NullBuffer}; use arrow_data::bit_iterator::try_for_each_valid_idx; use arrow_schema::ArrowError; use arrow_schema::*; +use std::borrow::BorrowMut; use std::ops::{BitAnd, BitOr, BitXor}; -/// Generic test for NaN, the optimizer should be able to remove this for integer types. -#[inline] -pub(crate) fn is_nan(a: T) -> bool { - #[allow(clippy::eq_op)] - !(a == a) +/// An accumulator for primitive numeric values. +trait NumericAccumulator: Copy + Default { + /// Accumulate a non-null value. + fn accumulate(&mut self, value: T); + /// Accumulate a nullable values. + /// If `valid` is false the `value` should not affect the accumulator state. + fn accumulate_nullable(&mut self, value: T, valid: bool); + /// Merge another accumulator into this accumulator + fn merge(&mut self, other: Self); + /// Return the aggregated value. + fn finish(&mut self) -> T; } -/// Returns the minimum value in the array, according to the natural order. -/// For floating point arrays any NaN values are considered to be greater than any other non-null value -#[cfg(not(feature = "simd"))] -pub fn min(array: &PrimitiveArray) -> Option -where - T: ArrowNumericType, - T::Native: ArrowNativeType, -{ - min_max_helper::(array, |a, b| (is_nan(*a) & !is_nan(*b)) || a > b) +/// Helper for branchlessly selecting either `a` or `b` based on the boolean `m`. +/// After verifying the generated assembly this can be a simple `if`. +#[inline(always)] +fn select(m: bool, a: T, b: T) -> T { + if m { + a + } else { + b + } } -/// Returns the maximum value in the array, according to the natural order. -/// For floating point arrays any NaN values are considered to be greater than any other non-null value -#[cfg(not(feature = "simd"))] -pub fn max(array: &PrimitiveArray) -> Option -where - T: ArrowNumericType, - T::Native: ArrowNativeType, -{ - min_max_helper::(array, |a, b| (!is_nan(*a) & is_nan(*b)) || a < b) +#[derive(Clone, Copy)] +struct SumAccumulator { + sum: T, +} + +impl Default for SumAccumulator { + fn default() -> Self { + Self { sum: T::ZERO } + } +} + +impl NumericAccumulator for SumAccumulator { + fn accumulate(&mut self, value: T) { + self.sum = self.sum.add_wrapping(value); + } + + fn accumulate_nullable(&mut self, value: T, valid: bool) { + let sum = self.sum; + self.sum = select(valid, sum.add_wrapping(value), sum) + } + + fn merge(&mut self, other: Self) { + self.sum = self.sum.add_wrapping(other.sum); + } + + fn finish(&mut self) -> T { + self.sum + } +} + +#[derive(Clone, Copy)] +struct MinAccumulator { + min: T, +} + +impl Default for MinAccumulator { + fn default() -> Self { + Self { + min: T::MAX_TOTAL_ORDER, + } + } +} + +impl NumericAccumulator for MinAccumulator { + fn accumulate(&mut self, value: T) { + let min = self.min; + self.min = select(value.is_lt(min), value, min); + } + + fn accumulate_nullable(&mut self, value: T, valid: bool) { + let min = self.min; + let is_lt = valid & value.is_lt(min); + self.min = select(is_lt, value, min); + } + + fn merge(&mut self, other: Self) { + self.accumulate(other.min) + } + + fn finish(&mut self) -> T { + self.min + } +} + +#[derive(Clone, Copy)] +struct MaxAccumulator { + max: T, +} + +impl Default for MaxAccumulator { + fn default() -> Self { + Self { + max: T::MIN_TOTAL_ORDER, + } + } +} + +impl NumericAccumulator for MaxAccumulator { + fn accumulate(&mut self, value: T) { + let max = self.max; + self.max = select(value.is_gt(max), value, max); + } + + fn accumulate_nullable(&mut self, value: T, valid: bool) { + let max = self.max; + let is_gt = value.is_gt(max) & valid; + self.max = select(is_gt, value, max); + } + + fn merge(&mut self, other: Self) { + self.accumulate(other.max) + } + + fn finish(&mut self) -> T { + self.max + } +} + +fn reduce_accumulators, const LANES: usize>( + mut acc: [A; LANES], +) -> A { + assert!(LANES > 0 && LANES.is_power_of_two()); + let mut len = LANES; + + // attempt at tree reduction, unfortunately llvm does not fully recognize this pattern, + // but the generated code is still a little faster than purely sequential reduction for floats. + while len >= 2 { + let mid = len / 2; + let (h, t) = acc[..len].split_at_mut(mid); + + for i in 0..mid { + h[i].merge(t[i]); + } + len /= 2; + } + acc[0] +} + +#[inline(always)] +fn aggregate_nonnull_chunk, const LANES: usize>( + acc: &mut [A; LANES], + values: &[T; LANES], +) { + for i in 0..LANES { + acc[i].accumulate(values[i]); + } +} + +#[inline(always)] +fn aggregate_nullable_chunk, const LANES: usize>( + acc: &mut [A; LANES], + values: &[T; LANES], + validity: u64, +) { + let mut bit = 1; + for i in 0..LANES { + acc[i].accumulate_nullable(values[i], (validity & bit) != 0); + bit <<= 1; + } +} + +fn aggregate_nonnull_simple>(values: &[T]) -> T { + return values + .iter() + .copied() + .fold(A::default(), |mut a, b| { + a.accumulate(b); + a + }) + .finish(); +} + +#[inline(never)] +fn aggregate_nonnull_lanes, const LANES: usize>( + values: &[T], +) -> T { + // aggregating into multiple independent accumulators allows the compiler to use vector registers + // with a single accumulator the compiler would not be allowed to reorder floating point addition + let mut acc = [A::default(); LANES]; + let mut chunks = values.chunks_exact(LANES); + chunks.borrow_mut().for_each(|chunk| { + aggregate_nonnull_chunk(&mut acc, chunk[..LANES].try_into().unwrap()); + }); + + let remainder = chunks.remainder(); + for i in 0..remainder.len() { + acc[i].accumulate(remainder[i]); + } + + reduce_accumulators(acc).finish() +} + +#[inline(never)] +fn aggregate_nullable_lanes, const LANES: usize>( + values: &[T], + validity: &NullBuffer, +) -> T { + assert!(LANES > 0 && 64 % LANES == 0); + assert_eq!(values.len(), validity.len()); + + // aggregating into multiple independent accumulators allows the compiler to use vector registers + let mut acc = [A::default(); LANES]; + // we process 64 bits of validity at a time + let mut values_chunks = values.chunks_exact(64); + let validity_chunks = validity.inner().bit_chunks(); + let mut validity_chunks_iter = validity_chunks.iter(); + + values_chunks.borrow_mut().for_each(|chunk| { + // Safety: we asserted that values and validity have the same length and trust the iterator impl + let mut validity = unsafe { validity_chunks_iter.next().unwrap_unchecked() }; + // chunk further based on the number of vector lanes + chunk.chunks_exact(LANES).for_each(|chunk| { + aggregate_nullable_chunk(&mut acc, chunk[..LANES].try_into().unwrap(), validity); + validity >>= LANES; + }); + }); + + let remainder = values_chunks.remainder(); + if !remainder.is_empty() { + let mut validity = validity_chunks.remainder_bits(); + + let mut remainder_chunks = remainder.chunks_exact(LANES); + remainder_chunks.borrow_mut().for_each(|chunk| { + aggregate_nullable_chunk(&mut acc, chunk[..LANES].try_into().unwrap(), validity); + validity >>= LANES; + }); + + let remainder = remainder_chunks.remainder(); + if !remainder.is_empty() { + let mut bit = 1; + for i in 0..remainder.len() { + acc[i].accumulate_nullable(remainder[i], (validity & bit) != 0); + bit <<= 1; + } + } + } + + reduce_accumulators(acc).finish() +} + +/// The preferred vector size in bytes for the target platform. +/// Note that the avx512 target feature is still unstable and this also means it is not detected on stable rust. +const PREFERRED_VECTOR_SIZE: usize = + if cfg!(all(target_arch = "x86_64", target_feature = "avx512f")) { + 64 + } else if cfg!(all(target_arch = "x86_64", target_feature = "avx")) { + 32 + } else { + 16 + }; + +/// non-nullable aggregation requires fewer temporary registers so we can use more of them for accumulators +const PREFERRED_VECTOR_SIZE_NON_NULL: usize = PREFERRED_VECTOR_SIZE * 2; + +/// Generic aggregation for any primitive type. +/// Returns None if there are no non-null values in `array`. +fn aggregate, A: NumericAccumulator>( + array: &PrimitiveArray

, +) -> Option { + let null_count = array.null_count(); + if null_count == array.len() { + return None; + } + let values = array.values().as_ref(); + match array.nulls() { + Some(nulls) if null_count > 0 => { + // const generics depending on a generic type parameter are not supported + // so we have to match and call aggregate with the corresponding constant + match PREFERRED_VECTOR_SIZE / std::mem::size_of::() { + 64 => Some(aggregate_nullable_lanes::(values, nulls)), + 32 => Some(aggregate_nullable_lanes::(values, nulls)), + 16 => Some(aggregate_nullable_lanes::(values, nulls)), + 8 => Some(aggregate_nullable_lanes::(values, nulls)), + 4 => Some(aggregate_nullable_lanes::(values, nulls)), + 2 => Some(aggregate_nullable_lanes::(values, nulls)), + _ => Some(aggregate_nullable_lanes::(values, nulls)), + } + } + _ => { + let is_float = matches!( + array.data_type(), + DataType::Float16 | DataType::Float32 | DataType::Float64 + ); + if is_float { + match PREFERRED_VECTOR_SIZE_NON_NULL / std::mem::size_of::() { + 64 => Some(aggregate_nonnull_lanes::(values)), + 32 => Some(aggregate_nonnull_lanes::(values)), + 16 => Some(aggregate_nonnull_lanes::(values)), + 8 => Some(aggregate_nonnull_lanes::(values)), + 4 => Some(aggregate_nonnull_lanes::(values)), + 2 => Some(aggregate_nonnull_lanes::(values)), + _ => Some(aggregate_nonnull_simple::(values)), + } + } else { + // for non-null integers its better to not chunk ourselves and instead + // let llvm fully handle loop unrolling and vectorization + Some(aggregate_nonnull_simple::(values)) + } + } + } } /// Returns the minimum value in the boolean array. @@ -230,7 +508,7 @@ where T: ArrowNumericType, T::Native: ArrowNativeType, { - min_max_array_helper::(array, |a, b| (is_nan(*a) & !is_nan(*b)) || a > b, min) + min_max_array_helper::(array, |a, b| a.is_gt(*b), min) } /// Returns the max of values in the array of `ArrowNumericType` type, or dictionary @@ -238,9 +516,9 @@ where pub fn max_array>(array: A) -> Option where T: ArrowNumericType, - T::Native: ArrowNativeType, + T::Native: ArrowNativeTypeOp, { - min_max_array_helper::(array, |a, b| (!is_nan(*a) & is_nan(*b)) || a < b, max) + min_max_array_helper::(array, |a, b| a.is_lt(*b), max) } fn min_max_array_helper, F, M>( @@ -259,66 +537,6 @@ where } } -/// Returns the sum of values in the primitive array. -/// -/// Returns `None` if the array is empty or only contains null values. -/// -/// This doesn't detect overflow. Once overflowing, the result will wrap around. -/// For an overflow-checking variant, use `sum_checked` instead. -#[cfg(not(feature = "simd"))] -pub fn sum(array: &PrimitiveArray) -> Option -where - T: ArrowNumericType, - T::Native: ArrowNativeTypeOp, -{ - let null_count = array.null_count(); - - if null_count == array.len() { - return None; - } - - let data: &[T::Native] = array.values(); - - match array.nulls() { - None => { - let sum = data.iter().fold(T::default_value(), |accumulator, value| { - accumulator.add_wrapping(*value) - }); - - Some(sum) - } - Some(nulls) => { - let mut sum = T::default_value(); - let data_chunks = data.chunks_exact(64); - let remainder = data_chunks.remainder(); - - let bit_chunks = nulls.inner().bit_chunks(); - data_chunks - .zip(bit_chunks.iter()) - .for_each(|(chunk, mask)| { - // index_mask has value 1 << i in the loop - let mut index_mask = 1; - chunk.iter().for_each(|value| { - if (mask & index_mask) != 0 { - sum = sum.add_wrapping(*value); - } - index_mask <<= 1; - }); - }); - - let remainder_bits = bit_chunks.remainder_bits(); - - remainder.iter().enumerate().for_each(|(i, value)| { - if remainder_bits & (1 << i) != 0 { - sum = sum.add_wrapping(*value); - } - }); - - Some(sum) - } - } -} - macro_rules! bit_operation { ($NAME:ident, $OP:ident, $NATIVE:ident, $DEFAULT:expr, $DOC:expr) => { #[doc = $DOC] @@ -476,369 +694,35 @@ where } } -#[cfg(feature = "simd")] -mod simd { - use super::is_nan; - use arrow_array::*; - use std::marker::PhantomData; - - pub(super) trait SimdAggregate { - type ScalarAccumulator; - type SimdAccumulator; - - /// Returns the accumulator for aggregating scalar values - fn init_accumulator_scalar() -> Self::ScalarAccumulator; - - /// Returns the accumulator for aggregating simd chunks of values - fn init_accumulator_chunk() -> Self::SimdAccumulator; - - /// Updates the accumulator with the values of one chunk - fn accumulate_chunk_non_null(accumulator: &mut Self::SimdAccumulator, chunk: T::Simd); - - /// Updates the accumulator with the values of one chunk according to the given vector mask - fn accumulate_chunk_nullable( - accumulator: &mut Self::SimdAccumulator, - chunk: T::Simd, - mask: T::SimdMask, - ); - - /// Updates the accumulator with one value - fn accumulate_scalar(accumulator: &mut Self::ScalarAccumulator, value: T::Native); - - /// Reduces the vector lanes of the simd accumulator and the scalar accumulator to a single value - fn reduce( - simd_accumulator: Self::SimdAccumulator, - scalar_accumulator: Self::ScalarAccumulator, - ) -> Option; - } - - pub(super) struct SumAggregate { - phantom: PhantomData, - } - - impl SimdAggregate for SumAggregate - where - T::Native: ArrowNativeTypeOp, - { - type ScalarAccumulator = T::Native; - type SimdAccumulator = T::Simd; - - fn init_accumulator_scalar() -> Self::ScalarAccumulator { - T::default_value() - } - - fn init_accumulator_chunk() -> Self::SimdAccumulator { - T::init(Self::init_accumulator_scalar()) - } - - fn accumulate_chunk_non_null(accumulator: &mut T::Simd, chunk: T::Simd) { - *accumulator = *accumulator + chunk; - } - - fn accumulate_chunk_nullable( - accumulator: &mut T::Simd, - chunk: T::Simd, - vecmask: T::SimdMask, - ) { - let zero = T::init(T::default_value()); - let blended = T::mask_select(vecmask, chunk, zero); - - *accumulator = *accumulator + blended; - } - - fn accumulate_scalar(accumulator: &mut T::Native, value: T::Native) { - *accumulator = accumulator.add_wrapping(value) - } - - fn reduce( - simd_accumulator: Self::SimdAccumulator, - scalar_accumulator: Self::ScalarAccumulator, - ) -> Option { - // we can't use T::lanes() as the slice len because it is not const, - // instead always reserve the maximum number of lanes - let mut tmp = [T::default_value(); 64]; - let slice = &mut tmp[0..T::lanes()]; - T::write(simd_accumulator, slice); - - let mut reduced = Self::init_accumulator_scalar(); - slice - .iter() - .for_each(|value| Self::accumulate_scalar(&mut reduced, *value)); - - Self::accumulate_scalar(&mut reduced, scalar_accumulator); - - // result can not be None because we checked earlier for the null count - Some(reduced) - } - } - - pub(super) struct MinAggregate { - phantom: PhantomData, - } - - impl SimdAggregate for MinAggregate - where - T::Native: PartialOrd, - { - type ScalarAccumulator = (T::Native, bool); - type SimdAccumulator = (T::Simd, T::SimdMask); - - fn init_accumulator_scalar() -> Self::ScalarAccumulator { - (T::default_value(), false) - } - - fn init_accumulator_chunk() -> Self::SimdAccumulator { - (T::init(T::default_value()), T::mask_init(false)) - } - - fn accumulate_chunk_non_null(accumulator: &mut Self::SimdAccumulator, chunk: T::Simd) { - let acc_is_nan = !T::eq(accumulator.0, accumulator.0); - let is_lt = acc_is_nan | T::lt(chunk, accumulator.0); - let first_or_lt = !accumulator.1 | is_lt; - - accumulator.0 = T::mask_select(first_or_lt, chunk, accumulator.0); - accumulator.1 = T::mask_init(true); - } - - fn accumulate_chunk_nullable( - accumulator: &mut Self::SimdAccumulator, - chunk: T::Simd, - vecmask: T::SimdMask, - ) { - let acc_is_nan = !T::eq(accumulator.0, accumulator.0); - let is_lt = vecmask & (acc_is_nan | T::lt(chunk, accumulator.0)); - let first_or_lt = !accumulator.1 | is_lt; - - accumulator.0 = T::mask_select(first_or_lt, chunk, accumulator.0); - accumulator.1 |= vecmask; - } - - fn accumulate_scalar(accumulator: &mut Self::ScalarAccumulator, value: T::Native) { - if !accumulator.1 { - accumulator.0 = value; - } else { - let acc_is_nan = is_nan(accumulator.0); - if acc_is_nan || value < accumulator.0 { - accumulator.0 = value - } - } - accumulator.1 = true - } - - fn reduce( - simd_accumulator: Self::SimdAccumulator, - scalar_accumulator: Self::ScalarAccumulator, - ) -> Option { - // we can't use T::lanes() as the slice len because it is not const, - // instead always reserve the maximum number of lanes - let mut tmp = [T::default_value(); 64]; - let slice = &mut tmp[0..T::lanes()]; - T::write(simd_accumulator.0, slice); - - let mut reduced = Self::init_accumulator_scalar(); - slice - .iter() - .enumerate() - .filter(|(i, _value)| T::mask_get(&simd_accumulator.1, *i)) - .for_each(|(_i, value)| Self::accumulate_scalar(&mut reduced, *value)); - - if scalar_accumulator.1 { - Self::accumulate_scalar(&mut reduced, scalar_accumulator.0); - } - - if reduced.1 { - Some(reduced.0) - } else { - None - } - } - } - - pub(super) struct MaxAggregate { - phantom: PhantomData, - } - - impl SimdAggregate for MaxAggregate - where - T::Native: PartialOrd, - { - type ScalarAccumulator = (T::Native, bool); - type SimdAccumulator = (T::Simd, T::SimdMask); - - fn init_accumulator_scalar() -> Self::ScalarAccumulator { - (T::default_value(), false) - } - - fn init_accumulator_chunk() -> Self::SimdAccumulator { - (T::init(T::default_value()), T::mask_init(false)) - } - - fn accumulate_chunk_non_null(accumulator: &mut Self::SimdAccumulator, chunk: T::Simd) { - let chunk_is_nan = !T::eq(chunk, chunk); - let is_gt = chunk_is_nan | T::gt(chunk, accumulator.0); - let first_or_gt = !accumulator.1 | is_gt; - - accumulator.0 = T::mask_select(first_or_gt, chunk, accumulator.0); - accumulator.1 = T::mask_init(true); - } - - fn accumulate_chunk_nullable( - accumulator: &mut Self::SimdAccumulator, - chunk: T::Simd, - vecmask: T::SimdMask, - ) { - let chunk_is_nan = !T::eq(chunk, chunk); - let is_gt = vecmask & (chunk_is_nan | T::gt(chunk, accumulator.0)); - let first_or_gt = !accumulator.1 | is_gt; - - accumulator.0 = T::mask_select(first_or_gt, chunk, accumulator.0); - accumulator.1 |= vecmask; - } - - fn accumulate_scalar(accumulator: &mut Self::ScalarAccumulator, value: T::Native) { - if !accumulator.1 { - accumulator.0 = value; - } else { - let value_is_nan = is_nan(value); - if value_is_nan || value > accumulator.0 { - accumulator.0 = value - } - } - accumulator.1 = true; - } - - fn reduce( - simd_accumulator: Self::SimdAccumulator, - scalar_accumulator: Self::ScalarAccumulator, - ) -> Option { - // we can't use T::lanes() as the slice len because it is not const, - // instead always reserve the maximum number of lanes - let mut tmp = [T::default_value(); 64]; - let slice = &mut tmp[0..T::lanes()]; - T::write(simd_accumulator.0, slice); - - let mut reduced = Self::init_accumulator_scalar(); - slice - .iter() - .enumerate() - .filter(|(i, _value)| T::mask_get(&simd_accumulator.1, *i)) - .for_each(|(_i, value)| Self::accumulate_scalar(&mut reduced, *value)); - - if scalar_accumulator.1 { - Self::accumulate_scalar(&mut reduced, scalar_accumulator.0); - } - - if reduced.1 { - Some(reduced.0) - } else { - None - } - } - } - - pub(super) fn simd_aggregation>( - array: &PrimitiveArray, - ) -> Option { - let null_count = array.null_count(); - - if null_count == array.len() { - return None; - } - - let data: &[T::Native] = array.values(); - - let mut chunk_acc = A::init_accumulator_chunk(); - let mut rem_acc = A::init_accumulator_scalar(); - - match array.nulls() { - None => { - let data_chunks = data.chunks_exact(64); - let remainder = data_chunks.remainder(); - - data_chunks.for_each(|chunk| { - chunk.chunks_exact(T::lanes()).for_each(|chunk| { - let chunk = T::load(&chunk); - A::accumulate_chunk_non_null(&mut chunk_acc, chunk); - }); - }); - - remainder.iter().for_each(|value| { - A::accumulate_scalar(&mut rem_acc, *value); - }); - } - Some(nulls) => { - // process data in chunks of 64 elements since we also get 64 bits of validity information at a time - let data_chunks = data.chunks_exact(64); - let remainder = data_chunks.remainder(); - - let bit_chunks = nulls.inner().bit_chunks(); - let remainder_bits = bit_chunks.remainder_bits(); - - data_chunks.zip(bit_chunks).for_each(|(chunk, mut mask)| { - // split chunks further into slices corresponding to the vector length - // the compiler is able to unroll this inner loop and remove bounds checks - // since the outer chunk size (64) is always a multiple of the number of lanes - chunk.chunks_exact(T::lanes()).for_each(|chunk| { - let vecmask = T::mask_from_u64(mask); - let chunk = T::load(&chunk); - - A::accumulate_chunk_nullable(&mut chunk_acc, chunk, vecmask); - - // skip the shift and avoid overflow for u8 type, which uses 64 lanes. - mask >>= T::lanes() % 64; - }); - }); - - remainder.iter().enumerate().for_each(|(i, value)| { - if remainder_bits & (1 << i) != 0 { - A::accumulate_scalar(&mut rem_acc, *value) - } - }); - } - } - - A::reduce(chunk_acc, rem_acc) - } -} - /// Returns the sum of values in the primitive array. /// /// Returns `None` if the array is empty or only contains null values. /// /// This doesn't detect overflow in release mode by default. Once overflowing, the result will /// wrap around. For an overflow-checking variant, use `sum_checked` instead. -#[cfg(feature = "simd")] pub fn sum(array: &PrimitiveArray) -> Option where T::Native: ArrowNativeTypeOp, { - use simd::*; - - simd::simd_aggregation::>(&array) + aggregate::>(array) } -#[cfg(feature = "simd")] /// Returns the minimum value in the array, according to the natural order. /// For floating point arrays any NaN values are considered to be greater than any other non-null value pub fn min(array: &PrimitiveArray) -> Option where T::Native: PartialOrd, { - use simd::*; - - simd::simd_aggregation::>(&array) + aggregate::>(array) } -#[cfg(feature = "simd")] /// Returns the maximum value in the array, according to the natural order. /// For floating point arrays any NaN values are considered to be greater than any other non-null value pub fn max(array: &PrimitiveArray) -> Option where T::Native: PartialOrd, { - use simd::*; - - simd::simd_aggregation::>(&array) + aggregate::>(array) } #[cfg(test)] @@ -872,8 +756,41 @@ mod tests { assert_eq!(None, sum(&a)); } + #[test] + fn test_primitive_array_sum_large_float_64() { + let c = Float64Array::new((1..=100).map(|x| x as f64).collect(), None); + assert_eq!(Some((1..=100).sum::() as f64), sum(&c)); + + // create an array that actually has non-zero values at the invalid indices + let validity = NullBuffer::new((1..=100).map(|x| x % 3 == 0).collect()); + let c = Float64Array::new((1..=100).map(|x| x as f64).collect(), Some(validity)); + + assert_eq!( + Some((1..=100).filter(|i| i % 3 == 0).sum::() as f64), + sum(&c) + ); + } + + #[test] + fn test_primitive_array_sum_large_float_32() { + let c = Float32Array::new((1..=100).map(|x| x as f32).collect(), None); + assert_eq!(Some((1..=100).sum::() as f32), sum(&c)); + + // create an array that actually has non-zero values at the invalid indices + let validity = NullBuffer::new((1..=100).map(|x| x % 3 == 0).collect()); + let c = Float32Array::new((1..=100).map(|x| x as f32).collect(), Some(validity)); + + assert_eq!( + Some((1..=100).filter(|i| i % 3 == 0).sum::() as f32), + sum(&c) + ); + } + #[test] fn test_primitive_array_sum_large_64() { + let c = Int64Array::new((1..=100).collect(), None); + assert_eq!(Some((1..=100).sum()), sum(&c)); + // create an array that actually has non-zero values at the invalid indices let validity = NullBuffer::new((1..=100).map(|x| x % 3 == 0).collect()); let c = Int64Array::new((1..=100).collect(), Some(validity)); @@ -883,6 +800,9 @@ mod tests { #[test] fn test_primitive_array_sum_large_32() { + let c = Int32Array::new((1..=100).collect(), None); + assert_eq!(Some((1..=100).sum()), sum(&c)); + // create an array that actually has non-zero values at the invalid indices let validity = NullBuffer::new((1..=100).map(|x| x % 3 == 0).collect()); let c = Int32Array::new((1..=100).collect(), Some(validity)); @@ -891,6 +811,9 @@ mod tests { #[test] fn test_primitive_array_sum_large_16() { + let c = Int16Array::new((1..=100).collect(), None); + assert_eq!(Some((1..=100).sum()), sum(&c)); + // create an array that actually has non-zero values at the invalid indices let validity = NullBuffer::new((1..=100).map(|x| x % 3 == 0).collect()); let c = Int16Array::new((1..=100).collect(), Some(validity)); @@ -899,11 +822,23 @@ mod tests { #[test] fn test_primitive_array_sum_large_8() { - // include fewer values than other large tests so the result does not overflow the u8 + let c = UInt8Array::new((1..=100).collect(), None); + assert_eq!( + Some((1..=100).fold(0_u8, |a, x| a.wrapping_add(x))), + sum(&c) + ); + // create an array that actually has non-zero values at the invalid indices - let validity = NullBuffer::new((1..=100).map(|x| x % 33 == 0).collect()); + let validity = NullBuffer::new((1..=100).map(|x| x % 3 == 0).collect()); let c = UInt8Array::new((1..=100).collect(), Some(validity)); - assert_eq!(Some((1..=100).filter(|i| i % 33 == 0).sum()), sum(&c)); + assert_eq!( + Some( + (1..=100) + .filter(|i| i % 3 == 0) + .fold(0_u8, |a, x| a.wrapping_add(x)) + ), + sum(&c) + ); } #[test] @@ -1103,6 +1038,19 @@ mod tests { assert!(min(&a).unwrap().is_nan()); } + #[test] + fn test_primitive_min_max_float_negative_nan() { + let a: Float64Array = + Float64Array::from(vec![f64::NEG_INFINITY, f64::NAN, f64::INFINITY, -f64::NAN]); + let max = max(&a).unwrap(); + let min = min(&a).unwrap(); + assert!(max.is_nan()); + assert!(max.is_sign_positive()); + + assert!(min.is_nan()); + assert!(min.is_sign_negative()); + } + #[test] fn test_primitive_min_max_float_first_nan_nonnull() { let a: Float64Array = (0..100) @@ -1455,7 +1403,6 @@ mod tests { } #[test] - #[cfg(not(feature = "simd"))] fn test_sum_overflow() { let a = Int32Array::from(vec![i32::MAX, 1]); diff --git a/arrow-array/src/arithmetic.rs b/arrow-array/src/arithmetic.rs index c9be39d44144..590536190309 100644 --- a/arrow-array/src/arithmetic.rs +++ b/arrow-array/src/arithmetic.rs @@ -45,6 +45,16 @@ pub trait ArrowNativeTypeOp: ArrowNativeType { /// The multiplicative identity const ONE: Self; + /// The minimum value and identity for the `max` aggregation. + /// Note that the aggregation uses the total order predicate for floating point values, + /// which means that this value is a negative NaN. + const MIN_TOTAL_ORDER: Self; + + /// The maximum value and identity for the `min` aggregation. + /// Note that the aggregation uses the total order predicate for floating point values, + /// which means that this value is a positive NaN. + const MAX_TOTAL_ORDER: Self; + /// Checked addition operation fn add_checked(self, rhs: Self) -> Result; @@ -129,12 +139,14 @@ pub trait ArrowNativeTypeOp: ArrowNativeType { macro_rules! native_type_op { ($t:tt) => { - native_type_op!($t, 0, 1); + native_type_op!($t, 0, 1, $t::MIN, $t::MAX); }; - ($t:tt, $zero:expr, $one: expr) => { + ($t:tt, $zero:expr, $one: expr, $min: expr, $max: expr) => { impl ArrowNativeTypeOp for $t { const ZERO: Self = $zero; const ONE: Self = $one; + const MIN_TOTAL_ORDER: Self = $min; + const MAX_TOTAL_ORDER: Self = $max; #[inline] fn add_checked(self, rhs: Self) -> Result { @@ -270,13 +282,15 @@ native_type_op!(u8); native_type_op!(u16); native_type_op!(u32); native_type_op!(u64); -native_type_op!(i256, i256::ZERO, i256::ONE); +native_type_op!(i256, i256::ZERO, i256::ONE, i256::MIN, i256::MAX); macro_rules! native_type_float_op { - ($t:tt, $zero:expr, $one:expr) => { + ($t:tt, $zero:expr, $one:expr, $min:expr, $max:expr) => { impl ArrowNativeTypeOp for $t { const ZERO: Self = $zero; const ONE: Self = $one; + const MIN_TOTAL_ORDER: Self = $min; + const MAX_TOTAL_ORDER: Self = $max; #[inline] fn add_checked(self, rhs: Self) -> Result { @@ -377,9 +391,30 @@ macro_rules! native_type_float_op { }; } -native_type_float_op!(f16, f16::ZERO, f16::ONE); -native_type_float_op!(f32, 0., 1.); -native_type_float_op!(f64, 0., 1.); +// the smallest/largest bit patterns for floating point numbers are NaN, but differ from the canonical NAN constants. +// See test_float_total_order_min_max for details. +native_type_float_op!( + f16, + f16::ZERO, + f16::ONE, + f16::from_bits(-1 as _), + f16::from_bits(i16::MAX as _) +); +// from_bits is not yet stable as const fn, see https://github.com/rust-lang/rust/issues/72447 +native_type_float_op!( + f32, + 0., + 1., + unsafe { std::mem::transmute(-1_i32) }, + unsafe { std::mem::transmute(i32::MAX) } +); +native_type_float_op!( + f64, + 0., + 1., + unsafe { std::mem::transmute(-1_i64) }, + unsafe { std::mem::transmute(i64::MAX) } +); #[cfg(test)] mod tests { @@ -780,4 +815,40 @@ mod tests { assert_eq!(8.0_f32.pow_checked(2_u32).unwrap(), 64_f32); assert_eq!(8.0_f64.pow_checked(2_u32).unwrap(), 64_f64); } + + #[test] + fn test_float_total_order_min_max() { + assert!(::MIN_TOTAL_ORDER.is_lt(f64::NEG_INFINITY)); + assert!(::MAX_TOTAL_ORDER.is_gt(f64::INFINITY)); + + assert!(::MIN_TOTAL_ORDER.is_nan()); + assert!(::MIN_TOTAL_ORDER.is_sign_negative()); + assert!(::MIN_TOTAL_ORDER.is_lt(-f64::NAN)); + + assert!(::MAX_TOTAL_ORDER.is_nan()); + assert!(::MAX_TOTAL_ORDER.is_sign_positive()); + assert!(::MAX_TOTAL_ORDER.is_gt(f64::NAN)); + + assert!(::MIN_TOTAL_ORDER.is_lt(f32::NEG_INFINITY)); + assert!(::MAX_TOTAL_ORDER.is_gt(f32::INFINITY)); + + assert!(::MIN_TOTAL_ORDER.is_nan()); + assert!(::MIN_TOTAL_ORDER.is_sign_negative()); + assert!(::MIN_TOTAL_ORDER.is_lt(-f32::NAN)); + + assert!(::MAX_TOTAL_ORDER.is_nan()); + assert!(::MAX_TOTAL_ORDER.is_sign_positive()); + assert!(::MAX_TOTAL_ORDER.is_gt(f32::NAN)); + + assert!(::MIN_TOTAL_ORDER.is_lt(f16::NEG_INFINITY)); + assert!(::MAX_TOTAL_ORDER.is_gt(f16::INFINITY)); + + assert!(::MIN_TOTAL_ORDER.is_nan()); + assert!(::MIN_TOTAL_ORDER.is_sign_negative()); + assert!(::MIN_TOTAL_ORDER.is_lt(-f16::NAN)); + + assert!(::MAX_TOTAL_ORDER.is_nan()); + assert!(::MAX_TOTAL_ORDER.is_sign_positive()); + assert!(::MAX_TOTAL_ORDER.is_gt(f16::NAN)); + } } diff --git a/arrow-array/src/builder/generic_list_builder.rs b/arrow-array/src/builder/generic_list_builder.rs index 21eaadd5208a..116e2553cfb7 100644 --- a/arrow-array/src/builder/generic_list_builder.rs +++ b/arrow-array/src/builder/generic_list_builder.rs @@ -584,14 +584,31 @@ mod tests { &DataType::List(Arc::new(Field::new("item", DataType::Int32, true))), 10, ); - let mut builder = ListBuilder::new(values_builder); + test_boxed_generic_list_generic_list_array_builder::(values_builder); + } + + #[test] + fn test_boxed_large_list_large_list_array_builder() { + // This test is same as `test_list_list_array_builder` but uses boxed builders. + let values_builder = make_builder( + &DataType::LargeList(Arc::new(Field::new("item", DataType::Int32, true))), + 10, + ); + test_boxed_generic_list_generic_list_array_builder::(values_builder); + } + + fn test_boxed_generic_list_generic_list_array_builder( + values_builder: Box, + ) { + let mut builder: GenericListBuilder> = + GenericListBuilder::>::new(values_builder); // [[[1, 2], [3, 4]], [[5, 6, 7], null, [8]], null, [[9, 10]]] builder .values() .as_any_mut() - .downcast_mut::>>() - .expect("should be an ListBuilder") + .downcast_mut::>>() + .expect("should be an (Large)ListBuilder") .values() .as_any_mut() .downcast_mut::() @@ -600,8 +617,8 @@ mod tests { builder .values() .as_any_mut() - .downcast_mut::>>() - .expect("should be an ListBuilder") + .downcast_mut::>>() + .expect("should be an (Large)ListBuilder") .values() .as_any_mut() .downcast_mut::() @@ -610,14 +627,14 @@ mod tests { builder .values() .as_any_mut() - .downcast_mut::>>() - .expect("should be an ListBuilder") + .downcast_mut::>>() + .expect("should be an (Large)ListBuilder") .append(true); builder .values() .as_any_mut() - .downcast_mut::>>() - .expect("should be an ListBuilder") + .downcast_mut::>>() + .expect("should be an (Large)ListBuilder") .values() .as_any_mut() .downcast_mut::() @@ -626,8 +643,8 @@ mod tests { builder .values() .as_any_mut() - .downcast_mut::>>() - .expect("should be an ListBuilder") + .downcast_mut::>>() + .expect("should be an (Large)ListBuilder") .values() .as_any_mut() .downcast_mut::() @@ -636,16 +653,16 @@ mod tests { builder .values() .as_any_mut() - .downcast_mut::>>() - .expect("should be an ListBuilder") + .downcast_mut::>>() + .expect("should be an (Large)ListBuilder") .append(true); builder.append(true); builder .values() .as_any_mut() - .downcast_mut::>>() - .expect("should be an ListBuilder") + .downcast_mut::>>() + .expect("should be an (Large)ListBuilder") .values() .as_any_mut() .downcast_mut::() @@ -654,8 +671,8 @@ mod tests { builder .values() .as_any_mut() - .downcast_mut::>>() - .expect("should be an ListBuilder") + .downcast_mut::>>() + .expect("should be an (Large)ListBuilder") .values() .as_any_mut() .downcast_mut::() @@ -664,30 +681,30 @@ mod tests { builder .values() .as_any_mut() - .downcast_mut::>>() - .expect("should be an ListBuilder") + .downcast_mut::>>() + .expect("should be an (Large)ListBuilder") .values() .as_any_mut() .downcast_mut::() - .expect("should be an Int32Builder") + .expect("should be an (Large)ListBuilder") .append_value(7); builder .values() .as_any_mut() - .downcast_mut::>>() - .expect("should be an ListBuilder") + .downcast_mut::>>() + .expect("should be an (Large)ListBuilder") .append(true); builder .values() .as_any_mut() - .downcast_mut::>>() - .expect("should be an ListBuilder") + .downcast_mut::>>() + .expect("should be an (Large)ListBuilder") .append(false); builder .values() .as_any_mut() - .downcast_mut::>>() - .expect("should be an ListBuilder") + .downcast_mut::>>() + .expect("should be an (Large)ListBuilder") .values() .as_any_mut() .downcast_mut::() @@ -696,8 +713,8 @@ mod tests { builder .values() .as_any_mut() - .downcast_mut::>>() - .expect("should be an ListBuilder") + .downcast_mut::>>() + .expect("should be an (Large)ListBuilder") .append(true); builder.append(true); @@ -706,8 +723,8 @@ mod tests { builder .values() .as_any_mut() - .downcast_mut::>>() - .expect("should be an ListBuilder") + .downcast_mut::>>() + .expect("should be an (Large)ListBuilder") .values() .as_any_mut() .downcast_mut::() @@ -716,8 +733,8 @@ mod tests { builder .values() .as_any_mut() - .downcast_mut::>>() - .expect("should be an ListBuilder") + .downcast_mut::>>() + .expect("should be an (Large)ListBuilder") .values() .as_any_mut() .downcast_mut::() @@ -726,8 +743,8 @@ mod tests { builder .values() .as_any_mut() - .downcast_mut::>>() - .expect("should be an ListBuilder") + .downcast_mut::>>() + .expect("should be an (Large)ListBuilder") .append(true); builder.append(true); @@ -736,12 +753,12 @@ mod tests { assert_eq!(4, l1.len()); assert_eq!(1, l1.null_count()); - assert_eq!(l1.value_offsets(), &[0, 2, 5, 5, 6]); - let l2 = l1.values().as_list::(); + assert_eq!(l1.value_offsets(), &[0, 2, 5, 5, 6].map(O::usize_as)); + let l2 = l1.values().as_list::(); assert_eq!(6, l2.len()); assert_eq!(1, l2.null_count()); - assert_eq!(l2.value_offsets(), &[0, 2, 4, 7, 7, 8, 10]); + assert_eq!(l2.value_offsets(), &[0, 2, 4, 7, 7, 8, 10].map(O::usize_as)); let i1 = l2.values().as_primitive::(); assert_eq!(10, i1.len()); diff --git a/arrow-array/src/builder/struct_builder.rs b/arrow-array/src/builder/struct_builder.rs index 06b8385b3164..960949a2f09f 100644 --- a/arrow-array/src/builder/struct_builder.rs +++ b/arrow-array/src/builder/struct_builder.rs @@ -173,6 +173,10 @@ pub fn make_builder(datatype: &DataType, capacity: usize) -> Box { + let builder = make_builder(field.data_type(), capacity); + Box::new(LargeListBuilder::with_capacity(builder, capacity)) + } DataType::Struct(fields) => Box::new(StructBuilder::from_fields(fields.clone(), capacity)), t => panic!("Data type {t:?} is not currently supported"), } diff --git a/arrow-buffer/src/buffer/boolean.rs b/arrow-buffer/src/buffer/boolean.rs index c651edcad92e..1589cc5b102b 100644 --- a/arrow-buffer/src/buffer/boolean.rs +++ b/arrow-buffer/src/buffer/boolean.rs @@ -90,6 +90,7 @@ impl BooleanBuffer { /// Returns a `BitChunks` instance which can be used to iterate over /// this buffer's bits in `u64` chunks + #[inline] pub fn bit_chunks(&self) -> BitChunks { BitChunks::new(self.values(), self.offset, self.len) } diff --git a/arrow-cast/src/display.rs b/arrow-cast/src/display.rs index 28c29c94bbdb..edf7c9394c88 100644 --- a/arrow-cast/src/display.rs +++ b/arrow-cast/src/display.rs @@ -301,6 +301,10 @@ fn make_formatter<'a>( DataType::Struct(_) => array_format(as_struct_array(array), options), DataType::Map(_, _) => array_format(as_map_array(array), options), DataType::Union(_, _) => array_format(as_union_array(array), options), + DataType::RunEndEncoded(_, _) => downcast_run_array! { + array => array_format(array, options), + _ => unreachable!() + }, d => Err(ArrowError::NotYetImplemented(format!("formatting {d} is not yet supported"))), } } @@ -748,6 +752,19 @@ impl<'a, K: ArrowDictionaryKeyType> DisplayIndexState<'a> for &'a DictionaryArra } } +impl<'a, K: RunEndIndexType> DisplayIndexState<'a> for &'a RunArray { + type State = Box; + + fn prepare(&self, options: &FormatOptions<'a>) -> Result { + make_formatter(self.values().as_ref(), options) + } + + fn write(&self, s: &Self::State, idx: usize, f: &mut dyn Write) -> FormatResult { + let value_idx = self.get_physical_index(idx); + s.as_ref().write(value_idx, f) + } +} + fn write_list( f: &mut dyn Write, mut range: Range, @@ -935,6 +952,8 @@ pub fn lexical_to_string(n: N) -> String { #[cfg(test)] mod tests { + use arrow_array::builder::StringRunBuilder; + use super::*; /// Test to verify options can be constant. See #4580 @@ -1079,4 +1098,21 @@ mod tests { let formatted = format_array(&array, &options); assert_eq!(formatted, &["NULL".to_string(), "NULL".to_string()]) } + + #[test] + fn test_string_run_arry_to_string() { + let mut builder = StringRunBuilder::::new(); + + builder.append_value("input_value"); + builder.append_value("input_value"); + builder.append_value("input_value"); + builder.append_value("input_value1"); + + let map_array = builder.finish(); + assert_eq!("input_value", array_value_to_string(&map_array, 1).unwrap()); + assert_eq!( + "input_value1", + array_value_to_string(&map_array, 3).unwrap() + ); + } } diff --git a/arrow-csv/Cargo.toml b/arrow-csv/Cargo.toml index 66a6d7dbcaa5..d29c85c56cfd 100644 --- a/arrow-csv/Cargo.toml +++ b/arrow-csv/Cargo.toml @@ -18,7 +18,7 @@ [package] name = "arrow-csv" version = { workspace = true } -description = "Support for parsing CSV format into the Arrow format" +description = "Support for parsing CSV format to and from the Arrow format" homepage = { workspace = true } repository = { workspace = true } authors = { workspace = true } diff --git a/arrow-json/Cargo.toml b/arrow-json/Cargo.toml index 7e49a57fbd6c..dd232f197ead 100644 --- a/arrow-json/Cargo.toml +++ b/arrow-json/Cargo.toml @@ -18,7 +18,7 @@ [package] name = "arrow-json" version = { workspace = true } -description = "Support for parsing JSON format into the Arrow format" +description = "Support for parsing JSON format to and from the Arrow format" homepage = { workspace = true } repository = { workspace = true } authors = { workspace = true } diff --git a/arrow-select/src/take.rs b/arrow-select/src/take.rs index d47b884ae38d..44269e38758e 100644 --- a/arrow-select/src/take.rs +++ b/arrow-select/src/take.rs @@ -19,7 +19,7 @@ use std::sync::Arc; -use arrow_array::builder::BufferBuilder; +use arrow_array::builder::{BufferBuilder, UInt32Builder}; use arrow_array::cast::AsArray; use arrow_array::types::*; use arrow_array::*; @@ -689,7 +689,7 @@ fn take_value_indices_from_fixed_size_list( where IndexType: ArrowPrimitiveType, { - let mut values = vec![]; + let mut values = UInt32Builder::with_capacity(length as usize * indices.len()); for i in 0..indices.len() { if indices.is_valid(i) { @@ -699,11 +699,16 @@ where .ok_or_else(|| ArrowError::ComputeError("Cast to usize failed".to_string()))?; let start = list.value_offset(index) as ::Native; - values.extend(start..start + length); + // Safety: Range always has known length. + unsafe { + values.append_trusted_len_iter(start..start + length); + } + } else { + values.append_nulls(length as usize); } } - Ok(PrimitiveArray::::from(values)) + Ok(values.finish()) } /// To avoid generating take implementations for every index type, instead we @@ -1985,6 +1990,23 @@ mod tests { assert_eq!(&values, &[Some(23), Some(4), None, None]) } + #[test] + fn test_take_fixed_size_list_null_indices() { + let indices = Int32Array::from_iter([Some(0), None]); + let values = Arc::new(Int32Array::from(vec![0, 1, 2, 3])); + let arr_field = Arc::new(Field::new("item", values.data_type().clone(), true)); + let values = FixedSizeListArray::try_new(arr_field, 2, values, None).unwrap(); + + let r = take(&values, &indices, None).unwrap(); + let values = r + .as_fixed_size_list() + .values() + .as_primitive::() + .into_iter() + .collect::>(); + assert_eq!(values, &[Some(0), Some(1), None, None]) + } + #[test] fn test_take_bytes_null_indices() { let indices = Int32Array::new( diff --git a/arrow/Cargo.toml b/arrow/Cargo.toml index 37f03a05b3fa..6ca218f5f658 100644 --- a/arrow/Cargo.toml +++ b/arrow/Cargo.toml @@ -38,12 +38,6 @@ name = "arrow" path = "src/lib.rs" bench = false -[target.'cfg(target_arch = "wasm32")'.dependencies] -ahash = { version = "0.8", default-features = false, features = ["compile-time-rng"] } - -[target.'cfg(not(target_arch = "wasm32"))'.dependencies] -ahash = { version = "0.8", default-features = false, features = ["runtime-rng"] } - [dependencies] arrow-arith = { workspace = true } arrow-array = { workspace = true } diff --git a/object_store/Cargo.toml b/object_store/Cargo.toml index 2f5157c40e67..e7f99e529e07 100644 --- a/object_store/Cargo.toml +++ b/object_store/Cargo.toml @@ -52,7 +52,7 @@ serde_json = { version = "1.0", default-features = false, optional = true } rand = { version = "0.8", default-features = false, features = ["std", "std_rng"], optional = true } reqwest = { version = "0.11", default-features = false, features = ["rustls-tls-native-roots"], optional = true } ring = { version = "0.17", default-features = false, features = ["std"], optional = true } -rustls-pemfile = { version = "1.0", default-features = false, optional = true } +rustls-pemfile = { version = "2.0", default-features = false, features = ["std"], optional = true } tokio = { version = "1.25.0", features = ["sync", "macros", "rt", "time", "io-util"] } [target.'cfg(target_family="unix")'.dev-dependencies] @@ -67,10 +67,10 @@ http = ["cloud"] tls-webpki-roots = ["reqwest?/rustls-tls-webpki-roots"] [dev-dependencies] # In alphabetical order -tempfile = "3.1.0" futures-test = "0.3" -rand = "0.8" hyper = { version = "0.14.24", features = ["server"] } +rand = "0.8" +tempfile = "3.1.0" [[test]] name = "get_range_file" diff --git a/object_store/src/gcp/credential.rs b/object_store/src/gcp/credential.rs index 29c7b4563ad5..dc504da05723 100644 --- a/object_store/src/gcp/credential.rs +++ b/object_store/src/gcp/credential.rs @@ -304,8 +304,8 @@ fn decode_first_rsa_key(private_key_pem: String) -> Result { // Reading from string is infallible match rustls_pemfile::read_one(&mut reader).unwrap() { - Some(Item::PKCS8Key(key)) => Ok(RsaKeyPair::from_pkcs8(&key)?), - Some(Item::RSAKey(key)) => Ok(RsaKeyPair::from_der(&key)?), + Some(Item::Pkcs8Key(key)) => Ok(RsaKeyPair::from_pkcs8(key.secret_pkcs8_der())?), + Some(Item::Pkcs1Key(key)) => Ok(RsaKeyPair::from_der(key.secret_pkcs1_der())?), _ => Err(Error::MissingKey), } } diff --git a/object_store/src/lib.rs b/object_store/src/lib.rs index 5c5c70de3a2b..3a841667ff97 100644 --- a/object_store/src/lib.rs +++ b/object_store/src/lib.rs @@ -119,6 +119,7 @@ //! application complexity. //! //! ```no_run +//! # #[cfg(feature = "aws")] { //! # use url::Url; //! # use object_store::{parse_url, parse_url_opts}; //! # use object_store::aws::{AmazonS3, AmazonS3Builder}; @@ -140,6 +141,7 @@ //! let url = Url::parse("https://ACCOUNT_ID.r2.cloudflarestorage.com/bucket/path").unwrap(); //! let (store, path) = parse_url(&url).unwrap(); //! assert_eq!(path.as_ref(), "path"); +//! # } //! ``` //! //! [PyArrow FileSystem]: https://arrow.apache.org/docs/python/generated/pyarrow.fs.FileSystem.html#pyarrow.fs.FileSystem.from_uri diff --git a/parquet/src/arrow/arrow_reader/mod.rs b/parquet/src/arrow/arrow_reader/mod.rs index b9e9d2898459..77de83994078 100644 --- a/parquet/src/arrow/arrow_reader/mod.rs +++ b/parquet/src/arrow/arrow_reader/mod.rs @@ -575,6 +575,10 @@ impl Iterator for ParquetRecordBatchReader { } impl RecordBatchReader for ParquetRecordBatchReader { + /// Returns the projected [`SchemaRef`] for reading the parquet file. + /// + /// Note that the schema metadata will be stripped here. See + /// [`ParquetRecordBatchReaderBuilder::schema`] if the metadata is desired. fn schema(&self) -> SchemaRef { self.schema.clone() } diff --git a/parquet/src/arrow/arrow_writer/mod.rs b/parquet/src/arrow/arrow_writer/mod.rs index ea7b1eee99b8..e6e95d50996a 100644 --- a/parquet/src/arrow/arrow_writer/mod.rs +++ b/parquet/src/arrow/arrow_writer/mod.rs @@ -916,8 +916,9 @@ mod tests { use crate::basic::Encoding; use crate::data_type::AsBytes; use crate::file::metadata::ParquetMetaData; + use crate::file::page_index::index::Index; use crate::file::page_index::index_reader::read_pages_locations; - use crate::file::properties::{ReaderProperties, WriterVersion}; + use crate::file::properties::{EnabledStatistics, ReaderProperties, WriterVersion}; use crate::file::serialized_reader::ReadOptionsBuilder; use crate::file::{ reader::{FileReader, SerializedFileReader}, @@ -2738,4 +2739,144 @@ mod tests { assert_eq!(index[0][0].len(), 1); // 1 page assert_eq!(index[0][1].len(), 1); // 1 page } + + #[test] + fn test_disabled_statistics_with_page() { + let file_schema = Schema::new(vec![ + Field::new("a", DataType::Utf8, true), + Field::new("b", DataType::Utf8, true), + ]); + let file_schema = Arc::new(file_schema); + + let batch = RecordBatch::try_new( + file_schema.clone(), + vec![ + Arc::new(StringArray::from(vec!["a", "b", "c", "d"])) as _, + Arc::new(StringArray::from(vec!["w", "x", "y", "z"])) as _, + ], + ) + .unwrap(); + + let props = WriterProperties::builder() + .set_statistics_enabled(EnabledStatistics::None) + .set_column_statistics_enabled("a".into(), EnabledStatistics::Page) + .build(); + + let mut buf = Vec::with_capacity(1024); + let mut writer = ArrowWriter::try_new(&mut buf, file_schema.clone(), Some(props)).unwrap(); + writer.write(&batch).unwrap(); + + let metadata = writer.close().unwrap(); + assert_eq!(metadata.row_groups.len(), 1); + let row_group = &metadata.row_groups[0]; + assert_eq!(row_group.columns.len(), 2); + // Column "a" has both offset and column index, as requested + assert!(row_group.columns[0].offset_index_offset.is_some()); + assert!(row_group.columns[0].column_index_offset.is_some()); + // Column "b" should only have offset index + assert!(row_group.columns[1].offset_index_offset.is_some()); + assert!(row_group.columns[1].column_index_offset.is_none()); + + let options = ReadOptionsBuilder::new().with_page_index().build(); + let reader = SerializedFileReader::new_with_options(Bytes::from(buf), options).unwrap(); + + let row_group = reader.get_row_group(0).unwrap(); + let a_col = row_group.metadata().column(0); + let b_col = row_group.metadata().column(1); + + // Column chunk of column "a" should have chunk level statistics + if let Statistics::ByteArray(byte_array_stats) = a_col.statistics().unwrap() { + let min = byte_array_stats.min(); + let max = byte_array_stats.max(); + + assert_eq!(min.as_bytes(), &[b'a']); + assert_eq!(max.as_bytes(), &[b'd']); + } else { + panic!("expecting Statistics::ByteArray"); + } + + // The column chunk for column "b" shouldn't have statistics + assert!(b_col.statistics().is_none()); + + let offset_index = reader.metadata().offset_index().unwrap(); + assert_eq!(offset_index.len(), 1); // 1 row group + assert_eq!(offset_index[0].len(), 2); // 2 columns + + let column_index = reader.metadata().column_index().unwrap(); + assert_eq!(column_index.len(), 1); // 1 row group + assert_eq!(column_index[0].len(), 2); // 2 columns + + let a_idx = &column_index[0][0]; + assert!(matches!(a_idx, Index::BYTE_ARRAY(_)), "{a_idx:?}"); + let b_idx = &column_index[0][1]; + assert!(matches!(b_idx, Index::NONE), "{b_idx:?}"); + } + + #[test] + fn test_disabled_statistics_with_chunk() { + let file_schema = Schema::new(vec![ + Field::new("a", DataType::Utf8, true), + Field::new("b", DataType::Utf8, true), + ]); + let file_schema = Arc::new(file_schema); + + let batch = RecordBatch::try_new( + file_schema.clone(), + vec![ + Arc::new(StringArray::from(vec!["a", "b", "c", "d"])) as _, + Arc::new(StringArray::from(vec!["w", "x", "y", "z"])) as _, + ], + ) + .unwrap(); + + let props = WriterProperties::builder() + .set_statistics_enabled(EnabledStatistics::None) + .set_column_statistics_enabled("a".into(), EnabledStatistics::Chunk) + .build(); + + let mut buf = Vec::with_capacity(1024); + let mut writer = ArrowWriter::try_new(&mut buf, file_schema.clone(), Some(props)).unwrap(); + writer.write(&batch).unwrap(); + + let metadata = writer.close().unwrap(); + assert_eq!(metadata.row_groups.len(), 1); + let row_group = &metadata.row_groups[0]; + assert_eq!(row_group.columns.len(), 2); + // Column "a" should only have offset index + assert!(row_group.columns[0].offset_index_offset.is_some()); + assert!(row_group.columns[0].column_index_offset.is_none()); + // Column "b" should only have offset index + assert!(row_group.columns[1].offset_index_offset.is_some()); + assert!(row_group.columns[1].column_index_offset.is_none()); + + let options = ReadOptionsBuilder::new().with_page_index().build(); + let reader = SerializedFileReader::new_with_options(Bytes::from(buf), options).unwrap(); + + let row_group = reader.get_row_group(0).unwrap(); + let a_col = row_group.metadata().column(0); + let b_col = row_group.metadata().column(1); + + // Column chunk of column "a" should have chunk level statistics + if let Statistics::ByteArray(byte_array_stats) = a_col.statistics().unwrap() { + let min = byte_array_stats.min(); + let max = byte_array_stats.max(); + + assert_eq!(min.as_bytes(), &[b'a']); + assert_eq!(max.as_bytes(), &[b'd']); + } else { + panic!("expecting Statistics::ByteArray"); + } + + // The column chunk for column "b" shouldn't have statistics + assert!(b_col.statistics().is_none()); + + let column_index = reader.metadata().column_index().unwrap(); + assert_eq!(column_index.len(), 1); // 1 row group + assert_eq!(column_index[0].len(), 2); // 2 columns + + let a_idx = &column_index[0][0]; + assert!(matches!(a_idx, Index::NONE), "{a_idx:?}"); + let b_idx = &column_index[0][1]; + assert!(matches!(b_idx, Index::NONE), "{b_idx:?}"); + } } diff --git a/parquet/src/arrow/async_reader/mod.rs b/parquet/src/arrow/async_reader/mod.rs index 04383bb51bda..80a554026d9a 100644 --- a/parquet/src/arrow/async_reader/mod.rs +++ b/parquet/src/arrow/async_reader/mod.rs @@ -90,7 +90,7 @@ use futures::stream::Stream; use tokio::io::{AsyncRead, AsyncReadExt, AsyncSeek, AsyncSeekExt}; use arrow_array::RecordBatch; -use arrow_schema::SchemaRef; +use arrow_schema::{DataType, Fields, Schema, SchemaRef}; use crate::arrow::array_reader::{build_array_reader, RowGroups}; use crate::arrow::arrow_reader::{ @@ -385,13 +385,24 @@ impl ParquetRecordBatchStreamBuilder { offset: self.offset, }; + // Ensure schema of ParquetRecordBatchStream respects projection, and does + // not store metadata (same as for ParquetRecordBatchReader and emitted RecordBatches) + let projected_fields = match reader.fields.as_deref().map(|pf| &pf.arrow_type) { + Some(DataType::Struct(fields)) => { + fields.filter_leaves(|idx, _| self.projection.leaf_included(idx)) + } + None => Fields::empty(), + _ => unreachable!("Must be Struct for root type"), + }; + let schema = Arc::new(Schema::new(projected_fields)); + Ok(ParquetRecordBatchStream { metadata: self.metadata, batch_size, row_groups, projection: self.projection, selection: self.selection, - schema: self.schema, + schema, reader: Some(reader), state: StreamState::Init, }) @@ -572,7 +583,10 @@ impl std::fmt::Debug for ParquetRecordBatchStream { } impl ParquetRecordBatchStream { - /// Returns the [`SchemaRef`] for this parquet file + /// Returns the projected [`SchemaRef`] for reading the parquet file. + /// + /// Note that the schema metadata will be stripped here. See + /// [`ParquetRecordBatchStreamBuilder::schema`] if the metadata is desired. pub fn schema(&self) -> &SchemaRef { &self.schema } @@ -855,11 +869,15 @@ mod tests { use arrow_array::builder::{ListBuilder, StringBuilder}; use arrow_array::cast::AsArray; use arrow_array::types::Int32Type; - use arrow_array::{Array, ArrayRef, Int32Array, Int8Array, Scalar, StringArray, UInt64Array}; + use arrow_array::{ + Array, ArrayRef, Int32Array, Int8Array, RecordBatchReader, Scalar, StringArray, + StructArray, UInt64Array, + }; use arrow_schema::{DataType, Field, Schema}; use futures::{StreamExt, TryStreamExt}; use rand::{thread_rng, Rng}; - use std::sync::Mutex; + use std::collections::HashMap; + use std::sync::{Arc, Mutex}; use tempfile::tempfile; #[derive(Clone)] @@ -1584,6 +1602,114 @@ mod tests { test_get_row_group_column_bloom_filter(data, false).await; } + #[tokio::test] + async fn test_parquet_record_batch_stream_schema() { + fn get_all_field_names(schema: &Schema) -> Vec<&String> { + schema.all_fields().iter().map(|f| f.name()).collect() + } + + // ParquetRecordBatchReaderBuilder::schema differs from + // ParquetRecordBatchReader::schema and RecordBatch::schema in the returned + // schema contents (in terms of custom metadata attached to schema, and fields + // returned). Test to ensure this remains consistent behaviour. + // + // Ensure same for asynchronous versions of the above. + + // Prep data, for a schema with nested fields, with custom metadata + let mut metadata = HashMap::with_capacity(1); + metadata.insert("key".to_string(), "value".to_string()); + + let nested_struct_array = StructArray::from(vec![ + ( + Arc::new(Field::new("d", DataType::Utf8, true)), + Arc::new(StringArray::from(vec!["a", "b"])) as ArrayRef, + ), + ( + Arc::new(Field::new("e", DataType::Utf8, true)), + Arc::new(StringArray::from(vec!["c", "d"])) as ArrayRef, + ), + ]); + let struct_array = StructArray::from(vec![ + ( + Arc::new(Field::new("a", DataType::Int32, true)), + Arc::new(Int32Array::from(vec![-1, 1])) as ArrayRef, + ), + ( + Arc::new(Field::new("b", DataType::UInt64, true)), + Arc::new(UInt64Array::from(vec![1, 2])) as ArrayRef, + ), + ( + Arc::new(Field::new( + "c", + nested_struct_array.data_type().clone(), + true, + )), + Arc::new(nested_struct_array) as ArrayRef, + ), + ]); + + let schema = + Arc::new(Schema::new(struct_array.fields().clone()).with_metadata(metadata.clone())); + let record_batch = RecordBatch::from(struct_array) + .with_schema(schema.clone()) + .unwrap(); + + // Write parquet with custom metadata in schema + let mut file = tempfile().unwrap(); + let mut writer = ArrowWriter::try_new(&mut file, schema.clone(), None).unwrap(); + writer.write(&record_batch).unwrap(); + writer.close().unwrap(); + + let all_fields = ["a", "b", "c", "d", "e"]; + // (leaf indices in mask, expected names in output schema all fields) + let projections = [ + (vec![], vec![]), + (vec![0], vec!["a"]), + (vec![0, 1], vec!["a", "b"]), + (vec![0, 1, 2], vec!["a", "b", "c", "d"]), + (vec![0, 1, 2, 3], vec!["a", "b", "c", "d", "e"]), + ]; + + // Ensure we're consistent for each of these projections + for (indices, expected_projected_names) in projections { + let assert_schemas = |builder: SchemaRef, reader: SchemaRef, batch: SchemaRef| { + // Builder schema should preserve all fields and metadata + assert_eq!(get_all_field_names(&builder), all_fields); + assert_eq!(builder.metadata, metadata); + // Reader & batch schema should show only projected fields, and no metadata + assert_eq!(get_all_field_names(&reader), expected_projected_names); + assert_eq!(reader.metadata, HashMap::default()); + assert_eq!(get_all_field_names(&batch), expected_projected_names); + assert_eq!(batch.metadata, HashMap::default()); + }; + + let builder = + ParquetRecordBatchReaderBuilder::try_new(file.try_clone().unwrap()).unwrap(); + let sync_builder_schema = builder.schema().clone(); + let mask = ProjectionMask::leaves(builder.parquet_schema(), indices.clone()); + let mut reader = builder.with_projection(mask).build().unwrap(); + let sync_reader_schema = reader.schema(); + let batch = reader.next().unwrap().unwrap(); + let sync_batch_schema = batch.schema(); + assert_schemas(sync_builder_schema, sync_reader_schema, sync_batch_schema); + + // asynchronous should be same + let file = tokio::fs::File::from(file.try_clone().unwrap()); + let builder = ParquetRecordBatchStreamBuilder::new(file).await.unwrap(); + let async_builder_schema = builder.schema().clone(); + let mask = ProjectionMask::leaves(builder.parquet_schema(), indices); + let mut reader = builder.with_projection(mask).build().unwrap(); + let async_reader_schema = reader.schema().clone(); + let batch = reader.next().await.unwrap().unwrap(); + let async_batch_schema = batch.schema(); + assert_schemas( + async_builder_schema, + async_reader_schema, + async_batch_schema, + ); + } + } + #[tokio::test] async fn test_get_row_group_column_bloom_filter_with_length() { // convert to new parquet file with bloom_filter_length diff --git a/parquet/src/column/reader.rs b/parquet/src/column/reader.rs index adfcd6390720..6c712ead625c 100644 --- a/parquet/src/column/reader.rs +++ b/parquet/src/column/reader.rs @@ -607,10 +607,7 @@ fn parse_v1_level( } Encoding::BIT_PACKED => { let bit_width = num_required_bits(max_level as u64); - let num_bytes = ceil( - (num_buffered_values as usize * bit_width as usize) as i64, - 8, - ) as usize; + let num_bytes = ceil(num_buffered_values as usize * bit_width as usize, 8); Ok((num_bytes, buf.slice(..num_bytes))) } _ => Err(general_err!("invalid level encoding: {}", encoding)), diff --git a/parquet/src/column/writer/mod.rs b/parquet/src/column/writer/mod.rs index e92a502689a3..531af4bd461e 100644 --- a/parquet/src/column/writer/mod.rs +++ b/parquet/src/column/writer/mod.rs @@ -680,32 +680,28 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> { } self.last_non_null_data_page_min_max = Some((new_min.clone(), new_max.clone())); - // We only truncate if the data is represented as binary - match self.descr.physical_type() { - Type::BYTE_ARRAY | Type::FIXED_LEN_BYTE_ARRAY => { - self.column_index_builder.append( - null_page, - self.truncate_min_value( - self.props.column_index_truncate_length(), - stat.min_bytes(), - ) - .0, - self.truncate_max_value( - self.props.column_index_truncate_length(), - stat.max_bytes(), - ) - .0, - self.page_metrics.num_page_nulls as i64, - ); - } - _ => { - self.column_index_builder.append( - null_page, - stat.min_bytes().to_vec(), - stat.max_bytes().to_vec(), - self.page_metrics.num_page_nulls as i64, - ); - } + if self.can_truncate_value() { + self.column_index_builder.append( + null_page, + self.truncate_min_value( + self.props.column_index_truncate_length(), + stat.min_bytes(), + ) + .0, + self.truncate_max_value( + self.props.column_index_truncate_length(), + stat.max_bytes(), + ) + .0, + self.page_metrics.num_page_nulls as i64, + ); + } else { + self.column_index_builder.append( + null_page, + stat.min_bytes().to_vec(), + stat.max_bytes().to_vec(), + self.page_metrics.num_page_nulls as i64, + ); } } } @@ -715,6 +711,26 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> { .append_row_count(self.page_metrics.num_buffered_rows as i64); } + /// Determine if we should allow truncating min/max values for this column's statistics + fn can_truncate_value(&self) -> bool { + match self.descr.physical_type() { + // Don't truncate for Float16 and Decimal because their sort order is different + // from that of FIXED_LEN_BYTE_ARRAY sort order. + // So truncation of those types could lead to inaccurate min/max statistics + Type::FIXED_LEN_BYTE_ARRAY + if !matches!( + self.descr.logical_type(), + Some(LogicalType::Decimal { .. }) | Some(LogicalType::Float16) + ) => + { + true + } + Type::BYTE_ARRAY => true, + // Truncation only applies for fba/binary physical types + _ => false, + } + } + fn truncate_min_value(&self, truncation_length: Option, data: &[u8]) -> (Vec, bool) { truncation_length .filter(|l| data.len() > *l) @@ -748,19 +764,22 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> { self.column_metrics.num_column_nulls += self.page_metrics.num_page_nulls; - let page_statistics = match (values_data.min_value, values_data.max_value) { - (Some(min), Some(max)) => { - update_min(&self.descr, &min, &mut self.column_metrics.min_column_value); - update_max(&self.descr, &max, &mut self.column_metrics.max_column_value); - Some(ValueStatistics::new( - Some(min), - Some(max), - None, - self.page_metrics.num_page_nulls, - false, - )) - } - _ => None, + let page_statistics = if let (Some(min), Some(max)) = + (values_data.min_value, values_data.max_value) + { + // Update chunk level statistics + update_min(&self.descr, &min, &mut self.column_metrics.min_column_value); + update_max(&self.descr, &max, &mut self.column_metrics.max_column_value); + + (self.statistics_enabled == EnabledStatistics::Page).then_some(ValueStatistics::new( + Some(min), + Some(max), + None, + self.page_metrics.num_page_nulls, + false, + )) + } else { + None }; // update column and offset index @@ -948,7 +967,9 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> { .with_min_is_exact(!did_truncate_min), ) } - Statistics::FixedLenByteArray(stats) if stats.has_min_max_set() => { + Statistics::FixedLenByteArray(stats) + if (stats.has_min_max_set() && self.can_truncate_value()) => + { let (min, did_truncate_min) = self.truncate_min_value( self.props.statistics_truncate_length(), stats.min_bytes(), @@ -2713,6 +2734,82 @@ mod tests { } } + #[test] + fn test_float16_min_max_no_truncation() { + // Even if we set truncation to occur at 1 byte, we should not truncate for Float16 + let builder = WriterProperties::builder().set_column_index_truncate_length(Some(1)); + let props = Arc::new(builder.build()); + let page_writer = get_test_page_writer(); + let mut writer = get_test_float16_column_writer(page_writer, props); + + let expected_value = f16::PI.to_le_bytes().to_vec(); + let data = vec![ByteArray::from(expected_value.clone()).into()]; + writer.write_batch(&data, None, None).unwrap(); + writer.flush_data_pages().unwrap(); + + let r = writer.close().unwrap(); + + // stats should still be written + // ensure bytes weren't truncated for column index + let column_index = r.column_index.unwrap(); + let column_index_min_bytes = column_index.min_values[0].as_slice(); + let column_index_max_bytes = column_index.max_values[0].as_slice(); + assert_eq!(expected_value, column_index_min_bytes); + assert_eq!(expected_value, column_index_max_bytes); + + // ensure bytes weren't truncated for statistics + let stats = r.metadata.statistics().unwrap(); + assert!(stats.has_min_max_set()); + if let Statistics::FixedLenByteArray(stats) = stats { + let stats_min_bytes = stats.min_bytes(); + let stats_max_bytes = stats.max_bytes(); + assert_eq!(expected_value, stats_min_bytes); + assert_eq!(expected_value, stats_max_bytes); + } else { + panic!("expecting Statistics::FixedLenByteArray"); + } + } + + #[test] + fn test_decimal_min_max_no_truncation() { + // Even if we set truncation to occur at 1 byte, we should not truncate for Decimal + let builder = WriterProperties::builder().set_column_index_truncate_length(Some(1)); + let props = Arc::new(builder.build()); + let page_writer = get_test_page_writer(); + let mut writer = + get_test_decimals_column_writer::(page_writer, 0, 0, props); + + let expected_value = vec![ + 255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 179u8, 172u8, 19u8, 35u8, + 231u8, 90u8, 0u8, 0u8, + ]; + let data = vec![ByteArray::from(expected_value.clone()).into()]; + writer.write_batch(&data, None, None).unwrap(); + writer.flush_data_pages().unwrap(); + + let r = writer.close().unwrap(); + + // stats should still be written + // ensure bytes weren't truncated for column index + let column_index = r.column_index.unwrap(); + let column_index_min_bytes = column_index.min_values[0].as_slice(); + let column_index_max_bytes = column_index.max_values[0].as_slice(); + assert_eq!(expected_value, column_index_min_bytes); + assert_eq!(expected_value, column_index_max_bytes); + + // ensure bytes weren't truncated for statistics + let stats = r.metadata.statistics().unwrap(); + assert!(stats.has_min_max_set()); + if let Statistics::FixedLenByteArray(stats) = stats { + let stats_min_bytes = stats.min_bytes(); + let stats_max_bytes = stats.max_bytes(); + assert_eq!(expected_value, stats_min_bytes); + assert_eq!(expected_value, stats_max_bytes); + } else { + panic!("expecting Statistics::FixedLenByteArray"); + } + } + #[test] fn test_statistics_truncating_byte_array() { let page_writer = get_test_page_writer(); @@ -3421,7 +3518,7 @@ mod tests { values: &[FixedLenByteArray], ) -> ValueStatistics { let page_writer = get_test_page_writer(); - let mut writer = get_test_float16_column_writer(page_writer); + let mut writer = get_test_float16_column_writer(page_writer, Default::default()); writer.write_batch(values, None, None).unwrap(); let metadata = writer.close().unwrap().metadata; @@ -3434,9 +3531,10 @@ mod tests { fn get_test_float16_column_writer( page_writer: Box, + props: WriterPropertiesPtr, ) -> ColumnWriterImpl<'static, FixedLenByteArrayType> { let descr = Arc::new(get_test_float16_column_descr(0, 0)); - let column_writer = get_column_writer(descr, Default::default(), page_writer); + let column_writer = get_column_writer(descr, props, page_writer); get_typed_column_writer::(column_writer) } diff --git a/parquet/src/encodings/rle.rs b/parquet/src/encodings/rle.rs index 5807f6b9c527..5d91c1e53d0f 100644 --- a/parquet/src/encodings/rle.rs +++ b/parquet/src/encodings/rle.rs @@ -230,7 +230,7 @@ impl RleEncoder { self.bit_writer.put_vlq_int(indicator_value as u64); self.bit_writer.put_aligned( self.current_value, - bit_util::ceil(self.bit_width as i64, 8) as usize, + bit_util::ceil(self.bit_width as usize, 8), ); self.num_buffered_values = 0; self.repeat_count = 0; @@ -524,8 +524,8 @@ impl RleDecoder { self.bit_packed_left = ((indicator_value >> 1) * 8) as u32; } else { self.rle_left = (indicator_value >> 1) as u32; - let value_width = bit_util::ceil(self.bit_width as i64, 8); - self.current_value = bit_reader.get_aligned::(value_width as usize); + let value_width = bit_util::ceil(self.bit_width as usize, 8); + self.current_value = bit_reader.get_aligned::(value_width); assert!(self.current_value.is_some()); } true diff --git a/parquet/src/file/writer.rs b/parquet/src/file/writer.rs index 2b9f261d9f42..f0b75f302552 100644 --- a/parquet/src/file/writer.rs +++ b/parquet/src/file/writer.rs @@ -323,6 +323,19 @@ impl SerializedFileWriter { None => Some(self.kv_metadatas.clone()), }; + // We only include ColumnOrder for leaf nodes. + // Currently only supported ColumnOrder is TypeDefinedOrder so we set this + // for all leaf nodes. + // Even if the column has an undefined sort order, such as INTERVAL, this + // is still technically the defined TYPEORDER so it should still be set. + let column_orders = (0..self.schema_descr().num_columns()) + .map(|_| parquet::ColumnOrder::TYPEORDER(parquet::TypeDefinedOrder {})) + .collect(); + // This field is optional, perhaps in cases where no min/max fields are set + // in any Statistics or ColumnIndex object in the whole file. + // But for simplicity we always set this field. + let column_orders = Some(column_orders); + let file_metadata = parquet::FileMetaData { num_rows, row_groups, @@ -330,7 +343,7 @@ impl SerializedFileWriter { version: self.props.writer_version().as_num(), schema: types::to_thrift(self.schema.as_ref())?, created_by: Some(self.props.created_by().to_owned()), - column_orders: None, + column_orders, encryption_algorithm: None, footer_signing_key_metadata: None, }; @@ -738,7 +751,9 @@ mod tests { use bytes::Bytes; use std::fs::File; - use crate::basic::{Compression, Encoding, LogicalType, Repetition, Type}; + use crate::basic::{ + ColumnOrder, Compression, ConvertedType, Encoding, LogicalType, Repetition, SortOrder, Type, + }; use crate::column::page::{Page, PageReader}; use crate::column::reader::get_typed_column_reader; use crate::compression::{create_codec, Codec, CodecOptionsBuilder}; @@ -851,6 +866,78 @@ mod tests { assert_eq!(reader.get_row_iter(None).unwrap().count(), 0); } + #[test] + fn test_file_writer_column_orders_populated() { + let file = tempfile::tempfile().unwrap(); + + let schema = Arc::new( + types::Type::group_type_builder("schema") + .with_fields(vec![ + Arc::new( + types::Type::primitive_type_builder("col1", Type::INT32) + .build() + .unwrap(), + ), + Arc::new( + types::Type::primitive_type_builder("col2", Type::FIXED_LEN_BYTE_ARRAY) + .with_converted_type(ConvertedType::INTERVAL) + .with_length(12) + .build() + .unwrap(), + ), + Arc::new( + types::Type::group_type_builder("nested") + .with_repetition(Repetition::REQUIRED) + .with_fields(vec![ + Arc::new( + types::Type::primitive_type_builder( + "col3", + Type::FIXED_LEN_BYTE_ARRAY, + ) + .with_logical_type(Some(LogicalType::Float16)) + .with_length(2) + .build() + .unwrap(), + ), + Arc::new( + types::Type::primitive_type_builder("col4", Type::BYTE_ARRAY) + .with_logical_type(Some(LogicalType::String)) + .build() + .unwrap(), + ), + ]) + .build() + .unwrap(), + ), + ]) + .build() + .unwrap(), + ); + + let props = Default::default(); + let writer = SerializedFileWriter::new(file.try_clone().unwrap(), schema, props).unwrap(); + writer.close().unwrap(); + + let reader = SerializedFileReader::new(file).unwrap(); + + // only leaves + let expected = vec![ + // INT32 + ColumnOrder::TYPE_DEFINED_ORDER(SortOrder::SIGNED), + // INTERVAL + ColumnOrder::TYPE_DEFINED_ORDER(SortOrder::UNDEFINED), + // Float16 + ColumnOrder::TYPE_DEFINED_ORDER(SortOrder::SIGNED), + // String + ColumnOrder::TYPE_DEFINED_ORDER(SortOrder::UNSIGNED), + ]; + let actual = reader.metadata().file_metadata().column_orders(); + + assert!(actual.is_some()); + let actual = actual.unwrap(); + assert_eq!(*actual, expected); + } + #[test] fn test_file_writer_with_metadata() { let file = tempfile::tempfile().unwrap();