Skip to content

Commit

Permalink
fix: Check shape for *_horizontal functions (#20130)
Browse files Browse the repository at this point in the history
  • Loading branch information
coastalwhite authored Dec 3, 2024
1 parent cf3b47f commit cc05ff2
Show file tree
Hide file tree
Showing 15 changed files with 359 additions and 268 deletions.
181 changes: 7 additions & 174 deletions crates/polars-core/src/frame/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,4 @@
//! DataFrame module.
#[cfg(feature = "zip_with")]
use std::borrow::Cow;
use std::sync::OnceLock;
use std::{mem, ops};

Expand All @@ -13,6 +11,7 @@ use crate::chunked_array::metadata::MetadataFlags;
#[cfg(feature = "algorithm_group_by")]
use crate::chunked_array::ops::unique::is_unique_helper;
use crate::prelude::*;
use crate::series::arithmetic::horizontal as series_horizontal;
#[cfg(feature = "row_hash")]
use crate::utils::split_df;
use crate::utils::{slice_offsets, try_get_supertype, Container, NoNull};
Expand All @@ -38,11 +37,8 @@ use polars_utils::pl_str::PlSmallStr;
use serde::{Deserialize, Serialize};
use strum_macros::IntoStaticStr;

use crate::chunked_array::cast::CastOptions;
#[cfg(feature = "row_hash")]
use crate::hashing::_df_rows_to_hashes_threaded_vertical;
#[cfg(feature = "zip_with")]
use crate::prelude::min_max_binary::min_max_binary_columns;
use crate::prelude::sort::{argsort_multiple_row_fmt, prepare_arg_sort};
use crate::series::IsSorted;
use crate::POOL;
Expand Down Expand Up @@ -2798,186 +2794,23 @@ impl DataFrame {
/// Aggregate the column horizontally to their min values.
#[cfg(feature = "zip_with")]
pub fn min_horizontal(&self) -> PolarsResult<Option<Column>> {
let min_fn = |acc: &Column, s: &Column| min_max_binary_columns(acc, s, true);

match self.columns.len() {
0 => Ok(None),
1 => Ok(Some(self.columns[0].clone())),
2 => min_fn(&self.columns[0], &self.columns[1]).map(Some),
_ => {
// the try_reduce_with is a bit slower in parallelism,
// but I don't think it matters here as we parallelize over columns, not over elements
POOL.install(|| {
self.columns
.par_iter()
.map(|s| Ok(Cow::Borrowed(s)))
.try_reduce_with(|l, r| min_fn(&l, &r).map(Cow::Owned))
// we can unwrap the option, because we are certain there is a column
// we started this operation on 3 columns
.unwrap()
.map(|cow| Some(cow.into_owned()))
})
},
}
series_horizontal::min_horizontal(&self.columns)
}

/// Aggregate the column horizontally to their max values.
#[cfg(feature = "zip_with")]
pub fn max_horizontal(&self) -> PolarsResult<Option<Column>> {
let max_fn = |acc: &Column, s: &Column| min_max_binary_columns(acc, s, false);

match self.columns.len() {
0 => Ok(None),
1 => Ok(Some(self.columns[0].clone())),
2 => max_fn(&self.columns[0], &self.columns[1]).map(Some),
_ => {
// the try_reduce_with is a bit slower in parallelism,
// but I don't think it matters here as we parallelize over columns, not over elements
POOL.install(|| {
self.columns
.par_iter()
.map(|s| Ok(Cow::Borrowed(s)))
.try_reduce_with(|l, r| max_fn(&l, &r).map(Cow::Owned))
// we can unwrap the option, because we are certain there is a column
// we started this operation on 3 columns
.unwrap()
.map(|cow| Some(cow.into_owned()))
})
},
}
series_horizontal::max_horizontal(&self.columns)
}

/// Sum all values horizontally across columns.
pub fn sum_horizontal(&self, null_strategy: NullStrategy) -> PolarsResult<Option<Series>> {
let apply_null_strategy =
|s: Series, null_strategy: NullStrategy| -> PolarsResult<Series> {
if let NullStrategy::Ignore = null_strategy {
// if has nulls
if s.null_count() > 0 {
return s.fill_null(FillNullStrategy::Zero);
}
}
Ok(s)
};

let sum_fn =
|acc: Series, s: Series, null_strategy: NullStrategy| -> PolarsResult<Series> {
let acc: Series = apply_null_strategy(acc, null_strategy)?;
let s = apply_null_strategy(s, null_strategy)?;
// This will do owned arithmetic and can be mutable
std::ops::Add::add(acc, s)
};

let non_null_cols = self
.materialized_column_iter()
.filter(|x| x.dtype() != &DataType::Null)
.collect::<Vec<_>>();

match non_null_cols.len() {
0 => {
if self.columns.is_empty() {
Ok(None)
} else {
// all columns are null dtype, so result is null dtype
Ok(Some(self.columns[0].as_materialized_series().clone()))
}
},
1 => Ok(Some(apply_null_strategy(
if non_null_cols[0].dtype() == &DataType::Boolean {
non_null_cols[0].cast(&DataType::UInt32)?
} else {
non_null_cols[0].clone()
},
null_strategy,
)?)),
2 => sum_fn(
non_null_cols[0].clone(),
non_null_cols[1].clone(),
null_strategy,
)
.map(Some),
_ => {
// the try_reduce_with is a bit slower in parallelism,
// but I don't think it matters here as we parallelize over columns, not over elements
let out = POOL.install(|| {
non_null_cols
.into_par_iter()
.cloned()
.map(Ok)
.try_reduce_with(|l, r| sum_fn(l, r, null_strategy))
// We can unwrap because we started with at least 3 columns, so we always get a Some
.unwrap()
});
out.map(Some)
},
}
pub fn sum_horizontal(&self, null_strategy: NullStrategy) -> PolarsResult<Option<Column>> {
series_horizontal::sum_horizontal(&self.columns, null_strategy)
}

/// Compute the mean of all numeric values horizontally across columns.
pub fn mean_horizontal(&self, null_strategy: NullStrategy) -> PolarsResult<Option<Series>> {
let (numeric_columns, non_numeric_columns): (Vec<_>, Vec<_>) =
self.columns.iter().partition(|s| {
let dtype = s.dtype();
dtype.is_numeric() || dtype.is_decimal() || dtype.is_bool() || dtype.is_null()
});

if !non_numeric_columns.is_empty() {
let col = non_numeric_columns.first().cloned();
polars_bail!(
InvalidOperation: "'horizontal_mean' expects numeric expressions, found {:?} (dtype={})",
col.unwrap().name(),
col.unwrap().dtype(),
);
}
let columns = numeric_columns.into_iter().cloned().collect::<Vec<_>>();
match columns.len() {
0 => Ok(None),
1 => Ok(Some(match columns[0].dtype() {
dt if dt != &DataType::Float32 && !dt.is_decimal() => columns[0]
.as_materialized_series()
.cast(&DataType::Float64)?,
_ => columns[0].as_materialized_series().clone(),
})),
_ => {
let numeric_df = unsafe { DataFrame::_new_no_checks_impl(self.height(), columns) };
let sum = || numeric_df.sum_horizontal(null_strategy);
let null_count = || {
numeric_df
.par_materialized_column_iter()
.map(|s| {
s.is_null()
.cast_with_options(&DataType::UInt32, CastOptions::NonStrict)
})
.reduce_with(|l, r| {
let l = l?;
let r = r?;
let result = std::ops::Add::add(&l, &r)?;
PolarsResult::Ok(result)
})
// we can unwrap the option, because we are certain there is a column
// we started this operation on 2 columns
.unwrap()
};

let (sum, null_count) = POOL.install(|| rayon::join(sum, null_count));
let sum = sum?;
let null_count = null_count?;

// value lengths: len - null_count
let value_length: UInt32Chunked =
(numeric_df.width().sub(&null_count)).u32().unwrap().clone();

// make sure that we do not divide by zero
// by replacing with None
let value_length = value_length
.set(&value_length.equal(0), None)?
.into_series()
.cast(&DataType::Float64)?;

sum.map(|sum| std::ops::Div::div(&sum, &value_length))
.transpose()
},
}
pub fn mean_horizontal(&self, null_strategy: NullStrategy) -> PolarsResult<Option<Column>> {
series_horizontal::mean_horizontal(&self.columns, null_strategy)
}

/// Pipe different functions/ closure operations that work on a DataFrame together.
Expand Down
Loading

0 comments on commit cc05ff2

Please sign in to comment.