Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: remove timeout in the channel between frontend and datanode #3962

Merged
merged 4 commits into from
May 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion config/config.md
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,6 @@
| `meta_client.metadata_cache_tti` | String | `5m` | -- |
| `datanode` | -- | -- | Datanode options. |
| `datanode.client` | -- | -- | Datanode client options. |
| `datanode.client.timeout` | String | `10s` | -- |
| `datanode.client.connect_timeout` | String | `10s` | -- |
| `datanode.client.tcp_nodelay` | Bool | `true` | -- |
| `logging` | -- | -- | The logging options. |
Expand Down
1 change: 0 additions & 1 deletion config/frontend.example.toml
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,6 @@ metadata_cache_tti = "5m"
[datanode]
## Datanode client options.
[datanode.client]
timeout = "10s"
connect_timeout = "10s"
tcp_nodelay = true

Expand Down
1 change: 1 addition & 0 deletions src/cmd/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ common-base.workspace = true
common-catalog.workspace = true
common-config.workspace = true
common-error.workspace = true
common-grpc.workspace = true
common-macro.workspace = true
common-meta.workspace = true
common-procedure.workspace = true
Expand Down
11 changes: 10 additions & 1 deletion src/cmd/src/frontend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use catalog::kvbackend::{CachedMetaKvBackendBuilder, KvBackendCatalogManager, Me
use clap::Parser;
use client::client_manager::DatanodeClients;
use common_config::Configurable;
use common_grpc::channel_manager::ChannelConfig;
use common_meta::cache::{CacheRegistryBuilder, LayeredCacheRegistryBuilder};
use common_meta::heartbeat::handler::parse_mailbox_message::ParseMailboxMessageHandler;
use common_meta::heartbeat::handler::HandlerGroupExecutor;
Expand Down Expand Up @@ -318,11 +319,19 @@ impl StartCommand {
Arc::new(executor),
);

// frontend to datanode need not timeout.
// Some queries are expected to take long time.
let channel_config = ChannelConfig {
timeout: None,
..Default::default()
};
waynexia marked this conversation as resolved.
Show resolved Hide resolved
fengjiachun marked this conversation as resolved.
Show resolved Hide resolved
let client = DatanodeClients::new(channel_config);

let mut instance = FrontendBuilder::new(
cached_meta_backend.clone(),
layered_cache_registry.clone(),
catalog_manager,
Arc::new(DatanodeClients::default()),
Arc::new(client),
meta_client,
)
.with_plugin(plugins.clone())
Expand Down
104 changes: 39 additions & 65 deletions src/common/grpc/src/channel_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -291,88 +291,68 @@ impl ChannelConfig {
}

/// A timeout to each request.
pub fn timeout(self, timeout: Duration) -> Self {
Self {
timeout: Some(timeout),
..self
}
pub fn timeout(mut self, timeout: Duration) -> Self {
self.timeout = Some(timeout);
self
}

/// A timeout to connecting to the uri.
///
/// Defaults to no timeout.
pub fn connect_timeout(self, timeout: Duration) -> Self {
Self {
connect_timeout: Some(timeout),
..self
}
pub fn connect_timeout(mut self, timeout: Duration) -> Self {
self.connect_timeout = Some(timeout);
self
}

/// A concurrency limit to each request.
pub fn concurrency_limit(self, limit: usize) -> Self {
Self {
concurrency_limit: Some(limit),
..self
}
pub fn concurrency_limit(mut self, limit: usize) -> Self {
self.concurrency_limit = Some(limit);
self
}

/// A rate limit to each request.
pub fn rate_limit(self, limit: u64, duration: Duration) -> Self {
Self {
rate_limit: Some((limit, duration)),
..self
}
pub fn rate_limit(mut self, limit: u64, duration: Duration) -> Self {
self.rate_limit = Some((limit, duration));
self
}

/// Sets the SETTINGS_INITIAL_WINDOW_SIZE option for HTTP2 stream-level flow control.
/// Default is 65,535
pub fn initial_stream_window_size(self, size: u32) -> Self {
Self {
initial_stream_window_size: Some(size),
..self
}
pub fn initial_stream_window_size(mut self, size: u32) -> Self {
self.initial_stream_window_size = Some(size);
self
}

/// Sets the max connection-level flow control for HTTP2
///
/// Default is 65,535
pub fn initial_connection_window_size(self, size: u32) -> Self {
Self {
initial_connection_window_size: Some(size),
..self
}
pub fn initial_connection_window_size(mut self, size: u32) -> Self {
self.initial_connection_window_size = Some(size);
self
}

/// Set http2 KEEP_ALIVE_INTERVAL. Uses hyper’s default otherwise.
pub fn http2_keep_alive_interval(self, duration: Duration) -> Self {
Self {
http2_keep_alive_interval: Some(duration),
..self
}
pub fn http2_keep_alive_interval(mut self, duration: Duration) -> Self {
self.http2_keep_alive_interval = Some(duration);
self
}

/// Set http2 KEEP_ALIVE_TIMEOUT. Uses hyper’s default otherwise.
pub fn http2_keep_alive_timeout(self, duration: Duration) -> Self {
Self {
http2_keep_alive_timeout: Some(duration),
..self
}
pub fn http2_keep_alive_timeout(mut self, duration: Duration) -> Self {
self.http2_keep_alive_timeout = Some(duration);
self
}

/// Set http2 KEEP_ALIVE_WHILE_IDLE. Uses hyper’s default otherwise.
pub fn http2_keep_alive_while_idle(self, enabled: bool) -> Self {
Self {
http2_keep_alive_while_idle: Some(enabled),
..self
}
pub fn http2_keep_alive_while_idle(mut self, enabled: bool) -> Self {
self.http2_keep_alive_while_idle = Some(enabled);
self
}

/// Sets whether to use an adaptive flow control. Uses hyper’s default otherwise.
pub fn http2_adaptive_window(self, enabled: bool) -> Self {
Self {
http2_adaptive_window: Some(enabled),
..self
}
pub fn http2_adaptive_window(mut self, enabled: bool) -> Self {
self.http2_adaptive_window = Some(enabled);
self
}

/// Set whether TCP keepalive messages are enabled on accepted connections.
Expand All @@ -381,31 +361,25 @@ impl ChannelConfig {
/// will be the time to remain idle before sending TCP keepalive probes.
///
/// Default is no keepalive (None)
pub fn tcp_keepalive(self, duration: Duration) -> Self {
Self {
tcp_keepalive: Some(duration),
..self
}
pub fn tcp_keepalive(mut self, duration: Duration) -> Self {
self.tcp_keepalive = Some(duration);
self
}

/// Set the value of TCP_NODELAY option for accepted connections.
///
/// Enabled by default.
pub fn tcp_nodelay(self, enabled: bool) -> Self {
Self {
tcp_nodelay: enabled,
..self
}
pub fn tcp_nodelay(mut self, enabled: bool) -> Self {
self.tcp_nodelay = enabled;
self
}

/// Set the value of tls client auth.
///
/// Disabled by default.
pub fn client_tls_config(self, client_tls_option: ClientTlsOption) -> Self {
Self {
client_tls: Some(client_tls_option),
..self
}
pub fn client_tls_config(mut self, client_tls_option: ClientTlsOption) -> Self {
self.client_tls = Some(client_tls_option);
self
}
}

Expand Down
3 changes: 0 additions & 3 deletions src/frontend/src/service_config/datanode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,6 @@ pub struct DatanodeOptions {

#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct DatanodeClientOptions {
#[serde(with = "humantime_serde")]
pub timeout: Duration,
#[serde(with = "humantime_serde")]
pub connect_timeout: Duration,
pub tcp_nodelay: bool,
Expand All @@ -34,7 +32,6 @@ pub struct DatanodeClientOptions {
impl Default for DatanodeClientOptions {
fn default() -> Self {
Self {
timeout: Duration::from_secs(channel_manager::DEFAULT_GRPC_REQUEST_TIMEOUT_SECS),
connect_timeout: Duration::from_secs(
channel_manager::DEFAULT_GRPC_CONNECT_TIMEOUT_SECS,
),
Expand Down