Skip to content

Commit

Permalink
Add support for TLS-PSK to mqtt
Browse files Browse the repository at this point in the history
* Set cipher list to include PSK
* Lookup identity
* Use patched version of ntex-tls
  • Loading branch information
lulf committed Oct 18, 2022
1 parent db86295 commit 67db823
Show file tree
Hide file tree
Showing 12 changed files with 167 additions and 22 deletions.
8 changes: 5 additions & 3 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ drogue-client = { git = "https://github.com/drogue-iot/drogue-client", rev = "3e

operator-framework = { git = "https://github.com/ctron/operator-framework", rev = "8366506a3ed44b638f899dcce4a82ac32fcaff9e" } # FIXME: awaiting release 0.7.0

ntex-tls = { git = "https://github.com/lulf/ntex.git", rev = "4fa7f5cceb5bf29fc8764fe1b97f5f5fd3fdf2e4" } # awaiting release with new ntex-tls dependency

#pq-sys = { git = "https://github.com/sgrif/pq-sys", rev = "3e367d53019a2740054d5dc6946e07931f1fb70b" } # needed for windows only

#actix-web = { git = "https://github.com/ctron/actix-web", rev = "f3f41a0cc70e43564f8243b3ff425195566b5f16"} # FIXME: awaiting release 4.2.0
Expand Down
3 changes: 2 additions & 1 deletion endpoint-common/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ uuid = { version = "1", features = ["v4"] }
x509-parser = "0.14"

ntex = { version = "0.5", optional = true }
ntex-tls = { version = "0.1.6", optional = true }
tokio-openssl = { version = "0.6", optional = true }
tokio-rustls = { version = "0.23", optional = true }
tokio-dtls-stream-sink = { version = "0.5", optional = true }
Expand All @@ -56,4 +57,4 @@ package = "openssl"
[features]
default = ["rustls", "openssl"]
rustls = ["tokio-rustls"]
openssl = ["tokio-openssl", "ntex/openssl", "open-ssl", "tokio-dtls-stream-sink"]
openssl = ["tokio-openssl", "ntex-tls", "ntex/openssl", "open-ssl", "tokio-dtls-stream-sink"]
8 changes: 0 additions & 8 deletions endpoint-common/src/psk.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,11 +104,3 @@ pub struct VerifiedIdentity {
pub application: registry::v1::Application,
pub device: registry::v1::Device,
}

#[cfg(all(feature = "ntex", feature = "openssl"))]
impl PskIdentityRetriever for ntex::io::IoBoxed {
fn verified_identity(&self) -> Option<VerifiedIdentity> {
// TODO: Not supported by ntex yet
None
}
}
9 changes: 7 additions & 2 deletions mqtt-common/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,7 @@ where
pub trait TlsConfig {
fn is_disabled(&self) -> bool;
fn disable_client_certs(&self) -> bool;
fn disable_psk(&self) -> bool;

#[cfg(feature = "rustls")]
fn verifier_rustls(&self) -> std::sync::Arc<dyn rust_tls::server::ClientCertVerifier> {
Expand Down Expand Up @@ -287,13 +288,15 @@ where
})
}

pub fn build<Svc, S>(
pub fn build<Svc, F, S>(
opts: MqttServerOptions,
app: Svc,
config: &dyn TlsConfig,
psk_verifier: Option<F>,
) -> anyhow::Result<ServerBuilder>
where
Svc: Service<S> + Clone + Send + 'static,
F: Fn(Option<&[u8]>, &mut [u8]) -> Result<usize, std::io::Error> + Send + Sync + 'static,
S: Session + 'static,
{
log::info!("MQTT transport: {:?}", opts.transport);
Expand All @@ -307,14 +310,16 @@ where
config.disable_client_certs()
);

log::info!("PSK disabled: {}", config.disable_psk());

if cfg!(feature = "rustls") {
// with rustls
#[cfg(feature = "rustls")]
return build_rustls(opts, app, crate::tls::rustls_config(config)?);
} else if cfg!(feature = "openssl") {
// with openssl
#[cfg(feature = "openssl")]
return build_openssl(opts, app, crate::tls::openssl_config(config)?);
return build_openssl(opts, app, crate::tls::openssl_config(config, psk_verifier)?);
}

// no implementation available
Expand Down
36 changes: 35 additions & 1 deletion mqtt-common/src/tls.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,27 @@ pub use openssl::tls_config as openssl_config;
mod openssl {
use crate::server::TlsConfig;

// Mozilla intermediate v5 + PSK
const DEFAULT_CIPHERS: &[&str] = &[
"PSK",
"ECDHE-ECDSA-AES128-GCM-SHA256",
"ECDHE-RSA-AES128-GCM-SHA256",
"ECDHE-ECDSA-AES256-GCM-SHA384",
"ECDHE-RSA-AES256-GCM-SHA384",
"ECDHE-ECDSA-CHACHA20-POLY1305",
"ECDHE-RSA-CHACHA20-POLY1305",
"DHE-RSA-AES128-GCM-SHA256",
"DHE-RSA-AES256-GCM-SHA384",
];

/// Build a server config for openssl.
pub fn tls_config(config: &dyn TlsConfig) -> anyhow::Result<open_ssl::ssl::SslAcceptor> {
pub fn tls_config<F>(
config: &dyn TlsConfig,
psk_verifier: Option<F>,
) -> anyhow::Result<open_ssl::ssl::SslAcceptor>
where
F: Fn(Option<&[u8]>, &mut [u8]) -> Result<usize, std::io::Error> + Send + Sync + 'static,
{
let key = config
.key_file()
.ok_or_else(|| anyhow::anyhow!("TLS configuration error: Missing key file"))?;
Expand All @@ -83,6 +102,7 @@ mod openssl {
let mut builder = ssl::SslAcceptor::mozilla_intermediate_v5(method)?;
builder.set_private_key_file(key, ssl::SslFiletype::PEM)?;
builder.set_certificate_chain_file(cert)?;
builder.set_cipher_list(&DEFAULT_CIPHERS.join(","))?;

if !config.disable_client_certs() {
// we ask for client certificates, but don't enforce them
Expand All @@ -97,6 +117,20 @@ mod openssl {
});
}

if !config.disable_psk() {
if let Some(psk) = psk_verifier {
builder.set_psk_server_callback(move |_ssl, identity, secret_mut| {
match psk(identity, secret_mut) {
Ok(len) => Ok(len),
Err(e) => {
log::debug!("Error during TLS-PSK handshake: {:?}", e);
Ok(0)
}
}
});
}
}

Ok(builder.build())
}
}
3 changes: 2 additions & 1 deletion mqtt-endpoint/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,9 @@ http = "0.2.1"
humantime-serde = "1"
lazy_static = "1.4.0"
log = "0.4"
ntex = "0.5"
ntex = { version = "0.5", features = ["tokio"] }
ntex-mqtt = "0.8"
ntex-tls = "0.1.6"
ntex-rt = "0.4"
ntex-service = "0.3"
prometheus = { version = "^0.13", default-features = false }
Expand Down
7 changes: 7 additions & 0 deletions mqtt-endpoint/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,9 @@ pub struct Config {
#[serde(default)]
pub disable_tls: bool,

#[serde(default)]
pub disable_tls_psk: bool,

#[serde(default)]
pub disable_client_certificates: bool,

Expand Down Expand Up @@ -88,6 +91,10 @@ impl TlsConfig for Config {
self.disable_client_certificates
}

fn disable_psk(&self) -> bool {
self.disable_tls_psk
}

#[cfg(feature = "rustls")]
fn verifier_rustls(&self) -> std::sync::Arc<dyn rust_tls::server::ClientCertVerifier> {
// This seems dangerous, as we simply accept all client certificates. However,
Expand Down
47 changes: 46 additions & 1 deletion mqtt-endpoint/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,12 @@ mod config;
mod service;

pub use config::Config;
use drogue_cloud_service_api::auth::device::authn::{PreSharedKeyOutcome, PreSharedKeyResponse};

use crate::{auth::DeviceAuthenticator, service::App};
use drogue_cloud_endpoint_common::{
command::{Commands, KafkaCommandSource},
psk::Identity,
sender::DownstreamSender,
sink::KafkaSink,
};
Expand Down Expand Up @@ -56,9 +58,52 @@ pub async fn run(config: Config, startup: &mut dyn Startup) -> anyhow::Result<()
commands: commands.clone(),

states,
disable_psk: config.disable_tls_psk,
};

let srv = build(config.mqtt.clone(), app, &config)?.run();
let mut psk_verifier = None;
if !config.disable_tls_psk {
let auth = app.authenticator.clone();
/*
let (psk_req_tx, psk_req_rx) = ntex::channel::channel();
let (psk_res_tx, psk_res_rx) = ntex::channel::channel();*/
psk_verifier = Some(Box::new(
move |identity: Option<&[u8]>, secret_mut: &mut [u8]| {
let mut to_copy = 0;
if let Some(Ok(identity)) = identity.map(|s| core::str::from_utf8(s)) {
if let Ok(identity) = Identity::parse(identity) {
let auth = auth.clone();
let app = identity.application().to_string();
let device = identity.device().to_string();

// Block this thread waiting for a response.
let response = std::thread::spawn(move || {
// Run a temporary executor for this request
let runner = ntex::rt::System::new("ntex-blocking");
runner.block_on(async move { auth.request_psk(app, device).await })
})
.join();

if let Ok(Ok(PreSharedKeyResponse {
outcome:
PreSharedKeyOutcome::Found {
key,
app: _,
device: _,
},
})) = response
{
to_copy = std::cmp::min(key.key.len(), secret_mut.len());
secret_mut[..to_copy].copy_from_slice(&key.key[..to_copy]);
}
}
}
Ok(to_copy)
},
))
}

let srv = build(config.mqtt.clone(), app, &config, psk_verifier)?.run();

log::info!("Starting web server");

Expand Down
51 changes: 48 additions & 3 deletions mqtt-endpoint/src/service/app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use drogue_client::{
use drogue_cloud_endpoint_common::{
command::Commands,
error::EndpointError,
psk::{PskIdentityRetriever, VerifiedIdentity},
psk::{Identity, VerifiedIdentity},
sender::DownstreamSender,
x509::{ClientCertificateChain, ClientCertificateRetriever},
};
Expand All @@ -16,7 +16,9 @@ use drogue_cloud_mqtt_common::{
mqtt::{AckOptions, Connect, ConnectAck, Service, Sink},
};
use drogue_cloud_service_api::{
auth::device::authn::Outcome as AuthOutcome, services::device_state::LastWillTestament,
auth::device::authn::Outcome as AuthOutcome,
auth::device::authn::{PreSharedKeyOutcome, PreSharedKeyResponse},
services::device_state::LastWillTestament,
};
use drogue_cloud_service_common::state::{CreateOptions, CreationOutcome, StateController};
use std::fmt::Debug;
Expand All @@ -29,6 +31,7 @@ pub struct App {
pub authenticator: DeviceAuthenticator,
pub commands: Commands,
pub states: StateController,
pub disable_psk: bool,
}

impl App {
Expand Down Expand Up @@ -150,6 +153,28 @@ impl App {
},
}
}

async fn lookup_identity(&self, identity: &Identity) -> Option<VerifiedIdentity> {
if let Ok(PreSharedKeyResponse {
outcome:
PreSharedKeyOutcome::Found {
key: _,
app,
device,
},
}) = self
.authenticator
.request_psk(identity.application(), identity.device())
.await
{
Some(VerifiedIdentity {
application: app,
device,
})
} else {
None
}
}
}

#[async_trait(?Send)]
Expand All @@ -166,7 +191,27 @@ impl Service<Session> for App {
}

let certs = connect.io().client_certs();
let verified_identity = connect.io().verified_identity();
let verified_identity = if self.disable_psk {
None
} else {
use ntex_tls::PskIdentity;

let psk_identity = connect.io().query::<PskIdentity>();
let psk_identity = if let Some(psk_identity) = psk_identity.as_ref() {
core::str::from_utf8(&psk_identity.0[..])
.ok()
.map(|i| Identity::parse(i).ok())
.flatten()
} else {
None
};

if let Some(identity) = psk_identity {
self.lookup_identity(&identity).await
} else {
None
}
};
let (username, password) = connect.credentials();

match self
Expand Down
12 changes: 11 additions & 1 deletion mqtt-integration/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,10 @@ impl TlsConfig for Config {
self.disable_tls
}

fn disable_psk(&self) -> bool {
true
}

fn disable_client_certs(&self) -> bool {
self.disable_client_certificates
}
Expand Down Expand Up @@ -147,7 +151,13 @@ pub async fn run(config: Config, startup: &mut dyn Startup) -> anyhow::Result<()

// create server

let srv = build(config.mqtt.clone(), app, &app_config)?.run();
let srv = build(
config.mqtt.clone(),
app,
&app_config,
Some(Box::new(|_: Option<&[u8]>, _: &mut [u8]| Ok(0))),
)?
.run();

// run

Expand Down
3 changes: 2 additions & 1 deletion server/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -672,6 +672,7 @@ async fn cmd_run(matches: &ArgMatches) -> anyhow::Result<()> {
auth,
disable_tls: !(key_file.is_some() && cert_bundle_file.is_some()),
disable_client_certificates: false,
disable_tls_psk: false,
cert_bundle_file,
key_file,
instance: "drogue".to_string(),
Expand Down Expand Up @@ -924,7 +925,7 @@ async fn run(ctx: Context<'_>, server: ServerConfig, mut main: Main<'_>) -> anyh
println!();

println!("Creating a device:");
println!("\tdrg create device --application example-app device1 --spec '{{\"credentials\":{{\"credentials\":[{{\"pass\":\"hey-rodney\"}}]}}}}'");
println!("\tdrg create device --application example-app device1 --spec '{{\"authentication\":{{\"credentials\":[{{\"pass\":\"hey-rodney\"}}]}}}}'");
println!();

println!("Streaming telemetry data for an application:");
Expand Down

0 comments on commit 67db823

Please sign in to comment.