diff --git a/e2e_test/source/cdc_inline/auto_schema_map_mysql.slt b/e2e_test/source/cdc_inline/auto_schema_map_mysql.slt index b1a4cd0836349..08afa5d1988a7 100644 --- a/e2e_test/source/cdc_inline/auto_schema_map_mysql.slt +++ b/e2e_test/source/cdc_inline/auto_schema_map_mysql.slt @@ -38,6 +38,22 @@ mysql --protocol=tcp -u root mytest -e " ); INSERT INTO mysql_types_test VALUES ( False, 0, null, null, -8388608, -2147483647, 9223372036854775806, -10.0, -9999.999999, -10000.0, 'c', 'd', '', '', '1001-01-01', '-838:59:59.000000', '2000-01-01 00:00:00.000000', null, 'happy', '[1,2]'); INSERT INTO mysql_types_test VALUES ( True, 1, -128, -32767, -8388608, -2147483647, -9223372036854775807, -10.0, -9999.999999, -10000.0, 'a', 'b', '', '', '1001-01-01', '00:00:00', '1998-01-01 00:00:00.000000', '1970-01-01 00:00:01', 'sad', '[3,4]'); + CREATE TABLE IF NOT EXISTS test_default( + id int, + name varchar(255) DEFAULT 'default_name', + age int DEFAULT 18, + v1 real DEFAULT 1.1, + v2 double precision DEFAULT 2.2, + v3 decimal(5,2) DEFAULT 3.3, + v4 boolean DEFAULT false, + v5 date DEFAULT '2020-01-01', + v6 time DEFAULT '12:34:56', + v7 timestamp DEFAULT '2020-01-01 12:34:56', + v8 datetime DEFAULT '2020-01-01 12:34:56', + v9 VARCHAR(255) DEFAULT (UUID()), + PRIMARY KEY (id) + ); + INSERT INTO test_default(id) VALUES (1),(2); " statement ok @@ -70,6 +86,30 @@ HINT: Please define the schema manually statement ok ALTER SYSTEM SET license_key TO DEFAULT; +statement ok +create table test_default (*) from mysql_source table 'mytest.test_default'; + +sleep 3s + +query TTTTTTTTTTTTT +SELECT count(*) FROM test_default; +---- +2 + +statement ok +insert into test_default(id) values (4),(5); + +statement ok +FLUSH; + + +# uuid() default expression is not supported +query TTTTTTTTTTTTTT +SELECT * FROM test_default where id>=4 order by id; +---- +4 default_name 18 1.1 2.2 3.3 0 2020-01-01 12:34:56 2020-01-01 12:34:56+00:00 2020-01-01 12:34:56 NULL +5 default_name 18 1.1 2.2 3.3 0 2020-01-01 12:34:56 2020-01-01 12:34:56+00:00 2020-01-01 12:34:56 NULL + statement ok create table rw_customers (*) from mysql_source table 'mytest.customers'; diff --git a/e2e_test/source/cdc_inline/auto_schema_map_pg.slt b/e2e_test/source/cdc_inline/auto_schema_map_pg.slt index 8183a617293b0..be220347225bb 100644 --- a/e2e_test/source/cdc_inline/auto_schema_map_pg.slt +++ b/e2e_test/source/cdc_inline/auto_schema_map_pg.slt @@ -45,6 +45,23 @@ psql -c " INSERT INTO postgres_types_test VALUES ( False, -32767, -2147483647, -9223372036854775807, -10.0, -9999.999999, -10000.0, 'd', '00'::bytea, '0001-01-01', '00:00:00', '2001-01-01 00:00:00'::timestamp, '2001-01-01 00:00:00-8'::timestamptz, interval '0 second', '{}', 'bb488f9b-330d-4012-b849-12adeb49e57e', 'happy', array[False::boolean]::boolean[], array[-32767::smallint]::smallint[], array[-2147483647::integer]::integer[], array[-9223372036854775807::bigint]::bigint[], array[-10.0::decimal]::decimal[], array[-9999.999999::real]::real[], array[-10000.0::double precision]::double precision[], array[''::varchar]::varchar[], array['00'::bytea]::bytea[], array['0001-01-01'::date]::date[], array['00:00:00'::time]::time[], array['2001-01-01 00:00:00'::timestamp::timestamp]::timestamp[], array['2001-01-01 00:00:00-8'::timestamptz::timestamptz]::timestamptz[], array[interval '0 second'::interval]::interval[], array['{}'::jsonb]::jsonb[], '{bb488f9b-330d-4012-b849-12adeb49e57e}', '{happy,ok,sad}'); INSERT INTO postgres_types_test VALUES ( False, 1, 123, 1234567890, 123.45, 123.45, 123.456, 'a_varchar', 'DEADBEEF'::bytea, '0024-01-01', '12:34:56', '2024-05-19 12:34:56', '2024-05-19 12:34:56+00', INTERVAL '1 day', to_jsonb('hello'::text), '123e4567-e89b-12d3-a456-426614174000', 'happy', ARRAY[NULL, TRUE]::boolean[], ARRAY[NULL, 1::smallint], ARRAY[NULL, 123], ARRAY[NULL, 1234567890], ARRAY[NULL, 123.45::numeric], ARRAY[NULL, 123.45::real], ARRAY[NULL, 123.456], ARRAY[NULL, 'a_varchar'], ARRAY[NULL, 'DEADBEEF'::bytea], ARRAY[NULL, '2024-05-19'::date], ARRAY[NULL, '12:34:56'::time], ARRAY[NULL, '2024-05-19 12:34:56'::timestamp], ARRAY[NULL, '2024-05-19 12:34:56+00'::timestamptz], ARRAY[NULL, INTERVAL '1 day'], ARRAY[NULL, to_jsonb('hello'::text)], ARRAY[NULL, '123e4567-e89b-12d3-a456-426614174000'::uuid], ARRAY[NULL, 'happy'::mood]); INSERT INTO postgres_types_test VALUES ( False, NULL, NULL, 1, NULL, NULL, NULL, NULL, NULL, '0024-05-19', NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL); + CREATE TABLE IF NOT EXISTS test_default( + id int, + name varchar(255) DEFAULT 'default_name', + age int DEFAULT 18, + v1 real DEFAULT 1.1, + v2 double precision DEFAULT 2.2, + v3 numeric DEFAULT 3.3, + v4 boolean DEFAULT false, + v5 date DEFAULT '2020-01-01', + v6 time DEFAULT '12:34:56', + v7 timestamp DEFAULT '2020-01-01 12:34:56', + v8 timestamptz DEFAULT '2020-01-01 12:34:56+00', + v9 interval DEFAULT '1 day', + v10 jsonb DEFAULT '{}', + PRIMARY KEY (id) + ); + INSERT INTO test_default(id,name,age) VALUES (1, 'name1', 20), (2, 'name2', 21), (3, 'name3', 22); " statement ok @@ -58,6 +75,26 @@ create source pg_source with ( slot.name = 'pg_slot' ); +statement ok +create table test_default (*) from pg_source table 'public.test_default'; + +sleep 3s + +statement ok +insert into test_default(id) values (4),(5); + +statement ok +FLUSH; + +query TTTTTTTTTTTTT +SELECT * from test_default order by id; +---- +1 name1 20 1.1 2.2 3.3 f 2020-01-01 12:34:56 2020-01-01 12:34:56 2020-01-01 12:34:56+00:00 1 day {} +2 name2 21 1.1 2.2 3.3 f 2020-01-01 12:34:56 2020-01-01 12:34:56 2020-01-01 12:34:56+00:00 1 day {} +3 name3 22 1.1 2.2 3.3 f 2020-01-01 12:34:56 2020-01-01 12:34:56 2020-01-01 12:34:56+00:00 1 day {} +4 default_name 18 1.1 2.2 3.3 f 2020-01-01 12:34:56 2020-01-01 12:34:56 2020-01-01 12:34:56+00:00 1 day {} +5 default_name 18 1.1 2.2 3.3 f 2020-01-01 12:34:56 2020-01-01 12:34:56 2020-01-01 12:34:56+00:00 1 day {} + statement ok create table rw_postgres_types_test (*) from pg_source table 'public.postgres_types_test'; diff --git a/src/common/src/catalog/column.rs b/src/common/src/catalog/column.rs index f1d1123fbfd4e..119fbd8a7b04f 100644 --- a/src/common/src/catalog/column.rs +++ b/src/common/src/catalog/column.rs @@ -15,6 +15,8 @@ use std::borrow::Cow; use itertools::Itertools; +use risingwave_common::types::Datum; +use risingwave_pb::expr::expr_node::{RexNode, Type as ExprType}; use risingwave_pb::expr::ExprNode; use risingwave_pb::plan_common::column_desc::GeneratedOrDefaultColumn; use risingwave_pb::plan_common::{ @@ -24,6 +26,7 @@ use risingwave_pb::plan_common::{ use super::{row_id_column_desc, USER_COLUMN_ID_OFFSET}; use crate::catalog::{cdc_table_name_column_desc, offset_column_desc, Field, ROW_ID_COLUMN_ID}; use crate::types::DataType; +use crate::util::value_encoding::DatumToProtoExt; /// Column ID is the unique identifier of a column in a table. Different from table ID, column ID is /// not globally unique. @@ -144,10 +147,19 @@ impl ColumnDesc { name: impl Into, column_id: ColumnId, data_type: DataType, - default_val: DefaultColumnDesc, + snapshot_value: Datum, ) -> ColumnDesc { + let default_col = DefaultColumnDesc { + expr: Some(ExprNode { + // equivalent to `Literal::to_expr_proto` + function_type: ExprType::Unspecified as i32, + return_type: Some(data_type.to_protobuf()), + rex_node: Some(RexNode::Constant(snapshot_value.to_protobuf())), + }), + snapshot_value: Some(snapshot_value.to_protobuf()), + }; ColumnDesc { - generated_or_default_column: Some(GeneratedOrDefaultColumn::DefaultColumn(default_val)), + generated_or_default_column: Some(GeneratedOrDefaultColumn::DefaultColumn(default_col)), ..Self::named(name, column_id, data_type) } } diff --git a/src/connector/src/parser/unified/debezium.rs b/src/connector/src/parser/unified/debezium.rs index a2c5742b87d44..fe7c8dd226b50 100644 --- a/src/connector/src/parser/unified/debezium.rs +++ b/src/connector/src/parser/unified/debezium.rs @@ -18,12 +18,8 @@ use risingwave_common::types::{ DataType, Datum, DatumCow, Scalar, ScalarImpl, ScalarRefImpl, Timestamptz, ToDatumRef, ToOwnedDatum, }; -use risingwave_common::util::value_encoding::DatumToProtoExt; use risingwave_connector_codec::decoder::AccessExt; -use risingwave_pb::expr::expr_node::{RexNode, Type as ExprType}; -use risingwave_pb::expr::ExprNode; use risingwave_pb::plan_common::additional_column::ColumnType; -use risingwave_pb::plan_common::DefaultColumnDesc; use thiserror_ext::AsReport; use super::{Access, AccessError, AccessResult, ChangeEvent, ChangeEventOperation}; @@ -31,7 +27,9 @@ use crate::parser::debezium::schema_change::{SchemaChangeEnvelope, TableSchemaCh use crate::parser::schema_change::TableChangeType; use crate::parser::TransactionControl; use crate::source::cdc::build_cdc_table_id; -use crate::source::cdc::external::mysql::{mysql_type_to_rw_type, type_name_to_mysql_type}; +use crate::source::cdc::external::mysql::{ + mysql_type_to_rw_type, timestamp_val_to_timestamptz, type_name_to_mysql_type, +}; use crate::source::{ConnectorProperties, SourceColumnDesc}; // Example of Debezium JSON value: @@ -228,7 +226,20 @@ pub fn parse_schema_change( // handle default value expression, currently we only support constant expression let column_desc = match col.access_object_field("defaultValueExpression") { Some(default_val_expr_str) if !default_val_expr_str.is_jsonb_null() => { - let value_text = default_val_expr_str.as_string().unwrap(); + let mut value_text = default_val_expr_str.as_string().unwrap(); + // mysql timestamp is mapped to timestamptz, we use UTC timezone to + // interpret its value + if data_type == DataType::Timestamptz + && matches!(*connector_props, ConnectorProperties::MysqlCdc(_)) + { + value_text = timestamp_val_to_timestamptz(value_text.as_str()).map_err(|err| { + tracing::error!(target: "auto_schema_change", error=%err.as_report(), "failed to convert timestamp value to timestamptz"); + AccessError::TypeError { + expected: "timestamp in YYYY-MM-DD HH:MM:SS".into(), + got: data_type.to_string(), + value: value_text, + }})?; + } let snapshot_value: Datum = Some( ScalarImpl::from_text(value_text.as_str(), &data_type).map_err( |err| { @@ -240,20 +251,11 @@ pub fn parse_schema_change( }}, )?, ); - // equivalent to `Literal::to_expr_proto` - let default_val_expr_node = ExprNode { - function_type: ExprType::Unspecified as i32, - return_type: Some(data_type.to_protobuf()), - rex_node: Some(RexNode::Constant(snapshot_value.to_protobuf())), - }; ColumnDesc::named_with_default_value( name, ColumnId::placeholder(), data_type, - DefaultColumnDesc { - expr: Some(default_val_expr_node), - snapshot_value: Some(snapshot_value.to_protobuf()), - }, + snapshot_value, ) } _ => ColumnDesc::named(name, ColumnId::placeholder(), data_type), diff --git a/src/connector/src/source/cdc/external/mysql.rs b/src/connector/src/source/cdc/external/mysql.rs index 59971f8761068..c45f6143431df 100644 --- a/src/connector/src/source/cdc/external/mysql.rs +++ b/src/connector/src/source/cdc/external/mysql.rs @@ -15,6 +15,7 @@ use std::collections::HashMap; use anyhow::{anyhow, Context}; +use chrono::{DateTime, NaiveDateTime}; use futures::stream::BoxStream; use futures::{pin_mut, StreamExt}; use futures_async_stream::try_stream; @@ -25,15 +26,16 @@ use mysql_common::value::Value; use risingwave_common::bail; use risingwave_common::catalog::{ColumnDesc, ColumnId, Schema, OFFSET_COLUMN_NAME}; use risingwave_common::row::OwnedRow; -use risingwave_common::types::DataType; +use risingwave_common::types::{DataType, Decimal, ScalarImpl, F32}; use risingwave_common::util::iter_util::ZipEqFast; -use sea_schema::mysql::def::{ColumnKey, ColumnType}; +use sea_schema::mysql::def::{ColumnDefault, ColumnKey, ColumnType}; use sea_schema::mysql::discovery::SchemaDiscovery; use sea_schema::mysql::query::SchemaQueryBuilder; use sea_schema::sea_query::{Alias, IntoIden}; use serde_derive::{Deserialize, Serialize}; use sqlx::mysql::MySqlConnectOptions; use sqlx::MySqlPool; +use thiserror_ext::AsReport; use crate::error::{ConnectorError, ConnectorResult}; use crate::source::cdc::external::{ @@ -112,11 +114,56 @@ impl MySqlExternalTable { let data_type = mysql_type_to_rw_type(&col.col_type)?; // column name in mysql is case-insensitive, convert to lowercase let col_name = col.name.to_lowercase(); - column_descs.push(ColumnDesc::named( - col_name.clone(), - ColumnId::placeholder(), - data_type, - )); + let column_desc = if let Some(default) = col.default { + let snapshot_value = match default { + ColumnDefault::Null => None, + ColumnDefault::Int(val) => match data_type { + DataType::Int16 => Some(ScalarImpl::Int16(val as _)), + DataType::Int32 => Some(ScalarImpl::Int32(val as _)), + DataType::Int64 => Some(ScalarImpl::Int64(val)), + _ => Err(anyhow!("unexpected default value type for integer column"))?, + }, + ColumnDefault::Real(val) => match data_type { + DataType::Float32 => Some(ScalarImpl::Float32(F32::from(val as f32))), + DataType::Float64 => Some(ScalarImpl::Float64(val.into())), + DataType::Decimal => Some(ScalarImpl::Decimal( + Decimal::try_from(val).map_err(|err| { + anyhow!("failed to convert default value to decimal").context(err) + })?, + )), + _ => Err(anyhow!("unexpected default value type for float column"))?, + }, + ColumnDefault::String(mut val) => { + // mysql timestamp is mapped to timestamptz, we use UTC timezone to + // interpret its value + if data_type == DataType::Timestamptz { + val = timestamp_val_to_timestamptz(val.as_str())?; + } + match ScalarImpl::from_text(val.as_str(), &data_type) { + Ok(scalar) => Some(scalar), + Err(err) => { + tracing::warn!(error=%err.as_report(), "failed to parse mysql default value expression, only constant is supported"); + None + } + } + } + ColumnDefault::CurrentTimestamp | ColumnDefault::CustomExpr(_) => { + tracing::warn!("MySQL CURRENT_TIMESTAMP and custom expression default value not supported"); + None + } + }; + + ColumnDesc::named_with_default_value( + col_name.clone(), + ColumnId::placeholder(), + data_type.clone(), + snapshot_value, + ) + } else { + ColumnDesc::named(col_name.clone(), ColumnId::placeholder(), data_type) + }; + + column_descs.push(column_desc); if matches!(col.key, ColumnKey::Primary) { pk_names.push(col_name); } @@ -141,6 +188,17 @@ impl MySqlExternalTable { } } +pub fn timestamp_val_to_timestamptz(value_text: &str) -> ConnectorResult { + let format = "%Y-%m-%d %H:%M:%S"; + let naive_datetime = NaiveDateTime::parse_from_str(value_text, format) + .map_err(|err| anyhow!("failed to parse mysql timestamp value").context(err))?; + let postgres_timestamptz: DateTime = + DateTime::::from_naive_utc_and_offset(naive_datetime, chrono::Utc); + Ok(postgres_timestamptz + .format("%Y-%m-%d %H:%M:%S%:z") + .to_string()) +} + pub fn type_name_to_mysql_type(ty_name: &str) -> Option { macro_rules! column_type { ($($name:literal => $variant:ident),* $(,)?) => { @@ -149,6 +207,11 @@ pub fn type_name_to_mysql_type(ty_name: &str) -> Option { $name => Some(ColumnType::$variant(Default::default())), )* "json" => Some(ColumnType::Json), + "date" => Some(ColumnType::Date), + "bool" => Some(ColumnType::Bool), + "tinyblob" => Some(ColumnType::TinyBlob), + "mediumblob" => Some(ColumnType::MediumBlob), + "longblob" => Some(ColumnType::LongBlob), _ => None, } }; diff --git a/src/connector/src/source/cdc/external/postgres.rs b/src/connector/src/source/cdc/external/postgres.rs index 9123c7451b74e..5c2e75d066b19 100644 --- a/src/connector/src/source/cdc/external/postgres.rs +++ b/src/connector/src/source/cdc/external/postgres.rs @@ -24,7 +24,7 @@ use openssl::ssl::{SslConnector, SslMethod, SslVerifyMode}; use postgres_openssl::MakeTlsConnector; use risingwave_common::catalog::{ColumnDesc, ColumnId, Schema}; use risingwave_common::row::{OwnedRow, Row}; -use risingwave_common::types::{DataType, StructType}; +use risingwave_common::types::{DataType, ScalarImpl, StructType}; use risingwave_common::util::iter_util::ZipEqFast; use sea_schema::postgres::def::{ColumnType, TableInfo}; use sea_schema::postgres::discovery::SchemaDiscovery; @@ -123,11 +123,32 @@ impl PostgresExternalTable { let mut column_descs = vec![]; for col in &table_schema.columns { let data_type = type_to_rw_type(&col.col_type)?; - column_descs.push(ColumnDesc::named( - col.name.clone(), - ColumnId::placeholder(), - data_type, - )); + let column_desc = if let Some(ref default_expr) = col.default { + // parse the value of "column_default" field in information_schema.columns, + // non number data type will be stored as "'value'::type" + let val_text = default_expr + .0 + .split("::") + .map(|s| s.trim_matches('\'')) + .next() + .expect("default value expression"); + + match ScalarImpl::from_text(val_text, &data_type) { + Ok(scalar) => ColumnDesc::named_with_default_value( + col.name.clone(), + ColumnId::placeholder(), + data_type.clone(), + Some(scalar), + ), + Err(err) => { + tracing::warn!(error=%err.as_report(), "failed to parse postgres default value expression, only constant is supported"); + ColumnDesc::named(col.name.clone(), ColumnId::placeholder(), data_type) + } + } + } else { + ColumnDesc::named(col.name.clone(), ColumnId::placeholder(), data_type) + }; + column_descs.push(column_desc); } if table_schema.primary_key_constraints.is_empty() {