diff --git a/docs/man/ntp.toml.5.md b/docs/man/ntp.toml.5.md index 0b17a65e5..36d51be46 100644 --- a/docs/man/ntp.toml.5.md +++ b/docs/man/ntp.toml.5.md @@ -234,6 +234,10 @@ untampered with. : Timeout in milliseconds for how long a key exchange may take. If the timeout is exceeded the connection will be dropped. +`concurrent-connections` = *number* (**512**) +: Maximum number of concurrent connections the key exchange server will handle. + Any connections above the threshold will be held in an OS level queue. + `ntp-port` = *port* Port number the key exchange server should instruct clients to use. Should be used when the port number of the NTP server is not the default. diff --git a/docs/precompiled/man/ntp.toml.5 b/docs/precompiled/man/ntp.toml.5 index db6968688..ac2769f4e 100644 --- a/docs/precompiled/man/ntp.toml.5 +++ b/docs/precompiled/man/ntp.toml.5 @@ -294,6 +294,11 @@ certificate chain. \f[V]key-exchange-timeout-ms\f[R] = \f[I]timeout\f[R] (\f[B]1000\f[R]) Timeout in milliseconds for how long a key exchange may take. If the timeout is exceeded the connection will be dropped. +.TP +\f[V]concurrent-connections\f[R] = \f[I]number\f[R] (\f[B]512\f[R]) +Maximum number of concurrent connections the key exchange server will +handle. +Any connections above the threshold will be held in an OS level queue. .PP \f[V]ntp-port\f[R] = \f[I]port\f[R] Port number the key exchange server should instruct clients to use. diff --git a/ntpd/src/daemon/config/server.rs b/ntpd/src/daemon/config/server.rs index 319cc695e..a1172f151 100644 --- a/ntpd/src/daemon/config/server.rs +++ b/ntpd/src/daemon/config/server.rs @@ -135,6 +135,8 @@ pub struct NtsKeConfig { pub authorized_pool_server_certificates: Vec, #[serde(default = "default_nts_ke_timeout")] pub key_exchange_timeout_ms: u64, + #[serde(default = "default_concurrent_connections")] + pub concurrent_connections: usize, pub listen: SocketAddr, pub ntp_port: Option, pub ntp_server: Option, @@ -144,6 +146,10 @@ fn default_nts_ke_timeout() -> u64 { 1000 } +fn default_concurrent_connections() -> usize { + 512 +} + #[cfg(test)] mod tests { use super::*; diff --git a/ntpd/src/daemon/keyexchange.rs b/ntpd/src/daemon/keyexchange.rs index e0e5cee1b..b1c00ea54 100644 --- a/ntpd/src/daemon/keyexchange.rs +++ b/ntpd/src/daemon/keyexchange.rs @@ -8,6 +8,7 @@ use std::{ task::{Context, Poll}, }; +use libc::{ECONNABORTED, EMFILE, ENFILE, ENOBUFS, ENOMEM}; use ntp_proto::{ KeyExchangeClient, KeyExchangeError, KeyExchangeResult, KeyExchangeServer, KeySet, }; @@ -17,6 +18,7 @@ use tokio::{ net::TcpListener, task::JoinHandle, }; +use tracing::{debug, error}; use super::config::NtsKeConfig; use super::exitcode; @@ -166,41 +168,86 @@ async fn key_exchange_server( pool_certs: Vec, private_key: PrivateKey, ) -> std::io::Result<()> { - let listener = TcpListener::bind(&ke_config.listen).await?; - let config = build_server_config(certificate_chain, private_key)?; let pool_certs = Arc::<[_]>::from(pool_certs); + let timeout = std::time::Duration::from_millis(ke_config.key_exchange_timeout_ms); loop { - let (stream, peer_addr) = listener.accept().await?; - let config = config.clone(); - let keyset = keyset.borrow().clone(); - let pool_certs = pool_certs.clone(); - let ntp_port = ke_config.ntp_port; - let ntp_server = ke_config.ntp_server.clone(); - let timeout_ms = ke_config.key_exchange_timeout_ms; - - let fut = async move { - BoundKeyExchangeServer::run( - stream, - config, - keyset, - ntp_port, - ntp_server.clone(), - pool_certs, - ) - .await - .map_err(|ke_error| std::io::Error::new(std::io::ErrorKind::Other, ke_error)) + let listener = match TcpListener::bind(&ke_config.listen).await { + Ok(listener) => listener, + Err(e) => { + error!("Could not open network port for KE server: {}", e); + tokio::time::sleep(timeout).await; + continue; + } }; - tokio::spawn(async move { - let timeout = std::time::Duration::from_millis(timeout_ms); - match tokio::time::timeout(timeout, fut).await { - Err(_) => tracing::debug!(?peer_addr, "NTS KE timed out"), - Ok(Err(err)) => tracing::debug!(?err, ?peer_addr, "NTS KE failed"), - Ok(Ok(())) => tracing::debug!(?peer_addr, "NTS KE completed"), - } - }); + // Ensure we do not make too many connections. We can reinitialize here because any error path recreating the socket + // waits at least ke_config.key_exchange_timeout_ms milliseconds, ensuring all pre-existing connections are or will very + // soon be gone. + let connectionpermits = Arc::new(tokio::sync::Semaphore::new( + ke_config.concurrent_connections, + )); + + loop { + let permit = match connectionpermits.clone().acquire_owned().await { + Ok(permit) => permit, + Err(e) => { + error!("Could not get ticket for new connection: {}", e); + tokio::time::sleep(timeout).await; + break; + } + }; + let (stream, peer_addr) = match listener.accept().await { + Ok(a) => a, + Err(e) if matches!(e.raw_os_error(), Some(ECONNABORTED)) => { + debug!("Potential client-triggered accept error in NTS-KE: {}", e); + continue; + } + Err(e) + if matches!( + e.raw_os_error(), + Some(ENFILE) | Some(EMFILE) | Some(ENOMEM) | Some(ENOBUFS) + ) => + { + error!("Out of resources in NTS-KE, consider raising limits or lowering max parallel connections: {}", e); + tokio::time::sleep(timeout).await; + continue; + } + Err(e) => { + error!("Could not accept NTS-KE connection: {}", e); + tokio::time::sleep(timeout).await; + break; + } + }; + let config = config.clone(); + let keyset = keyset.borrow().clone(); + let pool_certs = pool_certs.clone(); + let ntp_port = ke_config.ntp_port; + let ntp_server = ke_config.ntp_server.clone(); + + let fut = async move { + BoundKeyExchangeServer::run( + stream, + config, + keyset, + ntp_port, + ntp_server.clone(), + pool_certs, + ) + .await + .map_err(|ke_error| std::io::Error::new(std::io::ErrorKind::Other, ke_error)) + }; + + tokio::spawn(async move { + match tokio::time::timeout(timeout, fut).await { + Err(_) => tracing::debug!(?peer_addr, "NTS KE timed out"), + Ok(Err(err)) => tracing::debug!(?err, ?peer_addr, "NTS KE failed"), + Ok(Ok(())) => tracing::debug!(?peer_addr, "NTS KE completed"), + } + drop(permit); + }); + } } } @@ -622,6 +669,7 @@ mod tests { #[cfg(feature = "unstable_nts-pool")] authorized_pool_server_certificates: pool_certs.iter().map(PathBuf::from).collect(), key_exchange_timeout_ms: 1000, + concurrent_connections: 512, listen: "0.0.0.0:5431".parse().unwrap(), ntp_port: None, ntp_server: None, @@ -645,6 +693,74 @@ mod tests { assert_eq!(result.port, 123); } + #[tokio::test] + async fn key_exchange_connection_limiter() { + let provider = KeySetProvider::new(1); + let keyset = provider.get(); + #[cfg(feature = "unstable_nts-pool")] + let pool_certs = ["testdata/certificates/nos-nl.pem"]; + + let (_sender, keyset) = tokio::sync::watch::channel(keyset); + let nts_ke_config = NtsKeConfig { + certificate_chain_path: PathBuf::from("test-keys/end.fullchain.pem"), + private_key_path: PathBuf::from("test-keys/end.key"), + #[cfg(feature = "unstable_nts-pool")] + authorized_pool_server_certificates: pool_certs.iter().map(PathBuf::from).collect(), + key_exchange_timeout_ms: 10000, + concurrent_connections: 1, + listen: "0.0.0.0:5435".parse().unwrap(), + ntp_port: None, + ntp_server: None, + }; + + let _join_handle = spawn(nts_ke_config, keyset); + + // give the server some time to make the port available + tokio::time::sleep(std::time::Duration::from_millis(20)).await; + + let mut blocker = tokio::net::TcpStream::connect("localhost:5435") + .await + .unwrap(); + + // Ensure connection, just send a random client hello + blocker.write_all(b"\x16\x03\x01\x00\xf5\x01\x00\x00\xf1\x03\x03\xfc\x86\xea\x41\x80\x21\xec\x3e\x14\x5f\xf9\x4c\xa0\xcd\x8a\x1a\x66\x65\x41\xe5\x95\xd6\x8e\xb4\x65\x3b\x62\x49\x8d\xe1\xe0\xd8\x20\xe9\xa8\x94\xdb\xbf\x99\xfd\xc9\x3d\xd7\xcf\x7a\xc6\x7c\x03\xee\xb3\xcf\x17\x0b\x57\x69\xb6\x51\x48\xb1\xc6\x3e\xcb\x2d\x54\x2c\x00\x14\x13\x02\x13\x01\x13\x03\xc0\x2c\xc0\x2b\xcc\xa9\xc0\x30\xc0\x2f\xcc\xa8\x00\xff\x01\x00\x00\x94\x00\x33\x00\x26\x00\x24\x00\x1d\x00\x20\x4e\xcb\x36\xd3\xff\xc7\x64\x3e\xd8\x25\xf2\x1a\x20\x42\xc7\xa0\x29\x89\x8d\x00\x82\x0c\x9f\xff\xdf\xa6\xa0\xdc\xcf\xa7\xb8\x2b\x00\x0d\x00\x14\x00\x12\x05\x03\x04\x03\x08\x07\x08\x06\x08\x05\x08\x04\x06\x01\x05\x01\x04\x01\x00\x2b\x00\x05\x04\x03\x04\x03\x03\x00\x23\x00\x00\x00\x05\x00\x05\x01\x00\x00\x00\x00\x00\x17\x00\x00\x00\x10\x00\x0a\x00\x08\x07\x6e\x74\x73\x6b\x65\x2f\x31\x00\x00\x00\x0e\x00\x0c\x00\x00\x09\x6c\x6f\x63\x61\x6c\x68\x6f\x73\x74\x00\x2d\x00\x02\x01\x01\x00\x0a\x00\x08\x00\x06\x00\x1d\x00\x17\x00\x18\x00\x0b\x00\x02\x01\x00").await.unwrap(); + blocker.flush().await.unwrap(); + + // give the server time to accept the connection + tokio::time::sleep(std::time::Duration::from_millis(20)).await; + + let ca = include_bytes!("../../test-keys/testca.pem"); + + assert!(tokio::time::timeout( + std::time::Duration::from_millis(100), + key_exchange_client( + "localhost".to_string(), + 5435, + &certificates_from_bufread(BufReader::new(Cursor::new(ca))).unwrap(), + ) + ) + .await + .is_err()); + + blocker.shutdown().await.unwrap(); + drop(blocker); + + let result = tokio::time::timeout( + std::time::Duration::from_millis(100), + key_exchange_client( + "localhost".to_string(), + 5435, + &certificates_from_bufread(BufReader::new(Cursor::new(ca))).unwrap(), + ), + ) + .await + .unwrap() + .unwrap(); + + assert_eq!(result.remote, "localhost"); + assert_eq!(result.port, 123); + } + #[tokio::test] async fn key_exchange_roundtrip_with_port_server() { let provider = KeySetProvider::new(1); @@ -659,6 +775,7 @@ mod tests { #[cfg(feature = "unstable_nts-pool")] authorized_pool_server_certificates: pool_certs.iter().map(PathBuf::from).collect(), key_exchange_timeout_ms: 1000, + concurrent_connections: 512, listen: "0.0.0.0:5432".parse().unwrap(), ntp_port: Some(568), ntp_server: Some("jantje".into()), @@ -697,6 +814,7 @@ mod tests { private_key_path: PathBuf::from("test-keys/end.key"), authorized_pool_server_certificates: certs.iter().map(PathBuf::from).collect(), key_exchange_timeout_ms: 1000, + concurrent_connections: 512, listen: "0.0.0.0:5433".parse().unwrap(), ntp_port: None, ntp_server: None,