Skip to content

Commit

Permalink
feat: support rw_rate_limit
Browse files Browse the repository at this point in the history
  • Loading branch information
xxchan committed Nov 20, 2024
1 parent 1bcbc14 commit 425261d
Show file tree
Hide file tree
Showing 15 changed files with 318 additions and 17 deletions.
52 changes: 45 additions & 7 deletions e2e_test/source_inline/fs/posix_fs.slt
Original file line number Diff line number Diff line change
Expand Up @@ -33,21 +33,38 @@ create materialized view diamonds_mv as select * from diamonds_source;
sleep 1s

# no output due to rate limit
query TTTT rowsort
statement count 0
select * from diamonds;
----

query TTTT rowsort

statement count 0
select * from diamonds_mv;


query T
select name, node_name, fragment_type, rate_limit from rw_rate_limit join rw_relations on table_id=id
order by name, node_name;
----
diamonds FS_FETCH {FS_FETCH} 0
diamonds SOURCE {SOURCE} 0
diamonds_mv FS_FETCH {MVIEW,FS_FETCH} 0
diamonds_mv SOURCE {SOURCE} 0

statement ok
ALTER TABLE diamonds SET source_rate_limit TO DEFAULT;
ALTER TABLE diamonds SET source_rate_limit TO 114;


query T
select name, node_name, fragment_type, rate_limit from rw_rate_limit join rw_relations on table_id=id
order by name, node_name;
----
diamonds FS_FETCH {FS_FETCH} 114
diamonds SOURCE {SOURCE} 114
diamonds_mv FS_FETCH {MVIEW,FS_FETCH} 0
diamonds_mv SOURCE {SOURCE} 0


statement ok
ALTER source diamonds_source SET source_rate_limit TO DEFAULT;

sleep 10s

query TTTT rowsort
select * from diamonds;
Expand All @@ -63,6 +80,27 @@ select * from diamonds;
1.28 Good J 63.1
1.3 Fair E 64.7


statement count 0
select * from diamonds_mv;



statement ok
ALTER SOURCE diamonds_source SET source_rate_limit TO 514;

query T
select name, node_name, fragment_type, rate_limit from rw_rate_limit join rw_relations on table_id=id
order by name, node_name;
----
diamonds FS_FETCH {FS_FETCH} 114
diamonds SOURCE {SOURCE} 114
diamonds_mv FS_FETCH {MVIEW,FS_FETCH} 514
diamonds_mv SOURCE {SOURCE} 514


sleep 10s

query TTTT rowsort
select * from diamonds_mv;
----
Expand Down
26 changes: 24 additions & 2 deletions e2e_test/source_inline/kafka/alter/rate_limit_source_kafka.slt
Original file line number Diff line number Diff line change
Expand Up @@ -80,16 +80,38 @@ select * from rl_mv3;
----
0

query T
select name, node_name, fragment_type, rate_limit from rw_rate_limit join rw_relations on table_id=id
order by name;
----
rl_mv1 SOURCE {SOURCE} 0
rl_mv2 SOURCE {SOURCE} 0
rl_mv3 SOURCE {SOURCE} 0

############## Alter Source (rate_limit = 0 --> rate_limit = 1000)

skipif in-memory
query I
statement count 0
alter source kafka_source set source_rate_limit to 1000;

query T
select name, node_name, fragment_type, rate_limit from rw_rate_limit join rw_relations on table_id=id
order by name;
----
rl_mv1 SOURCE {SOURCE} 1000
rl_mv2 SOURCE {SOURCE} 1000
rl_mv3 SOURCE {SOURCE} 1000

skipif in-memory
query I
statement count 0
alter source kafka_source set source_rate_limit to default;

# rate limit becomes None
query T
select count(*) from rw_rate_limit;
----
0

skipif in-memory
sleep 3s

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,11 +84,26 @@ SELECT progress from rw_ddl_progress;
----
0 rows consumed

query T
select name, node_name, fragment_type, rate_limit from rw_rate_limit join rw_relations on table_id=id
order by name;
----
kafka_source SOURCE {SOURCE} 0
rl_mv2 SOURCE_BACKFILL {SOURCE_SCAN} 0


############## Alter Source (rate_limit = 0 --> rate_limit = 1000)

statement ok
alter source kafka_source set source_rate_limit to 1000;

query T
select name, node_name, fragment_type, rate_limit from rw_rate_limit join rw_relations on table_id=id
order by name;
----
kafka_source SOURCE {SOURCE} 1000
rl_mv2 SOURCE_BACKFILL {SOURCE_SCAN} 0

sleep 3s

query I
Expand All @@ -114,17 +129,34 @@ LINE 1: alter materialized view rl_mv2 set source_rate_limit = 1000;
^


query T
select name, node_name, fragment_type, rate_limit from rw_rate_limit join rw_relations on table_id=id
order by name;
----
kafka_source SOURCE {SOURCE} 1000
rl_mv2 SOURCE_BACKFILL {SOURCE_SCAN} 0


statement ok
alter materialized view rl_mv2 set backfill_rate_limit = 2000;


query T
select name, node_name, fragment_type, rate_limit from rw_rate_limit join rw_relations on table_id=id
order by name;
----
kafka_source SOURCE {SOURCE} 1000
rl_mv2 SOURCE_BACKFILL {SOURCE_SCAN} 2000

sleep 3s

query ?
query T
select * from rl_mv2;
----
2000



############## Cleanup

statement ok
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ select count(*) from kafka_source;
############## Alter source (rate_limit = 0 --> rate_limit = 1000)

skipif in-memory
query I
statement ok
alter table kafka_source set source_rate_limit to 1000;

skipif in-memory
Expand Down
14 changes: 14 additions & 0 deletions proto/meta.proto
Original file line number Diff line number Diff line change
Expand Up @@ -335,6 +335,7 @@ service StreamManagerService {
rpc ListObjectDependencies(ListObjectDependenciesRequest) returns (ListObjectDependenciesResponse);
rpc ApplyThrottle(ApplyThrottleRequest) returns (ApplyThrottleResponse);
rpc Recover(RecoverRequest) returns (RecoverResponse);
rpc ListRateLimits(ListRateLimitsRequest) returns (ListRateLimitsResponse);
}

// Below for cluster service.
Expand Down Expand Up @@ -862,3 +863,16 @@ message GetClusterLimitsResponse {
service ClusterLimitService {
rpc GetClusterLimits(GetClusterLimitsRequest) returns (GetClusterLimitsResponse);
}

message ListRateLimitsRequest {}

message ListRateLimitsResponse {
message RateLimitInfo {
uint32 fragment_id = 1;
uint32 job_id = 2;
uint32 fragment_type_mask = 3;
uint32 rate_limit = 4;
string node_name = 5;
}
repeated RateLimitInfo rate_limits = 1;
}
2 changes: 2 additions & 0 deletions proto/stream_plan.proto
Original file line number Diff line number Diff line change
Expand Up @@ -985,6 +985,8 @@ enum FragmentTypeFlag {
FRAGMENT_TYPE_FLAG_CDC_FILTER = 256;
FRAGMENT_TYPE_FLAG_SOURCE_SCAN = 1024;
FRAGMENT_TYPE_FLAG_SNAPSHOT_BACKFILL_STREAM_SCAN = 2048;
// Note: this flag is not available in old fragments, so only suitable for debugging purpose.
FRAGMENT_TYPE_FLAG_FS_FETCH = 4096;
}

// The streaming context associated with a stream plan
Expand Down
1 change: 1 addition & 0 deletions src/frontend/src/catalog/system_catalog/rw_catalog/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ mod rw_indexes;
mod rw_internal_tables;
mod rw_materialized_views;
mod rw_meta_snapshot;
mod rw_rate_limit;
mod rw_relation_info;
mod rw_relations;
mod rw_schemas;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ struct RwFragment {
max_parallelism: i32,
}

fn extract_fragment_type_flag(mask: u32) -> Vec<FragmentTypeFlag> {
pub(super) fn extract_fragment_type_flag(mask: u32) -> Vec<FragmentTypeFlag> {
let mut result = vec![];
for i in 0..32 {
let bit = 1 << i;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
// Copyright 2024 RisingWave Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use risingwave_common::types::Fields;
use risingwave_frontend_macro::system_catalog;

use super::rw_fragments::extract_fragment_type_flag;
use crate::catalog::system_catalog::SysCatalogReaderImpl;
use crate::error::Result;

#[derive(Fields)]
struct RwFragmentRateLimit {
#[primary_key]
fragment_id: i32,
fragment_type: Vec<String>,
node_name: String,
table_id: i32,
rate_limit: i32,
}

#[system_catalog(table, "rw_catalog.rw_rate_limit")]
async fn read_rw_rate_limit(reader: &SysCatalogReaderImpl) -> Result<Vec<RwFragmentRateLimit>> {
let rate_limits = reader.meta_client.list_rate_limits().await?;

Ok(rate_limits
.into_iter()
.map(|info| RwFragmentRateLimit {
fragment_id: info.fragment_id as i32,
fragment_type: extract_fragment_type_flag(info.fragment_type_mask)
.into_iter()
.flat_map(|t| t.as_str_name().strip_prefix("FRAGMENT_TYPE_FLAG_"))
.map(|s| s.into())
.collect(),
table_id: info.job_id as i32,
rate_limit: info.rate_limit as i32,
node_name: info.node_name,
})
.collect())
}
7 changes: 7 additions & 0 deletions src/frontend/src/meta_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ use risingwave_pb::meta::list_actor_splits_response::ActorSplit;
use risingwave_pb::meta::list_actor_states_response::ActorState;
use risingwave_pb::meta::list_fragment_distribution_response::FragmentDistribution;
use risingwave_pb::meta::list_object_dependencies_response::PbObjectDependencies;
use risingwave_pb::meta::list_rate_limits_response::RateLimitInfo;
use risingwave_pb::meta::list_table_fragment_states_response::TableFragmentState;
use risingwave_pb::meta::list_table_fragments_response::TableFragmentInfo;
use risingwave_pb::meta::{EventLog, PbThrottleTarget, RecoveryStatus};
Expand Down Expand Up @@ -125,6 +126,8 @@ pub trait FrontendMetaClient: Send + Sync {
async fn get_cluster_recovery_status(&self) -> Result<RecoveryStatus>;

async fn get_cluster_limits(&self) -> Result<Vec<ClusterLimit>>;

async fn list_rate_limits(&self) -> Result<Vec<RateLimitInfo>>;
}

pub struct FrontendMetaClientImpl(pub MetaClient);
Expand Down Expand Up @@ -300,4 +303,8 @@ impl FrontendMetaClient for FrontendMetaClientImpl {
async fn get_cluster_limits(&self) -> Result<Vec<ClusterLimit>> {
self.0.get_cluster_limits().await
}

async fn list_rate_limits(&self) -> Result<Vec<RateLimitInfo>> {
self.0.list_rate_limits().await
}
}
5 changes: 5 additions & 0 deletions src/frontend/src/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ use risingwave_pb::meta::list_actor_splits_response::ActorSplit;
use risingwave_pb::meta::list_actor_states_response::ActorState;
use risingwave_pb::meta::list_fragment_distribution_response::FragmentDistribution;
use risingwave_pb::meta::list_object_dependencies_response::PbObjectDependencies;
use risingwave_pb::meta::list_rate_limits_response::RateLimitInfo;
use risingwave_pb::meta::list_table_fragment_states_response::TableFragmentState;
use risingwave_pb::meta::list_table_fragments_response::TableFragmentInfo;
use risingwave_pb::meta::{
Expand Down Expand Up @@ -1065,6 +1066,10 @@ impl FrontendMetaClient for MockFrontendMetaClient {
async fn get_cluster_limits(&self) -> RpcResult<Vec<ClusterLimit>> {
Ok(vec![])
}

async fn list_rate_limits(&self) -> RpcResult<Vec<RateLimitInfo>> {
Ok(vec![])
}
}

#[cfg(test)]
Expand Down
12 changes: 12 additions & 0 deletions src/meta/service/src/stream_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -433,4 +433,16 @@ impl StreamManagerService for StreamServiceImpl {

Ok(Response::new(ListActorSplitsResponse { actor_splits }))
}

async fn list_rate_limits(
&self,
_request: Request<ListRateLimitsRequest>,
) -> Result<Response<ListRateLimitsResponse>, Status> {
let rate_limits = self
.metadata_manager
.catalog_controller
.list_rate_limits()
.await?;
Ok(Response::new(ListRateLimitsResponse { rate_limits }))
}
}
Loading

0 comments on commit 425261d

Please sign in to comment.