Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add kafka backfill frontend #15602

Merged
merged 34 commits into from
Apr 2, 2024
Merged
Show file tree
Hide file tree
Changes from 25 commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
9c44201
fix: deny create MV on shared CDC source
xxchan Mar 12, 2024
55fde18
feat: add kafka backfill frontend
xxchan Jan 2, 2024
2285c1f
add config for easy of testing
xxchan Feb 28, 2024
6aa8502
cleanup silly issues
xxchan Mar 13, 2024
4c7ae69
fix shared cdc source
xxchan Mar 13, 2024
2daa0a8
fix test
xxchan Mar 13, 2024
8d996f4
rename has_streaming_job -> is_shared
xxchan Mar 15, 2024
82afb30
refine add_partition_offset_cols
xxchan Mar 15, 2024
5abbb11
rename the plan SourceBackfill -> SourceScan
xxchan Mar 15, 2024
fef7611
rename upstream_mview_actors->upstream_root_actors
xxchan Mar 15, 2024
8805238
refine source_manager
xxchan Mar 18, 2024
d028469
refine rewrite_inner, unif CdcFilter and SourceBackfill
xxchan Mar 18, 2024
cc8cb41
Merge branch 'main' into xxchan/backfill-fe
xxchan Mar 18, 2024
4f850de
Merge branch 'main' into xxchan/backfill-fe
xxchan Mar 19, 2024
31b34f0
rename reuseable source -> shared source
xxchan Mar 19, 2024
87c7cad
Merge branch 'main' into xxchan/backfill-fe
xxchan Mar 25, 2024
27db3f6
fix compile
xxchan Mar 25, 2024
9f8f533
fix cdc table stuck. due to track id
xxchan Mar 25, 2024
78a783f
fix
xxchan Mar 25, 2024
06027ca
fix pg_settings order
xxchan Mar 25, 2024
fa521f0
really fix tracking_progress_actor_ids
xxchan Mar 25, 2024
58034cc
- no need for a separated LogicalSourceScan node.
xxchan Mar 26, 2024
18ea279
add (very simple) e2e tests
xxchan Mar 26, 2024
658cb52
fix executor bug: should ignore backfilled rows when the partition is…
xxchan Mar 26, 2024
231b93c
support scaling source backfill
xxchan Mar 26, 2024
0339b38
Revert "support scaling source backfill" (For a new PR)
xxchan Mar 28, 2024
17e3779
refine
xxchan Mar 28, 2024
2e5c1c0
fix
xxchan Mar 28, 2024
144d7ae
Merge remote-tracking branch 'origin/main' into xxchan/backfill-fe
xxchan Mar 31, 2024
e174c12
don't deprecate cdc_source_job for compatibility
xxchan Apr 1, 2024
98cfb72
minor fix
xxchan Apr 1, 2024
f169dec
proto: rename cdc_source_job without renaming
xxchan Apr 2, 2024
aafba45
rename backfillable -> shareable
xxchan Apr 2, 2024
a3accdb
Merge remote-tracking branch 'origin/main' into xxchan/backfill-fe
xxchan Apr 2, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
114 changes: 57 additions & 57 deletions e2e_test/batch/catalog/pg_settings.slt.part
Original file line number Diff line number Diff line change
@@ -1,63 +1,63 @@
query TT
SELECT context, name FROM pg_catalog.pg_settings ORDER BY (context, name);
----
internal block_size_kb
internal bloom_false_positive
internal data_directory
internal parallel_compact_size_mb
internal sstable_size_mb
internal state_store
postmaster backup_storage_directory
postmaster backup_storage_url
postmaster barrier_interval_ms
postmaster checkpoint_frequency
postmaster enable_tracing
postmaster max_concurrent_creating_streaming_jobs
postmaster pause_on_next_bootstrap
user application_name
user background_ddl
user batch_enable_distributed_dml
user batch_parallelism
user bytea_output
user client_encoding
user client_min_messages
user create_compaction_group_for_mv
user datestyle
user extra_float_digits
user idle_in_transaction_session_timeout
user intervalstyle
user lock_timeout
user max_split_range_gap
user query_epoch
user query_mode
user row_security
user rw_batch_enable_lookup_join
user rw_batch_enable_sort_agg
user rw_enable_join_ordering
user rw_enable_reusable_source
user rw_enable_share_plan
user rw_enable_two_phase_agg
user rw_force_split_distinct_agg
user rw_force_two_phase_agg
user rw_implicit_flush
user rw_streaming_allow_jsonb_in_stream_key
user rw_streaming_enable_bushy_join
user rw_streaming_enable_delta_join
user rw_streaming_over_window_cache_policy
user search_path
user server_encoding
user server_version
user server_version_num
user sink_decouple
user standard_conforming_strings
user statement_timeout
user streaming_enable_arrangement_backfill
user streaming_parallelism
user streaming_rate_limit
user synchronize_seqscans
user timezone
user transaction_isolation
user visibility_mode
internal block_size_kb
internal bloom_false_positive
internal data_directory
internal parallel_compact_size_mb
internal sstable_size_mb
internal state_store
postmaster backup_storage_directory
postmaster backup_storage_url
postmaster barrier_interval_ms
postmaster checkpoint_frequency
postmaster enable_tracing
postmaster max_concurrent_creating_streaming_jobs
postmaster pause_on_next_bootstrap
user application_name
user background_ddl
user batch_enable_distributed_dml
user batch_parallelism
user bytea_output
user client_encoding
user client_min_messages
user create_compaction_group_for_mv
user datestyle
user extra_float_digits
user idle_in_transaction_session_timeout
user intervalstyle
user lock_timeout
user max_split_range_gap
user query_epoch
user query_mode
user row_security
user rw_batch_enable_lookup_join
user rw_batch_enable_sort_agg
user rw_enable_join_ordering
user rw_enable_share_plan
user rw_enable_shared_source
user rw_enable_two_phase_agg
user rw_force_split_distinct_agg
user rw_force_two_phase_agg
user rw_implicit_flush
user rw_streaming_allow_jsonb_in_stream_key
user rw_streaming_enable_bushy_join
user rw_streaming_enable_delta_join
user rw_streaming_over_window_cache_policy
user search_path
user server_encoding
user server_version
user server_version_num
user sink_decouple
user standard_conforming_strings
user statement_timeout
user streaming_parallelism
user streaming_rate_limit
user streaming_use_arrangement_backfill
user synchronize_seqscans
user timezone
user transaction_isolation
user visibility_mode

query TT
SELECT * FROM pg_catalog.pg_settings where name='dummy';
Expand Down
7 changes: 7 additions & 0 deletions e2e_test/source/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
Test in this directory needs some prior setup.

See also `ci/scripts/e2e-source-test.sh`, and `scripts/source`

kwannoel marked this conversation as resolved.
Show resolved Hide resolved
## Kafka

`scripts/source/test_data` contains the data. Filename's convention is `<topic_name>.<n_partitions>`.
58 changes: 58 additions & 0 deletions e2e_test/source/basic/kafka_shared_source.slt
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
control substitution on

statement ok
SET rw_enable_shared_source TO true;

statement ok
create source s0 (v1 int, v2 varchar) with (
connector = 'kafka',
topic = 'kafka_4_partition_topic',
properties.bootstrap.server = '${KAFKA_BOOTSTRAP_SERVER:message_queue:29092}',
scan.startup.mode = 'earliest'
) FORMAT PLAIN ENCODE JSON;

statement ok
create materialized view mv_1 as select * from s0;

statement ok
SET rw_enable_shared_source TO false;

statement ok
create materialized view mv_2 as select * from s0;

statement ok
flush;

# Wait enough time to ensure SourceExecutor consumes all Kafka data.
sleep 1s

query IT rowsort
select v1, v2 from s0;
----
1 1
2 22
3 333
4 4444

query IT rowsort
select v1, v2 from mv_1;
----
1 1
2 22
3 333
4 4444

query IT rowsort
select v1, v2 from mv_2;
----
1 1
2 22
3 333
4 4444

# TODO: add more data to the topic and re-check the data. Currently there's no good test infra to do this...
kwannoel marked this conversation as resolved.
Show resolved Hide resolved
# To test the correctness of source backfill, we might need to keep producing data during an interval, to let it go
# through the backfill stage to the forward stage.

statement ok
drop source s0 cascade;
2 changes: 1 addition & 1 deletion e2e_test/source/cdc/cdc.share_stream.slt
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ create source mysql_mytest with (
server.id = '5601'
);

statement error Should not create MATERIALIZED VIEW directly on shared CDC source.
statement error Should not create MATERIALIZED VIEW or SELECT directly on shared CDC source
create materialized view mv as select * from mysql_mytest;

statement error The upstream table name must contain database name prefix*
Expand Down
16 changes: 9 additions & 7 deletions proto/catalog.proto
Original file line number Diff line number Diff line change
Expand Up @@ -62,16 +62,18 @@ message StreamSourceInfo {
SchemaRegistryNameStrategy name_strategy = 10;
optional string key_message_name = 11;
plan_common.ExternalTableDesc external_table = 12;
// Whether the stream source has a streaming job.
// Whether the stream source is a cdc source streaming job.
// We need this field to differentiate the cdc source job, because CDC source is a singleton fragment, while Kafka source is not.
// If we have other singleton source jobs in the future, we may consider deprecating this field and add a new field `is_distributed`.
bool cdc_source_job = 13;
// Whether the stream source is a shared source (it has a streaming job).
// This is related with [RFC: Reusable Source Executor](https://github.com/risingwavelabs/rfcs/pull/72).
// Currently, the following sources have streaming jobs:
// - Direct CDC sources (mysql & postgresql)
// - Direct CDC sources (mysql & postgresql). Backwards compat note: For old CDC job it's `None`; For new CDC job it's `Some(true)`.
// - MQ sources (Kafka, Pulsar, Kinesis, etc.)
bool has_streaming_job = 13;
// Only used when `has_streaming_job` is `true`.
// If `false`, `requires_singleton` will be set in the stream plan.
bool is_distributed = 15;
reserved "cdc_source_job"; // deprecated
//
// **Should also test `cdc_source_job` for backwards compatibility. Use `is_shared_compatible()` instead of this field directly.**
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks error-prone to me. I would rather just reuse cdc_source_job even without renaming.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

😄 Sure that's acceptable to me

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Had better reuse and rename 😄

optional bool is_shared = 15;
// Options specified by user in the FORMAT ENCODE clause.
map<string, string> format_encode_options = 14;
}
Expand Down
5 changes: 3 additions & 2 deletions proto/stream_plan.proto
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@
}

message SourceBackfillNode {
uint32 source_id = 1;
uint32 upstream_source_id = 1;

Check failure on line 209 in proto/stream_plan.proto

View workflow job for this annotation

GitHub Actions / Check breaking changes in Protobuf files

Field "1" with name "upstream_source_id" on message "SourceBackfillNode" changed option "json_name" from "sourceId" to "upstreamSourceId".

Check failure on line 209 in proto/stream_plan.proto

View workflow job for this annotation

GitHub Actions / Check breaking changes in Protobuf files

Field "1" on message "SourceBackfillNode" changed name from "source_id" to "upstream_source_id".
optional uint32 row_id_index = 2;
repeated plan_common.ColumnCatalog columns = 3;
catalog.StreamSourceInfo info = 4;
Expand Down Expand Up @@ -876,13 +876,14 @@
FRAGMENT_TYPE_FLAG_MVIEW = 2;
FRAGMENT_TYPE_FLAG_SINK = 4;
FRAGMENT_TYPE_FLAG_NOW = 8; // TODO: Remove this and insert a `BarrierRecv` instead.
// Include StreamScan and StreamCdcScan
FRAGMENT_TYPE_FLAG_STREAM_SCAN = 16;
FRAGMENT_TYPE_FLAG_BARRIER_RECV = 32;
FRAGMENT_TYPE_FLAG_VALUES = 64;
FRAGMENT_TYPE_FLAG_DML = 128;
FRAGMENT_TYPE_FLAG_CDC_FILTER = 256;
FRAGMENT_TYPE_FLAG_SUBSCRIPTION = 512;
FRAGMENT_TYPE_FLAG_SOURCE_BACKFILL = 1024;
FRAGMENT_TYPE_FLAG_SOURCE_SCAN = 1024;

Check failure on line 886 in proto/stream_plan.proto

View workflow job for this annotation

GitHub Actions / Check breaking changes in Protobuf files

Enum value "1024" on enum "FragmentTypeFlag" changed name from "FRAGMENT_TYPE_FLAG_SOURCE_BACKFILL" to "FRAGMENT_TYPE_FLAG_SOURCE_SCAN".
}

// The streaming context associated with a stream plan
Expand Down
5 changes: 5 additions & 0 deletions scripts/source/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
This folder contains scripts to prepare data for testing sources.

## Kafka

`scripts/source/test_data` contains the data. Filename's convention is `<topic_name>.<n_partitions>`.
1 change: 0 additions & 1 deletion src/common/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@
#![feature(negative_impls)]
#![feature(bound_map)]
#![feature(array_methods)]
#![feature(btree_cursors)]
#![feature(register_tool)]
kwannoel marked this conversation as resolved.
Show resolved Hide resolved
#![register_tool(rw)]

Expand Down
7 changes: 5 additions & 2 deletions src/common/src/session_config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -243,9 +243,12 @@ pub struct ConfigMap {
#[parameter(default = false)]
background_ddl: bool,

/// Enable reusable source. Currently only for Kafka.
/// Enable shared source. Currently only for Kafka.
///
/// When enabled, `CREATE SOURCE` will create a source streaming job, and `CREATE MATERIALIZED VIEWS` from the source
/// will forward the data from the same source streaming job, and also backfill prior data from the external source.
#[parameter(default = false)]
rw_enable_reusable_source: bool,
rw_enable_shared_source: bool,
xxchan marked this conversation as resolved.
Show resolved Hide resolved

/// Shows the server-side character set encoding. At present, this parameter can be shown but not set, because the encoding is determined at database creation time.
#[parameter(default = SERVER_ENCODING)]
Expand Down
1 change: 1 addition & 0 deletions src/connector/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ arrow-array = { workspace = true }
arrow-row = { workspace = true }
arrow-schema = { workspace = true }
arrow-select = { workspace = true }
assert_matches = "1"
async-nats = "0.33"
async-trait = "0.1"
auto_enums = { version = "0.8", features = ["futures03"] }
Expand Down
19 changes: 18 additions & 1 deletion src/connector/src/parser/additional_columns.rs
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,11 @@ pub fn build_additional_column_catalog(
Ok(catalog)
}

/// Utility function for adding partition and offset columns to the columns, if not specified by the user.
///
/// ## Returns
/// - `columns_exist`: whether 1. `partition`/`file` and 2. `offset` columns are included in `columns`.
/// - `additional_columns`: The `ColumnCatalog` for `partition`/`file` and `offset` columns.
pub fn add_partition_offset_cols(
xxchan marked this conversation as resolved.
Show resolved Hide resolved
columns: &[ColumnCatalog],
connector_name: &str,
Expand Down Expand Up @@ -224,10 +229,22 @@ pub fn add_partition_offset_cols(
.collect()
};
assert_eq!(additional_columns.len(), 2);
xxchan marked this conversation as resolved.
Show resolved Hide resolved
xxchan marked this conversation as resolved.
Show resolved Hide resolved
use risingwave_pb::plan_common::additional_column::ColumnType;
assert_matches::assert_matches!(
additional_columns[0].column_desc.additional_column,
AdditionalColumn {
column_type: Some(ColumnType::Partition(_) | ColumnType::Filename(_)),
}
);
assert_matches::assert_matches!(
additional_columns[1].column_desc.additional_column,
AdditionalColumn {
column_type: Some(ColumnType::Offset(_)),
}
);

// Check if partition/file/offset columns are included explicitly.
for col in columns {
use risingwave_pb::plan_common::additional_column::ColumnType;
match col.column_desc.additional_column {
AdditionalColumn {
column_type: Some(ColumnType::Partition(_) | ColumnType::Filename(_)),
Expand Down
2 changes: 1 addition & 1 deletion src/connector/src/source/cdc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ where
};
self.table_schema = table_schema;
if let Some(info) = source.info.as_ref() {
self.is_cdc_source_job = info.has_streaming_job;
self.is_cdc_source_job = info.is_shared_compatible();
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/connector/src/with_options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ pub trait WithPropertiesExt: Get + Sized {
connector.contains("-cdc")
}

fn is_shared_cdc_source(&self) -> bool {
fn is_backfillable_cdc_connector(&self) -> bool {
self.is_cdc_connector() && CdcTableType::from_properties(self).can_backfill()
}

Expand Down
Loading
Loading