Skip to content

Commit

Permalink
implement preserve_wildcard_expression in ExprPlanner
Browse files Browse the repository at this point in the history
  • Loading branch information
goldmedal committed Jul 26, 2024
1 parent 01dc3f9 commit b2577eb
Show file tree
Hide file tree
Showing 3 changed files with 62 additions and 1 deletion.
29 changes: 28 additions & 1 deletion datafusion/core/tests/user_defined/expr_planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.

use arrow_array::RecordBatch;
use arrow_array::{ArrayRef, RecordBatch, StringArray};
use std::sync::Arc;

use datafusion::common::{assert_batches_eq, DFSchema};
Expand Down Expand Up @@ -62,6 +62,10 @@ impl ExprPlanner for MyCustomPlanner {
_ => Ok(PlannerResult::Original(expr)),
}
}

fn preserve_wildcard_expression(&self) -> bool {
true
}
}

async fn plan_and_collect(sql: &str) -> Result<Vec<RecordBatch>> {
Expand Down Expand Up @@ -121,3 +125,26 @@ async fn test_question_filter() {
let expected = ["+---+", "| a |", "+---+", "| 1 |", "+---+"];
assert_batches_eq!(&expected, &actual);
}

#[tokio::test]
async fn test_keep_wildcard() -> Result<()> {
let mut ctx = SessionContext::new();
ctx.register_expr_planner(Arc::new(MyCustomPlanner))?;
ctx.register_batch("t", t_batch())?;

let actual = ctx.sql("select * from t").await?.into_optimized_plan()?;
let actual = format!("{:?}", actual);
let expected = "Projection: *\n TableScan: t projection=[]";
assert_eq!(&expected, &actual);

let actual = ctx.sql("select t.* from t").await?.into_optimized_plan()?;
let actual = format!("{:?}", actual);
let expected = "Projection: t.*\n TableScan: t projection=[]";
assert_eq!(&expected, &actual);
Ok(())
}

fn t_batch() -> RecordBatch {
let c1: ArrayRef = Arc::new(StringArray::from_iter_values(["a", "b", "c"]));
RecordBatch::try_from_iter(vec![("c1", c1)]).unwrap()
}
11 changes: 11 additions & 0 deletions datafusion/expr/src/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,17 @@ pub trait ExprPlanner: Send + Sync {
"Default planner compound identifier hasn't been implemented for ExprPlanner"
)
}

/// Disable the expansion of wildcard expressions when selecting items.
/// This is used to keep the wildcard expression in the logical plan.
///
/// Note:
/// Physical Planner doesn't support plan the wildcard expression.
/// If the wildcard expression is kept in the logical plan,
/// it should be transformed to the list of columns before physical planning.
fn preserve_wildcard_expression(&self) -> bool {
false
}
}

/// An operator with two arguments to plan
Expand Down
23 changes: 23 additions & 0 deletions datafusion/sql/src/select.rs
Original file line number Diff line number Diff line change
Expand Up @@ -590,6 +590,16 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
if empty_from {
return plan_err!("SELECT * with no tables specified is not valid");
}
let preserve_wildcard_expression = self
.context_provider
.get_expr_planners()
.iter()
.any(|planner| planner.preserve_wildcard_expression());

if preserve_wildcard_expression {
return Ok(vec![Expr::Wildcard { qualifier: None }]);
}

// do not expand from outer schema
let expanded_exprs =
expand_wildcard(plan.schema().as_ref(), plan, Some(&options))?;
Expand All @@ -610,6 +620,19 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
SelectItem::QualifiedWildcard(object_name, options) => {
Self::check_wildcard_options(&options)?;
let qualifier = idents_to_table_reference(object_name.0, false)?;

let preserve_wildcard_expression = self
.context_provider
.get_expr_planners()
.iter()
.any(|planner| planner.preserve_wildcard_expression());

if preserve_wildcard_expression {
return Ok(vec![Expr::Wildcard {
qualifier: Some(qualifier),
}]);
}

// do not expand from outer schema
let expanded_exprs = expand_qualified_wildcard(
&qualifier,
Expand Down

0 comments on commit b2577eb

Please sign in to comment.