Skip to content

Commit

Permalink
address review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
goldmedal committed Aug 13, 2024
1 parent 28278d1 commit 5e24932
Show file tree
Hide file tree
Showing 5 changed files with 66 additions and 71 deletions.
50 changes: 31 additions & 19 deletions datafusion/expr/src/expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -978,28 +978,40 @@ impl GroupingSet {

#[derive(Clone, PartialEq, Eq, Hash, Debug, Default)]
pub struct WildcardOptions {
pub opt_ilike: Option<IlikeSelectItem>,
pub opt_exclude: Option<ExcludeSelectItem>,
pub opt_except: Option<ExceptSelectItem>,
pub opt_replace: Option<PlannedReplaceSelectItem>,
pub opt_rename: Option<RenameSelectItem>,
pub ilike: Option<IlikeSelectItem>,
pub exclude: Option<ExcludeSelectItem>,
pub except: Option<ExceptSelectItem>,
pub replace: Option<PlannedReplaceSelectItem>,
pub rename: Option<RenameSelectItem>,
}

impl WildcardOptions {
pub fn with_replace(self, replace: PlannedReplaceSelectItem) -> Self {
WildcardOptions {
ilike: self.ilike,
exclude: self.exclude,
except: self.except,
replace: Some(replace),
rename: self.rename,
}
}
}

impl Display for WildcardOptions {
fn fmt(&self, f: &mut Formatter) -> fmt::Result {
if let Some(ilike) = &self.opt_ilike {
if let Some(ilike) = &self.ilike {
write!(f, " {ilike}")?;
}
if let Some(exclude) = &self.opt_exclude {
if let Some(exclude) = &self.exclude {
write!(f, " {exclude}")?;
}
if let Some(except) = &self.opt_except {
if let Some(except) = &self.except {
write!(f, " {except}")?;
}
if let Some(replace) = &self.opt_replace {
if let Some(replace) = &self.replace {
write!(f, " {replace}")?;
}
if let Some(rename) = &self.opt_rename {
if let Some(rename) = &self.rename {
write!(f, " {rename}")?;
}
Ok(())
Expand All @@ -1008,7 +1020,7 @@ impl Display for WildcardOptions {

#[derive(Clone, PartialEq, Eq, Hash, Debug, Default)]
pub struct PlannedReplaceSelectItem {
pub items: Vec<Box<ReplaceSelectElement>>,
pub items: Vec<ReplaceSelectElement>,
pub planned_expressions: Vec<Expr>,
}

Expand All @@ -1021,7 +1033,7 @@ impl Display for PlannedReplaceSelectItem {
}

impl PlannedReplaceSelectItem {
pub fn items(&self) -> &[Box<ReplaceSelectElement>] {
pub fn items(&self) -> &[ReplaceSelectElement] {
&self.items
}

Expand Down Expand Up @@ -2986,11 +2998,11 @@ mod test {
None,
None,
Some(PlannedReplaceSelectItem {
items: vec![Box::new(ReplaceSelectElement {
items: vec![ReplaceSelectElement {
expr: ast::Expr::Identifier(Ident::from("c1")),
column_name: Ident::from("a1"),
as_keyword: false
})],
}],
planned_expressions: vec![]
}),
None
Expand Down Expand Up @@ -3024,11 +3036,11 @@ mod test {
opt_rename: Option<RenameSelectItem>,
) -> WildcardOptions {
WildcardOptions {
opt_ilike,
opt_exclude,
opt_except,
opt_replace,
opt_rename,
ilike: opt_ilike,
exclude: opt_exclude,
except: opt_except,
replace: opt_replace,
rename: opt_rename,
}
}
}
44 changes: 20 additions & 24 deletions datafusion/expr/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -402,8 +402,8 @@ pub fn expand_wildcard(
})
.collect::<HashSet<_>>();
let excluded_columns = if let Some(WildcardOptions {
opt_exclude,
opt_except,
exclude: opt_exclude,
except: opt_except,
..
}) = wildcard_options
{
Expand Down Expand Up @@ -436,8 +436,8 @@ pub fn expand_qualified_wildcard(
DFSchema::try_from_qualified_schema(qualifier.clone(), &qualified_schema)?
.with_functional_dependencies(projected_func_dependencies)?;
let excluded_columns = if let Some(WildcardOptions {
opt_exclude,
opt_except,
exclude: opt_exclude,
except: opt_except,
..
}) = wildcard_options
{
Expand Down Expand Up @@ -739,35 +739,31 @@ pub fn exprlist_to_fields<'a>(
Expr::Wildcard { qualifier, options } => match qualifier {
None => {
let excluded: Vec<String> = get_excluded_columns(
options.opt_exclude.as_ref(),
options.opt_except.as_ref(),
options.exclude.as_ref(),
options.except.as_ref(),
wildcard_schema,
None,
)?
.into_iter()
.map(|c| c.flat_name())
.collect();
Ok::<_, DataFusionError>(
(0..wildcard_schema.fields().len())
.filter_map(|i| {
let (qualifier, field) =
wildcard_schema.qualified_field(i);
let flat_name = qualifier
.map(|t| format!("{}.{}", t, field.name()))
.unwrap_or(field.name().clone());
if excluded.contains(&flat_name) {
None
} else {
Some((qualifier.cloned(), Arc::new(field.to_owned())))
}
wildcard_schema
.field_names()
.iter()
.enumerate()
.filter(|(_, s)| !excluded.contains(s))
.map(|(i, _)| wildcard_schema.qualified_field(i))
.map(|(qualifier, f)| {
(qualifier.cloned(), Arc::new(f.to_owned()))
})
.collect::<Vec<_>>(),
)
}
Some(qualifier) => {
let excluded: Vec<String> = get_excluded_columns(
options.opt_exclude.as_ref(),
options.opt_except.as_ref(),
options.exclude.as_ref(),
options.except.as_ref(),
wildcard_schema,
Some(qualifier),
)?
Expand Down Expand Up @@ -826,8 +822,8 @@ pub fn exprlist_len(
options,
} => {
let excluded = get_excluded_columns(
options.opt_exclude.as_ref(),
options.opt_except.as_ref(),
options.exclude.as_ref(),
options.except.as_ref(),
wildcard_schema.unwrap_or(schema),
None,
)?
Expand All @@ -843,8 +839,8 @@ pub fn exprlist_len(
options,
} => {
let excluded = get_excluded_columns(
options.opt_exclude.as_ref(),
options.opt_except.as_ref(),
options.exclude.as_ref(),
options.except.as_ref(),
wildcard_schema.unwrap_or(schema),
Some(qualifier),
)?
Expand Down
12 changes: 4 additions & 8 deletions datafusion/optimizer/src/analyzer/expand_wildcard_rule.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use datafusion_common::config::ConfigOptions;
use datafusion_common::tree_node::{Transformed, TransformedResult};
use datafusion_common::{Column, Result};
use datafusion_expr::builder::validate_unique_names;
use datafusion_expr::expr::{Alias, PlannedReplaceSelectItem};
use datafusion_expr::expr::PlannedReplaceSelectItem;
use datafusion_expr::utils::{
expand_qualified_wildcard, expand_wildcard, find_base_plan,
};
Expand Down Expand Up @@ -83,7 +83,7 @@ fn expand_exprlist(input: &LogicalPlan, expr: Vec<Expr>) -> Result<Vec<Expr>> {
)?;
// If there is a REPLACE statement, replace that column with the given
// replace expression. Column name remains the same.
let replaced = if let Some(replace) = options.opt_replace {
let replaced = if let Some(replace) = options.replace {
replace_columns(expanded, replace)?
} else {
expanded
Expand All @@ -94,7 +94,7 @@ fn expand_exprlist(input: &LogicalPlan, expr: Vec<Expr>) -> Result<Vec<Expr>> {
expand_wildcard(input.schema(), input, Some(&options))?;
// If there is a REPLACE statement, replace that column with the given
// replace expression. Column name remains the same.
let replaced = if let Some(replace) = options.opt_replace {
let replaced = if let Some(replace) = options.replace {
replace_columns(expanded, replace)?
} else {
expanded
Expand Down Expand Up @@ -149,11 +149,7 @@ fn replace_columns(
.zip(replace.expressions().iter())
.find(|(item, _)| item.column_name.value == *name)
{
*expr = Expr::Alias(Alias {
expr: Box::new(new_expr.clone()),
relation: None,
name: name.clone(),
})
*expr = new_expr.clone().alias(name.clone())
}
}
}
Expand Down
27 changes: 11 additions & 16 deletions datafusion/sql/src/select.rs
Original file line number Diff line number Diff line change
Expand Up @@ -625,6 +625,13 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
planner_context: &mut PlannerContext,
options: WildcardAdditionalOptions,
) -> Result<WildcardOptions> {
let planned_option = WildcardOptions {
ilike: options.opt_ilike,
exclude: options.opt_exclude,
except: options.opt_except,
replace: None,
rename: options.opt_rename,
};
if let Some(replace) = options.opt_replace {
let replace_expr = replace
.items
Expand All @@ -639,25 +646,13 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
.clone())
})
.collect::<Result<Vec<_>>>()?;
let planned_option = PlannedReplaceSelectItem {
items: replace.items,
let planned_replace = PlannedReplaceSelectItem {
items: replace.items.into_iter().map(|i| *i).collect(),
planned_expressions: replace_expr,
};
Ok(WildcardOptions {
opt_ilike: options.opt_ilike,
opt_exclude: options.opt_exclude,
opt_except: options.opt_except,
opt_replace: Some(planned_option),
opt_rename: options.opt_rename,
})
Ok(planned_option.with_replace(planned_replace))
} else {
Ok(WildcardOptions {
opt_ilike: options.opt_ilike,
opt_exclude: options.opt_exclude,
opt_except: options.opt_except,
opt_replace: None,
opt_rename: options.opt_rename,
})
Ok(planned_option)
}
}

Expand Down
4 changes: 0 additions & 4 deletions datafusion/sqllogictest/test_files/window.slt
Original file line number Diff line number Diff line change
Expand Up @@ -3929,10 +3929,6 @@ b 1 3
a 1 4
b 5 5

# expected failed message:
# DataFusion error: expand_wildcard_rule
# caused by
# Schema error: No field named aggregate_test_100.c1. Valid fields are rn.
statement error
SELECT *
FROM (SELECT c1, c2, ROW_NUMBER() OVER(PARTITION BY c1) as rn
Expand Down

0 comments on commit 5e24932

Please sign in to comment.