Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

cherry-pick fix: changelog #18785 + #19029 + #19030 to branch release-2.1 #19098

Merged
merged 3 commits into from
Oct 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 40 additions & 0 deletions e2e_test/ddl/alter_rename.slt
Original file line number Diff line number Diff line change
Expand Up @@ -280,3 +280,43 @@ DROP TABLE t2;

statement ok
DROP TABLE t_as_1;

# BEGIN references in changelog

statement ok
CREATE TABLE a (a0 int);

statement ok
CREATE TABLE b (b0 int);

statement ok
CREATE MATERIALIZED VIEW mv AS
WITH a_log AS changelog FROM a,
b_log AS changelog FROM b
SELECT 'a' AS tbl, * FROM a_log
UNION ALL
SELECT 'b', * FROM b_log;

query T
SELECT definition FROM rw_materialized_views WHERE name = 'mv';
----
CREATE MATERIALIZED VIEW mv AS WITH a_log AS changelog from a, b_log AS changelog from b SELECT 'a' AS tbl, * FROM a_log UNION ALL SELECT 'b', * FROM b_log

statement ok
ALTER TABLE a RENAME TO c;

query T
SELECT definition FROM rw_materialized_views WHERE name = 'mv';
----
CREATE MATERIALIZED VIEW mv AS WITH a_log AS changelog from "c", b_log AS changelog from b SELECT 'a' AS tbl, * FROM a_log UNION ALL SELECT 'b', * FROM b_log

statement ok
DROP MATERIALIZED VIEW mv;

statement ok
DROP TABLE b;

statement ok
DROP TABLE c;

# END references in changelog
3 changes: 2 additions & 1 deletion e2e_test/streaming/changelog.slt
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,9 @@ create table t2 (v1 int, v2 int);
statement ok
create table t3 (v1 int primary key, v2 int);

# test public.t1 due to https://github.com/risingwavelabs/risingwave/issues/18747
statement ok
create materialized view mv1 as with sub as changelog from t1 select * from sub;
create materialized view mv1 as with sub as changelog from public.t1 select * from sub;

statement ok
create materialized view mv2 as with sub as changelog from t2 select * from sub;
Expand Down
9 changes: 3 additions & 6 deletions src/frontend/src/binder/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use risingwave_common::catalog::Schema;
use risingwave_common::types::DataType;
use risingwave_common::util::sort_util::{ColumnOrder, OrderType};
use risingwave_sqlparser::ast::{
Cte, CteInner, Expr, Fetch, ObjectName, OrderByExpr, Query, SetExpr, SetOperator, Value, With,
Cte, CteInner, Expr, Fetch, OrderByExpr, Query, SetExpr, SetOperator, Value, With,
};
use thiserror_ext::AsReport;

Expand Down Expand Up @@ -350,11 +350,8 @@ impl Binder {
}
CteInner::ChangeLog(from_table_name) => {
self.push_context();
let from_table_relation = self.bind_relation_by_name(
ObjectName::from(vec![from_table_name]),
None,
None,
)?;
let from_table_relation =
self.bind_relation_by_name(from_table_name.clone(), None, None)?;
self.pop_context()?;
self.context.cte_to_relation.insert(
table_name,
Expand Down
7 changes: 5 additions & 2 deletions src/meta/src/controller/rename.rs
Original file line number Diff line number Diff line change
Expand Up @@ -155,8 +155,11 @@ impl QueryRewriter<'_> {
for cte_table in &mut with.cte_tables {
match &mut cte_table.cte_inner {
risingwave_sqlparser::ast::CteInner::Query(query) => self.visit_query(query),
risingwave_sqlparser::ast::CteInner::ChangeLog(from) => {
*from = Ident::new_unchecked(self.to)
risingwave_sqlparser::ast::CteInner::ChangeLog(name) => {
let idx = name.0.len() - 1;
if name.0[idx].real_value() == self.from {
name.0[idx] = Ident::with_quote_unchecked('"', self.to);
}
}
}
}
Expand Down
6 changes: 3 additions & 3 deletions src/sqlparser/src/ast/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -335,8 +335,8 @@ impl fmt::Display for Cte {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match &self.cte_inner {
CteInner::Query(query) => write!(f, "{} AS ({})", self.alias, query)?,
CteInner::ChangeLog(ident) => {
write!(f, "{} AS changelog from {}", self.alias, ident.value)?
CteInner::ChangeLog(obj_name) => {
write!(f, "{} AS changelog from {}", self.alias, obj_name)?
}
}
Ok(())
Expand All @@ -347,7 +347,7 @@ impl fmt::Display for Cte {
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
pub enum CteInner {
Query(Query),
ChangeLog(Ident),
ChangeLog(ObjectName),
}

/// One item of the comma-separated list following `SELECT`
Expand Down
2 changes: 1 addition & 1 deletion src/sqlparser/src/parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4133,7 +4133,7 @@ impl Parser<'_> {
parser_err!("Expected 'changelog' but found '{}'", changelog);
}
self.expect_keyword(Keyword::FROM)?;
Ok(CteInner::ChangeLog(self.parse_identifier()?))
Ok(CteInner::ChangeLog(self.parse_object_name()?))
}
}

Expand Down
64 changes: 0 additions & 64 deletions src/sqlparser/tests/sqlparser_common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2836,70 +2836,6 @@ fn parse_ctes() {
}
}

#[test]
fn parse_changelog_ctes() {
let cte_sqls = vec!["foo", "bar"];
let with = &format!(
"WITH a AS changelog from {}, b AS changelog from {} SELECT foo + bar FROM a, b",
cte_sqls[0], cte_sqls[1]
);

fn assert_changelog_ctes(expected: &[&str], sel: &Query) {
for (i, exp) in expected.iter().enumerate() {
let Cte { alias, cte_inner } = &sel.with.as_ref().unwrap().cte_tables[i];
if let CteInner::ChangeLog(from) = cte_inner {
assert_eq!(*exp, from.to_string());
assert_eq!(
if i == 0 {
Ident::new_unchecked("a")
} else {
Ident::new_unchecked("b")
},
alias.name
);
assert!(alias.columns.is_empty());
} else {
panic!("expected CteInner::ChangeLog")
}
}
}

// Top-level CTE
assert_changelog_ctes(&cte_sqls, &verified_query(with));
// CTE in a subquery
let sql = &format!("SELECT ({})", with);
let select = verified_only_select(sql);
match expr_from_projection(only(&select.projection)) {
Expr::Subquery(ref subquery) => {
assert_changelog_ctes(&cte_sqls, subquery.as_ref());
}
_ => panic!("expected subquery"),
}
// CTE in a derived table
let sql = &format!("SELECT * FROM ({})", with);
let select = verified_only_select(sql);
match only(select.from).relation {
TableFactor::Derived { subquery, .. } => {
assert_changelog_ctes(&cte_sqls, subquery.as_ref())
}
_ => panic!("expected derived table"),
}
// CTE in a view
let sql = &format!("CREATE VIEW v AS {}", with);
match verified_stmt(sql) {
Statement::CreateView { query, .. } => assert_changelog_ctes(&cte_sqls, &query),
_ => panic!("expected CREATE VIEW"),
}
// CTE in a CTE...
let sql = &format!("WITH outer_cte AS ({}) SELECT * FROM outer_cte", with);
let select = verified_query(sql);
if let CteInner::Query(query) = &only(&select.with.unwrap().cte_tables).cte_inner {
assert_changelog_ctes(&cte_sqls, query);
} else {
panic!("expected CteInner::Query")
}
}

#[test]
fn parse_cte_renamed_columns() {
let sql = "WITH cte (col1, col2) AS (SELECT foo, bar FROM baz) SELECT * FROM cte";
Expand Down
6 changes: 6 additions & 0 deletions src/sqlparser/tests/testdata/select.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -231,3 +231,9 @@
- input: select date t; -- A column "date" aliased to "t"
formatted_sql: SELECT date AS t
formatted_ast: 'Query(Query { with: None, body: Select(Select { distinct: All, projection: [ExprWithAlias { expr: Identifier(Ident { value: "date", quote_style: None }), alias: Ident { value: "t", quote_style: None } }], from: [], lateral_views: [], selection: None, group_by: [], having: None }), order_by: [], limit: None, offset: None, fetch: None })'
- input: |-
WITH a_log AS changelog from a,
"B_log" AS changelog from public."B"
SELECT a0 + b0 FROM a_log, "B_log"
formatted_sql: WITH a_log AS changelog from a, "B_log" AS changelog from public."B" SELECT a0 + b0 FROM a_log, "B_log"
formatted_ast: 'Query(Query { with: Some(With { recursive: false, cte_tables: [Cte { alias: TableAlias { name: Ident { value: "a_log", quote_style: None }, columns: [] }, cte_inner: ChangeLog(ObjectName([Ident { value: "a", quote_style: None }])) }, Cte { alias: TableAlias { name: Ident { value: "B_log", quote_style: Some(''"'') }, columns: [] }, cte_inner: ChangeLog(ObjectName([Ident { value: "public", quote_style: None }, Ident { value: "B", quote_style: Some(''"'') }])) }] }), body: Select(Select { distinct: All, projection: [UnnamedExpr(BinaryOp { left: Identifier(Ident { value: "a0", quote_style: None }), op: Plus, right: Identifier(Ident { value: "b0", quote_style: None }) })], from: [TableWithJoins { relation: Table { name: ObjectName([Ident { value: "a_log", quote_style: None }]), alias: None, as_of: None }, joins: [] }, TableWithJoins { relation: Table { name: ObjectName([Ident { value: "B_log", quote_style: Some(''"'') }]), alias: None, as_of: None }, joins: [] }], lateral_views: [], selection: None, group_by: [], having: None }), order_by: [], limit: None, offset: None, fetch: None })'
Loading