From ab6422a163e3316472102d2d399e4d5c5d88d246 Mon Sep 17 00:00:00 2001 From: xxhZs <1060434431@qq.com> Date: Tue, 18 Jun 2024 18:49:54 +0800 Subject: [PATCH] fmt fix ci --- e2e_test/streaming/changed_log.slt | 2 +- src/frontend/src/binder/relation/share.rs | 5 +++-- src/frontend/src/binder/select.rs | 14 +++++++++++++- .../src/optimizer/plan_node/generic/change_log.rs | 6 ++++-- .../src/optimizer/plan_node/logical_change_log.rs | 6 +++--- .../src/optimizer/plan_node/logical_scan.rs | 2 +- .../src/optimizer/plan_node/stream_table_scan.rs | 6 ++---- 7 files changed, 27 insertions(+), 14 deletions(-) diff --git a/e2e_test/streaming/changed_log.slt b/e2e_test/streaming/changed_log.slt index 199cbf381313..4b5adede244a 100644 --- a/e2e_test/streaming/changed_log.slt +++ b/e2e_test/streaming/changed_log.slt @@ -25,7 +25,7 @@ select sub1.v1 as v11, sub1.v2 as v12, sub2.v1 as v21, sub2.v2 as v22 from sub1 statement ok create materialized view mv5 as with sub1 as changelog from t1, sub2 as changelog from t2 -select sub1.v1 as v11, sub1.v2 as v12, sub2.v1 as v21, sub2.v2 as v22, sub1.op as op1, sub2.op as op2 from sub1 inner join sub2 on sub1.v1 = sub2.v1; +select sub1.v1 as v11, sub1.v2 as v12, sub2.v1 as v21, sub2.v2 as v22, sub1.change_log_op as op1, sub2.change_log_op as op2 from sub1 inner join sub2 on sub1.v1 = sub2.v1; statement ok create materialized view mv6 as with sub as changelog from t3 select * from sub; diff --git a/src/frontend/src/binder/relation/share.rs b/src/frontend/src/binder/relation/share.rs index a7450096cd97..0c5ea7627cc5 100644 --- a/src/frontend/src/binder/relation/share.rs +++ b/src/frontend/src/binder/relation/share.rs @@ -20,6 +20,7 @@ use crate::binder::bind_context::RecursiveUnion; use crate::binder::statement::RewriteExprsRecursive; use crate::binder::{BoundQuery, Relation, ShareId}; use crate::error::{ErrorCode, Result}; +use crate::optimizer::plan_node::generic::{CHANGE_LOG_OP, _CHANGE_LOG_ROW_ID}; /// Share a relation during binding and planning. /// It could be used to share a (recursive) CTE, a source, a view and so on. @@ -73,14 +74,14 @@ impl BoundShareInput { false, Field::with_name( risingwave_common::types::DataType::Int16, - "op".to_string(), + CHANGE_LOG_OP.to_string(), ), ), ( true, Field::with_name( risingwave_common::types::DataType::Serial, - "_changelog_row_id".to_string(), + _CHANGE_LOG_ROW_ID.to_string(), ), ), ]) diff --git a/src/frontend/src/binder/select.rs b/src/frontend/src/binder/select.rs index d9848ed76973..4c6097ba9e9f 100644 --- a/src/frontend/src/binder/select.rs +++ b/src/frontend/src/binder/select.rs @@ -25,11 +25,12 @@ use risingwave_sqlparser::ast::{ use super::bind_context::{Clause, ColumnBinding}; use super::statement::RewriteExprsRecursive; -use super::UNNAMED_COLUMN; +use super::{BoundShareInput, UNNAMED_COLUMN}; use crate::binder::{Binder, Relation}; use crate::catalog::check_valid_column_name; use crate::error::{ErrorCode, Result, RwError}; use crate::expr::{CorrelatedId, Depth, Expr as _, ExprImpl, ExprType, FunctionCall, InputRef}; +use crate::optimizer::plan_node::generic::CHANGE_LOG_OP; use crate::utils::group_by::GroupBy; #[derive(Debug, Clone)] @@ -282,6 +283,17 @@ impl Binder { }) .collect::>>()?; + if let Some(Relation::Share(bound)) = &from { + if matches!(bound.input, BoundShareInput::ChangeLog(_)) + && fields.iter().filter(|&x| x.name.eq(CHANGE_LOG_OP)).count() > 1 + { + return Err(ErrorCode::BindError( + "The source table of changelog cannot have `change_log_op`, please rename it first".to_string() + ) + .into()); + } + } + Ok(BoundSelect { distinct, select_items, diff --git a/src/frontend/src/optimizer/plan_node/generic/change_log.rs b/src/frontend/src/optimizer/plan_node/generic/change_log.rs index cb4d6df05b1a..e505a451d824 100644 --- a/src/frontend/src/optimizer/plan_node/generic/change_log.rs +++ b/src/frontend/src/optimizer/plan_node/generic/change_log.rs @@ -23,6 +23,8 @@ use crate::optimizer::property::FunctionalDependencySet; use crate::utils::ColIndexMappingRewriteExt; use crate::OptimizerContextRef; +pub const CHANGE_LOG_OP: &str = "change_log_op"; +pub const _CHANGE_LOG_ROW_ID: &str = "_change_log_row_id"; #[derive(Debug, Clone, PartialEq, Eq, Hash)] pub struct ChangeLog { pub input: PlanRef, @@ -57,13 +59,13 @@ impl GenericPlanNode for ChangeLog { if self.need_op { fields.push(Field::with_name( risingwave_common::types::DataType::Int16, - "op", + CHANGE_LOG_OP, )); } if self.need_change_log_row_id { fields.push(Field::with_name( risingwave_common::types::DataType::Serial, - "_change_log_row_id", + _CHANGE_LOG_ROW_ID, )); } Schema::new(fields) diff --git a/src/frontend/src/optimizer/plan_node/logical_change_log.rs b/src/frontend/src/optimizer/plan_node/logical_change_log.rs index 5b8ed3a5e85f..fe4f920e92fa 100644 --- a/src/frontend/src/optimizer/plan_node/logical_change_log.rs +++ b/src/frontend/src/optimizer/plan_node/logical_change_log.rs @@ -13,7 +13,7 @@ // limitations under the License. use super::expr_visitable::ExprVisitable; -use super::generic::GenericPlanRef; +use super::generic::{GenericPlanRef, CHANGE_LOG_OP, _CHANGE_LOG_ROW_ID}; use super::utils::impl_distill_by_unit; use super::{ gen_filter_and_pushdown, generic, ColPrunable, ColumnPruningContext, ExprRewritable, Logical, @@ -101,10 +101,10 @@ impl ColPrunable for LogicalChangeLog { .iter() .filter_map(|a| { if let Some(f) = fields.get(*a) { - if f.name == "op" { + if f.name == CHANGE_LOG_OP { need_op = true; None - } else if f.name == "_change_log_row_id" { + } else if f.name == _CHANGE_LOG_ROW_ID { need_change_log_row_id = true; None } else { diff --git a/src/frontend/src/optimizer/plan_node/logical_scan.rs b/src/frontend/src/optimizer/plan_node/logical_scan.rs index 565e3082c004..d4dd87b50834 100644 --- a/src/frontend/src/optimizer/plan_node/logical_scan.rs +++ b/src/frontend/src/optimizer/plan_node/logical_scan.rs @@ -538,7 +538,7 @@ impl ToStream for LogicalScan { &self, ctx: &mut RewriteStreamContext, ) -> Result<(PlanRef, ColIndexMapping)> { - match self.base.stream_key().is_none() && ctx.get_with_stream_key(){ + match self.base.stream_key().is_none() && ctx.get_with_stream_key() { true => { let mut col_ids = HashSet::new(); diff --git a/src/frontend/src/optimizer/plan_node/stream_table_scan.rs b/src/frontend/src/optimizer/plan_node/stream_table_scan.rs index 3993cdf9a359..b72cc5474036 100644 --- a/src/frontend/src/optimizer/plan_node/stream_table_scan.rs +++ b/src/frontend/src/optimizer/plan_node/stream_table_scan.rs @@ -34,7 +34,7 @@ use crate::optimizer::plan_node::utils::{IndicesDisplay, TableCatalogBuilder}; use crate::optimizer::property::{Distribution, DistributionDisplay}; use crate::scheduler::SchedulerResult; use crate::stream_fragmenter::BuildFragmentGraphState; -use crate::{Explain, TableCatalog}; +use crate::TableCatalog; /// `StreamTableScan` is a virtual plan node to represent a stream table scan. It will be converted /// to stream scan + merge node (for upstream materialize) + batch table scan when converting to `MView` @@ -240,9 +240,7 @@ impl StreamTableScan { let stream_key = self .stream_key() - .unwrap_or_else(|| { - &[] - }) + .unwrap_or(&[]) .iter() .map(|x| *x as u32) .collect_vec();