Skip to content

Commit

Permalink
refactor: set the actual bound port in server handler (#3353)
Browse files Browse the repository at this point in the history
* refactor: set the actual bound port so we can use port 0 in testing

* Update src/servers/src/server.rs

Co-authored-by: Weny Xu <[email protected]>

* fmt

---------

Co-authored-by: Weny Xu <[email protected]>
  • Loading branch information
MichaelScofield and WenyXu authored Feb 23, 2024
1 parent 7341f23 commit 2035e7b
Show file tree
Hide file tree
Showing 13 changed files with 148 additions and 100 deletions.
5 changes: 5 additions & 0 deletions src/cmd/src/datanode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,10 @@ impl Instance {
pub fn datanode_mut(&mut self) -> &mut Datanode {
&mut self.datanode
}

pub fn datanode(&self) -> &Datanode {
&self.datanode
}
}

#[async_trait]
Expand Down Expand Up @@ -235,6 +239,7 @@ impl StartCommand {
.with_default_grpc_server(&datanode.region_server())
.enable_http_service()
.build()
.await
.context(StartDatanodeSnafu)?;
datanode.setup_services(services);

Expand Down
7 changes: 6 additions & 1 deletion src/cmd/src/frontend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,13 +43,17 @@ pub struct Instance {
}

impl Instance {
fn new(frontend: FeInstance) -> Self {
pub fn new(frontend: FeInstance) -> Self {
Self { frontend }
}

pub fn mut_inner(&mut self) -> &mut FeInstance {
&mut self.frontend
}

pub fn inner(&self) -> &FeInstance {
&self.frontend
}
}

#[async_trait]
Expand Down Expand Up @@ -271,6 +275,7 @@ impl StartCommand {

let servers = Services::new(opts.clone(), Arc::new(instance.clone()), plugins)
.build()
.await
.context(StartFrontendSnafu)?;
instance
.build_servers(opts, servers)
Expand Down
33 changes: 15 additions & 18 deletions src/cmd/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,11 @@ lazy_static::lazy_static! {
}

#[async_trait]
pub trait App {
pub trait App: Send {
fn name(&self) -> &str;

/// A hook for implementor to make something happened before actual startup. Defaults to no-op.
fn pre_start(&mut self) -> error::Result<()> {
async fn pre_start(&mut self) -> error::Result<()> {
Ok(())
}

Expand All @@ -46,24 +46,21 @@ pub trait App {
}

pub async fn start_app(mut app: Box<dyn App>) -> error::Result<()> {
let name = app.name().to_string();

app.pre_start()?;

tokio::select! {
result = app.start() => {
if let Err(err) = result {
error!(err; "Failed to start app {name}!");
}
}
_ = tokio::signal::ctrl_c() => {
if let Err(err) = app.stop().await {
error!(err; "Failed to stop app {name}!");
}
info!("Goodbye!");
}
info!("Starting app: {}", app.name());

app.pre_start().await?;

app.start().await?;

if let Err(e) = tokio::signal::ctrl_c().await {
error!("Failed to listen for ctrl-c signal: {}", e);
// It's unusual to fail to listen for ctrl-c signal, maybe there's something unexpected in
// the underlying system. So we stop the app instead of running nonetheless to let people
// investigate the issue.
}

app.stop().await?;
info!("Goodbye!");
Ok(())
}

Expand Down
1 change: 1 addition & 0 deletions src/cmd/src/standalone.rs
Original file line number Diff line number Diff line change
Expand Up @@ -441,6 +441,7 @@ impl StartCommand {

let servers = Services::new(fe_opts.clone(), Arc::new(frontend.clone()), fe_plugins)
.build()
.await
.context(StartFrontendSnafu)?;
frontend
.build_servers(fe_opts, servers)
Expand Down
30 changes: 10 additions & 20 deletions src/datanode/src/datanode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@

//! Datanode implementation.

use std::collections::HashMap;
use std::path::Path;
use std::sync::Arc;

Expand All @@ -32,7 +31,6 @@ use common_wal::config::kafka::DatanodeKafkaConfig;
use common_wal::config::raft_engine::RaftEngineConfig;
use common_wal::config::DatanodeWalConfig;
use file_engine::engine::FileRegionEngine;
use futures::future;
use futures_util::future::try_join_all;
use futures_util::TryStreamExt;
use log_store::kafka::log_store::KafkaLogStore;
Expand All @@ -45,7 +43,7 @@ use object_store::manager::{ObjectStoreManager, ObjectStoreManagerRef};
use object_store::util::normalize_dir;
use query::QueryEngineFactory;
use servers::export_metrics::ExportMetricsTask;
use servers::server::{start_server, ServerHandlers};
use servers::server::ServerHandlers;
use servers::Mode;
use snafu::{OptionExt, ResultExt};
use store_api::path_utils::{region_dir, WAL_DIR};
Expand Down Expand Up @@ -97,7 +95,11 @@ impl Datanode {
t.start(None).context(StartServerSnafu)?
}

self.start_services().await
self.services.start_all().await.context(StartServerSnafu)
}

pub fn server_handlers(&self) -> &ServerHandlers {
&self.services
}

pub fn start_telemetry(&self) {
Expand Down Expand Up @@ -127,24 +129,12 @@ impl Datanode {
self.services = services;
}

/// Start services of datanode. This method call will block until services are shutdown.
pub async fn start_services(&mut self) -> Result<()> {
let _ = future::try_join_all(self.services.values().map(start_server))
.await
.context(StartServerSnafu)?;
Ok(())
}

async fn shutdown_services(&self) -> Result<()> {
let _ = future::try_join_all(self.services.values().map(|server| server.0.shutdown()))
pub async fn shutdown(&self) -> Result<()> {
self.services
.shutdown_all()
.await
.context(ShutdownServerSnafu)?;
Ok(())
}

pub async fn shutdown(&self) -> Result<()> {
// We must shutdown services first
self.shutdown_services().await?;
let _ = self.greptimedb_telemetry_task.stop().await;
if let Some(heartbeat_task) = &self.heartbeat_task {
heartbeat_task
Expand Down Expand Up @@ -268,7 +258,7 @@ impl DatanodeBuilder {
.context(StartServerSnafu)?;

Ok(Datanode {
services: HashMap::new(),
services: ServerHandlers::default(),
heartbeat_task,
region_server,
greptimedb_telemetry_task,
Expand Down
14 changes: 5 additions & 9 deletions src/datanode/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::HashMap;
use std::net::SocketAddr;
use std::sync::Arc;

Expand All @@ -27,9 +26,6 @@ use crate::config::DatanodeOptions;
use crate::error::{ParseAddrSnafu, Result};
use crate::region_server::RegionServer;

const DATANODE_GRPC_SERVICE_NAME: &str = "DATANODE_GRPC_SERVICE";
const DATANODE_HTTP_SERVICE_NAME: &str = "DATANODE_HTTP_SERVICE";

pub struct DatanodeServiceBuilder<'a> {
opts: &'a DatanodeOptions,
grpc_server: Option<GrpcServer>,
Expand Down Expand Up @@ -65,15 +61,15 @@ impl<'a> DatanodeServiceBuilder<'a> {
}
}

pub fn build(mut self) -> Result<ServerHandlers> {
let mut services = HashMap::new();
pub async fn build(mut self) -> Result<ServerHandlers> {
let handlers = ServerHandlers::default();

if let Some(grpc_server) = self.grpc_server.take() {
let addr: SocketAddr = self.opts.rpc_addr.parse().context(ParseAddrSnafu {
addr: &self.opts.rpc_addr,
})?;
let handler: ServerHandler = (Box::new(grpc_server), addr);
services.insert(DATANODE_GRPC_SERVICE_NAME.to_string(), handler);
handlers.insert(handler).await;
}

if self.enable_http_service {
Expand All @@ -85,10 +81,10 @@ impl<'a> DatanodeServiceBuilder<'a> {
addr: &self.opts.http.addr,
})?;
let handler: ServerHandler = (Box::new(http_server), addr);
services.insert(DATANODE_HTTP_SERVICE_NAME.to_string(), handler);
handlers.insert(handler).await;
}

Ok(services)
Ok(handlers)
}

pub fn grpc_server_builder(
Expand Down
23 changes: 10 additions & 13 deletions src/frontend/src/instance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ use servers::query_handler::{
InfluxdbLineProtocolHandler, OpenTelemetryProtocolHandler, OpentsdbProtocolHandler,
PromStoreProtocolHandler, ScriptHandler,
};
use servers::server::{start_server, ServerHandlers};
use servers::server::ServerHandlers;
use session::context::QueryContextRef;
use snafu::prelude::*;
use sql::dialect::Dialect;
Expand Down Expand Up @@ -115,7 +115,7 @@ pub struct Instance {
statement_executor: Arc<StatementExecutor>,
query_engine: QueryEngineRef,
plugins: Plugins,
servers: Arc<ServerHandlers>,
servers: ServerHandlers,
heartbeat_task: Option<HeartbeatTask>,
inserter: InserterRef,
deleter: DeleterRef,
Expand Down Expand Up @@ -198,8 +198,7 @@ impl Instance {
ExportMetricsTask::try_new(&opts.export_metrics, Some(&self.plugins))
.context(StartServerSnafu)?;

self.servers = Arc::new(servers);

self.servers = servers;
Ok(())
}

Expand All @@ -212,10 +211,14 @@ impl Instance {
}

pub async fn shutdown(&self) -> Result<()> {
futures::future::try_join_all(self.servers.values().map(|server| server.0.shutdown()))
self.servers
.shutdown_all()
.await
.context(error::ShutdownServerSnafu)
.map(|_| ())
}

pub fn server_handlers(&self) -> &ServerHandlers {
&self.servers
}

pub fn statement_executor(&self) -> Arc<StatementExecutor> {
Expand Down Expand Up @@ -248,13 +251,7 @@ impl FrontendInstance for Instance {
}
}

futures::future::try_join_all(self.servers.iter().map(|(name, handler)| async move {
info!("Starting service: {name}");
start_server(handler).await
}))
.await
.context(error::StartServerSnafu)
.map(|_| ())
self.servers.start_all().await.context(StartServerSnafu)
}
}

Expand Down
4 changes: 2 additions & 2 deletions src/frontend/src/instance/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::HashMap;
use std::sync::Arc;

use catalog::kvbackend::KvBackendCatalogManager;
Expand All @@ -29,6 +28,7 @@ use operator::statement::StatementExecutor;
use operator::table::TableMutationOperator;
use partition::manager::PartitionRuleManager;
use query::QueryEngineFactory;
use servers::server::ServerHandlers;

use crate::error::Result;
use crate::heartbeat::HeartbeatTask;
Expand Down Expand Up @@ -148,7 +148,7 @@ impl FrontendBuilder {
statement_executor,
query_engine,
plugins,
servers: Arc::new(HashMap::new()),
servers: ServerHandlers::default(),
heartbeat_task: self.heartbeat_task,
inserter,
deleter,
Expand Down
21 changes: 9 additions & 12 deletions src/frontend/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ use servers::opentsdb::OpentsdbServer;
use servers::postgres::PostgresServer;
use servers::query_handler::grpc::ServerGrpcQueryHandlerAdapter;
use servers::query_handler::sql::ServerSqlQueryHandlerAdapter;
use servers::server::{Server, ServerHandler, ServerHandlers};
use servers::server::{Server, ServerHandlers};
use snafu::ResultExt;

use crate::error::{self, Result, StartServerSnafu};
Expand Down Expand Up @@ -164,30 +164,30 @@ where
Ok(http_server)
}

pub fn build(mut self) -> Result<ServerHandlers> {
pub async fn build(mut self) -> Result<ServerHandlers> {
let opts = self.opts.clone();
let instance = self.instance.clone();

let toml = opts.to_toml()?;
let opts: FrontendOptions = opts.into();

let mut result = Vec::<ServerHandler>::new();
let handlers = ServerHandlers::default();

let user_provider = self.plugins.get::<UserProviderRef>();

{
// Always init GRPC server
let grpc_addr = parse_addr(&opts.grpc.addr)?;
let grpc_server = self.build_grpc_server(&opts)?;
result.push((Box::new(grpc_server), grpc_addr));
handlers.insert((Box::new(grpc_server), grpc_addr)).await;
}

{
// Always init HTTP server
let http_options = &opts.http;
let http_addr = parse_addr(&http_options.addr)?;
let http_server = self.build_http_server(&opts, toml)?;
result.push((Box::new(http_server), http_addr));
handlers.insert((Box::new(http_server), http_addr)).await;
}

if opts.mysql.enable {
Expand Down Expand Up @@ -218,7 +218,7 @@ where
opts.reject_no_database.unwrap_or(false),
)),
);
result.push((mysql_server, mysql_addr));
handlers.insert((mysql_server, mysql_addr)).await;
}

if opts.postgres.enable {
Expand All @@ -241,7 +241,7 @@ where
user_provider.clone(),
)) as Box<dyn Server>;

result.push((pg_server, pg_addr));
handlers.insert((pg_server, pg_addr)).await;
}

if opts.opentsdb.enable {
Expand All @@ -259,13 +259,10 @@ where

let server = OpentsdbServer::create_server(instance.clone(), io_runtime);

result.push((server, addr));
handlers.insert((server, addr)).await;
}

Ok(result
.into_iter()
.map(|(server, addr)| (server.name().to_string(), (server, addr)))
.collect())
Ok(handlers)
}
}

Expand Down
Loading

0 comments on commit 2035e7b

Please sign in to comment.