Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add rw_rate_limit system catalog #19466

Merged
merged 3 commits into from
Nov 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 38 additions & 6 deletions e2e_test/source_inline/fs/posix_fs.slt
Original file line number Diff line number Diff line change
Expand Up @@ -33,21 +33,36 @@ create materialized view diamonds_mv as select * from diamonds_source;
sleep 1s

# no output due to rate limit
query TTTT rowsort
statement count 0
select * from diamonds;
----

query TTTT rowsort

statement count 0
select * from diamonds_mv;


query T
select name, node_name, fragment_type, rate_limit from rw_rate_limit join rw_relations on table_id=id
order by name, node_name;
----
diamonds FS_FETCH {FS_FETCH} 0
diamonds SOURCE {SOURCE} 0
diamonds_mv FS_FETCH {MVIEW,FS_FETCH} 0
diamonds_mv SOURCE {SOURCE} 0

statement ok
ALTER TABLE diamonds SET source_rate_limit TO DEFAULT;

statement ok
ALTER source diamonds_source SET source_rate_limit TO DEFAULT;

sleep 10s
query T
select name, node_name, fragment_type, rate_limit from rw_rate_limit join rw_relations on table_id=id
order by name, node_name;
----
diamonds_mv FS_FETCH {MVIEW,FS_FETCH} 0
diamonds_mv SOURCE {SOURCE} 0


sleep 3s

query TTTT rowsort
select * from diamonds;
Expand All @@ -63,6 +78,23 @@ select * from diamonds;
1.28 Good J 63.1
1.3 Fair E 64.7


statement count 0
select * from diamonds_mv;



statement ok
ALTER SOURCE diamonds_source SET source_rate_limit TO DEFAULT;

query T
select name, node_name, fragment_type, rate_limit from rw_rate_limit join rw_relations on table_id=id
order by name, node_name;
----


sleep 3s

query TTTT rowsort
select * from diamonds_mv;
----
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,16 +80,38 @@ select * from rl_mv3;
----
0

query T
select name, node_name, fragment_type, rate_limit from rw_rate_limit join rw_relations on table_id=id
order by name;
----
rl_mv1 SOURCE {SOURCE} 0
rl_mv2 SOURCE {SOURCE} 0
rl_mv3 SOURCE {SOURCE} 0

############## Alter Source (rate_limit = 0 --> rate_limit = 1000)

skipif in-memory
query I
statement count 0
alter source kafka_source set source_rate_limit to 1000;

query T
select name, node_name, fragment_type, rate_limit from rw_rate_limit join rw_relations on table_id=id
order by name;
----
rl_mv1 SOURCE {SOURCE} 1000
rl_mv2 SOURCE {SOURCE} 1000
rl_mv3 SOURCE {SOURCE} 1000

skipif in-memory
query I
statement count 0
alter source kafka_source set source_rate_limit to default;

# rate limit becomes None
query T
select count(*) from rw_rate_limit;
----
0

skipif in-memory
sleep 3s

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,11 +84,26 @@ SELECT progress from rw_ddl_progress;
----
0 rows consumed

query T
select name, node_name, fragment_type, rate_limit from rw_rate_limit join rw_relations on table_id=id
order by name;
----
kafka_source SOURCE {SOURCE} 0
rl_mv2 SOURCE_BACKFILL {SOURCE_SCAN} 0


############## Alter Source (rate_limit = 0 --> rate_limit = 1000)

statement ok
alter source kafka_source set source_rate_limit to 1000;

query T
select name, node_name, fragment_type, rate_limit from rw_rate_limit join rw_relations on table_id=id
order by name;
----
kafka_source SOURCE {SOURCE} 1000
rl_mv2 SOURCE_BACKFILL {SOURCE_SCAN} 0

sleep 3s

query I
Expand All @@ -114,17 +129,34 @@ LINE 1: alter materialized view rl_mv2 set source_rate_limit = 1000;
^


query T
select name, node_name, fragment_type, rate_limit from rw_rate_limit join rw_relations on table_id=id
order by name;
----
kafka_source SOURCE {SOURCE} 1000
rl_mv2 SOURCE_BACKFILL {SOURCE_SCAN} 0


statement ok
alter materialized view rl_mv2 set backfill_rate_limit = 2000;


query T
select name, node_name, fragment_type, rate_limit from rw_rate_limit join rw_relations on table_id=id
order by name;
----
kafka_source SOURCE {SOURCE} 1000
rl_mv2 SOURCE_BACKFILL {SOURCE_SCAN} 2000

sleep 3s

query ?
query T
select * from rl_mv2;
----
2000



############## Cleanup

statement ok
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ select count(*) from kafka_source;
############## Alter source (rate_limit = 0 --> rate_limit = 1000)

skipif in-memory
query I
statement ok
alter table kafka_source set source_rate_limit to 1000;

skipif in-memory
Expand Down
14 changes: 14 additions & 0 deletions proto/meta.proto
Original file line number Diff line number Diff line change
Expand Up @@ -335,6 +335,7 @@ service StreamManagerService {
rpc ListObjectDependencies(ListObjectDependenciesRequest) returns (ListObjectDependenciesResponse);
rpc ApplyThrottle(ApplyThrottleRequest) returns (ApplyThrottleResponse);
rpc Recover(RecoverRequest) returns (RecoverResponse);
rpc ListRateLimits(ListRateLimitsRequest) returns (ListRateLimitsResponse);
}

// Below for cluster service.
Expand Down Expand Up @@ -862,3 +863,16 @@ message GetClusterLimitsResponse {
service ClusterLimitService {
rpc GetClusterLimits(GetClusterLimitsRequest) returns (GetClusterLimitsResponse);
}

message ListRateLimitsRequest {}

message ListRateLimitsResponse {
message RateLimitInfo {
uint32 fragment_id = 1;
uint32 job_id = 2;
uint32 fragment_type_mask = 3;
uint32 rate_limit = 4;
string node_name = 5;
}
repeated RateLimitInfo rate_limits = 1;
}
2 changes: 2 additions & 0 deletions proto/stream_plan.proto
Original file line number Diff line number Diff line change
Expand Up @@ -985,6 +985,8 @@ enum FragmentTypeFlag {
FRAGMENT_TYPE_FLAG_CDC_FILTER = 256;
FRAGMENT_TYPE_FLAG_SOURCE_SCAN = 1024;
FRAGMENT_TYPE_FLAG_SNAPSHOT_BACKFILL_STREAM_SCAN = 2048;
// Note: this flag is not available in old fragments, so only suitable for debugging purpose.
FRAGMENT_TYPE_FLAG_FS_FETCH = 4096;
}

// The streaming context associated with a stream plan
Expand Down
1 change: 1 addition & 0 deletions src/frontend/src/catalog/system_catalog/rw_catalog/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ mod rw_indexes;
mod rw_internal_tables;
mod rw_materialized_views;
mod rw_meta_snapshot;
mod rw_rate_limit;
mod rw_relation_info;
mod rw_relations;
mod rw_schemas;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ struct RwFragment {
max_parallelism: i32,
}

fn extract_fragment_type_flag(mask: u32) -> Vec<FragmentTypeFlag> {
pub(super) fn extract_fragment_type_flag(mask: u32) -> Vec<FragmentTypeFlag> {
let mut result = vec![];
for i in 0..32 {
let bit = 1 << i;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
// Copyright 2024 RisingWave Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use risingwave_common::types::Fields;
use risingwave_frontend_macro::system_catalog;

use super::rw_fragments::extract_fragment_type_flag;
use crate::catalog::system_catalog::SysCatalogReaderImpl;
use crate::error::Result;

#[derive(Fields)]
#[primary_key(fragment_id, node_name)]
struct RwRateLimit {
fragment_id: i32,
fragment_type: Vec<String>,
node_name: String,
table_id: i32,
rate_limit: i32,
}

#[system_catalog(table, "rw_catalog.rw_rate_limit")]
async fn read_rw_rate_limit(reader: &SysCatalogReaderImpl) -> Result<Vec<RwRateLimit>> {
let rate_limits = reader.meta_client.list_rate_limits().await?;

Ok(rate_limits
.into_iter()
.map(|info| RwRateLimit {
fragment_id: info.fragment_id as i32,
fragment_type: extract_fragment_type_flag(info.fragment_type_mask)
.into_iter()
.flat_map(|t| t.as_str_name().strip_prefix("FRAGMENT_TYPE_FLAG_"))
.map(|s| s.into())
.collect(),
table_id: info.job_id as i32,
rate_limit: info.rate_limit as i32,
node_name: info.node_name,
})
.collect())
}
7 changes: 7 additions & 0 deletions src/frontend/src/meta_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ use risingwave_pb::meta::list_actor_splits_response::ActorSplit;
use risingwave_pb::meta::list_actor_states_response::ActorState;
use risingwave_pb::meta::list_fragment_distribution_response::FragmentDistribution;
use risingwave_pb::meta::list_object_dependencies_response::PbObjectDependencies;
use risingwave_pb::meta::list_rate_limits_response::RateLimitInfo;
use risingwave_pb::meta::list_table_fragment_states_response::TableFragmentState;
use risingwave_pb::meta::list_table_fragments_response::TableFragmentInfo;
use risingwave_pb::meta::{EventLog, PbThrottleTarget, RecoveryStatus};
Expand Down Expand Up @@ -125,6 +126,8 @@ pub trait FrontendMetaClient: Send + Sync {
async fn get_cluster_recovery_status(&self) -> Result<RecoveryStatus>;

async fn get_cluster_limits(&self) -> Result<Vec<ClusterLimit>>;

async fn list_rate_limits(&self) -> Result<Vec<RateLimitInfo>>;
}

pub struct FrontendMetaClientImpl(pub MetaClient);
Expand Down Expand Up @@ -300,4 +303,8 @@ impl FrontendMetaClient for FrontendMetaClientImpl {
async fn get_cluster_limits(&self) -> Result<Vec<ClusterLimit>> {
self.0.get_cluster_limits().await
}

async fn list_rate_limits(&self) -> Result<Vec<RateLimitInfo>> {
self.0.list_rate_limits().await
}
}
4 changes: 4 additions & 0 deletions src/frontend/src/stream_fragmenter/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -361,6 +361,10 @@ fn build_fragment(
current_fragment.requires_singleton = true;
}

NodeBody::StreamFsFetch(_) => {
current_fragment.fragment_type_mask |= FragmentTypeFlag::FsFetch as u32;
}

_ => {}
};

Expand Down
5 changes: 5 additions & 0 deletions src/frontend/src/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ use risingwave_pb::meta::list_actor_splits_response::ActorSplit;
use risingwave_pb::meta::list_actor_states_response::ActorState;
use risingwave_pb::meta::list_fragment_distribution_response::FragmentDistribution;
use risingwave_pb::meta::list_object_dependencies_response::PbObjectDependencies;
use risingwave_pb::meta::list_rate_limits_response::RateLimitInfo;
use risingwave_pb::meta::list_table_fragment_states_response::TableFragmentState;
use risingwave_pb::meta::list_table_fragments_response::TableFragmentInfo;
use risingwave_pb::meta::{
Expand Down Expand Up @@ -1065,6 +1066,10 @@ impl FrontendMetaClient for MockFrontendMetaClient {
async fn get_cluster_limits(&self) -> RpcResult<Vec<ClusterLimit>> {
Ok(vec![])
}

async fn list_rate_limits(&self) -> RpcResult<Vec<RateLimitInfo>> {
Ok(vec![])
}
}

#[cfg(test)]
Expand Down
12 changes: 12 additions & 0 deletions src/meta/service/src/stream_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -433,4 +433,16 @@ impl StreamManagerService for StreamServiceImpl {

Ok(Response::new(ListActorSplitsResponse { actor_splits }))
}

async fn list_rate_limits(
&self,
_request: Request<ListRateLimitsRequest>,
) -> Result<Response<ListRateLimitsResponse>, Status> {
let rate_limits = self
.metadata_manager
.catalog_controller
.list_rate_limits()
.await?;
Ok(Response::new(ListRateLimitsResponse { rate_limits }))
}
}
Loading
Loading