Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: temporary branch for IOx update (12-18-2023 to 12-20-2023) #2

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 34 additions & 2 deletions datafusion/core/src/dataframe/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1825,8 +1825,8 @@ mod tests {
let df_results = collect(physical_plan, ctx.task_ctx()).await?;

#[rustfmt::skip]
assert_batches_sorted_eq!(
[ "+----+",
assert_batches_sorted_eq!([
"+----+",
"| id |",
"+----+",
"| 1 |",
Expand All @@ -1837,6 +1837,38 @@ mod tests {
Ok(())
}

#[tokio::test]
async fn test_aggregate_alias() -> Result<()> {
let df = test_table().await?;

let df = df
// GROUP BY `c2 + 1`
.aggregate(vec![col("c2") + lit(1)], vec![])?
// SELECT `c2 + 1` as c2
.select(vec![(col("c2") + lit(1)).alias("c2")])?
// GROUP BY c2 as "c2" (alias in expr is not supported by SQL)
.aggregate(vec![col("c2").alias("c2")], vec![])?;

let df_results = df.collect().await?;

#[rustfmt::skip]
assert_batches_sorted_eq!([
"+----+",
"| c2 |",
"+----+",
"| 2 |",
"| 3 |",
"| 4 |",
"| 5 |",
"| 6 |",
"+----+",
],
&df_results
);

Ok(())
}

#[tokio::test]
async fn test_distinct() -> Result<()> {
let t = test_table().await?;
Expand Down
15 changes: 12 additions & 3 deletions datafusion/core/tests/path_partition.rs
Original file line number Diff line number Diff line change
Expand Up @@ -168,9 +168,9 @@ async fn parquet_distinct_partition_col() -> Result<()> {
assert_eq!(min_limit, resulting_limit);

let s = ScalarValue::try_from_array(results[0].column(1), 0)?;
let month = match s {
ScalarValue::Utf8(Some(month)) => month,
s => panic!("Expected month as Utf8 found {s:?}"),
let month = match extract_as_utf(&s) {
Some(month) => month,
s => panic!("Expected month as Dict(_, Utf8) found {s:?}"),
};

let sql_on_partition_boundary = format!(
Expand All @@ -191,6 +191,15 @@ async fn parquet_distinct_partition_col() -> Result<()> {
Ok(())
}

fn extract_as_utf(v: &ScalarValue) -> Option<String> {
if let ScalarValue::Dictionary(_, v) = v {
if let ScalarValue::Utf8(v) = v.as_ref() {
return v.clone();
}
}
None
}

#[tokio::test]
async fn csv_filter_with_file_col() -> Result<()> {
let ctx = SessionContext::new();
Expand Down
58 changes: 39 additions & 19 deletions datafusion/expr/src/logical_plan/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -904,27 +904,11 @@ impl LogicalPlanBuilder {
group_expr: impl IntoIterator<Item = impl Into<Expr>>,
aggr_expr: impl IntoIterator<Item = impl Into<Expr>>,
) -> Result<Self> {
let mut group_expr = normalize_cols(group_expr, &self.plan)?;
let group_expr = normalize_cols(group_expr, &self.plan)?;
let aggr_expr = normalize_cols(aggr_expr, &self.plan)?;

// Rewrite groupby exprs according to functional dependencies
let group_by_expr_names = group_expr
.iter()
.map(|group_by_expr| group_by_expr.display_name())
.collect::<Result<Vec<_>>>()?;
let schema = self.plan.schema();
if let Some(target_indices) =
get_target_functional_dependencies(schema, &group_by_expr_names)
{
for idx in target_indices {
let field = schema.field(idx);
let expr =
Expr::Column(Column::new(field.qualifier().cloned(), field.name()));
if !group_expr.contains(&expr) {
group_expr.push(expr);
}
}
}
let group_expr =
add_group_by_exprs_from_dependencies(group_expr, self.plan.schema())?;
Aggregate::try_new(Arc::new(self.plan), group_expr, aggr_expr)
.map(LogicalPlan::Aggregate)
.map(Self::from)
Expand Down Expand Up @@ -1189,6 +1173,42 @@ pub fn build_join_schema(
schema.with_functional_dependencies(func_dependencies)
}

/// Add additional "synthetic" group by expressions based on functional
/// dependencies.
///
/// For example, if we are grouping on `[c1]`, and we know from
/// functional dependencies that column `c1` determines `c2`, this function
/// adds `c2` to the group by list.
///
/// This allows MySQL style selects like
/// `SELECT col FROM t WHERE pk = 5` if col is unique
fn add_group_by_exprs_from_dependencies(
mut group_expr: Vec<Expr>,
schema: &DFSchemaRef,
) -> Result<Vec<Expr>> {
// Names of the fields produced by the GROUP BY exprs for example, `GROUP BY
// c1 + 1` produces an output field named `"c1 + 1"`
let mut group_by_field_names = group_expr
.iter()
.map(|e| e.display_name())
.collect::<Result<Vec<_>>>()?;

if let Some(target_indices) =
get_target_functional_dependencies(schema, &group_by_field_names)
{
for idx in target_indices {
let field = schema.field(idx);
let expr =
Expr::Column(Column::new(field.qualifier().cloned(), field.name()));
let expr_name = expr.display_name()?;
if !group_by_field_names.contains(&expr_name) {
group_by_field_names.push(expr_name);
group_expr.push(expr);
}
}
}
Ok(group_expr)
}
/// Errors if one or more expressions have equal names.
pub(crate) fn validate_unique_names<'a>(
node_name: &str,
Expand Down
27 changes: 23 additions & 4 deletions datafusion/physical-plan/src/aggregates/group_values/row.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,22 @@

use crate::aggregates::group_values::GroupValues;
use ahash::RandomState;
use arrow::compute::cast;
use arrow::record_batch::RecordBatch;
use arrow::row::{RowConverter, Rows, SortField};
use arrow_array::ArrayRef;
use arrow_schema::SchemaRef;
use arrow_array::{Array, ArrayRef};
use arrow_schema::{DataType, SchemaRef};
use datafusion_common::hash_utils::create_hashes;
use datafusion_common::Result;
use datafusion_common::{DataFusionError, Result};
use datafusion_execution::memory_pool::proxy::{RawTableAllocExt, VecAllocExt};
use datafusion_physical_expr::EmitTo;
use hashbrown::raw::RawTable;

/// A [`GroupValues`] making use of [`Rows`]
pub struct GroupValuesRows {
/// The output schema
schema: SchemaRef,

/// Converter for the group values
row_converter: RowConverter,

Expand Down Expand Up @@ -75,6 +79,7 @@ impl GroupValuesRows {
let map = RawTable::with_capacity(0);

Ok(Self {
schema,
row_converter,
map,
map_size: 0,
Expand Down Expand Up @@ -165,7 +170,7 @@ impl GroupValues for GroupValuesRows {
.take()
.expect("Can not emit from empty rows");

let output = match emit_to {
let mut output = match emit_to {
EmitTo::All => {
let output = self.row_converter.convert_rows(&group_values)?;
group_values.clear();
Expand Down Expand Up @@ -198,6 +203,20 @@ impl GroupValues for GroupValuesRows {
}
};

// TODO: Materialize dictionaries in group keys (#7647)
for (field, array) in self.schema.fields.iter().zip(&mut output) {
let expected = field.data_type();
if let DataType::Dictionary(_, v) = expected {
let actual = array.data_type();
if v.as_ref() != actual {
return Err(DataFusionError::Internal(format!(
"Converted group rows expected dictionary of {v} got {actual}"
)));
}
*array = cast(array.as_ref(), expected)?;
}
}

self.group_values = Some(group_values);
Ok(output)
}
Expand Down
35 changes: 3 additions & 32 deletions datafusion/physical-plan/src/aggregates/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ use crate::{
use arrow::array::ArrayRef;
use arrow::datatypes::{Field, Schema, SchemaRef};
use arrow::record_batch::RecordBatch;
use arrow_schema::DataType;
use datafusion_common::stats::Precision;
use datafusion_common::{not_impl_err, plan_err, DataFusionError, Result};
use datafusion_execution::TaskContext;
Expand Down Expand Up @@ -254,9 +253,6 @@ pub struct AggregateExec {
limit: Option<usize>,
/// Input plan, could be a partial aggregate or the input to the aggregate
pub input: Arc<dyn ExecutionPlan>,
/// Original aggregation schema, could be different from `schema` before dictionary group
/// keys get materialized
original_schema: SchemaRef,
/// Schema after the aggregate is applied
schema: SchemaRef,
/// Input schema before any aggregation is applied. For partial aggregate this will be the
Expand Down Expand Up @@ -440,19 +436,15 @@ impl AggregateExec {
input: Arc<dyn ExecutionPlan>,
input_schema: SchemaRef,
) -> Result<Self> {
let original_schema = create_schema(
let schema = create_schema(
&input.schema(),
&group_by.expr,
&aggr_expr,
group_by.contains_null(),
mode,
)?;

let schema = Arc::new(materialize_dict_group_keys(
&original_schema,
group_by.expr.len(),
));
let original_schema = Arc::new(original_schema);
let schema = Arc::new(schema);
AggregateExec::try_new_with_schema(
mode,
group_by,
Expand All @@ -461,7 +453,6 @@ impl AggregateExec {
input,
input_schema,
schema,
original_schema,
)
}

Expand All @@ -482,7 +473,6 @@ impl AggregateExec {
input: Arc<dyn ExecutionPlan>,
input_schema: SchemaRef,
schema: SchemaRef,
original_schema: SchemaRef,
) -> Result<Self> {
// Reset ordering requirement to `None` if aggregator is not order-sensitive
let mut order_by_expr = aggr_expr
Expand Down Expand Up @@ -557,7 +547,6 @@ impl AggregateExec {
aggr_expr,
filter_expr,
input,
original_schema,
schema,
input_schema,
projection_mapping,
Expand Down Expand Up @@ -868,7 +857,7 @@ impl ExecutionPlan for AggregateExec {
children[0].clone(),
self.input_schema.clone(),
self.schema.clone(),
self.original_schema.clone(),
//self.original_schema.clone(),
)?;
me.limit = self.limit;
Ok(Arc::new(me))
Expand Down Expand Up @@ -975,24 +964,6 @@ fn create_schema(
Ok(Schema::new(fields))
}

/// returns schema with dictionary group keys materialized as their value types
/// The actual convertion happens in `RowConverter` and we don't do unnecessary
/// conversion back into dictionaries
fn materialize_dict_group_keys(schema: &Schema, group_count: usize) -> Schema {
let fields = schema
.fields
.iter()
.enumerate()
.map(|(i, field)| match field.data_type() {
DataType::Dictionary(_, value_data_type) if i < group_count => {
Field::new(field.name(), *value_data_type.clone(), field.is_nullable())
}
_ => Field::clone(field),
})
.collect::<Vec<_>>();
Schema::new(fields)
}

fn group_schema(schema: &Schema, group_count: usize) -> SchemaRef {
let group_fields = schema.fields()[0..group_count].to_vec();
Arc::new(Schema::new(group_fields))
Expand Down
4 changes: 1 addition & 3 deletions datafusion/physical-plan/src/aggregates/row_hash.rs
Original file line number Diff line number Diff line change
Expand Up @@ -324,9 +324,7 @@ impl GroupedHashAggregateStream {
.map(create_group_accumulator)
.collect::<Result<_>>()?;

// we need to use original schema so RowConverter in group_values below
// will do the proper coversion of dictionaries into value types
let group_schema = group_schema(&agg.original_schema, agg_group_by.expr.len());
let group_schema = group_schema(&agg_schema, agg_group_by.expr.len());
let spill_expr = group_schema
.fields
.into_iter()
Expand Down
10 changes: 5 additions & 5 deletions datafusion/sqllogictest/test_files/aggregate.slt
Original file line number Diff line number Diff line change
Expand Up @@ -2469,11 +2469,11 @@ select max(x_dict) from value_dict group by x_dict % 2 order by max(x_dict);
query T
select arrow_typeof(x_dict) from value_dict group by x_dict;
----
Int32
Int32
Int32
Int32
Int32
Dictionary(Int64, Int32)
Dictionary(Int64, Int32)
Dictionary(Int64, Int32)
Dictionary(Int64, Int32)
Dictionary(Int64, Int32)

statement ok
drop table value
Expand Down
Loading