Skip to content

Commit

Permalink
fmt
Browse files Browse the repository at this point in the history
fix ci
  • Loading branch information
xxhZs committed Jun 19, 2024
1 parent 027adea commit ab6422a
Show file tree
Hide file tree
Showing 7 changed files with 27 additions and 14 deletions.
2 changes: 1 addition & 1 deletion e2e_test/streaming/changed_log.slt
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ select sub1.v1 as v11, sub1.v2 as v12, sub2.v1 as v21, sub2.v2 as v22 from sub1

statement ok
create materialized view mv5 as with sub1 as changelog from t1, sub2 as changelog from t2
select sub1.v1 as v11, sub1.v2 as v12, sub2.v1 as v21, sub2.v2 as v22, sub1.op as op1, sub2.op as op2 from sub1 inner join sub2 on sub1.v1 = sub2.v1;
select sub1.v1 as v11, sub1.v2 as v12, sub2.v1 as v21, sub2.v2 as v22, sub1.change_log_op as op1, sub2.change_log_op as op2 from sub1 inner join sub2 on sub1.v1 = sub2.v1;

statement ok
create materialized view mv6 as with sub as changelog from t3 select * from sub;
Expand Down
5 changes: 3 additions & 2 deletions src/frontend/src/binder/relation/share.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use crate::binder::bind_context::RecursiveUnion;
use crate::binder::statement::RewriteExprsRecursive;
use crate::binder::{BoundQuery, Relation, ShareId};
use crate::error::{ErrorCode, Result};
use crate::optimizer::plan_node::generic::{CHANGE_LOG_OP, _CHANGE_LOG_ROW_ID};

/// Share a relation during binding and planning.
/// It could be used to share a (recursive) CTE, a source, a view and so on.
Expand Down Expand Up @@ -73,14 +74,14 @@ impl BoundShareInput {
false,
Field::with_name(
risingwave_common::types::DataType::Int16,
"op".to_string(),
CHANGE_LOG_OP.to_string(),
),
),
(
true,
Field::with_name(
risingwave_common::types::DataType::Serial,
"_changelog_row_id".to_string(),
_CHANGE_LOG_ROW_ID.to_string(),
),
),
])
Expand Down
14 changes: 13 additions & 1 deletion src/frontend/src/binder/select.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,12 @@ use risingwave_sqlparser::ast::{

use super::bind_context::{Clause, ColumnBinding};
use super::statement::RewriteExprsRecursive;
use super::UNNAMED_COLUMN;
use super::{BoundShareInput, UNNAMED_COLUMN};
use crate::binder::{Binder, Relation};
use crate::catalog::check_valid_column_name;
use crate::error::{ErrorCode, Result, RwError};
use crate::expr::{CorrelatedId, Depth, Expr as _, ExprImpl, ExprType, FunctionCall, InputRef};
use crate::optimizer::plan_node::generic::CHANGE_LOG_OP;
use crate::utils::group_by::GroupBy;

#[derive(Debug, Clone)]
Expand Down Expand Up @@ -282,6 +283,17 @@ impl Binder {
})
.collect::<Result<Vec<Field>>>()?;

if let Some(Relation::Share(bound)) = &from {
if matches!(bound.input, BoundShareInput::ChangeLog(_))
&& fields.iter().filter(|&x| x.name.eq(CHANGE_LOG_OP)).count() > 1
{
return Err(ErrorCode::BindError(
"The source table of changelog cannot have `change_log_op`, please rename it first".to_string()
)
.into());
}
}

Ok(BoundSelect {
distinct,
select_items,
Expand Down
6 changes: 4 additions & 2 deletions src/frontend/src/optimizer/plan_node/generic/change_log.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ use crate::optimizer::property::FunctionalDependencySet;
use crate::utils::ColIndexMappingRewriteExt;
use crate::OptimizerContextRef;

pub const CHANGE_LOG_OP: &str = "change_log_op";
pub const _CHANGE_LOG_ROW_ID: &str = "_change_log_row_id";
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct ChangeLog<PlanRef> {
pub input: PlanRef,
Expand Down Expand Up @@ -57,13 +59,13 @@ impl<PlanRef: GenericPlanRef> GenericPlanNode for ChangeLog<PlanRef> {
if self.need_op {
fields.push(Field::with_name(
risingwave_common::types::DataType::Int16,
"op",
CHANGE_LOG_OP,
));
}
if self.need_change_log_row_id {
fields.push(Field::with_name(
risingwave_common::types::DataType::Serial,
"_change_log_row_id",
_CHANGE_LOG_ROW_ID,
));
}
Schema::new(fields)
Expand Down
6 changes: 3 additions & 3 deletions src/frontend/src/optimizer/plan_node/logical_change_log.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
// limitations under the License.

use super::expr_visitable::ExprVisitable;
use super::generic::GenericPlanRef;
use super::generic::{GenericPlanRef, CHANGE_LOG_OP, _CHANGE_LOG_ROW_ID};
use super::utils::impl_distill_by_unit;
use super::{
gen_filter_and_pushdown, generic, ColPrunable, ColumnPruningContext, ExprRewritable, Logical,
Expand Down Expand Up @@ -101,10 +101,10 @@ impl ColPrunable for LogicalChangeLog {
.iter()
.filter_map(|a| {
if let Some(f) = fields.get(*a) {
if f.name == "op" {
if f.name == CHANGE_LOG_OP {
need_op = true;
None
} else if f.name == "_change_log_row_id" {
} else if f.name == _CHANGE_LOG_ROW_ID {
need_change_log_row_id = true;
None
} else {
Expand Down
2 changes: 1 addition & 1 deletion src/frontend/src/optimizer/plan_node/logical_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -538,7 +538,7 @@ impl ToStream for LogicalScan {
&self,
ctx: &mut RewriteStreamContext,
) -> Result<(PlanRef, ColIndexMapping)> {
match self.base.stream_key().is_none() && ctx.get_with_stream_key(){
match self.base.stream_key().is_none() && ctx.get_with_stream_key() {
true => {
let mut col_ids = HashSet::new();

Expand Down
6 changes: 2 additions & 4 deletions src/frontend/src/optimizer/plan_node/stream_table_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ use crate::optimizer::plan_node::utils::{IndicesDisplay, TableCatalogBuilder};
use crate::optimizer::property::{Distribution, DistributionDisplay};
use crate::scheduler::SchedulerResult;
use crate::stream_fragmenter::BuildFragmentGraphState;
use crate::{Explain, TableCatalog};
use crate::TableCatalog;

/// `StreamTableScan` is a virtual plan node to represent a stream table scan. It will be converted
/// to stream scan + merge node (for upstream materialize) + batch table scan when converting to `MView`
Expand Down Expand Up @@ -240,9 +240,7 @@ impl StreamTableScan {

let stream_key = self
.stream_key()
.unwrap_or_else(|| {
&[]
})
.unwrap_or(&[])
.iter()
.map(|x| *x as u32)
.collect_vec();
Expand Down

0 comments on commit ab6422a

Please sign in to comment.