Skip to content

Commit

Permalink
feat: limit grpc message size (GreptimeTeam#2459)
Browse files Browse the repository at this point in the history
* feat: add two grpc config options

Those options are for:
* Limit receiving(decoding) message size
* Limit sending(enoding) message size

* test: add integration tests for message size limit
  • Loading branch information
TheWaWaR authored and paomian committed Oct 19, 2023
1 parent ebfa01b commit 18e8c70
Show file tree
Hide file tree
Showing 11 changed files with 162 additions and 30 deletions.
18 changes: 13 additions & 5 deletions src/client/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,30 +138,38 @@ impl Client {
Ok((addr, channel))
}

fn max_grpc_message_size(&self) -> usize {
self.inner.channel_manager.config().max_message_size
fn max_grpc_recv_message_size(&self) -> usize {
self.inner.channel_manager.config().max_recv_message_size
}

fn max_grpc_send_message_size(&self) -> usize {
self.inner.channel_manager.config().max_send_message_size
}

pub(crate) fn make_flight_client(&self) -> Result<FlightClient> {
let (addr, channel) = self.find_channel()?;
Ok(FlightClient {
addr,
client: FlightServiceClient::new(channel)
.max_decoding_message_size(self.max_grpc_message_size()),
.max_decoding_message_size(self.max_grpc_recv_message_size())
.max_encoding_message_size(self.max_grpc_send_message_size()),
})
}

pub(crate) fn make_database_client(&self) -> Result<DatabaseClient> {
let (_, channel) = self.find_channel()?;
Ok(DatabaseClient {
inner: GreptimeDatabaseClient::new(channel)
.max_decoding_message_size(self.max_grpc_message_size()),
.max_decoding_message_size(self.max_grpc_recv_message_size())
.max_encoding_message_size(self.max_grpc_send_message_size()),
})
}

pub(crate) fn raw_region_client(&self) -> Result<PbRegionClient<Channel>> {
let (_, channel) = self.find_channel()?;
Ok(PbRegionClient::new(channel).max_decoding_message_size(self.max_grpc_message_size()))
Ok(PbRegionClient::new(channel)
.max_decoding_message_size(self.max_grpc_recv_message_size())
.max_encoding_message_size(self.max_grpc_send_message_size()))
}

pub fn make_prometheus_gateway_client(&self) -> Result<PrometheusGatewayClient<Channel>> {
Expand Down
19 changes: 12 additions & 7 deletions src/common/grpc/src/channel_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@ use crate::error::{CreateChannelSnafu, InvalidConfigFilePathSnafu, InvalidTlsCon
const RECYCLE_CHANNEL_INTERVAL_SECS: u64 = 60;
pub const DEFAULT_GRPC_REQUEST_TIMEOUT_SECS: u64 = 10;
pub const DEFAULT_GRPC_CONNECT_TIMEOUT_SECS: u64 = 10;
pub const DEFAULT_MAX_GRPC_MESSAGE_SIZE: usize = 512 * 1024 * 1024;
pub const DEFAULT_MAX_GRPC_RECV_MESSAGE_SIZE: usize = 512 * 1024 * 1024;
pub const DEFAULT_MAX_GRPC_SEND_MESSAGE_SIZE: usize = 512 * 1024 * 1024;

lazy_static! {
static ref ID: AtomicU64 = AtomicU64::new(0);
Expand Down Expand Up @@ -248,9 +249,10 @@ pub struct ChannelConfig {
pub tcp_keepalive: Option<Duration>,
pub tcp_nodelay: bool,
pub client_tls: Option<ClientTlsOption>,
// Max gRPC message size
// TODO(dennis): make it configurable
pub max_message_size: usize,
// Max gRPC receiving(decoding) message size
pub max_recv_message_size: usize,
// Max gRPC sending(encoding) message size
pub max_send_message_size: usize,
}

impl Default for ChannelConfig {
Expand All @@ -269,7 +271,8 @@ impl Default for ChannelConfig {
tcp_keepalive: None,
tcp_nodelay: true,
client_tls: None,
max_message_size: DEFAULT_MAX_GRPC_MESSAGE_SIZE,
max_recv_message_size: DEFAULT_MAX_GRPC_RECV_MESSAGE_SIZE,
max_send_message_size: DEFAULT_MAX_GRPC_SEND_MESSAGE_SIZE,
}
}
}
Expand Down Expand Up @@ -534,7 +537,8 @@ mod tests {
tcp_keepalive: None,
tcp_nodelay: true,
client_tls: None,
max_message_size: DEFAULT_MAX_GRPC_MESSAGE_SIZE,
max_recv_message_size: DEFAULT_MAX_GRPC_RECV_MESSAGE_SIZE,
max_send_message_size: DEFAULT_MAX_GRPC_SEND_MESSAGE_SIZE,
},
default_cfg
);
Expand Down Expand Up @@ -577,7 +581,8 @@ mod tests {
client_cert_path: "some_cert_path".to_string(),
client_key_path: "some_key_path".to_string(),
}),
max_message_size: DEFAULT_MAX_GRPC_MESSAGE_SIZE,
max_recv_message_size: DEFAULT_MAX_GRPC_RECV_MESSAGE_SIZE,
max_send_message_size: DEFAULT_MAX_GRPC_SEND_MESSAGE_SIZE,
},
cfg
);
Expand Down
9 changes: 9 additions & 0 deletions src/datanode/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@ use std::time::Duration;

use common_base::readable_size::ReadableSize;
use common_config::WalConfig;
use common_grpc::channel_manager::{
DEFAULT_MAX_GRPC_RECV_MESSAGE_SIZE, DEFAULT_MAX_GRPC_SEND_MESSAGE_SIZE,
};
pub use common_procedure::options::ProcedureConfig;
use common_telemetry::logging::LoggingOptions;
use file_engine::config::EngineConfig as FileEngineConfig;
Expand Down Expand Up @@ -324,6 +327,10 @@ pub struct DatanodeOptions {
pub rpc_addr: String,
pub rpc_hostname: Option<String>,
pub rpc_runtime_size: usize,
// Max gRPC receiving(decoding) message size
pub rpc_max_recv_message_size: usize,
// Max gRPC sending(encoding) message size
pub rpc_max_send_message_size: usize,
pub heartbeat: HeartbeatOptions,
pub http: HttpOptions,
pub meta_client: Option<MetaClientOptions>,
Expand All @@ -344,6 +351,8 @@ impl Default for DatanodeOptions {
rpc_addr: "127.0.0.1:3001".to_string(),
rpc_hostname: None,
rpc_runtime_size: 8,
rpc_max_recv_message_size: DEFAULT_MAX_GRPC_RECV_MESSAGE_SIZE,
rpc_max_send_message_size: DEFAULT_MAX_GRPC_SEND_MESSAGE_SIZE,
http: HttpOptions::default(),
meta_client: None,
wal: WalConfig::default(),
Expand Down
7 changes: 6 additions & 1 deletion src/datanode/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use std::net::SocketAddr;
use std::sync::Arc;

use futures::future;
use servers::grpc::GrpcServer;
use servers::grpc::{GrpcServer, GrpcServerConfig};
use servers::http::{HttpServer, HttpServerBuilder};
use servers::metrics_handler::MetricsHandler;
use servers::server::Server;
Expand All @@ -39,9 +39,14 @@ impl Services {
let flight_handler = Some(Arc::new(region_server.clone()) as _);
let region_server_handler = Some(Arc::new(region_server.clone()) as _);
let runtime = region_server.runtime();
let grpc_config = GrpcServerConfig {
max_recv_message_size: opts.rpc_max_recv_message_size,
max_send_message_size: opts.rpc_max_send_message_size,
};

Ok(Self {
grpc_server: GrpcServer::new(
Some(grpc_config),
None,
None,
flight_handler,
Expand Down
7 changes: 6 additions & 1 deletion src/frontend/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use common_runtime::Builder as RuntimeBuilder;
use common_telemetry::info;
use servers::configurator::ConfiguratorRef;
use servers::error::Error::InternalIo;
use servers::grpc::GrpcServer;
use servers::grpc::{GrpcServer, GrpcServerConfig};
use servers::http::HttpServerBuilder;
use servers::metrics_handler::MetricsHandler;
use servers::mysql::server::{MysqlServer, MysqlSpawnConfig, MysqlSpawnRef};
Expand Down Expand Up @@ -69,7 +69,12 @@ impl Services {
.context(error::RuntimeResourceSnafu)?,
);

let grpc_config = GrpcServerConfig {
max_recv_message_size: opts.max_recv_message_size,
max_send_message_size: opts.max_send_message_size,
};
let grpc_server = GrpcServer::new(
Some(grpc_config),
Some(ServerGrpcQueryHandlerAdaptor::arc(instance.clone())),
Some(instance.clone()),
None,
Expand Down
9 changes: 9 additions & 0 deletions src/frontend/src/service_config/grpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,19 +12,28 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use common_grpc::channel_manager::{
DEFAULT_MAX_GRPC_RECV_MESSAGE_SIZE, DEFAULT_MAX_GRPC_SEND_MESSAGE_SIZE,
};
use serde::{Deserialize, Serialize};

#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct GrpcOptions {
pub addr: String,
pub runtime_size: usize,
// Max gRPC receiving(decoding) message size
pub max_recv_message_size: usize,
// Max gRPC sending(encoding) message size
pub max_send_message_size: usize,
}

impl Default for GrpcOptions {
fn default() -> Self {
Self {
addr: "127.0.0.1:4001".to_string(),
runtime_size: 8,
max_recv_message_size: DEFAULT_MAX_GRPC_RECV_MESSAGE_SIZE,
max_send_message_size: DEFAULT_MAX_GRPC_SEND_MESSAGE_SIZE,
}
}
}
39 changes: 26 additions & 13 deletions src/servers/src/grpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,9 @@ use arrow_flight::flight_service_server::FlightService;
use arrow_flight::flight_service_server::FlightServiceServer;
use async_trait::async_trait;
use auth::UserProviderRef;
use common_grpc::channel_manager::DEFAULT_MAX_GRPC_MESSAGE_SIZE;
use common_grpc::channel_manager::{
DEFAULT_MAX_GRPC_RECV_MESSAGE_SIZE, DEFAULT_MAX_GRPC_SEND_MESSAGE_SIZE,
};
use common_runtime::Runtime;
use common_telemetry::logging::info;
use common_telemetry::{error, warn};
Expand Down Expand Up @@ -82,21 +84,24 @@ pub struct GrpcServer {
/// Grpc Server configuration
#[derive(Debug, Clone)]
pub struct GrpcServerConfig {
// Max gRPC message size
// TODO(dennis): make it configurable
pub max_message_size: usize,
// Max gRPC receiving(decoding) message size
pub max_recv_message_size: usize,
// Max gRPC sending(encoding) message size
pub max_send_message_size: usize,
}

impl Default for GrpcServerConfig {
fn default() -> Self {
Self {
max_message_size: DEFAULT_MAX_GRPC_MESSAGE_SIZE,
max_recv_message_size: DEFAULT_MAX_GRPC_RECV_MESSAGE_SIZE,
max_send_message_size: DEFAULT_MAX_GRPC_SEND_MESSAGE_SIZE,
}
}
}

impl GrpcServer {
pub fn new(
config: Option<GrpcServerConfig>,
query_handler: Option<ServerGrpcQueryHandlerRef>,
prometheus_handler: Option<PrometheusHandlerRef>,
flight_handler: Option<FlightCraftRef>,
Expand All @@ -110,7 +115,7 @@ impl GrpcServer {
let region_server_handler = region_server_handler
.map(|handler| RegionServerRequestHandler::new(handler, runtime.clone()));
Self {
config: GrpcServerConfig::default(),
config: config.unwrap_or_default(),
shutdown_tx: Mutex::new(None),
user_provider,
serve_state: Mutex::new(None),
Expand Down Expand Up @@ -201,7 +206,8 @@ impl Server for GrpcServer {
}

async fn start(&self, addr: SocketAddr) -> Result<SocketAddr> {
let max_message_size = self.config.max_message_size;
let max_recv_message_size = self.config.max_recv_message_size;
let max_send_message_size = self.config.max_send_message_size;
let (tx, rx) = oneshot::channel();
let (listener, addr) = {
let mut shutdown_tx = self.shutdown_tx.lock().await;
Expand All @@ -227,7 +233,8 @@ impl Server for GrpcServer {
if let Some(database_handler) = &self.database_handler {
builder = builder.add_service(
GreptimeDatabaseServer::new(DatabaseService::new(database_handler.clone()))
.max_decoding_message_size(max_message_size),
.max_decoding_message_size(max_recv_message_size)
.max_encoding_message_size(max_send_message_size),
)
}
if let Some(prometheus_handler) = &self.prometheus_handler {
Expand All @@ -237,18 +244,24 @@ impl Server for GrpcServer {
if let Some(flight_handler) = &self.flight_handler {
builder = builder.add_service(
FlightServiceServer::new(FlightCraftWrapper(flight_handler.clone()))
.max_decoding_message_size(max_message_size),
.max_decoding_message_size(max_recv_message_size)
.max_encoding_message_size(max_send_message_size),
)
} else {
// TODO(ruihang): this is a temporary workaround before region server is ready.
builder = builder.add_service(FlightServiceServer::new(FlightCraftWrapper(
self.database_handler.clone().unwrap(),
)))
builder = builder.add_service(
FlightServiceServer::new(FlightCraftWrapper(
self.database_handler.clone().unwrap(),
))
.max_decoding_message_size(max_recv_message_size)
.max_encoding_message_size(max_send_message_size),
)
}
if let Some(region_server_handler) = &self.region_server_handler {
builder = builder.add_service(
RegionServer::new(region_server_handler.clone())
.max_decoding_message_size(max_message_size),
.max_decoding_message_size(max_recv_message_size)
.max_encoding_message_size(max_send_message_size),
);
}

Expand Down
1 change: 1 addition & 0 deletions tests-integration/src/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,7 @@ async fn create_datanode_client(datanode: &Datanode) -> (String, Client) {
let flight_handler = Some(Arc::new(datanode.region_server()) as _);
let region_server_handler = Some(Arc::new(datanode.region_server()) as _);
let grpc_server = GrpcServer::new(
None,
None,
None,
flight_handler,
Expand Down
15 changes: 13 additions & 2 deletions tests-integration/src/test_util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ use object_store::test_util::TempFolder;
use object_store::ObjectStore;
use secrecy::ExposeSecret;
use servers::grpc::greptime_handler::GreptimeRequestHandler;
use servers::grpc::GrpcServer;
use servers::grpc::{GrpcServer, GrpcServerConfig};
use servers::http::{HttpOptions, HttpServerBuilder};
use servers::metrics_handler::MetricsHandler;
use servers::mysql::server::{MysqlServer, MysqlSpawnConfig, MysqlSpawnRef};
Expand Down Expand Up @@ -423,13 +423,22 @@ pub async fn setup_grpc_server(
store_type: StorageType,
name: &str,
) -> (String, TestGuard, Arc<GrpcServer>) {
setup_grpc_server_with_user_provider(store_type, name, None).await
setup_grpc_server_with(store_type, name, None, None).await
}

pub async fn setup_grpc_server_with_user_provider(
store_type: StorageType,
name: &str,
user_provider: Option<UserProviderRef>,
) -> (String, TestGuard, Arc<GrpcServer>) {
setup_grpc_server_with(store_type, name, user_provider, None).await
}

pub async fn setup_grpc_server_with(
store_type: StorageType,
name: &str,
user_provider: Option<UserProviderRef>,
grpc_config: Option<GrpcServerConfig>,
) -> (String, TestGuard, Arc<GrpcServer>) {
let instance = setup_standalone_instance(name, store_type).await;

Expand All @@ -447,7 +456,9 @@ pub async fn setup_grpc_server_with_user_provider(
user_provider.clone(),
runtime.clone(),
));

let fe_grpc_server = Arc::new(GrpcServer::new(
grpc_config,
Some(ServerGrpcQueryHandlerAdaptor::arc(fe_instance_ref.clone())),
Some(fe_instance_ref.clone()),
Some(flight_handler),
Expand Down
Loading

0 comments on commit 18e8c70

Please sign in to comment.