Skip to content

Commit

Permalink
fmt
Browse files Browse the repository at this point in the history
  • Loading branch information
xxhZs committed Jun 18, 2024
1 parent 027adea commit 0e6ee46
Show file tree
Hide file tree
Showing 6 changed files with 26 additions and 13 deletions.
5 changes: 3 additions & 2 deletions src/frontend/src/binder/relation/share.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use crate::binder::bind_context::RecursiveUnion;
use crate::binder::statement::RewriteExprsRecursive;
use crate::binder::{BoundQuery, Relation, ShareId};
use crate::error::{ErrorCode, Result};
use crate::optimizer::plan_node::generic::{CHANGE_LOG_OP, _CHANGE_LOG_ROW_ID};

/// Share a relation during binding and planning.
/// It could be used to share a (recursive) CTE, a source, a view and so on.
Expand Down Expand Up @@ -73,14 +74,14 @@ impl BoundShareInput {
false,
Field::with_name(
risingwave_common::types::DataType::Int16,
"op".to_string(),
CHANGE_LOG_OP.to_string(),
),
),
(
true,
Field::with_name(
risingwave_common::types::DataType::Serial,
"_changelog_row_id".to_string(),
_CHANGE_LOG_ROW_ID.to_string(),
),
),
])
Expand Down
14 changes: 13 additions & 1 deletion src/frontend/src/binder/select.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,12 @@ use risingwave_sqlparser::ast::{

use super::bind_context::{Clause, ColumnBinding};
use super::statement::RewriteExprsRecursive;
use super::UNNAMED_COLUMN;
use super::{BoundShareInput, UNNAMED_COLUMN};
use crate::binder::{Binder, Relation};
use crate::catalog::check_valid_column_name;
use crate::error::{ErrorCode, Result, RwError};
use crate::expr::{CorrelatedId, Depth, Expr as _, ExprImpl, ExprType, FunctionCall, InputRef};
use crate::optimizer::plan_node::generic::CHANGE_LOG_OP;
use crate::utils::group_by::GroupBy;

#[derive(Debug, Clone)]
Expand Down Expand Up @@ -282,6 +283,17 @@ impl Binder {
})
.collect::<Result<Vec<Field>>>()?;

if let Some(Relation::Share(bound)) = &from {
if matches!(bound.input, BoundShareInput::ChangeLog(_))
&& fields.iter().filter(|&x| x.name.eq(CHANGE_LOG_OP)).count() > 1
{
return Err(ErrorCode::BindError(
"The source table of changelog cannot have `change_log_op`, please rename it first".to_string()
)
.into());
}
}

Ok(BoundSelect {
distinct,
select_items,
Expand Down
6 changes: 4 additions & 2 deletions src/frontend/src/optimizer/plan_node/generic/change_log.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ use crate::optimizer::property::FunctionalDependencySet;
use crate::utils::ColIndexMappingRewriteExt;
use crate::OptimizerContextRef;

pub const CHANGE_LOG_OP: &str = "change_log_op";
pub const _CHANGE_LOG_ROW_ID: &str = "_change_log_row_id";
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct ChangeLog<PlanRef> {
pub input: PlanRef,
Expand Down Expand Up @@ -57,13 +59,13 @@ impl<PlanRef: GenericPlanRef> GenericPlanNode for ChangeLog<PlanRef> {
if self.need_op {
fields.push(Field::with_name(
risingwave_common::types::DataType::Int16,
"op",
CHANGE_LOG_OP,
));
}
if self.need_change_log_row_id {
fields.push(Field::with_name(
risingwave_common::types::DataType::Serial,
"_change_log_row_id",
_CHANGE_LOG_ROW_ID,
));
}
Schema::new(fields)
Expand Down
6 changes: 3 additions & 3 deletions src/frontend/src/optimizer/plan_node/logical_change_log.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
// limitations under the License.

use super::expr_visitable::ExprVisitable;
use super::generic::GenericPlanRef;
use super::generic::{GenericPlanRef, CHANGE_LOG_OP, _CHANGE_LOG_ROW_ID};
use super::utils::impl_distill_by_unit;
use super::{
gen_filter_and_pushdown, generic, ColPrunable, ColumnPruningContext, ExprRewritable, Logical,
Expand Down Expand Up @@ -101,10 +101,10 @@ impl ColPrunable for LogicalChangeLog {
.iter()
.filter_map(|a| {
if let Some(f) = fields.get(*a) {
if f.name == "op" {
if f.name == CHANGE_LOG_OP {
need_op = true;
None
} else if f.name == "_change_log_row_id" {
} else if f.name == _CHANGE_LOG_ROW_ID {
need_change_log_row_id = true;
None
} else {
Expand Down
2 changes: 1 addition & 1 deletion src/frontend/src/optimizer/plan_node/logical_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -538,7 +538,7 @@ impl ToStream for LogicalScan {
&self,
ctx: &mut RewriteStreamContext,
) -> Result<(PlanRef, ColIndexMapping)> {
match self.base.stream_key().is_none() && ctx.get_with_stream_key(){
match self.base.stream_key().is_none() && ctx.get_with_stream_key() {
true => {
let mut col_ids = HashSet::new();

Expand Down
6 changes: 2 additions & 4 deletions src/frontend/src/optimizer/plan_node/stream_table_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ use crate::optimizer::plan_node::utils::{IndicesDisplay, TableCatalogBuilder};
use crate::optimizer::property::{Distribution, DistributionDisplay};
use crate::scheduler::SchedulerResult;
use crate::stream_fragmenter::BuildFragmentGraphState;
use crate::{Explain, TableCatalog};
use crate::TableCatalog;

/// `StreamTableScan` is a virtual plan node to represent a stream table scan. It will be converted
/// to stream scan + merge node (for upstream materialize) + batch table scan when converting to `MView`
Expand Down Expand Up @@ -240,9 +240,7 @@ impl StreamTableScan {

let stream_key = self
.stream_key()
.unwrap_or_else(|| {
&[]
})
.unwrap_or(&[])
.iter()
.map(|x| *x as u32)
.collect_vec();
Expand Down

0 comments on commit 0e6ee46

Please sign in to comment.