Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature allow and block lists for sending and receiving messages #141

Open
wants to merge 24 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -561,6 +561,33 @@ Next, send the CSR to the central CA's administrator for signing and enrolling t

Both the Broker and the Proxy respect the log level in the `RUST_LOG` environment variable. E.g., `RUST_LOG=debug` enables debug outputs. Warning: the `trace` log level is *very* noisy.

## Restricting accesses
We have a black/whitelisting option that can be used to restrict traeffic to and from a given proxy. The lists contains a json array of app or proxy ids (see [system architecture](#system-architecture)). Setting a proxy id will permit or allow, dpending on the kind of list, all apps from the given proxy while the app id will only do that for the specific app.

Restrinction logic will follow these rules:
- Only a whitelist is set -> Traeffic is permitted according to the whitelist
- Only a blacklist is set -> Traeffic is denied according to the blacklist
- Neither white nor blacklist -> All traeffic is permitted
- A white and blacklist is set -> The whitelist will be used as an **exception** to the blacklist

> Note: All config options may also be set via environment variables matching the cli argument name in uppercase with `-` repplaced by `_`.
### Deny receiving messages from other proxies
This is done by setting `allowed-remotes` and/or `blocked-remotes` options accordingly as described in [Restricting access](#restricting-accesses).

Example:
```bash
./proxy ... --allowed-remotes='["app1.proxy1", "proxy2"]'
```

### Deny sending messages to specific proxies
This is done by setting `allowed-receivers` and/or `blocked-receivers` options accordingly as described in [Restricting access](#restricting-accesses).
> Note: When a receiver of a message is being blocked due to these restrictions the proxy will return a status code of 422 Unprocessable Entity and a json list containing the offending entries.

Example:
```bash
./proxy ... --allowed-receivers='["app1.proxy1", "proxy2"]'
```

## Technical Background Information

### End-to-End Encryption
Expand Down
5 changes: 5 additions & 0 deletions dev/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,11 @@ services:
BIND_ADDR: 0.0.0.0:8081
RUST_LOG: ${RUST_LOG}
ALL_PROXY: http://mitmproxy:8080
ALLOWED_REMOTES: '["app1.proxy1", "proxy1"]'
BLOCKED_REMOTES: '["app1.proxy1", "proxy1"]'
# does not exist only used for testing
BLOCKED_RECEIVERS: '["proxy3"]'
ALLOWED_RECEIVERS: '["app2.proxy3"]'
secrets:
- proxy1.pem
- root.crt.pem
Expand Down
22 changes: 13 additions & 9 deletions proxy/src/serve_sockets.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,27 +78,31 @@ async fn get_tasks(
state: State<TasksState>,
Extension(task_secret_map): Extension<MsgSecretMap>,
req: Request<Body>
) -> Result<Json<Vec<MsgSocketRequest<Plain>>>, StatusCode> {
let mut res = forward_request(req, &state.config, &sender, &state.client).await.map_err(|e| e.0)?;
) -> Result<Json<Vec<MsgSocketRequest<Plain>>>, Response> {
let mut res = forward_request(req, &state.config, &sender, &state.client).await?;
if res.status() != StatusCode::OK {
return Err(res.status());
return Err(res.into_response());
}
let body = hyper::body::to_bytes(res.body_mut()).await.map_err(|_| StatusCode::BAD_GATEWAY)?;
let enc_json = serde_json::from_slice(&body).map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
let plain_json = to_server_error(validate_and_decrypt(enc_json).await).map_err(|e| e.0)?;
let tasks: Vec<MessageType<Plain>> = serde_json::from_value(plain_json).map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
let body = hyper::body::to_bytes(res.body_mut()).await.map_err(|_| StatusCode::BAD_GATEWAY.into_response())?;
let enc_json = serde_json::from_slice(&body).map_err(|_| StatusCode::INTERNAL_SERVER_ERROR.into_response())?;
let plain_json = to_server_error(validate_and_decrypt(enc_json).await).map_err(IntoResponse::into_response)?;
let tasks: Vec<MessageType<Plain>> = serde_json::from_value(plain_json).map_err(|_| StatusCode::INTERNAL_SERVER_ERROR.into_response())?;
let mut out = Vec::with_capacity(tasks.len());
for task in tasks {
if let MessageType::MsgSocketRequest(mut socket_task) = task {
let key = serde_json::from_value(Value::String(socket_task.secret.body.as_ref().ok_or(StatusCode::INTERNAL_SERVER_ERROR)?.to_string())).map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
let key = serde_json::from_value(Value::String(socket_task.secret.body
.as_ref()
.ok_or(StatusCode::INTERNAL_SERVER_ERROR.into_response())?
.to_string()
)).map_err(|_| StatusCode::INTERNAL_SERVER_ERROR.into_response())?;
let Ok(ttl) = socket_task.expire.duration_since(SystemTime::now()) else {
continue;
};
task_secret_map.insert_for(ttl, socket_task.id, key);
socket_task.secret.body = None;
out.push(socket_task);
} else {
return Err(StatusCode::INTERNAL_SERVER_ERROR);
return Err(StatusCode::INTERNAL_SERVER_ERROR.into_response());
}
}

Expand Down
83 changes: 53 additions & 30 deletions proxy/src/serve_tasks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use axum::{
http::{request::Parts, HeaderValue},
response::{sse::Event, IntoResponse, Response, Sse},
routing::{any, get, put},
Router,
Router, Json,
};
use futures::{
stream::{StreamExt, TryStreamExt},
Expand Down Expand Up @@ -90,7 +90,7 @@ pub(crate) async fn forward_request(
config: &config_proxy::Config,
sender: &AppId,
client: &SamplyHttpClient,
) -> Result<hyper::Response<Body>, (StatusCode, &'static str)> {
) -> Result<hyper::Response<Body>, Response> {
// Create uri to contact broker
let path = req.uri().path();
let path_query = req
Expand All @@ -100,15 +100,15 @@ pub(crate) async fn forward_request(
.unwrap_or(path);
let target_uri =
Uri::try_from(config.broker_uri.to_string() + path_query.trim_start_matches('/'))
.map_err(|_| (StatusCode::BAD_REQUEST, "Invalid path queried."))?;
.map_err(|_| (StatusCode::BAD_REQUEST, "Invalid path queried.").into_response())?;
*req.uri_mut() = target_uri;

req.headers_mut().append(
header::VIA,
HeaderValue::from_static(env!("SAMPLY_USER_AGENT")),
);
let (encrypted_msg, parts) = encrypt_request(req, &sender).await?;
let req = sign_request(encrypted_msg, parts, &config, None).await?;
let req = sign_request(encrypted_msg, parts, &config, None).await.map_err(IntoResponse::into_response)?;
trace!("Requesting: {:?}", req);
let resp = client.request(req).await.map_err(|e| {
if is_actually_hyper_timeout(&e) {
Expand All @@ -117,7 +117,7 @@ pub(crate) async fn forward_request(
} else {
warn!("Request to broker failed: {}", e.to_string());
(StatusCode::BAD_GATEWAY, "Upstream error; see server logs.")
}
}.into_response()
})?;
Ok(resp)
}
Expand All @@ -128,7 +128,7 @@ pub(crate) async fn handler_task(
AuthenticatedApp(sender): AuthenticatedApp,
headers: HeaderMap,
req: Request<Body>,
) -> Result<Response, (StatusCode, String)> {
) -> Response {
let found = &headers
.get(header::ACCEPT)
.unwrap_or(&HeaderValue::from_static(""))
Expand All @@ -139,26 +139,23 @@ pub(crate) async fn handler_task(
.find(|part| *part == "text/event-stream")
.is_some();

let result = if *found {
if *found {
handler_tasks_stream(client, config, sender, req)
.await?
.await
.into_response()
} else {
handler_tasks_nostream(client, config, sender, req)
.await
.map_err(|e| (e.0, e.1.to_string()))?
.into_response()
};

return Ok(result);
}
}

async fn handler_tasks_nostream(
client: SamplyHttpClient,
config: config_proxy::Config,
sender: AppId,
req: Request<Body>,
) -> Result<Response<Body>, (StatusCode, &'static str)> {
) -> Result<Response<Body>, Response> {
// Validate Query, forward to server, get response.

let resp = forward_request(req, &config, &sender, &client).await?;
Expand All @@ -168,7 +165,7 @@ async fn handler_tasks_nostream(
let (mut parts, body) = resp.into_parts();
let mut bytes = body::to_bytes(body).await.map_err(|e| {
error!("Error receiving reply from the broker: {}", e);
ERR_UPSTREAM
ERR_UPSTREAM.into_response()
})?;

// TODO: Always return application/jwt from server.
Expand Down Expand Up @@ -210,12 +207,10 @@ async fn handler_tasks_stream(
config: config_proxy::Config,
sender: AppId,
req: Request<Body>,
) -> Result<Sse<impl Stream<Item = Result<Event, Infallible>>>, (StatusCode, String)> {
) -> Result<Sse<impl Stream<Item = Result<Event, Infallible>>>, Response> {
// Validate Query, forward to server, get response.

let mut resp = forward_request(req, &config, &sender, &client)
.await
.map_err(|err| (err.0, err.1.into()))?;
let mut resp = forward_request(req, &config, &sender, &client).await?;

let code = resp.status();
if !code.is_success() {
Expand All @@ -224,7 +219,7 @@ async fn handler_tasks_stream(
.and_then(|v| String::from_utf8(v.into()).ok())
.unwrap_or("(unable to parse reply)".into());
warn!("Got unexpected response code from server: {code}. Returning error message as-is: \"{error_msg}\"");
return Err((code, error_msg));
return Err((code, error_msg).into_response());
}

let outgoing = async_stream::stream! {
Expand Down Expand Up @@ -299,6 +294,10 @@ async fn handler_tasks_stream(
};
let json = match validate_and_decrypt(json).await {
Ok(json) => json,
Err(SamplyBeamError::DisallowedReceiver(rec)) => {
info!("Did not send result from {rec} as it was forbidden by the config");
continue;
},
Err(err) => {
warn!("Got an error decrypting Broker's reply: {err}");
continue;
Expand All @@ -325,7 +324,7 @@ async fn handler_tasks_stream(
Ok(sse)
}

pub(crate) fn to_server_error<T>(res: Result<T, SamplyBeamError>) -> Result<T, (StatusCode, &'static str)> {
pub(crate) fn to_server_error<T>(res: Result<T, SamplyBeamError>) -> Result<T, Response> {
res.map_err(|e| match e {
SamplyBeamError::JsonParseError(e) => {
warn!("{e}");
Expand All @@ -340,7 +339,7 @@ pub(crate) fn to_server_error<T>(res: Result<T, SamplyBeamError>) -> Result<T, (
warn!("Unhandled error {e}");
(StatusCode::INTERNAL_SERVER_ERROR, "Unknown error")
}
})
}.into_response())
}

// TODO: This could be a middleware
Expand Down Expand Up @@ -422,7 +421,13 @@ pub(crate) async fn validate_and_decrypt(json: Value) -> Result<Value, SamplyBea
if let Value::Array(arr) = json {
let mut results = Vec::with_capacity(arr.len());
for value in arr {
results.push(validate_and_decrypt(value).await?);
match validate_and_decrypt(value).await {
// If we get an array of results we just filter out the ones we are not allowed to receive
Err(SamplyBeamError::DisallowedReceiver(app)) => {
debug!("Filtered receiving message from {app} as it was blocked due to proxy permissions");
},
other => results.push(other?)
}
}
Ok(Value::Array(results))
} else if json.is_object() {
Expand All @@ -431,7 +436,13 @@ pub(crate) async fn validate_and_decrypt(json: Value) -> Result<Value, SamplyBea
let msg = MsgSigned::<EncryptedMessage>::verify(&signed.jwt)
.await?
.msg;
Ok(serde_json::to_value(decrypt_msg(msg)?).expect("Should serialize fine"))
let decrypted = decrypt_msg(msg)?;

if CONFIG_PROXY.permission_manager.allowed_to_recieve(decrypted.get_from()) {
Ok(serde_json::to_value(decrypted).expect("Should serialize fine"))
} else {
Err(SamplyBeamError::DisallowedReceiver(decrypted.get_from().clone()))
}
}
Err(e) => Err(SamplyBeamError::JsonParseError(format!(
"Failed to parse broker response as a signed encrypted message. Err is {e}"
Expand All @@ -454,11 +465,11 @@ fn decrypt_msg<M: DecryptableMsg>(msg: M) -> Result<M::Output, SamplyBeamError>
async fn encrypt_request(
req: Request<Body>,
sender: &AppId,
) -> Result<(EncryptedMessage, Parts), (StatusCode, &'static str)> {
) -> Result<(EncryptedMessage, Parts), Response> {
let (parts, body) = req.into_parts();
let body = body::to_bytes(body).await.map_err(|e| {
warn!("Unable to read message body: {e}");
ERR_BODY
ERR_BODY.into_response()
})?;

let msg = if body.is_empty() {
Expand All @@ -467,9 +478,14 @@ async fn encrypt_request(
from: sender.into(),
})
} else {
match serde_json::from_slice(&body) {
match serde_json::from_slice::<PlainMessage>(&body) {
Ok(val) => {
debug!("Body is valid json");
let filtered = val.get_to().iter().filter(|app| !CONFIG_PROXY.permission_manager.allowed_to_send(app)).cloned().collect::<Vec<_>>();
if !filtered.is_empty() {
return Err((StatusCode::UNPROCESSABLE_ENTITY, Json(filtered)).into_response());
}

val
}
Err(e) => {
Expand All @@ -478,17 +494,24 @@ async fn encrypt_request(
e,
std::str::from_utf8(&body).unwrap_or("(not valid UTF-8)")
);
return Err(ERR_BODY);
return Err(ERR_BODY.into_response());
}
}
};
// Sanity/security checks: From address sane?
if msg.get_from() != sender {
return Err(ERR_FAKED_FROM);
return Err(ERR_FAKED_FROM.into_response());
}
let body = encrypt_msg(msg).await.map_err(|e| {
warn!("Encryption failed with: {e}");
ERR_INTERNALCRYPTO
match e {
SamplyBeamError::InvalidReceivers(proxies) => {
(StatusCode::FAILED_DEPENDENCY, Json(proxies)).into_response()
}
e => {
warn!("Encryption failed with: {e}");
ERR_INTERNALCRYPTO.into_response()
}
}
})?;
Ok((body, parts))
}
Expand Down
18 changes: 12 additions & 6 deletions shared/src/beam_id.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use serde::{de::Visitor, Deserialize, Serialize};

use crate::{config, errors::SamplyBeamError};

static BROKER_ID: OnceCell<String> = OnceCell::new();
pub(crate) static BROKER_ID: OnceCell<String> = OnceCell::new();

#[derive(PartialEq, Debug)]
pub enum BeamIdType {
Expand Down Expand Up @@ -379,13 +379,19 @@ impl<'de> Visitor<'de> for AppOrProxyIdVisitor {
where
E: serde::de::Error,
{
let t = AppId::str_has_type(v)
.map_err(|e| serde::de::Error::custom(format!("Invalid Beam ID \"{v}\": {e}")))?;
match t {
v.parse().map_err(serde::de::Error::custom)
}
}

impl FromStr for AppOrProxyId {
type Err = SamplyBeamError;

fn from_str(v: &str) -> Result<AppOrProxyId, Self::Err> {
match AppId::str_has_type(v)? {
BeamIdType::AppId => Ok(AppOrProxyId::AppId(AppId::new(v).unwrap())),
BeamIdType::ProxyId => Ok(AppOrProxyId::ProxyId(ProxyId::new(v).unwrap())),
BeamIdType::BrokerId => Err(serde::de::Error::custom(
"Expected AppOrProxyId, got BrokerId.",
BeamIdType::BrokerId => Err(SamplyBeamError::InvalidBeamId(
"Expected AppOrProxyId, got BrokerId.".to_string()
)),
}
}
Expand Down
Loading