diff --git a/Cargo.lock b/Cargo.lock index 3d572aa3e832..ea8de8ee8bae 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1613,6 +1613,7 @@ dependencies = [ "common-catalog", "common-config", "common-error", + "common-grpc", "common-macro", "common-meta", "common-procedure", diff --git a/config/config.md b/config/config.md index 010afb3e6201..ce5b57250236 100644 --- a/config/config.md +++ b/config/config.md @@ -186,7 +186,6 @@ | `meta_client.metadata_cache_tti` | String | `5m` | -- | | `datanode` | -- | -- | Datanode options. | | `datanode.client` | -- | -- | Datanode client options. | -| `datanode.client.timeout` | String | `10s` | -- | | `datanode.client.connect_timeout` | String | `10s` | -- | | `datanode.client.tcp_nodelay` | Bool | `true` | -- | | `logging` | -- | -- | The logging options. | diff --git a/config/frontend.example.toml b/config/frontend.example.toml index 5eeb7bc5c7e8..5f3c38c0ae3b 100644 --- a/config/frontend.example.toml +++ b/config/frontend.example.toml @@ -136,7 +136,6 @@ metadata_cache_tti = "5m" [datanode] ## Datanode client options. [datanode.client] -timeout = "10s" connect_timeout = "10s" tcp_nodelay = true diff --git a/src/cmd/Cargo.toml b/src/cmd/Cargo.toml index a11a9a01974a..10ee2c0f9c59 100644 --- a/src/cmd/Cargo.toml +++ b/src/cmd/Cargo.toml @@ -28,6 +28,7 @@ common-base.workspace = true common-catalog.workspace = true common-config.workspace = true common-error.workspace = true +common-grpc.workspace = true common-macro.workspace = true common-meta.workspace = true common-procedure.workspace = true diff --git a/src/cmd/src/frontend.rs b/src/cmd/src/frontend.rs index 4297553304f8..f8fd5cbd511d 100644 --- a/src/cmd/src/frontend.rs +++ b/src/cmd/src/frontend.rs @@ -24,6 +24,7 @@ use catalog::kvbackend::{CachedMetaKvBackendBuilder, KvBackendCatalogManager, Me use clap::Parser; use client::client_manager::DatanodeClients; use common_config::Configurable; +use common_grpc::channel_manager::ChannelConfig; use common_meta::cache::{CacheRegistryBuilder, LayeredCacheRegistryBuilder}; use common_meta::heartbeat::handler::parse_mailbox_message::ParseMailboxMessageHandler; use common_meta::heartbeat::handler::HandlerGroupExecutor; @@ -318,11 +319,19 @@ impl StartCommand { Arc::new(executor), ); + // frontend to datanode need not timeout. + // Some queries are expected to take long time. + let channel_config = ChannelConfig { + timeout: None, + ..Default::default() + }; + let client = DatanodeClients::new(channel_config); + let mut instance = FrontendBuilder::new( cached_meta_backend.clone(), layered_cache_registry.clone(), catalog_manager, - Arc::new(DatanodeClients::default()), + Arc::new(client), meta_client, ) .with_plugin(plugins.clone()) diff --git a/src/common/grpc/src/channel_manager.rs b/src/common/grpc/src/channel_manager.rs index e47722c4e2a2..0b77fa326ea1 100644 --- a/src/common/grpc/src/channel_manager.rs +++ b/src/common/grpc/src/channel_manager.rs @@ -291,88 +291,68 @@ impl ChannelConfig { } /// A timeout to each request. - pub fn timeout(self, timeout: Duration) -> Self { - Self { - timeout: Some(timeout), - ..self - } + pub fn timeout(mut self, timeout: Duration) -> Self { + self.timeout = Some(timeout); + self } /// A timeout to connecting to the uri. /// /// Defaults to no timeout. - pub fn connect_timeout(self, timeout: Duration) -> Self { - Self { - connect_timeout: Some(timeout), - ..self - } + pub fn connect_timeout(mut self, timeout: Duration) -> Self { + self.connect_timeout = Some(timeout); + self } /// A concurrency limit to each request. - pub fn concurrency_limit(self, limit: usize) -> Self { - Self { - concurrency_limit: Some(limit), - ..self - } + pub fn concurrency_limit(mut self, limit: usize) -> Self { + self.concurrency_limit = Some(limit); + self } /// A rate limit to each request. - pub fn rate_limit(self, limit: u64, duration: Duration) -> Self { - Self { - rate_limit: Some((limit, duration)), - ..self - } + pub fn rate_limit(mut self, limit: u64, duration: Duration) -> Self { + self.rate_limit = Some((limit, duration)); + self } /// Sets the SETTINGS_INITIAL_WINDOW_SIZE option for HTTP2 stream-level flow control. /// Default is 65,535 - pub fn initial_stream_window_size(self, size: u32) -> Self { - Self { - initial_stream_window_size: Some(size), - ..self - } + pub fn initial_stream_window_size(mut self, size: u32) -> Self { + self.initial_stream_window_size = Some(size); + self } /// Sets the max connection-level flow control for HTTP2 /// /// Default is 65,535 - pub fn initial_connection_window_size(self, size: u32) -> Self { - Self { - initial_connection_window_size: Some(size), - ..self - } + pub fn initial_connection_window_size(mut self, size: u32) -> Self { + self.initial_connection_window_size = Some(size); + self } /// Set http2 KEEP_ALIVE_INTERVAL. Uses hyper’s default otherwise. - pub fn http2_keep_alive_interval(self, duration: Duration) -> Self { - Self { - http2_keep_alive_interval: Some(duration), - ..self - } + pub fn http2_keep_alive_interval(mut self, duration: Duration) -> Self { + self.http2_keep_alive_interval = Some(duration); + self } /// Set http2 KEEP_ALIVE_TIMEOUT. Uses hyper’s default otherwise. - pub fn http2_keep_alive_timeout(self, duration: Duration) -> Self { - Self { - http2_keep_alive_timeout: Some(duration), - ..self - } + pub fn http2_keep_alive_timeout(mut self, duration: Duration) -> Self { + self.http2_keep_alive_timeout = Some(duration); + self } /// Set http2 KEEP_ALIVE_WHILE_IDLE. Uses hyper’s default otherwise. - pub fn http2_keep_alive_while_idle(self, enabled: bool) -> Self { - Self { - http2_keep_alive_while_idle: Some(enabled), - ..self - } + pub fn http2_keep_alive_while_idle(mut self, enabled: bool) -> Self { + self.http2_keep_alive_while_idle = Some(enabled); + self } /// Sets whether to use an adaptive flow control. Uses hyper’s default otherwise. - pub fn http2_adaptive_window(self, enabled: bool) -> Self { - Self { - http2_adaptive_window: Some(enabled), - ..self - } + pub fn http2_adaptive_window(mut self, enabled: bool) -> Self { + self.http2_adaptive_window = Some(enabled); + self } /// Set whether TCP keepalive messages are enabled on accepted connections. @@ -381,31 +361,25 @@ impl ChannelConfig { /// will be the time to remain idle before sending TCP keepalive probes. /// /// Default is no keepalive (None) - pub fn tcp_keepalive(self, duration: Duration) -> Self { - Self { - tcp_keepalive: Some(duration), - ..self - } + pub fn tcp_keepalive(mut self, duration: Duration) -> Self { + self.tcp_keepalive = Some(duration); + self } /// Set the value of TCP_NODELAY option for accepted connections. /// /// Enabled by default. - pub fn tcp_nodelay(self, enabled: bool) -> Self { - Self { - tcp_nodelay: enabled, - ..self - } + pub fn tcp_nodelay(mut self, enabled: bool) -> Self { + self.tcp_nodelay = enabled; + self } /// Set the value of tls client auth. /// /// Disabled by default. - pub fn client_tls_config(self, client_tls_option: ClientTlsOption) -> Self { - Self { - client_tls: Some(client_tls_option), - ..self - } + pub fn client_tls_config(mut self, client_tls_option: ClientTlsOption) -> Self { + self.client_tls = Some(client_tls_option); + self } } diff --git a/src/frontend/src/service_config/datanode.rs b/src/frontend/src/service_config/datanode.rs index 1b01c4a94257..ccf2b2ebf4c7 100644 --- a/src/frontend/src/service_config/datanode.rs +++ b/src/frontend/src/service_config/datanode.rs @@ -24,8 +24,6 @@ pub struct DatanodeOptions { #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] pub struct DatanodeClientOptions { - #[serde(with = "humantime_serde")] - pub timeout: Duration, #[serde(with = "humantime_serde")] pub connect_timeout: Duration, pub tcp_nodelay: bool, @@ -34,7 +32,6 @@ pub struct DatanodeClientOptions { impl Default for DatanodeClientOptions { fn default() -> Self { Self { - timeout: Duration::from_secs(channel_manager::DEFAULT_GRPC_REQUEST_TIMEOUT_SECS), connect_timeout: Duration::from_secs( channel_manager::DEFAULT_GRPC_CONNECT_TIMEOUT_SECS, ),