From e5ba3d1708dea171f7a0548b2ca165db143c5ecd Mon Sep 17 00:00:00 2001 From: Ruihang Xia Date: Wed, 23 Aug 2023 22:29:08 -0500 Subject: [PATCH] feat: rewrite the dist analyzer (#2238) * it works! Signed-off-by: Ruihang Xia * clean up Signed-off-by: Ruihang Xia * add documents Signed-off-by: Ruihang Xia * remove unstable timestamp from sqlness test Signed-off-by: Ruihang Xia * rename rewriter struct Signed-off-by: Ruihang Xia --------- Signed-off-by: Ruihang Xia --- src/query/src/dist_plan.rs | 1 - src/query/src/dist_plan/analyzer.rs | 333 ++++++++---------- src/query/src/dist_plan/utils.rs | 45 --- .../distributed/explain/join_10_tables.result | 206 +++++++++++ .../distributed/explain/join_10_tables.sql | 53 +++ .../optimizer/filter_push_down.result | 34 +- 6 files changed, 438 insertions(+), 234 deletions(-) delete mode 100644 src/query/src/dist_plan/utils.rs create mode 100644 tests/cases/distributed/explain/join_10_tables.result create mode 100644 tests/cases/distributed/explain/join_10_tables.sql diff --git a/src/query/src/dist_plan.rs b/src/query/src/dist_plan.rs index ca1480c1bd82..830f3b14cf72 100644 --- a/src/query/src/dist_plan.rs +++ b/src/query/src/dist_plan.rs @@ -16,7 +16,6 @@ mod analyzer; mod commutativity; mod merge_scan; mod planner; -mod utils; pub use analyzer::DistPlannerAnalyzer; pub use merge_scan::MergeScanLogicalPlan; diff --git a/src/query/src/dist_plan/analyzer.rs b/src/query/src/dist_plan/analyzer.rs index 2ace7f2ea907..8ee7a59f821c 100644 --- a/src/query/src/dist_plan/analyzer.rs +++ b/src/query/src/dist_plan/analyzer.rs @@ -12,12 +12,11 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::sync::{Arc, Mutex}; - use datafusion::datasource::DefaultTableSource; +use datafusion::error::Result as DfResult; use datafusion_common::config::ConfigOptions; -use datafusion_common::tree_node::{Transformed, TreeNode, TreeNodeVisitor, VisitRecursion}; -use datafusion_expr::{Extension, LogicalPlan}; +use datafusion_common::tree_node::{RewriteRecursion, TreeNode, TreeNodeRewriter}; +use datafusion_expr::LogicalPlan; use datafusion_optimizer::analyzer::AnalyzerRule; use substrait::{DFLogicalSubstraitConvertor, SubstraitPlan}; use table::metadata::TableType; @@ -27,7 +26,6 @@ use crate::dist_plan::commutativity::{ partial_commutative_transformer, Categorizer, Commutativity, }; use crate::dist_plan::merge_scan::MergeScanLogicalPlan; -use crate::dist_plan::utils; pub struct DistPlannerAnalyzer; @@ -41,233 +39,196 @@ impl AnalyzerRule for DistPlannerAnalyzer { plan: LogicalPlan, _config: &ConfigOptions, ) -> datafusion_common::Result { - // (1) transform up merge scan - let mut visitor = CommutativeVisitor::new(); - let _ = plan.visit(&mut visitor)?; - let state = ExpandState::new(); - let plan = plan.transform_down(&|plan| Self::expand(plan, &visitor, &state))?; - - // (2) remove placeholder merge scan - let plan = plan.transform(&Self::remove_placeholder_merge_scan)?; - - Ok(plan) + let mut rewriter = PlanRewriter::default(); + plan.rewrite(&mut rewriter) } } -impl DistPlannerAnalyzer { - /// Add [MergeScanLogicalPlan] before the table scan - #[allow(dead_code)] - fn add_merge_scan(plan: LogicalPlan) -> datafusion_common::Result> { - Ok(match plan { - LogicalPlan::TableScan(table_scan) => { - let ext_plan = LogicalPlan::Extension(Extension { - node: Arc::new(MergeScanLogicalPlan::new( - LogicalPlan::TableScan(table_scan), - true, - )), - }); - Transformed::Yes(ext_plan) - } - _ => Transformed::No(plan), - }) - } - - /// Remove placeholder [MergeScanLogicalPlan] - fn remove_placeholder_merge_scan( - plan: LogicalPlan, - ) -> datafusion_common::Result> { - Ok(match &plan { - LogicalPlan::Extension(extension) - if extension.node.name() == MergeScanLogicalPlan::name() => - { - let merge_scan = extension - .node - .as_any() - .downcast_ref::() - .unwrap(); - if merge_scan.is_placeholder() { - Transformed::Yes(merge_scan.input().clone()) - } else { - Transformed::No(plan) - } - } - _ => Transformed::No(plan), - }) - } - - /// Expand stages on the stop node - fn expand( - mut plan: LogicalPlan, - visitor: &CommutativeVisitor, - state: &ExpandState, - ) -> datafusion_common::Result> { - if state.is_transformed() { - // only transform once - return Ok(Transformed::No(plan)); - } - if let Some(stop_node) = visitor.stop_node && utils::hash_plan(&plan) != stop_node { - // only act with the stop node or the root (the first node seen by this closure) if no stop node - return Ok(Transformed::No(plan)); - } - - if visitor.stop_node.is_some() { - // insert merge scan between the stop node and its child - let children = plan.inputs(); - let mut new_children = Vec::with_capacity(children.len()); - for child in children { - let mut new_child = - MergeScanLogicalPlan::new(child.clone(), false).into_logical_plan(); - // expand stages - for new_stage in &visitor.next_stage { - new_child = new_stage.with_new_inputs(&[new_child])? - } - new_children.push(new_child); - } - plan = plan.with_new_inputs(&new_children)?; - } else { - // otherwise add merge scan as the new root - plan = MergeScanLogicalPlan::new(plan, false).into_logical_plan(); - // expand stages - for new_stage in &visitor.next_stage { - plan = new_stage.with_new_inputs(&[plan])? - } - } - - state.set_transformed(); - Ok(Transformed::Yes(plan)) - } +/// Status of the rewriter to mark if the current pass is expanded +#[derive(Debug, Default, PartialEq, Eq, PartialOrd, Ord)] +enum RewriterStatus { + #[default] + Unexpanded, + Expanded, } -struct ExpandState { - transformed: Mutex, +#[derive(Debug, Default)] +struct PlanRewriter { + /// Current level in the tree + level: usize, + /// Simulated stack for the `rewrite` recursion + stack: Vec<(LogicalPlan, usize)>, + /// Stages to be expanded + stage: Vec, + status: RewriterStatus, + /// Partition columns of the table in current pass + partition_cols: Option>, } -impl ExpandState { - pub fn new() -> Self { - Self { - transformed: Mutex::new(false), - } - } - - pub fn is_transformed(&self) -> bool { - *self.transformed.lock().unwrap() +impl PlanRewriter { + fn get_parent(&self) -> Option<&LogicalPlan> { + // level starts from 1, it's safe to minus by 1 + self.stack + .iter() + .rev() + .find(|(_, level)| *level == self.level - 1) + .map(|(node, _)| node) } - /// Set the state to transformed - pub fn set_transformed(&self) { - *self.transformed.lock().unwrap() = true; - } -} - -#[derive(Debug)] -struct CommutativeVisitor { - next_stage: Vec, - // hash of the stop node - stop_node: Option, - /// Partition columns of current visiting table - current_partition_cols: Option>, -} - -impl TreeNodeVisitor for CommutativeVisitor { - type N = LogicalPlan; - - fn pre_visit(&mut self, plan: &LogicalPlan) -> datafusion_common::Result { - // find the first merge scan and stop traversing down - // todo: check if it works for join - Ok(match plan { - LogicalPlan::TableScan(table_scan) => { - // TODO(ruihang): spawn a sub visitor to retrieve partition columns - if let Some(source) = table_scan - .source - .as_any() - .downcast_ref::() - { - if let Some(provider) = source - .table_provider - .as_any() - .downcast_ref::() - { - if provider.table().table_type() == TableType::Base { - let info = provider.table().table_info(); - let partition_key_indices = info.meta.partition_key_indices.clone(); - let schema = info.meta.schema.clone(); - let partition_cols = partition_key_indices - .into_iter() - .map(|index| schema.column_name_by_index(index).to_string()) - .collect::>(); - self.current_partition_cols = Some(partition_cols); - } - } - } - VisitRecursion::Continue - } - _ => VisitRecursion::Continue, - }) - } - - fn post_visit(&mut self, plan: &LogicalPlan) -> datafusion_common::Result { + /// Return true if should stop and expand. The input plan is the parent node of current node + fn should_expand(&mut self, plan: &LogicalPlan) -> bool { if DFLogicalSubstraitConvertor.encode(plan).is_err() { common_telemetry::info!( "substrait error: {:?}", DFLogicalSubstraitConvertor.encode(plan) ); - self.stop_node = Some(utils::hash_plan(plan)); - return Ok(VisitRecursion::Stop); + return true; } match Categorizer::check_plan(plan) { Commutativity::Commutative => {} Commutativity::PartialCommutative => { if let Some(plan) = partial_commutative_transformer(plan) { - self.next_stage.push(plan) + self.stage.push(plan) } } Commutativity::ConditionalCommutative(transformer) => { if let Some(transformer) = transformer && let Some(plan) = transformer(plan) { - self.next_stage.push(plan) + self.stage.push(plan) } }, Commutativity::TransformedCommutative(transformer) => { if let Some(transformer) = transformer && let Some(plan) = transformer(plan) { - self.next_stage.push(plan) - } - }, - Commutativity::CheckPartition => { - if let Some(partition_cols) = &self.current_partition_cols - && partition_cols.is_empty() { - // no partition columns, and can be encoded skip - return Ok(VisitRecursion::Continue); - } else { - self.stop_node = Some(utils::hash_plan(plan)); - return Ok(VisitRecursion::Stop); + self.stage.push(plan) } }, - Commutativity::NonCommutative + Commutativity::CheckPartition + | Commutativity::NonCommutative | Commutativity::Unimplemented | Commutativity::Unsupported => { - self.stop_node = Some(utils::hash_plan(plan)); - return Ok(VisitRecursion::Stop); + return true; + } + } + + false + } + + fn is_expanded(&self) -> bool { + self.status == RewriterStatus::Expanded + } + + fn set_expanded(&mut self) { + self.status = RewriterStatus::Expanded; + } + + fn set_unexpanded(&mut self) { + self.status = RewriterStatus::Unexpanded; + } + + fn maybe_set_partitions(&mut self, plan: &LogicalPlan) { + if self.partition_cols.is_some() { + // only need to set once + return; + } + + if let LogicalPlan::TableScan(table_scan) = plan { + if let Some(source) = table_scan + .source + .as_any() + .downcast_ref::() + { + if let Some(provider) = source + .table_provider + .as_any() + .downcast_ref::() + { + if provider.table().table_type() == TableType::Base { + let info = provider.table().table_info(); + let partition_key_indices = info.meta.partition_key_indices.clone(); + let schema = info.meta.schema.clone(); + let partition_cols = partition_key_indices + .into_iter() + .map(|index| schema.column_name_by_index(index).to_string()) + .collect::>(); + self.partition_cols = Some(partition_cols); + } + } } } + } - Ok(VisitRecursion::Continue) + /// pop one stack item and reduce the level by 1 + fn pop_stack(&mut self) { + self.level -= 1; + self.stack.pop(); } } -impl CommutativeVisitor { - pub fn new() -> Self { - Self { - next_stage: vec![], - stop_node: None, - current_partition_cols: None, +impl TreeNodeRewriter for PlanRewriter { + type N = LogicalPlan; + + /// descend + fn pre_visit<'a>(&'a mut self, node: &'a Self::N) -> DfResult { + self.level += 1; + self.stack.push((node.clone(), self.level)); + // decendening will clear the stage + self.stage.clear(); + self.set_unexpanded(); + self.partition_cols = None; + Ok(RewriteRecursion::Continue) + } + + /// ascend + /// + /// Besure to call `pop_stack` before returning + fn mutate(&mut self, node: Self::N) -> DfResult { + // only expand once on each ascending + if self.is_expanded() { + self.pop_stack(); + return Ok(node); } + + self.maybe_set_partitions(&node); + + let Some(parent) = self.get_parent() else { + // add merge scan as the new root + let mut node = MergeScanLogicalPlan::new(node, false).into_logical_plan(); + // expand stages + for new_stage in self.stage.drain(..) { + node = new_stage.with_new_inputs(&[node])? + } + self.set_expanded(); + + self.pop_stack(); + return Ok(node); + }; + + // TODO(ruihang): avoid this clone + if self.should_expand(&parent.clone()) { + // TODO(ruihang): does this work for nodes with multiple children?; + // replace the current node with expanded one + let mut node = MergeScanLogicalPlan::new(node, false).into_logical_plan(); + // expand stages + for new_stage in self.stage.drain(..) { + node = new_stage.with_new_inputs(&[node])? + } + self.set_expanded(); + + self.pop_stack(); + return Ok(node); + } + + self.pop_stack(); + Ok(node) } } #[cfg(test)] mod test { + use std::sync::Arc; + use datafusion::datasource::DefaultTableSource; use datafusion_expr::{avg, col, lit, Expr, LogicalPlanBuilder}; use table::table::adapter::DfTableProviderAdapter; diff --git a/src/query/src/dist_plan/utils.rs b/src/query/src/dist_plan/utils.rs deleted file mode 100644 index 78d2a85b1e56..000000000000 --- a/src/query/src/dist_plan/utils.rs +++ /dev/null @@ -1,45 +0,0 @@ -// Copyright 2023 Greptime Team -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use std::hash::{Hash, Hasher}; - -use ahash::AHasher; -use datafusion_expr::LogicalPlan; - -/// Calculate u64 hash for a [LogicalPlan]. -pub fn hash_plan(plan: &LogicalPlan) -> u64 { - let mut hasher = AHasher::default(); - plan.hash(&mut hasher); - hasher.finish() -} - -#[cfg(test)] -mod test { - use datafusion_expr::LogicalPlanBuilder; - - use super::*; - - #[test] - fn hash_two_plan() { - let plan1 = LogicalPlanBuilder::empty(false).build().unwrap(); - let plan2 = LogicalPlanBuilder::empty(false) - .explain(false, false) - .unwrap() - .build() - .unwrap(); - - assert_eq!(hash_plan(&plan1), hash_plan(&plan1)); - assert_ne!(hash_plan(&plan1), hash_plan(&plan2)); - } -} diff --git a/tests/cases/distributed/explain/join_10_tables.result b/tests/cases/distributed/explain/join_10_tables.result new file mode 100644 index 000000000000..23ba1f31fec6 --- /dev/null +++ b/tests/cases/distributed/explain/join_10_tables.result @@ -0,0 +1,206 @@ +create table t_1 (ts timestamp time index, vin string, val int); + +Affected Rows: 0 + +create table t_2 (ts timestamp time index, vin string, val int); + +Affected Rows: 0 + +create table t_3 (ts timestamp time index, vin string, val int); + +Affected Rows: 0 + +create table t_4 (ts timestamp time index, vin string, val int); + +Affected Rows: 0 + +create table t_5 (ts timestamp time index, vin string, val int); + +Affected Rows: 0 + +create table t_6 (ts timestamp time index, vin string, val int); + +Affected Rows: 0 + +create table t_7 (ts timestamp time index, vin string, val int); + +Affected Rows: 0 + +create table t_8 (ts timestamp time index, vin string, val int); + +Affected Rows: 0 + +create table t_9 (ts timestamp time index, vin string, val int); + +Affected Rows: 0 + +create table t_10 (ts timestamp time index, vin string, val int); + +Affected Rows: 0 + +-- SQLNESS REPLACE (-+) - +-- SQLNESS REPLACE (\s\s+) _ +-- SQLNESS REPLACE (RoundRobinBatch.*) REDACTED +-- SQLNESS REPLACE (Hash.*) REDACTED +-- SQLNESS REPLACE (peer-.*) REDACTED +explain +select * +from + t_1 + INNER JOIN t_2 ON t_2.ts = t_1.ts + AND t_2.vin = t_1.vin + INNER JOIN t_3 ON t_3.ts = t_2.ts + AND t_3.vin = t_2.vin + INNER JOIN t_4 ON t_4.ts = t_3.ts + AND t_4.vin = t_3.vin + INNER JOIN t_5 ON t_5.ts = t_4.ts + AND t_5.vin = t_4.vin + INNER JOIN t_6 ON t_6.ts = t_5.ts + AND t_6.vin = t_5.vin + INNER JOIN t_7 ON t_7.ts = t_6.ts + AND t_7.vin = t_6.vin + INNER JOIN t_8 ON t_8.ts = t_7.ts + AND t_8.vin = t_7.vin + INNER JOIN t_9 ON t_9.ts = t_8.ts + AND t_9.vin = t_8.vin + INNER JOIN t_10 ON t_10.ts = t_9.ts + AND t_10.vin = t_9.vin +where + t_1.vin is not null +order by t_1.ts desc +limit 1; + ++-+-+ +| plan_type_| plan_| ++-+-+ +| logical_plan_| Limit: skip=0, fetch=1_| +|_|_Sort: t_1.ts DESC NULLS FIRST, fetch=1_| +|_|_Inner Join: t_9.ts = t_10.ts, t_9.vin = t_10.vin_| +|_|_Inner Join: t_8.ts = t_9.ts, t_8.vin = t_9.vin_| +|_|_Inner Join: t_7.ts = t_8.ts, t_7.vin = t_8.vin_| +|_|_Inner Join: t_6.ts = t_7.ts, t_6.vin = t_7.vin_| +|_|_Inner Join: t_5.ts = t_6.ts, t_5.vin = t_6.vin_| +|_|_Inner Join: t_4.ts = t_5.ts, t_4.vin = t_5.vin_| +|_|_Inner Join: t_3.ts = t_4.ts, t_3.vin = t_4.vin_| +|_|_Inner Join: t_2.ts = t_3.ts, t_2.vin = t_3.vin_| +|_|_Inner Join: t_1.ts = t_2.ts, t_1.vin = t_2.vin_| +|_|_Filter: t_1.vin IS NOT NULL_| +|_|_MergeScan [is_placeholder=false]_| +|_|_Filter: t_2.vin IS NOT NULL_| +|_|_MergeScan [is_placeholder=false]_| +|_|_MergeScan [is_placeholder=false]_| +|_|_MergeScan [is_placeholder=false]_| +|_|_MergeScan [is_placeholder=false]_| +|_|_MergeScan [is_placeholder=false]_| +|_|_MergeScan [is_placeholder=false]_| +|_|_MergeScan [is_placeholder=false]_| +|_|_MergeScan [is_placeholder=false]_| +|_|_MergeScan [is_placeholder=false]_| +| physical_plan | GlobalLimitExec: skip=0, fetch=1_| +|_|_SortPreservingMergeExec: [ts@0 DESC], fetch=1_| +|_|_SortExec: fetch=1, expr=[ts@0 DESC]_| +|_|_CoalesceBatchesExec: target_batch_size=8192_| +|_|_REDACTED +|_|_CoalesceBatchesExec: target_batch_size=8192_| +|_|_REDACTED +|_|_CoalesceBatchesExec: target_batch_size=8192_| +|_|_REDACTED +|_|_CoalesceBatchesExec: target_batch_size=8192_| +|_|_REDACTED +|_|_CoalesceBatchesExec: target_batch_size=8192_| +|_|_REDACTED +|_|_CoalesceBatchesExec: target_batch_size=8192_| +|_|_REDACTED +|_|_CoalesceBatchesExec: target_batch_size=8192_| +|_|_REDACTED +|_|_CoalesceBatchesExec: target_batch_size=8192_| +|_|_REDACTED +|_|_CoalesceBatchesExec: target_batch_size=8192_| +|_|_REDACTED +|_|_CoalesceBatchesExec: target_batch_size=8192_| +|_|_RepartitionExec: partitioning=REDACTED +|_|_CoalesceBatchesExec: target_batch_size=8192_| +|_|_FilterExec: vin@1 IS NOT NULL_| +|_|_RepartitionExec: partitioning=REDACTED +|_|_MergeScanExec: peers=[REDACTED +|_|_CoalesceBatchesExec: target_batch_size=8192_| +|_|_RepartitionExec: partitioning=REDACTED +|_|_CoalesceBatchesExec: target_batch_size=8192_| +|_|_FilterExec: vin@1 IS NOT NULL_| +|_|_RepartitionExec: partitioning=REDACTED +|_|_MergeScanExec: peers=[REDACTED +|_|_CoalesceBatchesExec: target_batch_size=8192_| +|_|_RepartitionExec: partitioning=REDACTED +|_|_RepartitionExec: partitioning=REDACTED +|_|_MergeScanExec: peers=[REDACTED +|_|_CoalesceBatchesExec: target_batch_size=8192_| +|_|_RepartitionExec: partitioning=REDACTED +|_|_RepartitionExec: partitioning=REDACTED +|_|_MergeScanExec: peers=[REDACTED +|_|_CoalesceBatchesExec: target_batch_size=8192_| +|_|_RepartitionExec: partitioning=REDACTED +|_|_RepartitionExec: partitioning=REDACTED +|_|_MergeScanExec: peers=[REDACTED +|_|_CoalesceBatchesExec: target_batch_size=8192_| +|_|_RepartitionExec: partitioning=REDACTED +|_|_RepartitionExec: partitioning=REDACTED +|_|_MergeScanExec: peers=[REDACTED +|_|_CoalesceBatchesExec: target_batch_size=8192_| +|_|_RepartitionExec: partitioning=REDACTED +|_|_RepartitionExec: partitioning=REDACTED +|_|_MergeScanExec: peers=[REDACTED +|_|_CoalesceBatchesExec: target_batch_size=8192_| +|_|_RepartitionExec: partitioning=REDACTED +|_|_RepartitionExec: partitioning=REDACTED +|_|_MergeScanExec: peers=[REDACTED +|_|_CoalesceBatchesExec: target_batch_size=8192_| +|_|_RepartitionExec: partitioning=REDACTED +|_|_RepartitionExec: partitioning=REDACTED +|_|_MergeScanExec: peers=[REDACTED +|_|_CoalesceBatchesExec: target_batch_size=8192_| +|_|_RepartitionExec: partitioning=REDACTED +|_|_RepartitionExec: partitioning=REDACTED +|_|_MergeScanExec: peers=[REDACTED +|_|_| ++-+-+ + +drop table t_1; + +Affected Rows: 1 + +drop table t_2; + +Affected Rows: 1 + +drop table t_3; + +Affected Rows: 1 + +drop table t_4; + +Affected Rows: 1 + +drop table t_5; + +Affected Rows: 1 + +drop table t_6; + +Affected Rows: 1 + +drop table t_7; + +Affected Rows: 1 + +drop table t_8; + +Affected Rows: 1 + +drop table t_9; + +Affected Rows: 1 + +drop table t_10; + +Affected Rows: 1 + diff --git a/tests/cases/distributed/explain/join_10_tables.sql b/tests/cases/distributed/explain/join_10_tables.sql new file mode 100644 index 000000000000..6f756ade6bb1 --- /dev/null +++ b/tests/cases/distributed/explain/join_10_tables.sql @@ -0,0 +1,53 @@ +create table t_1 (ts timestamp time index, vin string, val int); +create table t_2 (ts timestamp time index, vin string, val int); +create table t_3 (ts timestamp time index, vin string, val int); +create table t_4 (ts timestamp time index, vin string, val int); +create table t_5 (ts timestamp time index, vin string, val int); +create table t_6 (ts timestamp time index, vin string, val int); +create table t_7 (ts timestamp time index, vin string, val int); +create table t_8 (ts timestamp time index, vin string, val int); +create table t_9 (ts timestamp time index, vin string, val int); +create table t_10 (ts timestamp time index, vin string, val int); + +-- SQLNESS REPLACE (-+) - +-- SQLNESS REPLACE (\s\s+) _ +-- SQLNESS REPLACE (RoundRobinBatch.*) REDACTED +-- SQLNESS REPLACE (Hash.*) REDACTED +-- SQLNESS REPLACE (peer-.*) REDACTED +explain +select * +from + t_1 + INNER JOIN t_2 ON t_2.ts = t_1.ts + AND t_2.vin = t_1.vin + INNER JOIN t_3 ON t_3.ts = t_2.ts + AND t_3.vin = t_2.vin + INNER JOIN t_4 ON t_4.ts = t_3.ts + AND t_4.vin = t_3.vin + INNER JOIN t_5 ON t_5.ts = t_4.ts + AND t_5.vin = t_4.vin + INNER JOIN t_6 ON t_6.ts = t_5.ts + AND t_6.vin = t_5.vin + INNER JOIN t_7 ON t_7.ts = t_6.ts + AND t_7.vin = t_6.vin + INNER JOIN t_8 ON t_8.ts = t_7.ts + AND t_8.vin = t_7.vin + INNER JOIN t_9 ON t_9.ts = t_8.ts + AND t_9.vin = t_8.vin + INNER JOIN t_10 ON t_10.ts = t_9.ts + AND t_10.vin = t_9.vin +where + t_1.vin is not null +order by t_1.ts desc +limit 1; + +drop table t_1; +drop table t_2; +drop table t_3; +drop table t_4; +drop table t_5; +drop table t_6; +drop table t_7; +drop table t_8; +drop table t_9; +drop table t_10; diff --git a/tests/cases/distributed/optimizer/filter_push_down.result b/tests/cases/distributed/optimizer/filter_push_down.result index 40884864ef0c..94ab90c33ded 100644 --- a/tests/cases/distributed/optimizer/filter_push_down.result +++ b/tests/cases/distributed/optimizer/filter_push_down.result @@ -56,7 +56,22 @@ SELECT i1.i,i2.i FROM integers i1 LEFT OUTER JOIN integers i2 ON 1=1 WHERE i1.i> SELECT i1.i,i2.i FROM integers i1 LEFT OUTER JOIN integers i2 ON 1=0 WHERE i2.i IS NOT NULL ORDER BY 2; -Error: 1003(Internal), This feature is not implemented: Unsupported expression: IsNotNull(Column(Column { relation: Some(Full { catalog: "greptime", schema: "public", table: "integers" }), name: "i" })) ++---+---+ +| i | i | ++---+---+ +| 1 | 1 | +| 2 | 1 | +| 3 | 1 | +| | 1 | +| 1 | 2 | +| 2 | 2 | +| 3 | 2 | +| | 2 | +| 1 | 3 | +| 2 | 3 | +| 3 | 3 | +| | 3 | ++---+---+ SELECT i1.i,i2.i FROM integers i1 LEFT OUTER JOIN integers i2 ON 1=0 WHERE i2.i>1 ORDER BY 2; @@ -75,7 +90,22 @@ SELECT i1.i,i2.i FROM integers i1 LEFT OUTER JOIN integers i2 ON 1=0 WHERE i2.i> SELECT i1.i,i2.i FROM integers i1 LEFT OUTER JOIN integers i2 ON 1=0 WHERE CASE WHEN i2.i IS NULL THEN False ELSE True END ORDER BY 2; -Error: 1003(Internal), This feature is not implemented: Unsupported expression: IsNotNull(Column(Column { relation: Some(Full { catalog: "greptime", schema: "public", table: "integers" }), name: "i" })) ++---+---+ +| i | i | ++---+---+ +| 1 | 1 | +| 2 | 1 | +| 3 | 1 | +| | 1 | +| 1 | 2 | +| 2 | 2 | +| 3 | 2 | +| | 2 | +| 1 | 3 | +| 2 | 3 | +| 3 | 3 | +| | 3 | ++---+---+ SELECT DISTINCT i1.i,i2.i FROM integers i1 LEFT OUTER JOIN integers i2 ON 1=0 WHERE i2.i IS NULL ORDER BY 1;