Skip to content

Commit

Permalink
fix: fix alter rate limit for mv on fs source
Browse files Browse the repository at this point in the history
  • Loading branch information
xxchan committed Nov 20, 2024
1 parent 3993c63 commit 1bcbc14
Show file tree
Hide file tree
Showing 4 changed files with 30 additions and 5 deletions.
4 changes: 4 additions & 0 deletions src/frontend/src/stream_fragmenter/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -361,6 +361,10 @@ fn build_fragment(
current_fragment.requires_singleton = true;
}

NodeBody::StreamFsFetch(_) => {
current_fragment.fragment_type_mask |= FragmentTypeFlag::FsFetch as u32;
}

_ => {}
};

Expand Down
10 changes: 6 additions & 4 deletions src/meta/src/controller/streaming_job.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1332,9 +1332,11 @@ impl CatalogController {
});
}
if is_fs_source {
// scan all fragments for StreamFsFetch node if using fs connector
// in older versions, there's no fragment type flag for `FsFetch` node,
// so we just scan all fragments for StreamFsFetch node if using fs connector
visit_stream_node(stream_node, |node| {
if let PbNodeBody::StreamFsFetch(node) = node {
*fragment_type_mask |= PbFragmentTypeFlag::FsFetch as i32;
if let Some(node_inner) = &mut node.node_inner
&& node_inner.source_id == source_id as u32
{
Expand All @@ -1352,9 +1354,10 @@ impl CatalogController {
"source id should be used by at least one fragment"
);
let fragment_ids = fragments.iter().map(|(id, _, _)| *id).collect_vec();
for (id, _, stream_node) in fragments {
for (id, fragment_type_mask, stream_node) in fragments {
fragment::ActiveModel {
fragment_id: Set(id),
fragment_type_mask: Set(fragment_type_mask),
stream_node: Set(StreamNode::from(&stream_node)),
..Default::default()
}
Expand Down Expand Up @@ -1409,8 +1412,7 @@ impl CatalogController {

fragments.retain_mut(|(_, fragment_type_mask, stream_node)| {
let mut found = false;
if (*fragment_type_mask & PbFragmentTypeFlag::StreamScan as i32 != 0)
|| (*fragment_type_mask & PbFragmentTypeFlag::SourceScan as i32 != 0)
if *fragment_type_mask & PbFragmentTypeFlag::backfill_rate_limit_fragments() as i32 != 0
{
visit_stream_node(stream_node, |node| match node {
PbNodeBody::StreamCdcScan(node) => {
Expand Down
19 changes: 19 additions & 0 deletions src/prost/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -302,6 +302,25 @@ impl stream_plan::StreamNode {
}
}

impl stream_plan::FragmentTypeFlag {
/// Fragments that may be affected by `BACKFILL_RATE_LIMIT`.
pub fn backfill_rate_limit_fragments() -> i32 {
stream_plan::FragmentTypeFlag::SourceScan as i32
| stream_plan::FragmentTypeFlag::StreamScan as i32
}

/// Fragments that may be affected by `SOURCE_RATE_LIMIT`.
/// Note: for `FsFetch`, old fragments don't have this flag set, so don't use this to check.
pub fn source_rate_limit_fragments() -> i32 {
stream_plan::FragmentTypeFlag::Source as i32 | stream_plan::FragmentTypeFlag::FsFetch as i32
}

/// Note: this doesn't include `FsFetch` created in old versions.
pub fn rate_limit_fragments() -> i32 {
Self::backfill_rate_limit_fragments() | Self::source_rate_limit_fragments()
}
}

impl catalog::StreamSourceInfo {
/// Refer to [`Self::cdc_source_job`] for details.
pub fn is_shared(&self) -> bool {
Expand Down
2 changes: 1 addition & 1 deletion src/sqlparser/src/parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3462,7 +3462,7 @@ impl Parser<'_> {
} else if let Some(rate_limit) = self.parse_alter_source_rate_limit(false)? {
AlterSourceOperation::SetSourceRateLimit { rate_limit }
} else {
return self.expected("SCHEMA after SET");
return self.expected("SCHEMA or SOURCE_RATE_LIMIT after SET");
}
} else if self.peek_nth_any_of_keywords(0, &[Keyword::FORMAT]) {
let format_encode = self.parse_schema()?.unwrap();
Expand Down

0 comments on commit 1bcbc14

Please sign in to comment.