diff --git a/src/cmd/src/datanode.rs b/src/cmd/src/datanode.rs index 1da23c801f1b..162419a3b43e 100644 --- a/src/cmd/src/datanode.rs +++ b/src/cmd/src/datanode.rs @@ -43,6 +43,10 @@ impl Instance { pub fn datanode_mut(&mut self) -> &mut Datanode { &mut self.datanode } + + pub fn datanode(&self) -> &Datanode { + &self.datanode + } } #[async_trait] @@ -235,6 +239,7 @@ impl StartCommand { .with_default_grpc_server(&datanode.region_server()) .enable_http_service() .build() + .await .context(StartDatanodeSnafu)?; datanode.setup_services(services); diff --git a/src/cmd/src/frontend.rs b/src/cmd/src/frontend.rs index 149103f10f19..124330ee477e 100644 --- a/src/cmd/src/frontend.rs +++ b/src/cmd/src/frontend.rs @@ -43,13 +43,17 @@ pub struct Instance { } impl Instance { - fn new(frontend: FeInstance) -> Self { + pub fn new(frontend: FeInstance) -> Self { Self { frontend } } pub fn mut_inner(&mut self) -> &mut FeInstance { &mut self.frontend } + + pub fn inner(&self) -> &FeInstance { + &self.frontend + } } #[async_trait] @@ -271,6 +275,7 @@ impl StartCommand { let servers = Services::new(opts.clone(), Arc::new(instance.clone()), plugins) .build() + .await .context(StartFrontendSnafu)?; instance .build_servers(opts, servers) diff --git a/src/cmd/src/lib.rs b/src/cmd/src/lib.rs index 349fd910bddf..17f02cbc148d 100644 --- a/src/cmd/src/lib.rs +++ b/src/cmd/src/lib.rs @@ -32,11 +32,11 @@ lazy_static::lazy_static! { } #[async_trait] -pub trait App { +pub trait App: Send { fn name(&self) -> &str; /// A hook for implementor to make something happened before actual startup. Defaults to no-op. - fn pre_start(&mut self) -> error::Result<()> { + async fn pre_start(&mut self) -> error::Result<()> { Ok(()) } @@ -46,24 +46,21 @@ pub trait App { } pub async fn start_app(mut app: Box) -> error::Result<()> { - let name = app.name().to_string(); - - app.pre_start()?; - - tokio::select! { - result = app.start() => { - if let Err(err) = result { - error!(err; "Failed to start app {name}!"); - } - } - _ = tokio::signal::ctrl_c() => { - if let Err(err) = app.stop().await { - error!(err; "Failed to stop app {name}!"); - } - info!("Goodbye!"); - } + info!("Starting app: {}", app.name()); + + app.pre_start().await?; + + app.start().await?; + + if let Err(e) = tokio::signal::ctrl_c().await { + error!("Failed to listen for ctrl-c signal: {}", e); + // It's unusual to fail to listen for ctrl-c signal, maybe there's something unexpected in + // the underlying system. So we stop the app instead of running nonetheless to let people + // investigate the issue. } + app.stop().await?; + info!("Goodbye!"); Ok(()) } diff --git a/src/cmd/src/standalone.rs b/src/cmd/src/standalone.rs index edd262e9c29a..c8b0385cfe44 100644 --- a/src/cmd/src/standalone.rs +++ b/src/cmd/src/standalone.rs @@ -441,6 +441,7 @@ impl StartCommand { let servers = Services::new(fe_opts.clone(), Arc::new(frontend.clone()), fe_plugins) .build() + .await .context(StartFrontendSnafu)?; frontend .build_servers(fe_opts, servers) diff --git a/src/datanode/src/datanode.rs b/src/datanode/src/datanode.rs index 2d36e53eed8b..5fcd7d7af7f0 100644 --- a/src/datanode/src/datanode.rs +++ b/src/datanode/src/datanode.rs @@ -14,7 +14,6 @@ //! Datanode implementation. -use std::collections::HashMap; use std::path::Path; use std::sync::Arc; @@ -32,7 +31,6 @@ use common_wal::config::kafka::DatanodeKafkaConfig; use common_wal::config::raft_engine::RaftEngineConfig; use common_wal::config::DatanodeWalConfig; use file_engine::engine::FileRegionEngine; -use futures::future; use futures_util::future::try_join_all; use futures_util::TryStreamExt; use log_store::kafka::log_store::KafkaLogStore; @@ -45,7 +43,7 @@ use object_store::manager::{ObjectStoreManager, ObjectStoreManagerRef}; use object_store::util::normalize_dir; use query::QueryEngineFactory; use servers::export_metrics::ExportMetricsTask; -use servers::server::{start_server, ServerHandlers}; +use servers::server::ServerHandlers; use servers::Mode; use snafu::{OptionExt, ResultExt}; use store_api::path_utils::{region_dir, WAL_DIR}; @@ -97,7 +95,11 @@ impl Datanode { t.start(None).context(StartServerSnafu)? } - self.start_services().await + self.services.start_all().await.context(StartServerSnafu) + } + + pub fn server_handlers(&self) -> &ServerHandlers { + &self.services } pub fn start_telemetry(&self) { @@ -127,24 +129,12 @@ impl Datanode { self.services = services; } - /// Start services of datanode. This method call will block until services are shutdown. - pub async fn start_services(&mut self) -> Result<()> { - let _ = future::try_join_all(self.services.values().map(start_server)) - .await - .context(StartServerSnafu)?; - Ok(()) - } - - async fn shutdown_services(&self) -> Result<()> { - let _ = future::try_join_all(self.services.values().map(|server| server.0.shutdown())) + pub async fn shutdown(&self) -> Result<()> { + self.services + .shutdown_all() .await .context(ShutdownServerSnafu)?; - Ok(()) - } - pub async fn shutdown(&self) -> Result<()> { - // We must shutdown services first - self.shutdown_services().await?; let _ = self.greptimedb_telemetry_task.stop().await; if let Some(heartbeat_task) = &self.heartbeat_task { heartbeat_task @@ -268,7 +258,7 @@ impl DatanodeBuilder { .context(StartServerSnafu)?; Ok(Datanode { - services: HashMap::new(), + services: ServerHandlers::default(), heartbeat_task, region_server, greptimedb_telemetry_task, diff --git a/src/datanode/src/service.rs b/src/datanode/src/service.rs index 5e203f53da61..1ec2bd4eaa71 100644 --- a/src/datanode/src/service.rs +++ b/src/datanode/src/service.rs @@ -12,7 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::HashMap; use std::net::SocketAddr; use std::sync::Arc; @@ -27,9 +26,6 @@ use crate::config::DatanodeOptions; use crate::error::{ParseAddrSnafu, Result}; use crate::region_server::RegionServer; -const DATANODE_GRPC_SERVICE_NAME: &str = "DATANODE_GRPC_SERVICE"; -const DATANODE_HTTP_SERVICE_NAME: &str = "DATANODE_HTTP_SERVICE"; - pub struct DatanodeServiceBuilder<'a> { opts: &'a DatanodeOptions, grpc_server: Option, @@ -65,15 +61,15 @@ impl<'a> DatanodeServiceBuilder<'a> { } } - pub fn build(mut self) -> Result { - let mut services = HashMap::new(); + pub async fn build(mut self) -> Result { + let handlers = ServerHandlers::default(); if let Some(grpc_server) = self.grpc_server.take() { let addr: SocketAddr = self.opts.rpc_addr.parse().context(ParseAddrSnafu { addr: &self.opts.rpc_addr, })?; let handler: ServerHandler = (Box::new(grpc_server), addr); - services.insert(DATANODE_GRPC_SERVICE_NAME.to_string(), handler); + handlers.insert(handler).await; } if self.enable_http_service { @@ -85,10 +81,10 @@ impl<'a> DatanodeServiceBuilder<'a> { addr: &self.opts.http.addr, })?; let handler: ServerHandler = (Box::new(http_server), addr); - services.insert(DATANODE_HTTP_SERVICE_NAME.to_string(), handler); + handlers.insert(handler).await; } - Ok(services) + Ok(handlers) } pub fn grpc_server_builder( diff --git a/src/frontend/src/instance.rs b/src/frontend/src/instance.rs index cf58b741652b..8f63adee0198 100644 --- a/src/frontend/src/instance.rs +++ b/src/frontend/src/instance.rs @@ -67,7 +67,7 @@ use servers::query_handler::{ InfluxdbLineProtocolHandler, OpenTelemetryProtocolHandler, OpentsdbProtocolHandler, PromStoreProtocolHandler, ScriptHandler, }; -use servers::server::{start_server, ServerHandlers}; +use servers::server::ServerHandlers; use session::context::QueryContextRef; use snafu::prelude::*; use sql::dialect::Dialect; @@ -115,7 +115,7 @@ pub struct Instance { statement_executor: Arc, query_engine: QueryEngineRef, plugins: Plugins, - servers: Arc, + servers: ServerHandlers, heartbeat_task: Option, inserter: InserterRef, deleter: DeleterRef, @@ -198,8 +198,7 @@ impl Instance { ExportMetricsTask::try_new(&opts.export_metrics, Some(&self.plugins)) .context(StartServerSnafu)?; - self.servers = Arc::new(servers); - + self.servers = servers; Ok(()) } @@ -212,10 +211,14 @@ impl Instance { } pub async fn shutdown(&self) -> Result<()> { - futures::future::try_join_all(self.servers.values().map(|server| server.0.shutdown())) + self.servers + .shutdown_all() .await .context(error::ShutdownServerSnafu) - .map(|_| ()) + } + + pub fn server_handlers(&self) -> &ServerHandlers { + &self.servers } pub fn statement_executor(&self) -> Arc { @@ -248,13 +251,7 @@ impl FrontendInstance for Instance { } } - futures::future::try_join_all(self.servers.iter().map(|(name, handler)| async move { - info!("Starting service: {name}"); - start_server(handler).await - })) - .await - .context(error::StartServerSnafu) - .map(|_| ()) + self.servers.start_all().await.context(StartServerSnafu) } } diff --git a/src/frontend/src/instance/builder.rs b/src/frontend/src/instance/builder.rs index c890eeba71fa..8d3666cbd74f 100644 --- a/src/frontend/src/instance/builder.rs +++ b/src/frontend/src/instance/builder.rs @@ -12,7 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::HashMap; use std::sync::Arc; use catalog::kvbackend::KvBackendCatalogManager; @@ -29,6 +28,7 @@ use operator::statement::StatementExecutor; use operator::table::TableMutationOperator; use partition::manager::PartitionRuleManager; use query::QueryEngineFactory; +use servers::server::ServerHandlers; use crate::error::Result; use crate::heartbeat::HeartbeatTask; @@ -148,7 +148,7 @@ impl FrontendBuilder { statement_executor, query_engine, plugins, - servers: Arc::new(HashMap::new()), + servers: ServerHandlers::default(), heartbeat_task: self.heartbeat_task, inserter, deleter, diff --git a/src/frontend/src/server.rs b/src/frontend/src/server.rs index 245e92cb852f..9fcc7372c858 100644 --- a/src/frontend/src/server.rs +++ b/src/frontend/src/server.rs @@ -29,7 +29,7 @@ use servers::opentsdb::OpentsdbServer; use servers::postgres::PostgresServer; use servers::query_handler::grpc::ServerGrpcQueryHandlerAdapter; use servers::query_handler::sql::ServerSqlQueryHandlerAdapter; -use servers::server::{Server, ServerHandler, ServerHandlers}; +use servers::server::{Server, ServerHandlers}; use snafu::ResultExt; use crate::error::{self, Result, StartServerSnafu}; @@ -164,14 +164,14 @@ where Ok(http_server) } - pub fn build(mut self) -> Result { + pub async fn build(mut self) -> Result { let opts = self.opts.clone(); let instance = self.instance.clone(); let toml = opts.to_toml()?; let opts: FrontendOptions = opts.into(); - let mut result = Vec::::new(); + let handlers = ServerHandlers::default(); let user_provider = self.plugins.get::(); @@ -179,7 +179,7 @@ where // Always init GRPC server let grpc_addr = parse_addr(&opts.grpc.addr)?; let grpc_server = self.build_grpc_server(&opts)?; - result.push((Box::new(grpc_server), grpc_addr)); + handlers.insert((Box::new(grpc_server), grpc_addr)).await; } { @@ -187,7 +187,7 @@ where let http_options = &opts.http; let http_addr = parse_addr(&http_options.addr)?; let http_server = self.build_http_server(&opts, toml)?; - result.push((Box::new(http_server), http_addr)); + handlers.insert((Box::new(http_server), http_addr)).await; } if opts.mysql.enable { @@ -218,7 +218,7 @@ where opts.reject_no_database.unwrap_or(false), )), ); - result.push((mysql_server, mysql_addr)); + handlers.insert((mysql_server, mysql_addr)).await; } if opts.postgres.enable { @@ -241,7 +241,7 @@ where user_provider.clone(), )) as Box; - result.push((pg_server, pg_addr)); + handlers.insert((pg_server, pg_addr)).await; } if opts.opentsdb.enable { @@ -259,13 +259,10 @@ where let server = OpentsdbServer::create_server(instance.clone(), io_runtime); - result.push((server, addr)); + handlers.insert((server, addr)).await; } - Ok(result - .into_iter() - .map(|(server, addr)| (server.name().to_string(), (server, addr))) - .collect()) + Ok(handlers) } } diff --git a/src/meta-srv/src/bootstrap.rs b/src/meta-srv/src/bootstrap.rs index c959973b2b95..7ed7df942263 100644 --- a/src/meta-srv/src/bootstrap.rs +++ b/src/meta-srv/src/bootstrap.rs @@ -26,6 +26,7 @@ use common_meta::kv_backend::memory::MemoryKvBackend; use common_meta::kv_backend::{KvBackendRef, ResettableKvBackendRef}; use common_telemetry::info; use etcd_client::Client; +use futures::future; use servers::configurator::ConfiguratorRef; use servers::export_metrics::ExportMetricsTask; use servers::http::{HttpServer, HttpServerBuilder}; @@ -33,7 +34,6 @@ use servers::metrics_handler::MetricsHandler; use servers::server::Server; use snafu::ResultExt; use tokio::net::TcpListener; -use tokio::select; use tokio::sync::mpsc::{self, Receiver, Sender}; use tonic::transport::server::{Router, TcpIncoming}; @@ -110,12 +110,14 @@ impl MetaSrvInstance { let addr = self.opts.http.addr.parse().context(error::ParseAddrSnafu { addr: &self.opts.http.addr, })?; - let http_srv = self.http_srv.start(addr); - select! { - v = meta_srv => v?, - v = http_srv => v.map(|_| ()).context(error::StartHttpSnafu)?, - } - + let http_srv = async { + self.http_srv + .start(addr) + .await + .map(|_| ()) + .context(error::StartHttpSnafu) + }; + future::try_join(meta_srv, http_srv).await?; Ok(()) } diff --git a/src/meta-srv/src/metasrv/builder.rs b/src/meta-srv/src/metasrv/builder.rs index dc007a81fe01..7447fdc67b1a 100644 --- a/src/meta-srv/src/metasrv/builder.rs +++ b/src/meta-srv/src/metasrv/builder.rs @@ -78,7 +78,7 @@ pub struct MetaSrvBuilder { lock: Option, datanode_manager: Option, plugins: Option, - table_metadata_allocator: Option, + table_metadata_allocator: Option, } impl MetaSrvBuilder { @@ -150,7 +150,7 @@ impl MetaSrvBuilder { pub fn table_metadata_allocator( mut self, - table_metadata_allocator: TableMetadataAllocator, + table_metadata_allocator: TableMetadataAllocatorRef, ) -> Self { self.table_metadata_allocator = Some(table_metadata_allocator); self @@ -211,7 +211,7 @@ impl MetaSrvBuilder { options.wal.clone(), kv_backend.clone(), )); - let table_metadata_allocator = Arc::new(table_metadata_allocator.unwrap_or_else(|| { + let table_metadata_allocator = table_metadata_allocator.unwrap_or_else(|| { let sequence = Arc::new( SequenceBuilder::new(TABLE_ID_SEQ, kv_backend.clone()) .initial(MIN_USER_TABLE_ID as u64) @@ -222,13 +222,13 @@ impl MetaSrvBuilder { selector_ctx.clone(), selector.clone(), )); - TableMetadataAllocator::with_peer_allocator( + Arc::new(TableMetadataAllocator::with_peer_allocator( sequence, wal_options_allocator.clone(), table_metadata_manager.table_name_manager().clone(), peer_allocator, - ) - })); + )) + }); let opening_region_keeper = Arc::new(MemoryRegionKeeper::default()); diff --git a/src/servers/src/http.rs b/src/servers/src/http.rs index 3a5a9af9f4d9..0a71513e2778 100644 --- a/src/servers/src/http.rs +++ b/src/servers/src/http.rs @@ -51,7 +51,7 @@ use tower_http::trace::TraceLayer; use self::authorize::AuthState; use crate::configurator::ConfiguratorRef; -use crate::error::{AlreadyStartedSnafu, Error, Result, StartHttpSnafu, ToJsonSnafu}; +use crate::error::{AlreadyStartedSnafu, Error, HyperSnafu, Result, ToJsonSnafu}; use crate::http::arrow_result::ArrowResponse; use crate::http::csv_result::CsvResponse; use crate::http::error_result::ErrorResponse; @@ -797,9 +797,15 @@ impl Server for HttpServer { let listening = server.local_addr(); info!("HTTP server is bound to {}", listening); - let graceful = server.with_graceful_shutdown(rx.map(drop)); - graceful.await.context(StartHttpSnafu)?; - + common_runtime::spawn_bg(async move { + if let Err(e) = server + .with_graceful_shutdown(rx.map(drop)) + .await + .context(HyperSnafu) + { + error!(e; "Failed to shutdown http server"); + } + }); Ok(listening) } diff --git a/src/servers/src/server.rs b/src/servers/src/server.rs index 95b142803234..c2086f2658c1 100644 --- a/src/servers/src/server.rs +++ b/src/servers/src/server.rs @@ -19,9 +19,9 @@ use std::sync::Arc; use async_trait::async_trait; use common_runtime::Runtime; use common_telemetry::logging::{error, info}; -use futures::future::{AbortHandle, AbortRegistration, Abortable}; +use futures::future::{try_join_all, AbortHandle, AbortRegistration, Abortable}; use snafu::{ensure, ResultExt}; -use tokio::sync::Mutex; +use tokio::sync::{Mutex, RwLock}; use tokio::task::JoinHandle; use tokio_stream::wrappers::TcpListenerStream; @@ -29,14 +29,66 @@ use crate::error::{self, Result}; pub(crate) type AbortableStream = Abortable; -pub type ServerHandlers = HashMap; - pub type ServerHandler = (Box, SocketAddr); -pub async fn start_server(server_handler: &ServerHandler) -> Result> { - let (server, addr) = server_handler; - info!("Starting {} at {}", server.name(), addr); - server.start(*addr).await.map(Some) +/// [ServerHandlers] is used to manage the lifecycle of all the services like http or grpc in the GreptimeDB server. +#[derive(Clone, Default)] +pub struct ServerHandlers { + handlers: Arc>>, +} + +impl ServerHandlers { + pub fn new() -> Self { + Self { + handlers: Arc::new(RwLock::new(HashMap::new())), + } + } + + pub async fn insert(&self, handler: ServerHandler) { + let mut handlers = self.handlers.write().await; + handlers.insert(handler.0.name().to_string(), handler); + } + + /// Finds the __actual__ bound address of the service by its name. + /// + /// This is useful in testing. We can configure the service to bind to port 0 first, then start + /// the server to get the real bound port number. This way we avoid doing careful assignment of + /// the port number to the service in the test. + /// + /// Note that the address is guaranteed to be correct only after the `start_all` method is + /// successfully invoked. Otherwise you may find the address to be what you configured before. + pub async fn addr(&self, name: &str) -> Option { + let handlers = self.handlers.read().await; + handlers.get(name).map(|x| x.1) + } + + /// Starts all the managed services. It will block until all the services are started. + /// And it will set the actual bound address to the service. + pub async fn start_all(&self) -> Result<()> { + let mut handlers = self.handlers.write().await; + try_join_all(handlers.values_mut().map(|(server, addr)| async move { + let bind_addr = server.start(*addr).await?; + *addr = bind_addr; + info!("Service {} is started at {}", server.name(), bind_addr); + Ok::<(), error::Error>(()) + })) + .await?; + Ok(()) + } + + /// Shutdown all the managed services. It will block until all the services are shutdown. + pub async fn shutdown_all(&self) -> Result<()> { + // Even though the `shutdown` method in server does not require mut self, we still acquire + // write lock to pair with `start_all` method. + let handlers = self.handlers.write().await; + try_join_all(handlers.values().map(|(server, _)| async move { + server.shutdown().await?; + info!("Service {} is shutdown!", server.name()); + Ok::<(), error::Error>(()) + })) + .await?; + Ok(()) + } } #[async_trait]