Skip to content

Commit

Permalink
fix unique expression check for projection
Browse files Browse the repository at this point in the history
  • Loading branch information
goldmedal committed Aug 12, 2024
1 parent 3b55265 commit 03b9b19
Show file tree
Hide file tree
Showing 6 changed files with 34 additions and 35 deletions.
2 changes: 1 addition & 1 deletion datafusion/expr/src/logical_plan/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1315,7 +1315,7 @@ fn add_group_by_exprs_from_dependencies(
Ok(group_expr)
}
/// Errors if one or more expressions have equal names.
pub(crate) fn validate_unique_names<'a>(
pub fn validate_unique_names<'a>(
node_name: &str,
expressions: impl IntoIterator<Item = &'a Expr>,
) -> Result<()> {
Expand Down
5 changes: 4 additions & 1 deletion datafusion/expr/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -850,7 +850,10 @@ pub fn exprlist_len(
)?
.into_iter()
.collect::<HashSet<Column>>();
Ok(get_exprs_except_skipped(wildcard_schema.unwrap_or(schema), excluded).len())
Ok(
get_exprs_except_skipped(wildcard_schema.unwrap_or(schema), excluded)
.len(),
)
}
_ => Ok(1),
})
Expand Down
35 changes: 6 additions & 29 deletions datafusion/optimizer/src/analyzer/expand_wildcard_rule.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,21 +15,19 @@
// specific language governing permissions and limitations
// under the License.

use std::collections::hash_map::Entry;
use std::collections::HashMap;
use std::sync::Arc;

use crate::AnalyzerRule;
use datafusion_common::config::ConfigOptions;
use datafusion_common::tree_node::{Transformed, TransformedResult};
use datafusion_common::{Column, DataFusionError, Result};
use datafusion_common::{Column, Result};
use datafusion_expr::builder::validate_unique_names;
use datafusion_expr::expr::{Alias, PlannedReplaceSelectItem};
use datafusion_expr::utils::{
expand_qualified_wildcard, expand_wildcard, find_base_plan,
};
use datafusion_expr::{Expr, LogicalPlan, Projection, SubqueryAlias};

use crate::AnalyzerRule;

#[derive(Default)]
pub struct ExpandWildcardRule {}

Expand All @@ -55,12 +53,10 @@ fn expand_internal(plan: LogicalPlan) -> Result<Transformed<LogicalPlan>> {
match plan {
LogicalPlan::Projection(Projection { expr, input, .. }) => {
let projected_expr = expand_exprlist(&input, expr)?;
validate_unique_names("Projections", projected_expr.iter())?;
Ok(Transformed::yes(
Projection::try_new(
to_unique_names(projected_expr.iter())?,
Arc::clone(&input),
)
.map(LogicalPlan::Projection)?,
Projection::try_new(projected_expr, Arc::clone(&input))
.map(LogicalPlan::Projection)?,
))
}
// Teh schema of the plan should also be updated if the child plan is transformed.
Expand All @@ -73,25 +69,6 @@ fn expand_internal(plan: LogicalPlan) -> Result<Transformed<LogicalPlan>> {
}
}

fn to_unique_names<'a>(
expressions: impl IntoIterator<Item = &'a Expr>,
) -> Result<Vec<Expr>> {
let mut unique_names = HashMap::new();
let mut unique_expr = vec![];
expressions
.into_iter()
.enumerate()
.try_for_each(|(position, expr)| {
let name = expr.schema_name().to_string();
if let Entry::Vacant(e) = unique_names.entry(name) {
e.insert((position, expr));
unique_expr.push(expr.to_owned());
}
Ok::<(), DataFusionError>(())
})?;
Ok(unique_expr)
}

fn expand_exprlist(input: &LogicalPlan, expr: Vec<Expr>) -> Result<Vec<Expr>> {
let mut projected_expr = vec![];
let input = find_base_plan(input);
Expand Down
1 change: 1 addition & 0 deletions datafusion/sql/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ strum = { version = "0.26.1", features = ["derive"] }
ctor = { workspace = true }
datafusion-functions = { workspace = true, default-features = true }
datafusion-functions-aggregate = { workspace = true }
datafusion-optimizer = { workspace = true }
env_logger = { workspace = true }
paste = "^1.0"
rstest = { workspace = true }
12 changes: 10 additions & 2 deletions datafusion/sql/src/expr/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,10 @@ use sqlparser::ast::{

use datafusion_common::{
internal_datafusion_err, internal_err, not_impl_err, plan_err, DFSchema, Result,
ScalarValue,
ScalarValue, TableReference,
};
use datafusion_expr::expr::InList;
use datafusion_expr::expr::ScalarFunction;
use datafusion_expr::expr::{InList, WildcardOptions};
use datafusion_expr::{
lit, Between, BinaryExpr, Cast, Expr, ExprSchemable, GetFieldAccess, Like, Literal,
Operator, TryCast,
Expand Down Expand Up @@ -661,6 +661,14 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
}
not_impl_err!("AnyOp not supported by ExprPlanner: {binary_expr:?}")
}
SQLExpr::Wildcard => Ok(Expr::Wildcard {
qualifier: None,
options: WildcardOptions::default(),
}),
SQLExpr::QualifiedWildcard(object_name) => Ok(Expr::Wildcard {
qualifier: Some(TableReference::from(object_name.to_string())),
options: WildcardOptions::default(),
}),
_ => not_impl_err!("Unsupported ast node in sqltorel: {sql:?}"),
}
}
Expand Down
14 changes: 12 additions & 2 deletions datafusion/sql/tests/sql_integration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,12 +41,15 @@ use datafusion_sql::{
planner::{ParserOptions, SqlToRel},
};

use datafusion_common::config::ConfigOptions;
use datafusion_functions::core::planner::CoreFunctionPlanner;
use datafusion_functions_aggregate::{
approx_median::approx_median_udaf, count::count_udaf, min_max::max_udaf,
min_max::min_udaf,
};
use datafusion_functions_aggregate::{average::avg_udaf, grouping::grouping_udaf};
use datafusion_optimizer::analyzer::expand_wildcard_rule::ExpandWildcardRule;
use datafusion_optimizer::Analyzer;
use rstest::rstest;
use sqlparser::dialect::{Dialect, GenericDialect, HiveDialect, MySqlDialect};

Expand Down Expand Up @@ -641,7 +644,7 @@ fn select_wildcard_with_repeated_column() {
let sql = "SELECT *, age FROM person";
let err = logical_plan(sql).expect_err("query should have failed");
assert_eq!(
"Error during planning: Projections require unique expression names but the expression \"person.age\" at position 3 and \"person.age\" at position 8 have the same name. Consider aliasing (\"AS\") one of them.",
"expand_wildcard_rule\ncaused by\nError during planning: Projections require unique expression names but the expression \"person.age\" at position 3 and \"person.age\" at position 8 have the same name. Consider aliasing (\"AS\") one of them.",
err.strip_backtrace()
);
}
Expand Down Expand Up @@ -2775,7 +2778,14 @@ fn logical_plan_with_dialect_and_options(
let planner = SqlToRel::new_with_options(&context, options);
let result = DFParser::parse_sql_with_dialect(sql, dialect);
let mut ast = result?;
planner.statement_to_plan(ast.pop_front().unwrap())
let plan = planner.statement_to_plan(ast.pop_front().unwrap())?;
let options = ConfigOptions::default();
// apply rule to expand the wildcard expression
Analyzer::with_rules(vec![Arc::new(ExpandWildcardRule::new())]).execute_and_check(
plan,
&options,
|_, _| {},
)
}

fn make_udf(name: &'static str, args: Vec<DataType>, return_type: DataType) -> ScalarUDF {
Expand Down

0 comments on commit 03b9b19

Please sign in to comment.