Skip to content

Commit

Permalink
Reworked error handling in the nts ke server to be more resilient.
Browse files Browse the repository at this point in the history
  • Loading branch information
davidv1992 authored and rnijveld committed Jun 28, 2024
1 parent 6751bef commit 6049687
Show file tree
Hide file tree
Showing 4 changed files with 162 additions and 29 deletions.
4 changes: 4 additions & 0 deletions docs/man/ntp.toml.5.md
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,10 @@ untampered with.
: Timeout in milliseconds for how long a key exchange may take. If the timeout
is exceeded the connection will be dropped.

`concurrent-connections` = *number* (**512**)
: Maximum number of concurrent connections the key exchange server will handle.
Any connections above the threshold will be held in an OS level queue.

`ntp-port` = *port*
Port number the key exchange server should instruct clients to use. Should
be used when the port number of the NTP server is not the default.
Expand Down
5 changes: 5 additions & 0 deletions docs/precompiled/man/ntp.toml.5
Original file line number Diff line number Diff line change
Expand Up @@ -294,6 +294,11 @@ certificate chain.
\f[V]key-exchange-timeout-ms\f[R] = \f[I]timeout\f[R] (\f[B]1000\f[R])
Timeout in milliseconds for how long a key exchange may take.
If the timeout is exceeded the connection will be dropped.
.TP
\f[V]concurrent-connections\f[R] = \f[I]number\f[R] (\f[B]512\f[R])
Maximum number of concurrent connections the key exchange server will
handle.
Any connections above the threshold will be held in an OS level queue.
.PP
\f[V]ntp-port\f[R] = \f[I]port\f[R] Port number the key exchange server
should instruct clients to use.
Expand Down
6 changes: 6 additions & 0 deletions ntpd/src/daemon/config/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,8 @@ pub struct NtsKeConfig {
pub authorized_pool_server_certificates: Vec<PathBuf>,
#[serde(default = "default_nts_ke_timeout")]
pub key_exchange_timeout_ms: u64,
#[serde(default = "default_concurrent_connections")]
pub concurrent_connections: usize,
pub listen: SocketAddr,
pub ntp_port: Option<u16>,
pub ntp_server: Option<String>,
Expand All @@ -144,6 +146,10 @@ fn default_nts_ke_timeout() -> u64 {
1000
}

fn default_concurrent_connections() -> usize {
512
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down
176 changes: 147 additions & 29 deletions ntpd/src/daemon/keyexchange.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use std::{
task::{Context, Poll},
};

use libc::{ECONNABORTED, EMFILE, ENFILE, ENOBUFS, ENOMEM};
use ntp_proto::{
KeyExchangeClient, KeyExchangeError, KeyExchangeResult, KeyExchangeServer, KeySet,
};
Expand All @@ -17,6 +18,7 @@ use tokio::{
net::TcpListener,
task::JoinHandle,
};
use tracing::{debug, error};

use super::config::NtsKeConfig;
use super::exitcode;
Expand Down Expand Up @@ -166,41 +168,86 @@ async fn key_exchange_server(
pool_certs: Vec<Certificate>,
private_key: PrivateKey,
) -> std::io::Result<()> {
let listener = TcpListener::bind(&ke_config.listen).await?;

let config = build_server_config(certificate_chain, private_key)?;
let pool_certs = Arc::<[_]>::from(pool_certs);
let timeout = std::time::Duration::from_millis(ke_config.key_exchange_timeout_ms);

loop {
let (stream, peer_addr) = listener.accept().await?;
let config = config.clone();
let keyset = keyset.borrow().clone();
let pool_certs = pool_certs.clone();
let ntp_port = ke_config.ntp_port;
let ntp_server = ke_config.ntp_server.clone();
let timeout_ms = ke_config.key_exchange_timeout_ms;

let fut = async move {
BoundKeyExchangeServer::run(
stream,
config,
keyset,
ntp_port,
ntp_server.clone(),
pool_certs,
)
.await
.map_err(|ke_error| std::io::Error::new(std::io::ErrorKind::Other, ke_error))
let listener = match TcpListener::bind(&ke_config.listen).await {
Ok(listener) => listener,
Err(e) => {
error!("Could not open network port for KE server: {}", e);
tokio::time::sleep(timeout).await;
continue;
}
};

tokio::spawn(async move {
let timeout = std::time::Duration::from_millis(timeout_ms);
match tokio::time::timeout(timeout, fut).await {
Err(_) => tracing::debug!(?peer_addr, "NTS KE timed out"),
Ok(Err(err)) => tracing::debug!(?err, ?peer_addr, "NTS KE failed"),
Ok(Ok(())) => tracing::debug!(?peer_addr, "NTS KE completed"),
}
});
// Ensure we do not make too many connections. We can reinitialize here because any error path recreating the socket
// waits at least ke_config.key_exchange_timeout_ms milliseconds, ensuring all pre-existing connections are or will very
// soon be gone.
let connectionpermits = Arc::new(tokio::sync::Semaphore::new(
ke_config.concurrent_connections,
));

loop {
let permit = match connectionpermits.clone().acquire_owned().await {
Ok(permit) => permit,
Err(e) => {
error!("Could not get ticket for new connection: {}", e);
tokio::time::sleep(timeout).await;
break;
}
};
let (stream, peer_addr) = match listener.accept().await {
Ok(a) => a,
Err(e) if matches!(e.raw_os_error(), Some(ECONNABORTED)) => {
debug!("Potential client-triggered accept error in NTS-KE: {}", e);
continue;
}
Err(e)
if matches!(
e.raw_os_error(),
Some(ENFILE) | Some(EMFILE) | Some(ENOMEM) | Some(ENOBUFS)
) =>
{
error!("Out of resources in NTS-KE, consider raising limits or lowering max parallel connections: {}", e);
tokio::time::sleep(timeout).await;
continue;
}
Err(e) => {
error!("Could not accept NTS-KE connection: {}", e);
tokio::time::sleep(timeout).await;
break;
}
};
let config = config.clone();
let keyset = keyset.borrow().clone();
let pool_certs = pool_certs.clone();
let ntp_port = ke_config.ntp_port;
let ntp_server = ke_config.ntp_server.clone();

let fut = async move {
BoundKeyExchangeServer::run(
stream,
config,
keyset,
ntp_port,
ntp_server.clone(),
pool_certs,
)
.await
.map_err(|ke_error| std::io::Error::new(std::io::ErrorKind::Other, ke_error))
};

tokio::spawn(async move {
match tokio::time::timeout(timeout, fut).await {
Err(_) => tracing::debug!(?peer_addr, "NTS KE timed out"),
Ok(Err(err)) => tracing::debug!(?err, ?peer_addr, "NTS KE failed"),
Ok(Ok(())) => tracing::debug!(?peer_addr, "NTS KE completed"),
}
drop(permit);
});
}
}
}

Expand Down Expand Up @@ -622,6 +669,7 @@ mod tests {
#[cfg(feature = "unstable_nts-pool")]
authorized_pool_server_certificates: pool_certs.iter().map(PathBuf::from).collect(),
key_exchange_timeout_ms: 1000,
concurrent_connections: 512,
listen: "0.0.0.0:5431".parse().unwrap(),
ntp_port: None,
ntp_server: None,
Expand All @@ -645,6 +693,74 @@ mod tests {
assert_eq!(result.port, 123);
}

#[tokio::test]
async fn key_exchange_connection_limiter() {
let provider = KeySetProvider::new(1);
let keyset = provider.get();
#[cfg(feature = "unstable_nts-pool")]
let pool_certs = ["testdata/certificates/nos-nl.pem"];

let (_sender, keyset) = tokio::sync::watch::channel(keyset);
let nts_ke_config = NtsKeConfig {
certificate_chain_path: PathBuf::from("test-keys/end.fullchain.pem"),
private_key_path: PathBuf::from("test-keys/end.key"),
#[cfg(feature = "unstable_nts-pool")]
authorized_pool_server_certificates: pool_certs.iter().map(PathBuf::from).collect(),
key_exchange_timeout_ms: 10000,
concurrent_connections: 1,
listen: "0.0.0.0:5435".parse().unwrap(),
ntp_port: None,
ntp_server: None,
};

let _join_handle = spawn(nts_ke_config, keyset);

// give the server some time to make the port available
tokio::time::sleep(std::time::Duration::from_millis(20)).await;

let mut blocker = tokio::net::TcpStream::connect("localhost:5435")
.await
.unwrap();

// Ensure connection, just send a random client hello
blocker.write_all(b"\x16\x03\x01\x00\xf5\x01\x00\x00\xf1\x03\x03\xfc\x86\xea\x41\x80\x21\xec\x3e\x14\x5f\xf9\x4c\xa0\xcd\x8a\x1a\x66\x65\x41\xe5\x95\xd6\x8e\xb4\x65\x3b\x62\x49\x8d\xe1\xe0\xd8\x20\xe9\xa8\x94\xdb\xbf\x99\xfd\xc9\x3d\xd7\xcf\x7a\xc6\x7c\x03\xee\xb3\xcf\x17\x0b\x57\x69\xb6\x51\x48\xb1\xc6\x3e\xcb\x2d\x54\x2c\x00\x14\x13\x02\x13\x01\x13\x03\xc0\x2c\xc0\x2b\xcc\xa9\xc0\x30\xc0\x2f\xcc\xa8\x00\xff\x01\x00\x00\x94\x00\x33\x00\x26\x00\x24\x00\x1d\x00\x20\x4e\xcb\x36\xd3\xff\xc7\x64\x3e\xd8\x25\xf2\x1a\x20\x42\xc7\xa0\x29\x89\x8d\x00\x82\x0c\x9f\xff\xdf\xa6\xa0\xdc\xcf\xa7\xb8\x2b\x00\x0d\x00\x14\x00\x12\x05\x03\x04\x03\x08\x07\x08\x06\x08\x05\x08\x04\x06\x01\x05\x01\x04\x01\x00\x2b\x00\x05\x04\x03\x04\x03\x03\x00\x23\x00\x00\x00\x05\x00\x05\x01\x00\x00\x00\x00\x00\x17\x00\x00\x00\x10\x00\x0a\x00\x08\x07\x6e\x74\x73\x6b\x65\x2f\x31\x00\x00\x00\x0e\x00\x0c\x00\x00\x09\x6c\x6f\x63\x61\x6c\x68\x6f\x73\x74\x00\x2d\x00\x02\x01\x01\x00\x0a\x00\x08\x00\x06\x00\x1d\x00\x17\x00\x18\x00\x0b\x00\x02\x01\x00").await.unwrap();
blocker.flush().await.unwrap();

// give the server time to accept the connection
tokio::time::sleep(std::time::Duration::from_millis(20)).await;

let ca = include_bytes!("../../test-keys/testca.pem");

assert!(tokio::time::timeout(
std::time::Duration::from_millis(100),
key_exchange_client(
"localhost".to_string(),
5435,
&certificates_from_bufread(BufReader::new(Cursor::new(ca))).unwrap(),
)
)
.await
.is_err());

blocker.shutdown().await.unwrap();
drop(blocker);

let result = tokio::time::timeout(
std::time::Duration::from_millis(100),
key_exchange_client(
"localhost".to_string(),
5435,
&certificates_from_bufread(BufReader::new(Cursor::new(ca))).unwrap(),
),
)
.await
.unwrap()
.unwrap();

assert_eq!(result.remote, "localhost");
assert_eq!(result.port, 123);
}

#[tokio::test]
async fn key_exchange_roundtrip_with_port_server() {
let provider = KeySetProvider::new(1);
Expand All @@ -659,6 +775,7 @@ mod tests {
#[cfg(feature = "unstable_nts-pool")]
authorized_pool_server_certificates: pool_certs.iter().map(PathBuf::from).collect(),
key_exchange_timeout_ms: 1000,
concurrent_connections: 512,
listen: "0.0.0.0:5432".parse().unwrap(),
ntp_port: Some(568),
ntp_server: Some("jantje".into()),
Expand Down Expand Up @@ -697,6 +814,7 @@ mod tests {
private_key_path: PathBuf::from("test-keys/end.key"),
authorized_pool_server_certificates: certs.iter().map(PathBuf::from).collect(),
key_exchange_timeout_ms: 1000,
concurrent_connections: 512,
listen: "0.0.0.0:5433".parse().unwrap(),
ntp_port: None,
ntp_server: None,
Expand Down

0 comments on commit 6049687

Please sign in to comment.