Skip to content

Commit

Permalink
feat(workers/base): deserialize for config
Browse files Browse the repository at this point in the history
  • Loading branch information
PhotonQuantum committed Jul 26, 2022
1 parent d9e9e87 commit 27373d8
Show file tree
Hide file tree
Showing 8 changed files with 179 additions and 44 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion workers/base/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ mongodb = "2.3"
rand = "0.8"
rustls = "0.20"
rustls-pemfile = "1.0"
serde = "1.0"
serde = { version = "1.0", features = ["derive"] }
serde_with = "2.0"
sg-core = { package = "core", path = "../../core" }
tap = "1.0"
Expand All @@ -34,6 +34,7 @@ webpki = "0.22"

[dev-dependencies]
arbitrary = { version = "1.1", features = ["derive"] }
figment = { version = "0.10", features = ["env", "test"] }
once_cell = "1.13"
pki = "0.1"
tokio = { version = "1.20", features = ["macros"] }
118 changes: 107 additions & 11 deletions workers/base/src/config.rs
Original file line number Diff line number Diff line change
@@ -1,33 +1,129 @@
//! Worker config.

use std::net::SocketAddr;

use serde::Deserialize;
use serde_with::formats::CommaSeparator;
use serde_with::{serde_as, DisplayFromStr, StringWithSeparator};
use tokio_tungstenite::tungstenite::http::Uri;

use crate::{Certificates, ID};
use crate::gossip::transport::certificate::deserialize as deserialize_certificates;
use crate::Certificates;

/// Configuration for worker nodes.
pub struct NodeConfig<A> {
/// A peer URL to announce to the rest of the cluster.
///
/// This is optional. If not set, the worker will be started at idle state.
///
/// # Deserialize Implementation
/// Note that all array fields are deserialized as comma-separated strings.
#[serde_as]
#[derive(Debug, Clone, Deserialize)]
pub struct NodeConfig {
/// A list of peer URL to announce to the rest of the cluster.
/// If empty, the node will start idle.
#[serde_as(as = "StringWithSeparator::<CommaSeparator, Uri>")]
pub announce: Vec<Uri>,
/// Socket address to bind to for gossip protocol.
pub bind: A,
#[serde_as(as = "StringWithSeparator::<CommaSeparator, SocketAddr>")]
pub bind: Vec<SocketAddr>,
/// URI of this node to announce to the rest of the cluster.
#[serde_as(as = "DisplayFromStr")]
pub base_uri: Uri,
/// Kind of this node.
pub kind: String,
/// TLS certificates to use for the gossip protocol.
#[serde(flatten)]
#[serde(deserialize_with = "deserialize_certificates")]
pub certificates: Certificates,
/// Identity of this node.
pub ident: ID,
/// MongoDB configuration.
#[serde(flatten)]
pub db: DBConfig,
}

/// Database configuration.
#[derive(Debug, Clone, Eq, PartialEq, Deserialize)]
pub struct DBConfig {
/// MongoDB connection URI.
pub uri: String,
pub mongo_uri: String,
/// Database name.
pub db: String,
pub mongo_db: String,
/// Collection name.
pub collection: String,
pub mongo_collection: String,
}

#[cfg(test)]
mod tests {
use std::net::SocketAddr;
use std::str;
use std::str::FromStr;

use figment::providers::Env;
use figment::{Figment, Jail};
use tokio_tungstenite::tungstenite::http::Uri;

use crate::gossip::tests::{ca, cert};
use crate::{DBConfig, NodeConfig};

#[test]
fn must_from_env() {
Jail::expect_with(|jail| {
let ca = ca();
let cert = cert(&ca, "charlie");
let ca_pem = ca.to_pkcs8().unwrap();
let cert_pem = cert.to_pkcs8().unwrap();

let _ca_file = jail
.create_file("ca.pem", str::from_utf8(&*ca_pem).unwrap())
.unwrap();
let _cert_file = jail
.create_file("cert.pem", str::from_utf8(&*cert_pem).unwrap())
.unwrap();

jail.set_env("CONF_ANNOUNCE", "http://alice:8080,http://bob:8080");
jail.set_env("CONF_BIND", "0.0.0.0:8080,[::]:8080");
jail.set_env("CONF_BASE_URI", "http://charlie:8080");
jail.set_env("CONF_KIND", "test");
jail.set_env("CONF_CA_FILE", "ca.pem");
jail.set_env("CONF_CERT_FILE", "cert.pem");
jail.set_env("CONF_MONGO_URI", "mongodb://localhost:27017");
jail.set_env("CONF_MONGO_DB", "stargazer-reborn");
jail.set_env("CONF_MONGO_COLLECTION", "tasks");

let config: NodeConfig = Figment::from(Env::prefixed("CONF_")).extract().unwrap();
let NodeConfig {
announce,
bind,
base_uri,
kind,
certificates,
db,
} = config;
assert_eq!(
announce,
vec![
Uri::from_str("http://alice:8080").unwrap(),
Uri::from_str("http://bob:8080").unwrap(),
]
);
assert_eq!(
bind,
vec![
SocketAddr::from_str("0.0.0.0:8080").unwrap(),
SocketAddr::from_str("[::]:8080").unwrap(),
]
);
assert_eq!(base_uri, Uri::from_str("http://charlie:8080").unwrap());
assert_eq!(kind, "test".to_string());
assert_eq!(
db,
DBConfig {
mongo_uri: "mongodb://localhost:27017".to_string(),
mongo_db: "stargazer-reborn".to_string(),
mongo_collection: "tasks".to_string(),
}
);
assert!(!certificates.root_certificates.is_empty());
assert!(!certificates.public_cert_chain.is_empty());
assert!(!certificates.private_key.0.is_empty());
Ok(())
});
}
}
2 changes: 1 addition & 1 deletion workers/base/src/gossip/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,5 +6,5 @@ pub mod ident;
pub mod resolver;
pub mod runtime;
#[cfg(test)]
mod tests;
pub mod tests;
pub mod transport;
5 changes: 3 additions & 2 deletions workers/base/src/gossip/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,10 @@ pub fn cert(ca: &KeyStore, hostname: &str) -> KeyStore {

pub fn certs(ca: &KeyStore, hostname: &str) -> Certificates {
Certificates::from_pem(
&CA.to_pkcs8().unwrap(),
&cert(ca, hostname).to_pkcs8().unwrap(),
&mut &*CA.to_pkcs8().unwrap(),
&mut &*cert(ca, hostname).to_pkcs8().unwrap(),
)
.unwrap()
}

#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
Expand Down
70 changes: 52 additions & 18 deletions workers/base/src/gossip/transport/certificate.rs
Original file line number Diff line number Diff line change
@@ -1,64 +1,78 @@
//! Certificate related types that supports the secured `WebSocket` transport.
use std::iter;
use std::fs::File;
use std::io;
use std::io::BufReader;
use std::path::PathBuf;
use std::sync::Arc;

use eyre::{bail, eyre, Result, WrapErr};
use rustls::server::AllowAnyAuthenticatedClient;
use rustls::{Certificate, ClientConfig, PrivateKey, RootCertStore, ServerConfig};
use rustls_pemfile::Item;
use serde::de::Error;
use serde::{Deserialize, Deserializer};
use tokio_rustls::{TlsAcceptor, TlsConnector};
use tracing::{debug, warn};

/// Certificates used by a client or a server.
#[derive(Clone)]
#[derive(Debug, Clone)]
pub struct Certificates {
/// Trusted root certificates.
root_certificates: RootCertStore,
pub(crate) root_certificates: RootCertStore,
/// Public certificate chain.
public_cert_chain: Vec<Certificate>,
pub(crate) public_cert_chain: Vec<Certificate>,
/// Private key.
private_key: PrivateKey,
pub(crate) private_key: PrivateKey,
}

impl Certificates {
/// Create a new `Certificates` instance with given pem files.
/// Create a new `Certificates` instance with given PEM files.
///
/// `root`: CA certificate in PEM format.
/// `cert`: public certificate and private key in PEM format.
#[must_use]
#[allow(clippy::missing_panics_doc, clippy::cognitive_complexity)]
pub fn from_pem(mut root: &[u8], mut cert: &[u8]) -> Self {
///
/// # Errors
/// Returns error if no certificate or key is found in given `cert` file.
#[allow(clippy::cognitive_complexity)]
pub fn from_pem(root: &mut impl io::BufRead, cert: &mut impl io::BufRead) -> Result<Self> {
let mut root_certs = vec![];
for section in
iter::from_fn(|| rustls_pemfile::read_one(&mut root).expect("CFG: Corrupt PEM file"))
while let Some(section) =
rustls_pemfile::read_one(root).wrap_err("Corrupt root PEM file.")?
{
if let Item::X509Certificate(cert) = section {
root_certs.push(cert);
} else {
warn!("Section not handled in given pem file.");
warn!("Section not handled in given PEM file.");
}
}

let mut public_cert_chain = vec![];
let mut private_key = None;
for section in
iter::from_fn(|| rustls_pemfile::read_one(&mut cert).expect("CFG: Corrupt PEM file"))
while let Some(section) =
rustls_pemfile::read_one(cert).wrap_err("Corrupt cert PEM file.")?
{
match section {
Item::X509Certificate(cert) => public_cert_chain.push(Certificate(cert)),
Item::PKCS8Key(key) => private_key = Some(PrivateKey(key)),
_ => warn!("Section not handled in given pem file."),
_ => warn!("Section not handled in given PEM file."),
}
}

let mut root_certificates = RootCertStore::empty();
let (succ, _) = root_certificates.add_parsable_certificates(&root_certs);
debug!("{} root certificates added", succ);

Self {
if public_cert_chain.is_empty() {
bail!("No public certificate found in given PEM file.");
}
let private_key =
private_key.ok_or_else(|| eyre!("No private key found in given PEM file."))?;

Ok(Self {
root_certificates,
public_cert_chain,
private_key: private_key.expect("CFG: missing private key"),
}
private_key,
})
}
/// Return a TLS acceptor configured with the given certificates.
///
Expand All @@ -85,3 +99,23 @@ impl Certificates {
TlsConnector::from(Arc::new(client_config))
}
}

/// Helper struct for deserializing a certificate from PEM files.
#[derive(Debug, Deserialize)]
struct CertificatesFromFile {
/// Path to the client server TLS CA PEM file.
ca_file: PathBuf,
/// Path to the client server TLS certificate & key PEM file.
cert_file: PathBuf,
}

/// Helper function for deserializing a certificate from PEM files.
pub fn deserialize<'de, D>(de: D) -> Result<Certificates, D::Error>
where
D: Deserializer<'de>,
{
let cert_from_file = CertificatesFromFile::deserialize(de)?;
let mut ca = BufReader::new(File::open(cert_from_file.ca_file).map_err(D::Error::custom)?);
let mut cert = BufReader::new(File::open(cert_from_file.cert_file).map_err(D::Error::custom)?);
Certificates::from_pem(&mut ca, &mut cert).map_err(D::Error::custom)
}
2 changes: 1 addition & 1 deletion workers/base/src/gossip/transport/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use tokio_tungstenite::WebSocketStream;
pub use certificate::Certificates;
pub use websocket::ws_transport;

mod certificate;
pub mod certificate;
#[cfg(test)]
mod tests;
mod websocket;
Expand Down
22 changes: 12 additions & 10 deletions workers/base/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use std::collections::HashMap;

use eyre::Result;
use futures::{pin_mut, stream, Stream, TryStreamExt};
use tokio::net::{TcpListener, ToSocketAddrs};
use tokio::net::TcpListener;
use tokio_tungstenite::tungstenite::http::Uri;
use uuid::Uuid;

Expand All @@ -24,12 +24,9 @@ use crate::ring::{Migrated, Ring};
///
/// # Errors
/// Returns error if failed to bind to the given address, or initial connection to database failed.
pub async fn start_worker<A: ToSocketAddrs + Send>(
worker: impl Worker,
config: NodeConfig<A>,
) -> Result<()> {
pub async fn start_worker(worker: impl Worker, config: NodeConfig) -> Result<()> {
// Bind to the configured address and start transport layer.
let listener = TcpListener::bind(config.bind).await?;
let listener = TcpListener::bind(&*config.bind).await?;
let (stream, sink) = ws_transport(
listener,
config.certificates,
Expand All @@ -39,15 +36,20 @@ pub async fn start_worker<A: ToSocketAddrs + Send>(
.await;

// Start the Foca runtime.
let kind = config.ident.kind().to_string();
let foca = start_foca(config.ident, stream, sink, None);
let ident = ID::new(config.base_uri.clone(), config.kind.clone());
let foca = start_foca(ident, stream, sink, None);
for announce_peer in config.announce {
foca.announce(ID::new(announce_peer, kind.clone()));
foca.announce(ID::new(announce_peer, config.kind.clone()));
}

// Prepare change stream.
let foca_stream = foca_events(&foca).await;
let db_stream = db_events(&config.db.uri, &config.db.db, &config.db.collection).await?;
let db_stream = db_events(
&config.db.mongo_uri,
&config.db.mongo_db,
&config.db.mongo_collection,
)
.await?;
let event_stream = stream::select(foca_stream, db_stream);
pin_mut!(event_stream);

Expand Down

0 comments on commit 27373d8

Please sign in to comment.