From 18e8c700e50487ada9ac31a4dcfbeef3c8bc6a42 Mon Sep 17 00:00:00 2001 From: LinFeng Date: Fri, 22 Sep 2023 10:07:46 +0800 Subject: [PATCH] feat: limit grpc message size (#2459) * feat: add two grpc config options Those options are for: * Limit receiving(decoding) message size * Limit sending(enoding) message size * test: add integration tests for message size limit --- src/client/src/client.rs | 18 +++++-- src/common/grpc/src/channel_manager.rs | 19 ++++--- src/datanode/src/config.rs | 9 ++++ src/datanode/src/server.rs | 7 ++- src/frontend/src/server.rs | 7 ++- src/frontend/src/service_config/grpc.rs | 9 ++++ src/servers/src/grpc.rs | 39 ++++++++++----- tests-integration/src/cluster.rs | 1 + tests-integration/src/test_util.rs | 15 +++++- tests-integration/tests/grpc.rs | 66 ++++++++++++++++++++++++- tests-integration/tests/http.rs | 2 + 11 files changed, 162 insertions(+), 30 deletions(-) diff --git a/src/client/src/client.rs b/src/client/src/client.rs index ada1ae92c56a..c5457e58741f 100644 --- a/src/client/src/client.rs +++ b/src/client/src/client.rs @@ -138,8 +138,12 @@ impl Client { Ok((addr, channel)) } - fn max_grpc_message_size(&self) -> usize { - self.inner.channel_manager.config().max_message_size + fn max_grpc_recv_message_size(&self) -> usize { + self.inner.channel_manager.config().max_recv_message_size + } + + fn max_grpc_send_message_size(&self) -> usize { + self.inner.channel_manager.config().max_send_message_size } pub(crate) fn make_flight_client(&self) -> Result { @@ -147,7 +151,8 @@ impl Client { Ok(FlightClient { addr, client: FlightServiceClient::new(channel) - .max_decoding_message_size(self.max_grpc_message_size()), + .max_decoding_message_size(self.max_grpc_recv_message_size()) + .max_encoding_message_size(self.max_grpc_send_message_size()), }) } @@ -155,13 +160,16 @@ impl Client { let (_, channel) = self.find_channel()?; Ok(DatabaseClient { inner: GreptimeDatabaseClient::new(channel) - .max_decoding_message_size(self.max_grpc_message_size()), + .max_decoding_message_size(self.max_grpc_recv_message_size()) + .max_encoding_message_size(self.max_grpc_send_message_size()), }) } pub(crate) fn raw_region_client(&self) -> Result> { let (_, channel) = self.find_channel()?; - Ok(PbRegionClient::new(channel).max_decoding_message_size(self.max_grpc_message_size())) + Ok(PbRegionClient::new(channel) + .max_decoding_message_size(self.max_grpc_recv_message_size()) + .max_encoding_message_size(self.max_grpc_send_message_size())) } pub fn make_prometheus_gateway_client(&self) -> Result> { diff --git a/src/common/grpc/src/channel_manager.rs b/src/common/grpc/src/channel_manager.rs index 1102ac0fd303..f52177e2890d 100644 --- a/src/common/grpc/src/channel_manager.rs +++ b/src/common/grpc/src/channel_manager.rs @@ -31,7 +31,8 @@ use crate::error::{CreateChannelSnafu, InvalidConfigFilePathSnafu, InvalidTlsCon const RECYCLE_CHANNEL_INTERVAL_SECS: u64 = 60; pub const DEFAULT_GRPC_REQUEST_TIMEOUT_SECS: u64 = 10; pub const DEFAULT_GRPC_CONNECT_TIMEOUT_SECS: u64 = 10; -pub const DEFAULT_MAX_GRPC_MESSAGE_SIZE: usize = 512 * 1024 * 1024; +pub const DEFAULT_MAX_GRPC_RECV_MESSAGE_SIZE: usize = 512 * 1024 * 1024; +pub const DEFAULT_MAX_GRPC_SEND_MESSAGE_SIZE: usize = 512 * 1024 * 1024; lazy_static! { static ref ID: AtomicU64 = AtomicU64::new(0); @@ -248,9 +249,10 @@ pub struct ChannelConfig { pub tcp_keepalive: Option, pub tcp_nodelay: bool, pub client_tls: Option, - // Max gRPC message size - // TODO(dennis): make it configurable - pub max_message_size: usize, + // Max gRPC receiving(decoding) message size + pub max_recv_message_size: usize, + // Max gRPC sending(encoding) message size + pub max_send_message_size: usize, } impl Default for ChannelConfig { @@ -269,7 +271,8 @@ impl Default for ChannelConfig { tcp_keepalive: None, tcp_nodelay: true, client_tls: None, - max_message_size: DEFAULT_MAX_GRPC_MESSAGE_SIZE, + max_recv_message_size: DEFAULT_MAX_GRPC_RECV_MESSAGE_SIZE, + max_send_message_size: DEFAULT_MAX_GRPC_SEND_MESSAGE_SIZE, } } } @@ -534,7 +537,8 @@ mod tests { tcp_keepalive: None, tcp_nodelay: true, client_tls: None, - max_message_size: DEFAULT_MAX_GRPC_MESSAGE_SIZE, + max_recv_message_size: DEFAULT_MAX_GRPC_RECV_MESSAGE_SIZE, + max_send_message_size: DEFAULT_MAX_GRPC_SEND_MESSAGE_SIZE, }, default_cfg ); @@ -577,7 +581,8 @@ mod tests { client_cert_path: "some_cert_path".to_string(), client_key_path: "some_key_path".to_string(), }), - max_message_size: DEFAULT_MAX_GRPC_MESSAGE_SIZE, + max_recv_message_size: DEFAULT_MAX_GRPC_RECV_MESSAGE_SIZE, + max_send_message_size: DEFAULT_MAX_GRPC_SEND_MESSAGE_SIZE, }, cfg ); diff --git a/src/datanode/src/config.rs b/src/datanode/src/config.rs index 61001f2e39d1..b3835e6e2e84 100644 --- a/src/datanode/src/config.rs +++ b/src/datanode/src/config.rs @@ -18,6 +18,9 @@ use std::time::Duration; use common_base::readable_size::ReadableSize; use common_config::WalConfig; +use common_grpc::channel_manager::{ + DEFAULT_MAX_GRPC_RECV_MESSAGE_SIZE, DEFAULT_MAX_GRPC_SEND_MESSAGE_SIZE, +}; pub use common_procedure::options::ProcedureConfig; use common_telemetry::logging::LoggingOptions; use file_engine::config::EngineConfig as FileEngineConfig; @@ -324,6 +327,10 @@ pub struct DatanodeOptions { pub rpc_addr: String, pub rpc_hostname: Option, pub rpc_runtime_size: usize, + // Max gRPC receiving(decoding) message size + pub rpc_max_recv_message_size: usize, + // Max gRPC sending(encoding) message size + pub rpc_max_send_message_size: usize, pub heartbeat: HeartbeatOptions, pub http: HttpOptions, pub meta_client: Option, @@ -344,6 +351,8 @@ impl Default for DatanodeOptions { rpc_addr: "127.0.0.1:3001".to_string(), rpc_hostname: None, rpc_runtime_size: 8, + rpc_max_recv_message_size: DEFAULT_MAX_GRPC_RECV_MESSAGE_SIZE, + rpc_max_send_message_size: DEFAULT_MAX_GRPC_SEND_MESSAGE_SIZE, http: HttpOptions::default(), meta_client: None, wal: WalConfig::default(), diff --git a/src/datanode/src/server.rs b/src/datanode/src/server.rs index 71c050dc0bbc..1847dc4c992a 100644 --- a/src/datanode/src/server.rs +++ b/src/datanode/src/server.rs @@ -16,7 +16,7 @@ use std::net::SocketAddr; use std::sync::Arc; use futures::future; -use servers::grpc::GrpcServer; +use servers::grpc::{GrpcServer, GrpcServerConfig}; use servers::http::{HttpServer, HttpServerBuilder}; use servers::metrics_handler::MetricsHandler; use servers::server::Server; @@ -39,9 +39,14 @@ impl Services { let flight_handler = Some(Arc::new(region_server.clone()) as _); let region_server_handler = Some(Arc::new(region_server.clone()) as _); let runtime = region_server.runtime(); + let grpc_config = GrpcServerConfig { + max_recv_message_size: opts.rpc_max_recv_message_size, + max_send_message_size: opts.rpc_max_send_message_size, + }; Ok(Self { grpc_server: GrpcServer::new( + Some(grpc_config), None, None, flight_handler, diff --git a/src/frontend/src/server.rs b/src/frontend/src/server.rs index d96178f1d3ad..5a61c3b48834 100644 --- a/src/frontend/src/server.rs +++ b/src/frontend/src/server.rs @@ -22,7 +22,7 @@ use common_runtime::Builder as RuntimeBuilder; use common_telemetry::info; use servers::configurator::ConfiguratorRef; use servers::error::Error::InternalIo; -use servers::grpc::GrpcServer; +use servers::grpc::{GrpcServer, GrpcServerConfig}; use servers::http::HttpServerBuilder; use servers::metrics_handler::MetricsHandler; use servers::mysql::server::{MysqlServer, MysqlSpawnConfig, MysqlSpawnRef}; @@ -69,7 +69,12 @@ impl Services { .context(error::RuntimeResourceSnafu)?, ); + let grpc_config = GrpcServerConfig { + max_recv_message_size: opts.max_recv_message_size, + max_send_message_size: opts.max_send_message_size, + }; let grpc_server = GrpcServer::new( + Some(grpc_config), Some(ServerGrpcQueryHandlerAdaptor::arc(instance.clone())), Some(instance.clone()), None, diff --git a/src/frontend/src/service_config/grpc.rs b/src/frontend/src/service_config/grpc.rs index 92d6ea771710..e0a64565015d 100644 --- a/src/frontend/src/service_config/grpc.rs +++ b/src/frontend/src/service_config/grpc.rs @@ -12,12 +12,19 @@ // See the License for the specific language governing permissions and // limitations under the License. +use common_grpc::channel_manager::{ + DEFAULT_MAX_GRPC_RECV_MESSAGE_SIZE, DEFAULT_MAX_GRPC_SEND_MESSAGE_SIZE, +}; use serde::{Deserialize, Serialize}; #[derive(Clone, Debug, Serialize, Deserialize)] pub struct GrpcOptions { pub addr: String, pub runtime_size: usize, + // Max gRPC receiving(decoding) message size + pub max_recv_message_size: usize, + // Max gRPC sending(encoding) message size + pub max_send_message_size: usize, } impl Default for GrpcOptions { @@ -25,6 +32,8 @@ impl Default for GrpcOptions { Self { addr: "127.0.0.1:4001".to_string(), runtime_size: 8, + max_recv_message_size: DEFAULT_MAX_GRPC_RECV_MESSAGE_SIZE, + max_send_message_size: DEFAULT_MAX_GRPC_SEND_MESSAGE_SIZE, } } } diff --git a/src/servers/src/grpc.rs b/src/servers/src/grpc.rs index 5d5582de7faf..69ea1943a6fa 100644 --- a/src/servers/src/grpc.rs +++ b/src/servers/src/grpc.rs @@ -33,7 +33,9 @@ use arrow_flight::flight_service_server::FlightService; use arrow_flight::flight_service_server::FlightServiceServer; use async_trait::async_trait; use auth::UserProviderRef; -use common_grpc::channel_manager::DEFAULT_MAX_GRPC_MESSAGE_SIZE; +use common_grpc::channel_manager::{ + DEFAULT_MAX_GRPC_RECV_MESSAGE_SIZE, DEFAULT_MAX_GRPC_SEND_MESSAGE_SIZE, +}; use common_runtime::Runtime; use common_telemetry::logging::info; use common_telemetry::{error, warn}; @@ -82,21 +84,24 @@ pub struct GrpcServer { /// Grpc Server configuration #[derive(Debug, Clone)] pub struct GrpcServerConfig { - // Max gRPC message size - // TODO(dennis): make it configurable - pub max_message_size: usize, + // Max gRPC receiving(decoding) message size + pub max_recv_message_size: usize, + // Max gRPC sending(encoding) message size + pub max_send_message_size: usize, } impl Default for GrpcServerConfig { fn default() -> Self { Self { - max_message_size: DEFAULT_MAX_GRPC_MESSAGE_SIZE, + max_recv_message_size: DEFAULT_MAX_GRPC_RECV_MESSAGE_SIZE, + max_send_message_size: DEFAULT_MAX_GRPC_SEND_MESSAGE_SIZE, } } } impl GrpcServer { pub fn new( + config: Option, query_handler: Option, prometheus_handler: Option, flight_handler: Option, @@ -110,7 +115,7 @@ impl GrpcServer { let region_server_handler = region_server_handler .map(|handler| RegionServerRequestHandler::new(handler, runtime.clone())); Self { - config: GrpcServerConfig::default(), + config: config.unwrap_or_default(), shutdown_tx: Mutex::new(None), user_provider, serve_state: Mutex::new(None), @@ -201,7 +206,8 @@ impl Server for GrpcServer { } async fn start(&self, addr: SocketAddr) -> Result { - let max_message_size = self.config.max_message_size; + let max_recv_message_size = self.config.max_recv_message_size; + let max_send_message_size = self.config.max_send_message_size; let (tx, rx) = oneshot::channel(); let (listener, addr) = { let mut shutdown_tx = self.shutdown_tx.lock().await; @@ -227,7 +233,8 @@ impl Server for GrpcServer { if let Some(database_handler) = &self.database_handler { builder = builder.add_service( GreptimeDatabaseServer::new(DatabaseService::new(database_handler.clone())) - .max_decoding_message_size(max_message_size), + .max_decoding_message_size(max_recv_message_size) + .max_encoding_message_size(max_send_message_size), ) } if let Some(prometheus_handler) = &self.prometheus_handler { @@ -237,18 +244,24 @@ impl Server for GrpcServer { if let Some(flight_handler) = &self.flight_handler { builder = builder.add_service( FlightServiceServer::new(FlightCraftWrapper(flight_handler.clone())) - .max_decoding_message_size(max_message_size), + .max_decoding_message_size(max_recv_message_size) + .max_encoding_message_size(max_send_message_size), ) } else { // TODO(ruihang): this is a temporary workaround before region server is ready. - builder = builder.add_service(FlightServiceServer::new(FlightCraftWrapper( - self.database_handler.clone().unwrap(), - ))) + builder = builder.add_service( + FlightServiceServer::new(FlightCraftWrapper( + self.database_handler.clone().unwrap(), + )) + .max_decoding_message_size(max_recv_message_size) + .max_encoding_message_size(max_send_message_size), + ) } if let Some(region_server_handler) = &self.region_server_handler { builder = builder.add_service( RegionServer::new(region_server_handler.clone()) - .max_decoding_message_size(max_message_size), + .max_decoding_message_size(max_recv_message_size) + .max_encoding_message_size(max_send_message_size), ); } diff --git a/tests-integration/src/cluster.rs b/tests-integration/src/cluster.rs index 9bdb11f90b00..013eeb681ec6 100644 --- a/tests-integration/src/cluster.rs +++ b/tests-integration/src/cluster.rs @@ -272,6 +272,7 @@ async fn create_datanode_client(datanode: &Datanode) -> (String, Client) { let flight_handler = Some(Arc::new(datanode.region_server()) as _); let region_server_handler = Some(Arc::new(datanode.region_server()) as _); let grpc_server = GrpcServer::new( + None, None, None, flight_handler, diff --git a/tests-integration/src/test_util.rs b/tests-integration/src/test_util.rs index a6c0cf72fb65..e3370836eb2f 100644 --- a/tests-integration/src/test_util.rs +++ b/tests-integration/src/test_util.rs @@ -39,7 +39,7 @@ use object_store::test_util::TempFolder; use object_store::ObjectStore; use secrecy::ExposeSecret; use servers::grpc::greptime_handler::GreptimeRequestHandler; -use servers::grpc::GrpcServer; +use servers::grpc::{GrpcServer, GrpcServerConfig}; use servers::http::{HttpOptions, HttpServerBuilder}; use servers::metrics_handler::MetricsHandler; use servers::mysql::server::{MysqlServer, MysqlSpawnConfig, MysqlSpawnRef}; @@ -423,13 +423,22 @@ pub async fn setup_grpc_server( store_type: StorageType, name: &str, ) -> (String, TestGuard, Arc) { - setup_grpc_server_with_user_provider(store_type, name, None).await + setup_grpc_server_with(store_type, name, None, None).await } pub async fn setup_grpc_server_with_user_provider( store_type: StorageType, name: &str, user_provider: Option, +) -> (String, TestGuard, Arc) { + setup_grpc_server_with(store_type, name, user_provider, None).await +} + +pub async fn setup_grpc_server_with( + store_type: StorageType, + name: &str, + user_provider: Option, + grpc_config: Option, ) -> (String, TestGuard, Arc) { let instance = setup_standalone_instance(name, store_type).await; @@ -447,7 +456,9 @@ pub async fn setup_grpc_server_with_user_provider( user_provider.clone(), runtime.clone(), )); + let fe_grpc_server = Arc::new(GrpcServer::new( + grpc_config, Some(ServerGrpcQueryHandlerAdaptor::arc(fe_instance_ref.clone())), Some(fe_instance_ref.clone()), Some(flight_handler), diff --git a/tests-integration/tests/grpc.rs b/tests-integration/tests/grpc.rs index 7f2f20d87ec8..3d766a5a37f0 100644 --- a/tests-integration/tests/grpc.rs +++ b/tests-integration/tests/grpc.rs @@ -24,10 +24,11 @@ use client::{Client, Database, DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; use common_catalog::consts::{MIN_USER_TABLE_ID, MITO_ENGINE}; use common_query::Output; use common_recordbatch::RecordBatches; +use servers::grpc::GrpcServerConfig; use servers::http::prometheus::{PromData, PromSeries, PrometheusJsonResponse, PrometheusResponse}; use servers::server::Server; use tests_integration::test_util::{ - setup_grpc_server, setup_grpc_server_with_user_provider, StorageType, + setup_grpc_server, setup_grpc_server_with, setup_grpc_server_with_user_provider, StorageType, }; #[macro_export] @@ -64,6 +65,9 @@ macro_rules! grpc_tests { test_auto_create_table, test_insert_and_select, test_dbname, + test_grpc_message_size_ok, + test_grpc_message_size_limit_recv, + test_grpc_message_size_limit_send, test_grpc_auth, test_health_check, test_prom_gateway_query, @@ -115,6 +119,66 @@ pub async fn test_dbname(store_type: StorageType) { guard.remove_all().await; } +pub async fn test_grpc_message_size_ok(store_type: StorageType) { + let config = GrpcServerConfig { + max_recv_message_size: 1024, + max_send_message_size: 1024, + }; + let (addr, mut guard, fe_grpc_server) = + setup_grpc_server_with(store_type, "auto_create_table", None, Some(config)).await; + + let grpc_client = Client::with_urls(vec![addr]); + let db = Database::new_with_dbname( + format!("{}-{}", DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME), + grpc_client, + ); + db.sql("show tables;").await.unwrap(); + let _ = fe_grpc_server.shutdown().await; + guard.remove_all().await; +} + +pub async fn test_grpc_message_size_limit_send(store_type: StorageType) { + let config = GrpcServerConfig { + max_recv_message_size: 1024, + max_send_message_size: 50, + }; + let (addr, mut guard, fe_grpc_server) = + setup_grpc_server_with(store_type, "auto_create_table", None, Some(config)).await; + + let grpc_client = Client::with_urls(vec![addr]); + let db = Database::new_with_dbname( + format!("{}-{}", DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME), + grpc_client, + ); + let err_msg = db.sql("show tables;").await.unwrap_err().to_string(); + assert!(err_msg.contains("message length too large"), "{}", err_msg); + let _ = fe_grpc_server.shutdown().await; + guard.remove_all().await; +} + +pub async fn test_grpc_message_size_limit_recv(store_type: StorageType) { + let config = GrpcServerConfig { + max_recv_message_size: 10, + max_send_message_size: 1024, + }; + let (addr, mut guard, fe_grpc_server) = + setup_grpc_server_with(store_type, "auto_create_table", None, Some(config)).await; + + let grpc_client = Client::with_urls(vec![addr]); + let db = Database::new_with_dbname( + format!("{}-{}", DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME), + grpc_client, + ); + let err_msg = db.sql("show tables;").await.unwrap_err().to_string(); + assert!( + err_msg.contains("Operation was attempted past the valid range"), + "{}", + err_msg + ); + let _ = fe_grpc_server.shutdown().await; + guard.remove_all().await; +} + pub async fn test_grpc_auth(store_type: StorageType) { let user_provider = user_provider_from_option( &"static_user_provider:cmd:greptime_user=greptime_pwd".to_string(), diff --git a/tests-integration/tests/http.rs b/tests-integration/tests/http.rs index c186dab8ba7f..0317482da7f4 100644 --- a/tests-integration/tests/http.rs +++ b/tests-integration/tests/http.rs @@ -610,6 +610,8 @@ node_id = 0 require_lease_before_startup = true rpc_addr = "127.0.0.1:3001" rpc_runtime_size = 8 +rpc_max_recv_message_size = 536870912 +rpc_max_send_message_size = 536870912 enable_telemetry = true [heartbeat]