diff --git a/Cargo.lock b/Cargo.lock index b8273b5b6..2cc483b49 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2043,6 +2043,7 @@ dependencies = [ "lru", "mime", "ntex", + "ntex-tls", "openid", "openssl", "percent-encoding", @@ -2226,6 +2227,7 @@ dependencies = [ "ntex-mqtt", "ntex-rt", "ntex-service", + "ntex-tls", "openssl", "prometheus", "reqwest", @@ -4021,10 +4023,10 @@ dependencies = [ [[package]] name = "ntex-tls" -version = "0.1.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "00841754ccce812cb573f46090a2633698cce401f1a4e6cd196a07d3dacd934b" +version = "0.1.6" +source = "git+https://github.com/lulf/ntex.git?rev=4fa7f5cceb5bf29fc8764fe1b97f5f5fd3fdf2e4#4fa7f5cceb5bf29fc8764fe1b97f5f5fd3fdf2e4" dependencies = [ + "log", "ntex-bytes", "ntex-io", "ntex-service", diff --git a/Cargo.toml b/Cargo.toml index a7da93199..c006b5595 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -54,6 +54,8 @@ drogue-client = { git = "https://github.com/drogue-iot/drogue-client", rev = "3e operator-framework = { git = "https://github.com/ctron/operator-framework", rev = "8366506a3ed44b638f899dcce4a82ac32fcaff9e" } # FIXME: awaiting release 0.7.0 +ntex-tls = { git = "https://github.com/lulf/ntex.git", rev = "4fa7f5cceb5bf29fc8764fe1b97f5f5fd3fdf2e4" } # awaiting release with new ntex-tls dependency + #pq-sys = { git = "https://github.com/sgrif/pq-sys", rev = "3e367d53019a2740054d5dc6946e07931f1fb70b" } # needed for windows only #actix-web = { git = "https://github.com/ctron/actix-web", rev = "f3f41a0cc70e43564f8243b3ff425195566b5f16"} # FIXME: awaiting release 4.2.0 diff --git a/endpoint-common/Cargo.toml b/endpoint-common/Cargo.toml index 67bac94c3..440bd841d 100644 --- a/endpoint-common/Cargo.toml +++ b/endpoint-common/Cargo.toml @@ -37,6 +37,7 @@ uuid = { version = "1", features = ["v4"] } x509-parser = "0.14" ntex = { version = "0.5", optional = true } +ntex-tls = { version = "0.1.6", optional = true } tokio-openssl = { version = "0.6", optional = true } tokio-rustls = { version = "0.23", optional = true } tokio-dtls-stream-sink = { version = "0.5", optional = true } @@ -56,4 +57,4 @@ package = "openssl" [features] default = ["rustls", "openssl"] rustls = ["tokio-rustls"] -openssl = ["tokio-openssl", "ntex/openssl", "open-ssl", "tokio-dtls-stream-sink"] +openssl = ["tokio-openssl", "ntex-tls", "ntex/openssl", "open-ssl", "tokio-dtls-stream-sink"] diff --git a/endpoint-common/src/psk.rs b/endpoint-common/src/psk.rs index 660406a18..b436749e0 100644 --- a/endpoint-common/src/psk.rs +++ b/endpoint-common/src/psk.rs @@ -104,11 +104,3 @@ pub struct VerifiedIdentity { pub application: registry::v1::Application, pub device: registry::v1::Device, } - -#[cfg(all(feature = "ntex", feature = "openssl"))] -impl PskIdentityRetriever for ntex::io::IoBoxed { - fn verified_identity(&self) -> Option { - // TODO: Not supported by ntex yet - None - } -} diff --git a/mqtt-common/src/server.rs b/mqtt-common/src/server.rs index 8207b71ee..e32aadc01 100644 --- a/mqtt-common/src/server.rs +++ b/mqtt-common/src/server.rs @@ -174,6 +174,7 @@ where pub trait TlsConfig { fn is_disabled(&self) -> bool; fn disable_client_certs(&self) -> bool; + fn disable_psk(&self) -> bool; #[cfg(feature = "rustls")] fn verifier_rustls(&self) -> std::sync::Arc { @@ -287,13 +288,15 @@ where }) } -pub fn build( +pub fn build( opts: MqttServerOptions, app: Svc, config: &dyn TlsConfig, + psk_verifier: Option, ) -> anyhow::Result where Svc: Service + Clone + Send + 'static, + F: Fn(Option<&[u8]>, &mut [u8]) -> Result + Send + Sync + 'static, S: Session + 'static, { log::info!("MQTT transport: {:?}", opts.transport); @@ -307,6 +310,8 @@ where config.disable_client_certs() ); + log::info!("PSK disabled: {}", config.disable_psk()); + if cfg!(feature = "rustls") { // with rustls #[cfg(feature = "rustls")] @@ -314,7 +319,7 @@ where } else if cfg!(feature = "openssl") { // with openssl #[cfg(feature = "openssl")] - return build_openssl(opts, app, crate::tls::openssl_config(config)?); + return build_openssl(opts, app, crate::tls::openssl_config(config, psk_verifier)?); } // no implementation available diff --git a/mqtt-common/src/tls.rs b/mqtt-common/src/tls.rs index a5eb0e722..65535da8d 100644 --- a/mqtt-common/src/tls.rs +++ b/mqtt-common/src/tls.rs @@ -69,8 +69,27 @@ pub use openssl::tls_config as openssl_config; mod openssl { use crate::server::TlsConfig; + // Mozilla intermediate v5 + PSK + const DEFAULT_CIPHERS: &[&str] = &[ + "PSK", + "ECDHE-ECDSA-AES128-GCM-SHA256", + "ECDHE-RSA-AES128-GCM-SHA256", + "ECDHE-ECDSA-AES256-GCM-SHA384", + "ECDHE-RSA-AES256-GCM-SHA384", + "ECDHE-ECDSA-CHACHA20-POLY1305", + "ECDHE-RSA-CHACHA20-POLY1305", + "DHE-RSA-AES128-GCM-SHA256", + "DHE-RSA-AES256-GCM-SHA384", + ]; + /// Build a server config for openssl. - pub fn tls_config(config: &dyn TlsConfig) -> anyhow::Result { + pub fn tls_config( + config: &dyn TlsConfig, + psk_verifier: Option, + ) -> anyhow::Result + where + F: Fn(Option<&[u8]>, &mut [u8]) -> Result + Send + Sync + 'static, + { let key = config .key_file() .ok_or_else(|| anyhow::anyhow!("TLS configuration error: Missing key file"))?; @@ -83,6 +102,7 @@ mod openssl { let mut builder = ssl::SslAcceptor::mozilla_intermediate_v5(method)?; builder.set_private_key_file(key, ssl::SslFiletype::PEM)?; builder.set_certificate_chain_file(cert)?; + builder.set_cipher_list(&DEFAULT_CIPHERS.join(","))?; if !config.disable_client_certs() { // we ask for client certificates, but don't enforce them @@ -97,6 +117,20 @@ mod openssl { }); } + if !config.disable_psk() { + if let Some(psk) = psk_verifier { + builder.set_psk_server_callback(move |_ssl, identity, secret_mut| { + match psk(identity, secret_mut) { + Ok(len) => Ok(len), + Err(e) => { + log::debug!("Error during TLS-PSK handshake: {:?}", e); + Ok(0) + } + } + }); + } + } + Ok(builder.build()) } } diff --git a/mqtt-endpoint/Cargo.toml b/mqtt-endpoint/Cargo.toml index c712182d0..6eea3c926 100644 --- a/mqtt-endpoint/Cargo.toml +++ b/mqtt-endpoint/Cargo.toml @@ -21,8 +21,9 @@ http = "0.2.1" humantime-serde = "1" lazy_static = "1.4.0" log = "0.4" -ntex = "0.5" +ntex = { version = "0.5", features = ["tokio"] } ntex-mqtt = "0.8" +ntex-tls = "0.1.6" ntex-rt = "0.4" ntex-service = "0.3" prometheus = { version = "^0.13", default-features = false } diff --git a/mqtt-endpoint/src/config.rs b/mqtt-endpoint/src/config.rs index d0f22c7cb..eb6530bcb 100644 --- a/mqtt-endpoint/src/config.rs +++ b/mqtt-endpoint/src/config.rs @@ -47,6 +47,9 @@ pub struct Config { #[serde(default)] pub disable_tls: bool, + #[serde(default)] + pub disable_tls_psk: bool, + #[serde(default)] pub disable_client_certificates: bool, @@ -88,6 +91,10 @@ impl TlsConfig for Config { self.disable_client_certificates } + fn disable_psk(&self) -> bool { + self.disable_tls_psk + } + #[cfg(feature = "rustls")] fn verifier_rustls(&self) -> std::sync::Arc { // This seems dangerous, as we simply accept all client certificates. However, diff --git a/mqtt-endpoint/src/lib.rs b/mqtt-endpoint/src/lib.rs index f1dce4351..6b6a64f78 100644 --- a/mqtt-endpoint/src/lib.rs +++ b/mqtt-endpoint/src/lib.rs @@ -3,10 +3,12 @@ mod config; mod service; pub use config::Config; +use drogue_cloud_service_api::auth::device::authn::{PreSharedKeyOutcome, PreSharedKeyResponse}; use crate::{auth::DeviceAuthenticator, service::App}; use drogue_cloud_endpoint_common::{ command::{Commands, KafkaCommandSource}, + psk::Identity, sender::DownstreamSender, sink::KafkaSink, }; @@ -56,9 +58,52 @@ pub async fn run(config: Config, startup: &mut dyn Startup) -> anyhow::Result<() commands: commands.clone(), states, + disable_psk: config.disable_tls_psk, }; - let srv = build(config.mqtt.clone(), app, &config)?.run(); + let mut psk_verifier = None; + if !config.disable_tls_psk { + let auth = app.authenticator.clone(); + /* + let (psk_req_tx, psk_req_rx) = ntex::channel::channel(); + let (psk_res_tx, psk_res_rx) = ntex::channel::channel();*/ + psk_verifier = Some(Box::new( + move |identity: Option<&[u8]>, secret_mut: &mut [u8]| { + let mut to_copy = 0; + if let Some(Ok(identity)) = identity.map(|s| core::str::from_utf8(s)) { + if let Ok(identity) = Identity::parse(identity) { + let auth = auth.clone(); + let app = identity.application().to_string(); + let device = identity.device().to_string(); + + // Block this thread waiting for a response. + let response = std::thread::spawn(move || { + // Run a temporary executor for this request + let runner = ntex::rt::System::new("ntex-blocking"); + runner.block_on(async move { auth.request_psk(app, device).await }) + }) + .join(); + + if let Ok(Ok(PreSharedKeyResponse { + outcome: + PreSharedKeyOutcome::Found { + key, + app: _, + device: _, + }, + })) = response + { + to_copy = std::cmp::min(key.key.len(), secret_mut.len()); + secret_mut[..to_copy].copy_from_slice(&key.key[..to_copy]); + } + } + } + Ok(to_copy) + }, + )) + } + + let srv = build(config.mqtt.clone(), app, &config, psk_verifier)?.run(); log::info!("Starting web server"); diff --git a/mqtt-endpoint/src/service/app.rs b/mqtt-endpoint/src/service/app.rs index c20db320a..d6cb5dd14 100644 --- a/mqtt-endpoint/src/service/app.rs +++ b/mqtt-endpoint/src/service/app.rs @@ -7,7 +7,7 @@ use drogue_client::{ use drogue_cloud_endpoint_common::{ command::Commands, error::EndpointError, - psk::{PskIdentityRetriever, VerifiedIdentity}, + psk::{Identity, VerifiedIdentity}, sender::DownstreamSender, x509::{ClientCertificateChain, ClientCertificateRetriever}, }; @@ -16,7 +16,9 @@ use drogue_cloud_mqtt_common::{ mqtt::{AckOptions, Connect, ConnectAck, Service, Sink}, }; use drogue_cloud_service_api::{ - auth::device::authn::Outcome as AuthOutcome, services::device_state::LastWillTestament, + auth::device::authn::Outcome as AuthOutcome, + auth::device::authn::{PreSharedKeyOutcome, PreSharedKeyResponse}, + services::device_state::LastWillTestament, }; use drogue_cloud_service_common::state::{CreateOptions, CreationOutcome, StateController}; use std::fmt::Debug; @@ -29,6 +31,7 @@ pub struct App { pub authenticator: DeviceAuthenticator, pub commands: Commands, pub states: StateController, + pub disable_psk: bool, } impl App { @@ -150,6 +153,28 @@ impl App { }, } } + + async fn lookup_identity(&self, identity: &Identity) -> Option { + if let Ok(PreSharedKeyResponse { + outcome: + PreSharedKeyOutcome::Found { + key: _, + app, + device, + }, + }) = self + .authenticator + .request_psk(identity.application(), identity.device()) + .await + { + Some(VerifiedIdentity { + application: app, + device, + }) + } else { + None + } + } } #[async_trait(?Send)] @@ -166,7 +191,27 @@ impl Service for App { } let certs = connect.io().client_certs(); - let verified_identity = connect.io().verified_identity(); + let verified_identity = if self.disable_psk { + None + } else { + use ntex_tls::PskIdentity; + + let psk_identity = connect.io().query::(); + let psk_identity = if let Some(psk_identity) = psk_identity.as_ref() { + core::str::from_utf8(&psk_identity.0[..]) + .ok() + .map(|i| Identity::parse(i).ok()) + .flatten() + } else { + None + }; + + if let Some(identity) = psk_identity { + self.lookup_identity(&identity).await + } else { + None + } + }; let (username, password) = connect.credentials(); match self diff --git a/mqtt-integration/src/lib.rs b/mqtt-integration/src/lib.rs index 918def9f7..9d1869d5c 100644 --- a/mqtt-integration/src/lib.rs +++ b/mqtt-integration/src/lib.rs @@ -77,6 +77,10 @@ impl TlsConfig for Config { self.disable_tls } + fn disable_psk(&self) -> bool { + true + } + fn disable_client_certs(&self) -> bool { self.disable_client_certificates } @@ -147,7 +151,13 @@ pub async fn run(config: Config, startup: &mut dyn Startup) -> anyhow::Result<() // create server - let srv = build(config.mqtt.clone(), app, &app_config)?.run(); + let srv = build( + config.mqtt.clone(), + app, + &app_config, + Some(Box::new(|_: Option<&[u8]>, _: &mut [u8]| Ok(0))), + )? + .run(); // run diff --git a/server/src/main.rs b/server/src/main.rs index 6385b3ff4..02452c134 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -672,6 +672,7 @@ async fn cmd_run(matches: &ArgMatches) -> anyhow::Result<()> { auth, disable_tls: !(key_file.is_some() && cert_bundle_file.is_some()), disable_client_certificates: false, + disable_tls_psk: false, cert_bundle_file, key_file, instance: "drogue".to_string(), @@ -924,7 +925,7 @@ async fn run(ctx: Context<'_>, server: ServerConfig, mut main: Main<'_>) -> anyh println!(); println!("Creating a device:"); - println!("\tdrg create device --application example-app device1 --spec '{{\"credentials\":{{\"credentials\":[{{\"pass\":\"hey-rodney\"}}]}}}}'"); + println!("\tdrg create device --application example-app device1 --spec '{{\"authentication\":{{\"credentials\":[{{\"pass\":\"hey-rodney\"}}]}}}}'"); println!(); println!("Streaming telemetry data for an application:");