Skip to content

Commit

Permalink
fix cdc
Browse files Browse the repository at this point in the history
Signed-off-by: xxchan <[email protected]>
  • Loading branch information
xxchan committed Nov 26, 2024
1 parent d30ea7d commit 64cc6ce
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 20 deletions.
2 changes: 1 addition & 1 deletion ci/scripts/e2e-source-test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ python3 -m pip install --break-system-packages requests protobuf fastavro conflu
apt-get -y install jq

echo "--- e2e, inline test"
RUST_LOG="debug,risingwave_stream=info,risingwave_batch=info,risingwave_storage=info" \
RUST_LOG="debug,risingwave_stream=info,risingwave_batch=info,risingwave_storage=info,risingwave_meta=info" \
risedev ci-start ci-inline-source-test
risedev slt './e2e_test/source_inline/**/*.slt' -j16
risedev slt './e2e_test/source_inline/**/*.slt.serial'
Expand Down
31 changes: 12 additions & 19 deletions src/frontend/src/handler/create_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1495,7 +1495,7 @@ pub async fn bind_create_source_or_table_with_connector(
col_id_gen: &mut ColumnIdGenerator,
// `true` for "create source", `false` for "create table with connector"
is_create_source: bool,
is_shared: bool,
is_shared_non_cdc: bool,
source_rate_limit: Option<u32>,
) -> Result<(SourceCatalog, DatabaseId, SchemaId)> {
let session = &handler_args.session;
Expand Down Expand Up @@ -1559,7 +1559,8 @@ pub async fn bind_create_source_or_table_with_connector(
check_and_add_timestamp_column(&with_properties, &mut columns);

// For shared sources, we will include partition and offset cols in the SourceExecutor's *output*, to be used by the SourceBackfillExecutor.
if is_shared {
// For shared CDC source, the schema is different. See debezium_cdc_source_schema, CDC_BACKFILL_TABLE_ADDITIONAL_COLUMNS
if is_shared_non_cdc {
let (columns_exist, additional_columns) = source_add_partition_offset_cols(
&columns,
&with_properties.get_connector().unwrap(),
Expand Down Expand Up @@ -1687,14 +1688,14 @@ pub async fn handle_create_source(
let with_properties = bind_connector_props(&handler_args, &format_encode, true)?;

let create_cdc_source_job = with_properties.is_shareable_cdc_connector();
let is_shared = create_cdc_source_job
|| (with_properties.is_shareable_non_cdc_connector()
&& session
.env()
.streaming_config()
.developer
.enable_shared_source
&& session.config().streaming_use_shared_source());
let is_shared_non_cdc = with_properties.is_shareable_non_cdc_connector()
&& session
.env()
.streaming_config()
.developer
.enable_shared_source
&& session.config().streaming_use_shared_source();
let is_shared = create_cdc_source_job || is_shared_non_cdc;

let (columns_from_resolve_source, mut source_info) = if create_cdc_source_job {
bind_columns_from_source_for_cdc(&session, &format_encode)?
Expand Down Expand Up @@ -1722,7 +1723,7 @@ pub async fn handle_create_source(
stmt.include_column_options,
&mut col_id_gen,
true,
is_shared,
is_shared_non_cdc,
overwrite_options.source_rate_limit,
)
.await?;
Expand Down Expand Up @@ -1965,14 +1966,6 @@ pub mod tests {
"_rw_table_name",
Varchar,
),
(
"_rw_mysql-cdc_partition",
Varchar,
),
(
"_rw_mysql-cdc_offset",
Varchar,
),
(
"_row_id",
Serial,
Expand Down

0 comments on commit 64cc6ce

Please sign in to comment.