diff --git a/README.md b/README.md index 04cf839a..712571a0 100644 --- a/README.md +++ b/README.md @@ -561,6 +561,33 @@ Next, send the CSR to the central CA's administrator for signing and enrolling t Both the Broker and the Proxy respect the log level in the `RUST_LOG` environment variable. E.g., `RUST_LOG=debug` enables debug outputs. Warning: the `trace` log level is *very* noisy. +## Restricting accesses +We have a black/whitelisting option that can be used to restrict traeffic to and from a given proxy. The lists contains a json array of app or proxy ids (see [system architecture](#system-architecture)). Setting a proxy id will permit or allow, dpending on the kind of list, all apps from the given proxy while the app id will only do that for the specific app. + +Restrinction logic will follow these rules: +- Only a whitelist is set -> Traeffic is permitted according to the whitelist +- Only a blacklist is set -> Traeffic is denied according to the blacklist +- Neither white nor blacklist -> All traeffic is permitted +- A white and blacklist is set -> The whitelist will be used as an **exception** to the blacklist + +> Note: All config options may also be set via environment variables matching the cli argument name in uppercase with `-` repplaced by `_`. +### Deny receiving messages from other proxies +This is done by setting `allowed-remotes` and/or `blocked-remotes` options accordingly as described in [Restricting access](#restricting-accesses). + +Example: +```bash +./proxy ... --allowed-remotes='["app1.proxy1", "proxy2"]' +``` + +### Deny sending messages to specific proxies +This is done by setting `allowed-receivers` and/or `blocked-receivers` options accordingly as described in [Restricting access](#restricting-accesses). +> Note: When a receiver of a message is being blocked due to these restrictions the proxy will return a status code of 422 Unprocessable Entity and a json list containing the offending entries. + +Example: +```bash +./proxy ... --allowed-receivers='["app1.proxy1", "proxy2"]' +``` + ## Technical Background Information ### End-to-End Encryption diff --git a/dev/docker-compose.yml b/dev/docker-compose.yml index 704ed365..00c30b6c 100644 --- a/dev/docker-compose.yml +++ b/dev/docker-compose.yml @@ -61,6 +61,11 @@ services: BIND_ADDR: 0.0.0.0:8081 RUST_LOG: ${RUST_LOG} ALL_PROXY: http://mitmproxy:8080 + ALLOWED_REMOTES: '["app1.proxy1", "proxy1"]' + BLOCKED_REMOTES: '["app1.proxy1", "proxy1"]' + # does not exist only used for testing + BLOCKED_RECEIVERS: '["proxy3"]' + ALLOWED_RECEIVERS: '["app2.proxy3"]' secrets: - proxy1.pem - root.crt.pem diff --git a/proxy/src/serve_sockets.rs b/proxy/src/serve_sockets.rs index 41b25104..592a2413 100644 --- a/proxy/src/serve_sockets.rs +++ b/proxy/src/serve_sockets.rs @@ -78,19 +78,23 @@ async fn get_tasks( state: State, Extension(task_secret_map): Extension, req: Request -) -> Result>>, StatusCode> { - let mut res = forward_request(req, &state.config, &sender, &state.client).await.map_err(|e| e.0)?; +) -> Result>>, Response> { + let mut res = forward_request(req, &state.config, &sender, &state.client).await?; if res.status() != StatusCode::OK { - return Err(res.status()); + return Err(res.into_response()); } - let body = hyper::body::to_bytes(res.body_mut()).await.map_err(|_| StatusCode::BAD_GATEWAY)?; - let enc_json = serde_json::from_slice(&body).map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?; - let plain_json = to_server_error(validate_and_decrypt(enc_json).await).map_err(|e| e.0)?; - let tasks: Vec> = serde_json::from_value(plain_json).map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?; + let body = hyper::body::to_bytes(res.body_mut()).await.map_err(|_| StatusCode::BAD_GATEWAY.into_response())?; + let enc_json = serde_json::from_slice(&body).map_err(|_| StatusCode::INTERNAL_SERVER_ERROR.into_response())?; + let plain_json = to_server_error(validate_and_decrypt(enc_json).await).map_err(IntoResponse::into_response)?; + let tasks: Vec> = serde_json::from_value(plain_json).map_err(|_| StatusCode::INTERNAL_SERVER_ERROR.into_response())?; let mut out = Vec::with_capacity(tasks.len()); for task in tasks { if let MessageType::MsgSocketRequest(mut socket_task) = task { - let key = serde_json::from_value(Value::String(socket_task.secret.body.as_ref().ok_or(StatusCode::INTERNAL_SERVER_ERROR)?.to_string())).map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?; + let key = serde_json::from_value(Value::String(socket_task.secret.body + .as_ref() + .ok_or(StatusCode::INTERNAL_SERVER_ERROR.into_response())? + .to_string() + )).map_err(|_| StatusCode::INTERNAL_SERVER_ERROR.into_response())?; let Ok(ttl) = socket_task.expire.duration_since(SystemTime::now()) else { continue; }; @@ -98,7 +102,7 @@ async fn get_tasks( socket_task.secret.body = None; out.push(socket_task); } else { - return Err(StatusCode::INTERNAL_SERVER_ERROR); + return Err(StatusCode::INTERNAL_SERVER_ERROR.into_response()); } } diff --git a/proxy/src/serve_tasks.rs b/proxy/src/serve_tasks.rs index 91005894..58273e01 100644 --- a/proxy/src/serve_tasks.rs +++ b/proxy/src/serve_tasks.rs @@ -10,7 +10,7 @@ use axum::{ http::{request::Parts, HeaderValue}, response::{sse::Event, IntoResponse, Response, Sse}, routing::{any, get, put}, - Router, + Router, Json, }; use futures::{ stream::{StreamExt, TryStreamExt}, @@ -90,7 +90,7 @@ pub(crate) async fn forward_request( config: &config_proxy::Config, sender: &AppId, client: &SamplyHttpClient, -) -> Result, (StatusCode, &'static str)> { +) -> Result, Response> { // Create uri to contact broker let path = req.uri().path(); let path_query = req @@ -100,7 +100,7 @@ pub(crate) async fn forward_request( .unwrap_or(path); let target_uri = Uri::try_from(config.broker_uri.to_string() + path_query.trim_start_matches('/')) - .map_err(|_| (StatusCode::BAD_REQUEST, "Invalid path queried."))?; + .map_err(|_| (StatusCode::BAD_REQUEST, "Invalid path queried.").into_response())?; *req.uri_mut() = target_uri; req.headers_mut().append( @@ -108,7 +108,7 @@ pub(crate) async fn forward_request( HeaderValue::from_static(env!("SAMPLY_USER_AGENT")), ); let (encrypted_msg, parts) = encrypt_request(req, &sender).await?; - let req = sign_request(encrypted_msg, parts, &config, None).await?; + let req = sign_request(encrypted_msg, parts, &config, None).await.map_err(IntoResponse::into_response)?; trace!("Requesting: {:?}", req); let resp = client.request(req).await.map_err(|e| { if is_actually_hyper_timeout(&e) { @@ -117,7 +117,7 @@ pub(crate) async fn forward_request( } else { warn!("Request to broker failed: {}", e.to_string()); (StatusCode::BAD_GATEWAY, "Upstream error; see server logs.") - } + }.into_response() })?; Ok(resp) } @@ -128,7 +128,7 @@ pub(crate) async fn handler_task( AuthenticatedApp(sender): AuthenticatedApp, headers: HeaderMap, req: Request, -) -> Result { +) -> Response { let found = &headers .get(header::ACCEPT) .unwrap_or(&HeaderValue::from_static("")) @@ -139,18 +139,15 @@ pub(crate) async fn handler_task( .find(|part| *part == "text/event-stream") .is_some(); - let result = if *found { + if *found { handler_tasks_stream(client, config, sender, req) - .await? + .await .into_response() } else { handler_tasks_nostream(client, config, sender, req) .await - .map_err(|e| (e.0, e.1.to_string()))? .into_response() - }; - - return Ok(result); + } } async fn handler_tasks_nostream( @@ -158,7 +155,7 @@ async fn handler_tasks_nostream( config: config_proxy::Config, sender: AppId, req: Request, -) -> Result, (StatusCode, &'static str)> { +) -> Result, Response> { // Validate Query, forward to server, get response. let resp = forward_request(req, &config, &sender, &client).await?; @@ -168,7 +165,7 @@ async fn handler_tasks_nostream( let (mut parts, body) = resp.into_parts(); let mut bytes = body::to_bytes(body).await.map_err(|e| { error!("Error receiving reply from the broker: {}", e); - ERR_UPSTREAM + ERR_UPSTREAM.into_response() })?; // TODO: Always return application/jwt from server. @@ -210,12 +207,10 @@ async fn handler_tasks_stream( config: config_proxy::Config, sender: AppId, req: Request, -) -> Result>>, (StatusCode, String)> { +) -> Result>>, Response> { // Validate Query, forward to server, get response. - let mut resp = forward_request(req, &config, &sender, &client) - .await - .map_err(|err| (err.0, err.1.into()))?; + let mut resp = forward_request(req, &config, &sender, &client).await?; let code = resp.status(); if !code.is_success() { @@ -224,7 +219,7 @@ async fn handler_tasks_stream( .and_then(|v| String::from_utf8(v.into()).ok()) .unwrap_or("(unable to parse reply)".into()); warn!("Got unexpected response code from server: {code}. Returning error message as-is: \"{error_msg}\""); - return Err((code, error_msg)); + return Err((code, error_msg).into_response()); } let outgoing = async_stream::stream! { @@ -299,6 +294,10 @@ async fn handler_tasks_stream( }; let json = match validate_and_decrypt(json).await { Ok(json) => json, + Err(SamplyBeamError::DisallowedReceiver(rec)) => { + info!("Did not send result from {rec} as it was forbidden by the config"); + continue; + }, Err(err) => { warn!("Got an error decrypting Broker's reply: {err}"); continue; @@ -325,7 +324,7 @@ async fn handler_tasks_stream( Ok(sse) } -pub(crate) fn to_server_error(res: Result) -> Result { +pub(crate) fn to_server_error(res: Result) -> Result { res.map_err(|e| match e { SamplyBeamError::JsonParseError(e) => { warn!("{e}"); @@ -340,7 +339,7 @@ pub(crate) fn to_server_error(res: Result) -> Result Result { + debug!("Filtered receiving message from {app} as it was blocked due to proxy permissions"); + }, + other => results.push(other?) + } } Ok(Value::Array(results)) } else if json.is_object() { @@ -431,7 +436,13 @@ pub(crate) async fn validate_and_decrypt(json: Value) -> Result::verify(&signed.jwt) .await? .msg; - Ok(serde_json::to_value(decrypt_msg(msg)?).expect("Should serialize fine")) + let decrypted = decrypt_msg(msg)?; + + if CONFIG_PROXY.permission_manager.allowed_to_recieve(decrypted.get_from()) { + Ok(serde_json::to_value(decrypted).expect("Should serialize fine")) + } else { + Err(SamplyBeamError::DisallowedReceiver(decrypted.get_from().clone())) + } } Err(e) => Err(SamplyBeamError::JsonParseError(format!( "Failed to parse broker response as a signed encrypted message. Err is {e}" @@ -454,11 +465,11 @@ fn decrypt_msg(msg: M) -> Result async fn encrypt_request( req: Request, sender: &AppId, -) -> Result<(EncryptedMessage, Parts), (StatusCode, &'static str)> { +) -> Result<(EncryptedMessage, Parts), Response> { let (parts, body) = req.into_parts(); let body = body::to_bytes(body).await.map_err(|e| { warn!("Unable to read message body: {e}"); - ERR_BODY + ERR_BODY.into_response() })?; let msg = if body.is_empty() { @@ -467,9 +478,14 @@ async fn encrypt_request( from: sender.into(), }) } else { - match serde_json::from_slice(&body) { + match serde_json::from_slice::(&body) { Ok(val) => { debug!("Body is valid json"); + let filtered = val.get_to().iter().filter(|app| !CONFIG_PROXY.permission_manager.allowed_to_send(app)).cloned().collect::>(); + if !filtered.is_empty() { + return Err((StatusCode::UNPROCESSABLE_ENTITY, Json(filtered)).into_response()); + } + val } Err(e) => { @@ -478,17 +494,24 @@ async fn encrypt_request( e, std::str::from_utf8(&body).unwrap_or("(not valid UTF-8)") ); - return Err(ERR_BODY); + return Err(ERR_BODY.into_response()); } } }; // Sanity/security checks: From address sane? if msg.get_from() != sender { - return Err(ERR_FAKED_FROM); + return Err(ERR_FAKED_FROM.into_response()); } let body = encrypt_msg(msg).await.map_err(|e| { - warn!("Encryption failed with: {e}"); - ERR_INTERNALCRYPTO + match e { + SamplyBeamError::InvalidReceivers(proxies) => { + (StatusCode::FAILED_DEPENDENCY, Json(proxies)).into_response() + } + e => { + warn!("Encryption failed with: {e}"); + ERR_INTERNALCRYPTO.into_response() + } + } })?; Ok((body, parts)) } diff --git a/shared/src/beam_id.rs b/shared/src/beam_id.rs index 2b419cb8..0b7a0bfe 100644 --- a/shared/src/beam_id.rs +++ b/shared/src/beam_id.rs @@ -6,7 +6,7 @@ use serde::{de::Visitor, Deserialize, Serialize}; use crate::{config, errors::SamplyBeamError}; -static BROKER_ID: OnceCell = OnceCell::new(); +pub(crate) static BROKER_ID: OnceCell = OnceCell::new(); #[derive(PartialEq, Debug)] pub enum BeamIdType { @@ -379,13 +379,19 @@ impl<'de> Visitor<'de> for AppOrProxyIdVisitor { where E: serde::de::Error, { - let t = AppId::str_has_type(v) - .map_err(|e| serde::de::Error::custom(format!("Invalid Beam ID \"{v}\": {e}")))?; - match t { + v.parse().map_err(serde::de::Error::custom) + } +} + +impl FromStr for AppOrProxyId { + type Err = SamplyBeamError; + + fn from_str(v: &str) -> Result { + match AppId::str_has_type(v)? { BeamIdType::AppId => Ok(AppOrProxyId::AppId(AppId::new(v).unwrap())), BeamIdType::ProxyId => Ok(AppOrProxyId::ProxyId(ProxyId::new(v).unwrap())), - BeamIdType::BrokerId => Err(serde::de::Error::custom( - "Expected AppOrProxyId, got BrokerId.", + BeamIdType::BrokerId => Err(SamplyBeamError::InvalidBeamId( + "Expected AppOrProxyId, got BrokerId.".to_string() )), } } diff --git a/shared/src/config_proxy.rs b/shared/src/config_proxy.rs index 6fbfbbf3..d2a43344 100644 --- a/shared/src/config_proxy.rs +++ b/shared/src/config_proxy.rs @@ -17,7 +17,7 @@ use serde::Deserialize; use tracing::{debug, info, warn}; use crate::{ - beam_id::{self, AppId, BeamId, BrokerId, ProxyId}, + beam_id::{self, AppId, BeamId, BrokerId, ProxyId, AppOrProxyId, BROKER_ID}, errors::SamplyBeamError, }; @@ -29,6 +29,7 @@ pub struct Config { pub proxy_id: ProxyId, pub api_keys: HashMap, pub tls_ca_certificates: Vec, + pub permission_manager: PermissionManager, } pub type ApiKey = String; @@ -64,6 +65,22 @@ pub struct CliArgs { /// samply.pki: Path to CA Root certificate #[clap(long, env, value_parser, default_value = "/run/secrets/root.crt.pem")] rootcert_file: PathBuf, + + /// A whitelist of apps or proxies that messages may be sent to, e.g. ["app1.proxy1.broker", "proxy2.broker", ...] + #[clap(long, env, value_parser)] + pub allowed_receivers: Option, + + /// A blacklist of apps or proxies that messages may not be sent to, e.g. ["app1.proxy1.broker", "proxy2.broker", ...] + #[clap(long, env, value_parser)] + pub blocked_receivers: Option, + + /// A whitelist of apps or proxies that may send messages to this proxy, e.g. ["app1.proxy1.broker", "proxy2.broker", ...] + #[clap(long, env, value_parser)] + pub allowed_remotes: Option, + + /// A blacklist of apps or proxies that may not send messages to this proxy, e.g. ["app1.proxy1.broker", "proxy2.broker", ...] + #[clap(long, env, value_parser)] + pub blocked_remotes: Option, /// (included for technical reasons) #[clap(long, hide(true))] @@ -97,6 +114,40 @@ fn parse_apikeys(proxy_id: &ProxyId) -> Result, SamplyBea Ok(api_keys) } +#[derive(Debug, Clone)] +pub struct PermissionManager { + pub recv_allow_list: Option>, + pub recv_block_list: Option>, + pub send_allow_list: Option>, + pub send_block_list: Option>, +} + +impl PermissionManager { + pub fn allowed_to_recieve(&self, from: &AppOrProxyId) -> bool { + Self::check_permissions_with(&self.recv_allow_list,&self.recv_block_list, from) + } + + pub fn allowed_to_send(&self, to: &AppOrProxyId) -> bool { + Self::check_permissions_with(&self.send_allow_list, &self.send_block_list, to) + } + + fn check_permissions_with(allow: &Option>, deny: &Option>, beam_id: &AppOrProxyId) -> bool { + match (allow, deny) { + (None, None) => true, + (None, Some(block_list)) => !Self::contains(&block_list, beam_id), + (Some(allow_list), None) => Self::contains(&allow_list, beam_id), + (Some(allow_list), Some(block_list)) => !Self::contains(&block_list, beam_id) || Self::contains(&allow_list, beam_id) + } + } + + fn contains(ids: &Vec, needle: &AppOrProxyId) -> bool { + ids.iter().find(|id| match id { + AppOrProxyId::AppId(app) => needle == app, + AppOrProxyId::ProxyId(proxy) => proxy == &needle.get_proxy_id(), + }).is_some() + } +} + impl crate::config::Config for Config { fn load() -> Result { let cli_args = CliArgs::parse(); @@ -120,6 +171,7 @@ impl crate::config::Config for Config { e )) })?; + let config = Config { broker_host_header: uri_to_host_header(&cli_args.broker_url)?, broker_uri: cli_args.broker_url, @@ -127,12 +179,40 @@ impl crate::config::Config for Config { proxy_id, api_keys, tls_ca_certificates, + permission_manager: PermissionManager { + recv_allow_list: parse_to_list_of_ids(cli_args.allowed_remotes)?, + recv_block_list: parse_to_list_of_ids(cli_args.blocked_remotes)?, + send_allow_list: parse_to_list_of_ids(cli_args.allowed_receivers)?, + send_block_list: parse_to_list_of_ids(cli_args.blocked_receivers)? + } }; info!("Successfully read config and API keys from CLI and secrets file."); Ok(config) } } +fn parse_to_list_of_ids(input: Option) -> Result>, SamplyBeamError> { + let broker_id = BROKER_ID.get().expect("Should be set before parsing beam ids"); + if let Some(app_list_str) = input { + let Ok(strings): Result, _> = serde_json::from_str(&app_list_str) else { + return Err(SamplyBeamError::ConfigurationFailed(format!("Failed to parse {app_list_str} as a json array."))); + }; + Ok(Some(strings.into_iter() + .map(|mut id| { + id.push('.'); + id.push_str(broker_id); + id.parse() + }) + .collect::>() + .map_err(|e| SamplyBeamError::ConfigurationFailed( + format!("Failed to parse {app_list_str} to a list of beam ids: {e}") + ))? + )) + } else { + Ok(None) + } +} + fn uri_to_host_header(uri: &Uri) -> Result { let hostname: String = uri .host() @@ -170,4 +250,65 @@ mod tests { let parsed = parse_apikeys(&ProxyId::new(&format!("proxy.{BROKER_ID}")).unwrap()).unwrap(); assert_eq!(parsed.len(), apps.len() * 2); } + + #[test] + fn test_parse_app_list() { + const BROKER_ID: &str = "broker.samply.de"; + BrokerId::set_broker_id(BROKER_ID.to_string()); + assert_eq!( + parse_to_list_of_ids(Some(r#"["app1.proxy1", "proxy1"]"#.to_string())).unwrap(), + Some(vec![ + AppOrProxyId::new(&format!("app1.proxy1.{BROKER_ID}")).unwrap(), + AppOrProxyId::new(&format!("proxy1.{BROKER_ID}")).unwrap() + ]) + ); + assert_eq!(parse_to_list_of_ids(None).unwrap(), None); + } + + #[test] + fn test_contains() { + const BROKER_ID: &str = "broker.samply.de"; + BrokerId::set_broker_id(BROKER_ID.to_string()); + let app_ids = vec![AppOrProxyId::new(&format!("app1.proxy1.{BROKER_ID}")).unwrap()]; + let proxy_id = vec![AppOrProxyId::new(&format!("proxy1.{BROKER_ID}")).unwrap()]; + assert!(PermissionManager::contains(&app_ids, &AppOrProxyId::new(&format!("app1.proxy1.{BROKER_ID}")).unwrap())); + assert!(!PermissionManager::contains(&app_ids, &AppOrProxyId::new(&format!("proxy1.{BROKER_ID}")).unwrap())); + assert!(PermissionManager::contains(&proxy_id, &AppOrProxyId::new(&format!("app2.proxy1.{BROKER_ID}")).unwrap())); + assert!(PermissionManager::contains(&proxy_id, &AppOrProxyId::new(&format!("proxy1.{BROKER_ID}")).unwrap())); + assert!(!PermissionManager::contains(&proxy_id, &AppOrProxyId::new(&format!("proxy2.{BROKER_ID}")).unwrap())); + } + + #[test] + fn test_check_permissions_with() { + const BROKER_ID: &str = "broker.samply.de"; + BrokerId::set_broker_id(BROKER_ID.to_string()); + let proxy1 = AppOrProxyId::new(&format!("proxy1.{BROKER_ID}")).unwrap(); + let app_id1 = AppOrProxyId::new(&format!("app1.proxy1.{BROKER_ID}")).unwrap(); + let app_id2 = AppOrProxyId::new(&format!("app2.proxy1.{BROKER_ID}")).unwrap(); + + // Both allow and deny lists empty + assert!(PermissionManager::check_permissions_with(&None, &None, &app_id1)); + + // Deny list empty, allow list contains ID + let allow_list = vec![app_id1.clone()]; + assert!(PermissionManager::check_permissions_with(&Some(allow_list.clone()), &None, &app_id1)); + assert!(!PermissionManager::check_permissions_with(&Some(allow_list), &None, &app_id2)); + + // Deny list contains ID, allow list empty + let block_list = vec![app_id1.clone()]; + assert!(!PermissionManager::check_permissions_with(&None, &Some(block_list.clone()), &app_id1)); + assert!(PermissionManager::check_permissions_with(&None, &Some(block_list), &app_id2)); + + // Both lists contain ID + let allow_list = vec![app_id1.clone()]; + let block_list = vec![app_id1.clone(), app_id2.clone()]; + assert!(PermissionManager::check_permissions_with(&Some(allow_list.clone()), &Some(block_list.clone()), &app_id1)); + assert!(!PermissionManager::check_permissions_with(&Some(allow_list), &Some(block_list), &app_id2)); + + // Both lists with proxy + let allow_list = vec![app_id1.clone()]; + assert!(PermissionManager::check_permissions_with(&Some(allow_list.clone()), &Some(vec![proxy1.clone()]), &app_id1)); + assert!(!PermissionManager::check_permissions_with(&Some(allow_list), &Some(vec![proxy1]), &app_id2)); + } + } diff --git a/shared/src/crypto.rs b/shared/src/crypto.rs index 30e0c89c..027d2b6f 100644 --- a/shared/src/crypto.rs +++ b/shared/src/crypto.rs @@ -213,7 +213,7 @@ impl CertificateCache { pub async fn get_all_certs_by_cname(cname: &ProxyId) -> Vec { // TODO: What if multiple certs are found? let mut result = get_all_certs_from_cache_by_cname(cname).await; // Drop Read Locks - if result.is_empty() { + if result.iter().filter(|cert| matches!(cert, CertificateCacheEntry::Valid(_))).count() == 0 { // requires write lock. Self::update_certificates().await.unwrap_or_else(|e| { @@ -596,20 +596,22 @@ pub async fn get_cert_and_client_by_serial_as_pemstr( } pub async fn get_newest_certs_for_cnames_as_pemstr( - cnames: impl IntoIterator, -) -> Option> { - let mut result: Vec = Vec::new(); // No fancy map/iter, bc of async + cnames: Vec, +) -> Vec> { + let mut result = Vec::with_capacity(cnames.len()); // No fancy map/iter, bc of async for id in cnames { - let certs = get_all_certs_and_clients_by_cname_as_pemstr(id) + let certs = get_all_certs_and_clients_by_cname_as_pemstr(&id) .await .into_iter() .flatten() .collect(); if let Some(best_candidate) = get_best_other_certificate(&certs) { - result.push(best_candidate); + result.push(Ok(best_candidate)); + } else { + result.push(Err(id)) } } - (!result.is_empty()).then_some(result) + result } fn extract_x509(cert: &X509) -> Result { @@ -861,18 +863,19 @@ pub async fn get_proxy_public_keys( }) .collect(); let receivers_crypto_bundle = - crypto::get_newest_certs_for_cnames_as_pemstr(proxy_receivers.iter()).await; - let receivers_keys = match receivers_crypto_bundle { - Some(vec) => vec - .iter() - .map(|crypt_publ| { - rsa::RsaPublicKey::from_public_key_pem(&crypt_publ.pubkey) - .expect("Cannot collect recipients' public keys") - }) - .collect::>(), // TODO Expect - None => Vec::new(), - }; - Ok(receivers_keys) + crypto::get_newest_certs_for_cnames_as_pemstr(proxy_receivers).await; + let (receivers_keys, proxies_with_invalid_certs): (Vec<_>, Vec<_>) = receivers_crypto_bundle + .into_iter() + .map(|crypt_publ_res| { + crypt_publ_res.and_then(|crypto| + rsa::RsaPublicKey::from_public_key_pem(&crypto.pubkey).map_err(|_| crypto.beam_id)) + }) + .partition_result(); + if proxies_with_invalid_certs.is_empty() { + Ok(receivers_keys) + } else { + Err(SamplyBeamError::InvalidReceivers(proxies_with_invalid_certs)) + } } #[tokio::test] diff --git a/shared/src/errors.rs b/shared/src/errors.rs index d4ec0a23..8bcb550b 100644 --- a/shared/src/errors.rs +++ b/shared/src/errors.rs @@ -4,6 +4,8 @@ use http::StatusCode; use openssl::error::ErrorStack; use tokio::time::error::Elapsed; +use crate::beam_id::{ProxyId, AppOrProxyId}; + #[derive(thiserror::Error, Debug)] pub enum SamplyBeamError { #[error("Invalid bind address supplied: {0}")] @@ -50,6 +52,10 @@ pub enum SamplyBeamError { CertificateError(#[from] CertificateInvalidReason), #[error("Timeout executing HTTP request: {0}")] HttpTimeoutError(Elapsed), + #[error("Invalid receivers: {0:?}")] + InvalidReceivers(Vec), + #[error("Not allowed to receive messge from: {0}")] + DisallowedReceiver(AppOrProxyId), } impl From for SamplyBeamError { diff --git a/shared/src/lib.rs b/shared/src/lib.rs index b7ea4471..2d62f8ae 100644 --- a/shared/src/lib.rs +++ b/shared/src/lib.rs @@ -381,7 +381,7 @@ pub trait EncryptableMsg: Msg + Serialize + Sized { let nonce = XChaCha20Poly1305::generate_nonce(&mut rng); // Encrypt symmetric key with receivers' public keys - let (encrypted_keys, err): (Vec<_>, Vec<_>) = receivers_public_keys + let Ok(encrypted_keys) = receivers_public_keys .iter() .map(|key| { key.encrypt( @@ -390,12 +390,12 @@ pub trait EncryptableMsg: Msg + Serialize + Sized { symmetric_key.as_slice(), ) }) - .partition_result(); - if !err.is_empty() { + .collect() + else { return Err(SamplyBeamError::SignEncryptError( "Encryption error: Cannot encrypt symmetric key".into(), )); - } + }; // Encrypt fields content let cipher = XChaCha20Poly1305::new(&symmetric_key); diff --git a/tests/src/lib.rs b/tests/src/lib.rs index b3f9c714..d74c4d4a 100644 --- a/tests/src/lib.rs +++ b/tests/src/lib.rs @@ -1,21 +1,21 @@ -use std::time::{SystemTime, Duration}; - -use http::{Request, header, StatusCode}; -use hyper::{Client, Body}; +use hyper::{Body, Client, client::HttpConnector}; +use http::{Request, header, request, StatusCode}; use once_cell::sync::Lazy; use serde::{Deserialize, Serialize}; -use serde_json::json; use shared::{beam_id::{AppOrProxyId, BeamId, AppId}, MsgId, MsgTaskRequest, Plain, FailureStrategy}; #[cfg(all(feature = "sockets", test))] mod socket_test; -pub const APP1: Lazy = Lazy::new(|| { +#[cfg(test)] +mod permission_test; + +pub static APP1: Lazy = Lazy::new(|| { AppId::set_broker_id("broker".to_string()); AppOrProxyId::new(option_env!("APP1_P1").unwrap_or("app1.proxy1.broker")).unwrap() }); -pub const APP2: Lazy = Lazy::new(|| { +pub static APP2: Lazy = Lazy::new(|| { AppId::set_broker_id("broker".to_string()); AppOrProxyId::new(option_env!("APP2_P2").unwrap_or("app2.proxy2.broker")).unwrap() }); @@ -36,6 +36,37 @@ pub const APP_KEY: &str = match option_env!("APP_KEY") { }; +pub fn beam_request(r#as: &AppOrProxyId, path: &str) -> request::Builder { + let proxy = match r#as { + app if app == &*APP1 => PROXY1, + app if app == &*APP2 => PROXY2, + _ => panic!("Failed to find matching proxy for app") + }; + Request::builder() + .as_app(r#as, APP_KEY) + .uri(format!("{proxy}{path}")) +} + +pub static CLIENT: Lazy> = Lazy::new(|| Client::new()); + +// This could be in a beam lib as well maybe +trait BeamRequestBuilder { + fn as_app(self, app: &AppOrProxyId, key: &str) -> Self; + // We do a generic B here to not require hyper as a dependency + fn with_json>, T: Serialize>(self, json: &T) -> Result, http::Error>; +} + +impl BeamRequestBuilder for request::Builder { + fn as_app(self, app: &AppOrProxyId, key: &str) -> Self { + self.header(header::AUTHORIZATION, format!("ApiKey {app} {key}")) + } + + fn with_json>, T: Serialize>(self, json: &T) -> Result, http::Error> { + self.body(B::from(serde_json::to_vec(json).unwrap())) + } +} + +// Move to beam lib when I get to write it #[derive(Debug, Serialize, Deserialize)] pub struct SocketTask { pub to: Vec, diff --git a/tests/src/permission_test.rs b/tests/src/permission_test.rs new file mode 100644 index 00000000..51fc7348 --- /dev/null +++ b/tests/src/permission_test.rs @@ -0,0 +1,53 @@ +use std::time::{SystemTime, Duration}; + +use http::{Request, StatusCode, Method}; +use shared::{MsgId, Plain, beam_id::{AppOrProxyId, BeamId, BrokerId}}; + +use crate::{BeamRequestBuilder, APP1, APP_KEY, CLIENT, PROXY1}; + + +#[tokio::test] +async fn test_no_senders() { + BrokerId::set_broker_id("broker".to_string()); + let to = vec![AppOrProxyId::new("app1.proxy3.broker").unwrap()]; + let req = Request::builder() + .uri(format!("{PROXY1}/v1/tasks")) + .method(Method::POST) + .as_app(&APP1, APP_KEY) + .with_json(&shared::MsgTaskRequest { + id: MsgId::new(), + from: APP1.clone(), + to: to.clone(), + body: Plain::from(""), + expire: SystemTime::now() + Duration::from_secs(60), + failure_strategy: shared::FailureStrategy::Discard, + results: Default::default(), + metadata: serde_json::Value::Null + }) + .unwrap(); + let res = CLIENT.request(req).await.unwrap(); + assert_eq!(res.status(), StatusCode::UNPROCESSABLE_ENTITY); + assert_eq!(serde_json::from_slice::>(&hyper::body::to_bytes(res.into_body()).await.unwrap()).unwrap(), to); +} + +#[tokio::test] +async fn test_allowed_sender_but_invalid_proxy() { + BrokerId::set_broker_id("broker".to_string()); + let req = Request::builder() + .uri(format!("{PROXY1}/v1/tasks")) + .method(Method::POST) + .as_app(&APP1, APP_KEY) + .with_json(&shared::MsgTaskRequest { + id: MsgId::new(), + from: APP1.clone(), + to: vec![AppOrProxyId::new("app2.proxy3.broker").unwrap()], + body: Plain::from(""), + expire: SystemTime::now() + Duration::from_secs(60), + failure_strategy: shared::FailureStrategy::Discard, + results: Default::default(), + metadata: serde_json::Value::Null + }) + .unwrap(); + let res = CLIENT.request(req).await.unwrap(); + assert_eq!(res.status(), StatusCode::FAILED_DEPENDENCY) +}