Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(cubestore): Limit pushdown isn’t working for wrapped select with aliasing #9034

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
86 changes: 86 additions & 0 deletions rust/cubestore/cubestore-sql-tests/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8001,6 +8001,18 @@ async fn limit_pushdown_without_group(service: Box<dyn SqlClient>) {
.exec_query("CREATE TABLE foo.pushdown_where_group2 (a int, b int, c int) index ind1 (a, b, c) index ind2 (c, b)")
.await
.unwrap();
service
.exec_query("CREATE TABLE foo.pushdown_where_group2_with_alias (a_alias int, b_alias int, c_alias int) index ind1 (a_alias, b_alias, c_alias) index ind2 (c_alias, b_alias)")
.await
.unwrap();
service
.exec_query("CREATE TABLE foo.pushdown_where_group3_with_alias (a_alias int, b_alias int, c_alias int) index ind1 (c_alias, b_alias)")
.await
.unwrap();
service
.exec_query("CREATE TABLE foo.pushdown_where_group4_with_alias (a_alias_2 int, b_alias_2 int, c_alias_2 int) index ind1 (c_alias_2, b_alias_2)")
.await
.unwrap();
service
.exec_query(
"INSERT INTO foo.pushdown_where_group1
Expand Down Expand Up @@ -8279,6 +8291,80 @@ async fn limit_pushdown_without_group(service: Box<dyn SqlClient>) {
]),
]
);

// ====================================
assert_limit_pushdown(
&service,
"SELECT a, b, c FROM (
SELECT a, b, c FROM foo.pushdown_where_group1
UNION ALL
SELECT a_alias a, b_alias b, c_alias c FROM foo.pushdown_where_group2_with_alias
) as `tb`
ORDER BY 3 DESC
LIMIT 10",
Some("ind2"),
true,
true,
)
.await
.unwrap();

// ====================================
assert_limit_pushdown(
&service,
"SELECT a, b, c FROM (
SELECT a, b, c FROM foo.pushdown_where_group1
UNION ALL
SELECT a_alias a, b_alias b, c_alias c FROM foo.pushdown_where_group2_with_alias
) as `tb`
WHERE b = 20
ORDER BY 1 DESC, 3 DESC
LIMIT 3",
Some("ind1"),
true,
true,
)
.await
.unwrap();

// ====================================
// TODO: theses cases still don't use an optimal index
// Filters outside the index are a priority right now.
// The second problem is that ORDER BY does not affect the score when selecting an index
// assert_limit_pushdown(
// &service,
// "SELECT a, b, c FROM (
// SELECT a_alias a, b_alias b, c_alias c FROM foo.pushdown_where_group3_with_alias
// UNION ALL
// SELECT a_alias_2 a, b_alias_2 b, c_alias_2 c FROM foo.pushdown_where_group4_with_alias
// ) as `tb`
// WHERE a = 20
// ORDER BY 3 DESC
// LIMIT 3",
// Some("ind1"),
// true,
// true,
// )
// .await
// .unwrap();

// ====================================
// assert_limit_pushdown(
// &service,
// "SELECT a, b, c FROM (
// SELECT a_alias a, b_alias b, c_alias c FROM foo.pushdown_where_group3_with_alias
// UNION ALL
// SELECT a_alias_2 a, b_alias_2 b, c_alias_2 c FROM foo.pushdown_where_group4_with_alias
// ) as `tb`
// WHERE a > 20
// ORDER BY 3 DESC
// LIMIT 3",
// Some("ind1"),
// true,
// true,
// )
// .await
// .unwrap();
}
async fn limit_pushdown_without_group_resort(service: Box<dyn SqlClient>) {
service.exec_query("CREATE SCHEMA foo").await.unwrap();
Expand Down
84 changes: 64 additions & 20 deletions rust/cubestore/cubestore/src/queryplanner/planning.rs
Original file line number Diff line number Diff line change
Expand Up @@ -519,6 +519,28 @@ impl PlanRewriter for CollectConstraints {
order_col_names: current_context.order_col_names.clone(),
})
}
LogicalPlan::Projection { expr, .. } => {
let alias_to_column = get_alias_to_column(expr);

if let Some(order_col_names) = &current_context.order_col_names {
let names: Vec<String> = order_col_names
.iter()
.map(|k| {
alias_to_column
.get(k)
.map_or_else(|| k.clone(), |v| v.name.clone())
})
.collect();

if !names.is_empty() {
return Some(current_context.update_order_col_names(names));
} else {
return None;
}
}

None
}
LogicalPlan::Sort { expr, input, .. } => {
let (names, _) = sort_to_column_names(expr, input);

Expand Down Expand Up @@ -606,26 +628,15 @@ fn extract_column_name(expr: &Expr) -> Option<String> {
}
}

///Try to get original column namse from if underlined projection or aggregates contains columns aliases
fn get_original_name(may_be_alias: &String, input: &LogicalPlan) -> String {
fn get_name(exprs: &Vec<Expr>, may_be_alias: &String) -> String {
let expr = exprs.iter().find(|&expr| match expr {
Expr::Alias(_, name) => name == may_be_alias,
_ => false,
});
if let Some(expr) = expr {
if let Some(original_name) = extract_column_name(expr) {
return original_name;
}
fn get_alias_to_column(expr: &Vec<Expr>) -> HashMap<String, logical_plan::Column> {
let mut alias_to_column = HashMap::new();
expr.iter().for_each(|e| {
if let Expr::Alias(box Expr::Column(c), alias) = e {
alias_to_column.insert(alias.clone(), c.clone());
}
may_be_alias.clone()
}
match input {
LogicalPlan::Projection { expr, .. } => get_name(expr, may_be_alias),
LogicalPlan::Filter { input, .. } => get_original_name(may_be_alias, input),
LogicalPlan::Aggregate { group_expr, .. } => get_name(group_expr, may_be_alias),
_ => may_be_alias.clone(),
}
});

alias_to_column
}

fn sort_to_column_names(sort_exprs: &Vec<Expr>, input: &LogicalPlan) -> (Vec<String>, bool) {
Expand All @@ -642,7 +653,7 @@ fn sort_to_column_names(sort_exprs: &Vec<Expr>, input: &LogicalPlan) -> (Vec<Str
}
match expr.as_ref() {
Expr::Column(c) => {
res.push(get_original_name(&c.name, input));
res.push(c.name.clone());
}
_ => {
return (Vec::new(), true);
Expand Down Expand Up @@ -755,6 +766,39 @@ impl PlanRewriter for ChooseIndex<'_> {

fn enter_node(&mut self, n: &LogicalPlan, context: &Self::Context) -> Option<Self::Context> {
match n {
LogicalPlan::Projection { expr, .. } => {
let alias_to_column = get_alias_to_column(expr);

let new_single_value_filtered_cols = context
.single_value_filtered_cols
.iter()
.map(|name| {
alias_to_column
.get(name)
.map_or_else(|| name.clone(), |col| col.name.clone())
})
.collect();

let mut new_context =
context.update_single_value_filtered_cols(new_single_value_filtered_cols);

if let Some(sort) = &new_context.sort {
let names: Vec<String> = sort
.iter()
.map(|k| {
alias_to_column
.get(k)
.map_or_else(|| k.clone(), |col| col.name.clone())
})
.collect();

if !names.is_empty() {
new_context = new_context.update_sort(names, context.sort_is_asc);
}
}

Some(new_context)
}
LogicalPlan::Limit { n, .. } => Some(context.update_limit(Some(*n))),
LogicalPlan::Skip { n, .. } => {
if let Some(limit) = context.limit {
Expand Down
Loading