Skip to content

Commit

Permalink
Try to test GatewayHandler
Browse files Browse the repository at this point in the history
  • Loading branch information
moubctez committed Nov 5, 2024
1 parent 5c253f8 commit c01358d
Show file tree
Hide file tree
Showing 7 changed files with 116 additions and 63 deletions.
65 changes: 33 additions & 32 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 6 additions & 4 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -107,19 +107,21 @@ webauthn-rs-proto = "0.5"
x25519-dalek = { version = "2.0", features = ["static_secrets"] }

[dev-dependencies]
bytes = "1.6"
bytes = "1.8"
claims = "0.7"
hyper-util = "0.1"
matches = "0.1"
regex = "1.10"
regex = "1.11"
reqwest = { version = "0.11", features = [
"json",
"cookies",
"stream",
"json",
"multipart",
"rustls-tls",
"stream",
], default-features = false }
serde_qs = "0.13"
webauthn-authenticator-rs = { version = "0.5", features = ["softpasskey"] }
tower = { version = "0.5", features = ["util"] }

[build-dependencies]
prost-build = "0.13"
Expand Down
23 changes: 18 additions & 5 deletions src/grpc/gateway.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@ use crate::{
mail::Mail,
};

#[cfg(test)]
mod tests;

tonic::include_proto!("gateway");

fn gen_config(network: &WireguardNetwork<Id>, peers: Vec<Peer>) -> Configuration {
Expand Down Expand Up @@ -85,8 +88,7 @@ impl GatewayHandler {
events_tx: Sender<ChangeEvent>,
mail_tx: UnboundedSender<Mail>,
) -> Result<Self, tonic::transport::Error> {
let endpoint = Endpoint::from_shared(gateway.url.to_string())?;
let endpoint = endpoint
let endpoint = Endpoint::from_shared(gateway.url.to_string())?
.http2_keep_alive_interval(TEN_SECS)
.tcp_keepalive(Some(TEN_SECS))
.keep_alive_while_idle(true);
Expand Down Expand Up @@ -207,11 +209,23 @@ impl GatewayHandler {
};
}

/// Connect to Gateway and handle its messages through gRPC.
pub(super) async fn handle_connection(&mut self) -> ! {
let uri = self.endpoint.uri();
loop {
info!("Connecting to gateway {uri}");
let mut client = gateway_client::GatewayClient::new(self.endpoint.connect_lazy());
#[cfg(not(test))]
let channel = self.endpoint.connect_lazy();
#[cfg(test)]
let channel = self.endpoint.connect_with_connector_lazy(tower::service_fn(
|_: tonic::transport::Uri| async {
Ok::<_, std::io::Error>(hyper_util::rt::TokioIo::new(
tokio::net::UnixStream::connect(tests::TONIC_SOCKET).await?,
))
},
));

debug!("Connecting to gateway {uri}");
let mut client = gateway_client::GatewayClient::new(channel);
let (tx, rx) = mpsc::unbounded_channel();
let response = match client.bidi(UnboundedReceiverStream::new(rx)).await {
Ok(response) => response,
Expand Down Expand Up @@ -317,7 +331,6 @@ impl GatewayHandler {
self.gateway.network_id,
);
// Get device by public key and fill in stats.device_id
// FIXME: keep an in-memory device map to avoid repeated DB requests
match Device::find_by_pubkey(&self.pool, &public_key).await {
Ok(Some(device)) => {
stats.device_id = device.id;
Expand Down
38 changes: 38 additions & 0 deletions src/grpc/gateway/tests.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
use std::net::{IpAddr, Ipv4Addr};

use ipnetwork::IpNetwork;
use tokio::sync::{broadcast, mpsc::unbounded_channel};

use super::*;

pub(super) static TONIC_SOCKET: &str = "tonic.sock";

#[sqlx::test]
async fn test_gateway(pool: PgPool) {
let network = WireguardNetwork::new(
"TestNet".to_string(),
IpNetwork::new(IpAddr::V4(Ipv4Addr::new(10, 1, 1, 1)), 24).unwrap(),
50051,
"0.0.0.0".to_string(),
None,
vec![IpNetwork::new(IpAddr::V4(Ipv4Addr::new(10, 1, 1, 0)), 24).unwrap()],
false,
0,
0,
)
.save(&pool)
.await
.unwrap();
let gateway = Gateway::new(network.id, "http://[::]:50051")
.save(&pool)
.await
.unwrap();
let (events_tx, _events_rx) = broadcast::channel::<ChangeEvent>(16);
let (mail_tx, _mail_rx) = unbounded_channel::<Mail>();

let mut gateway_handler = GatewayHandler::new(gateway, None, pool, events_tx, mail_tx).unwrap();
let handle = tokio::spawn(async move {
gateway_handler.handle_connection().await;
});
handle.abort();
}
9 changes: 4 additions & 5 deletions src/grpc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -127,15 +127,15 @@ pub async fn run_grpc_gateway_stream(
// Helper closure to launch `GatewayHandler`.
let mut launch_gateway_handler =
|gateway: Gateway<Id>| -> Result<AbortHandle, tonic::transport::Error> {
let mut gateway_client = GatewayHandler::new(
let mut gateway_handler = GatewayHandler::new(
gateway,
tls_config.clone(),
pool.clone(),
events_tx.clone(),
mail_tx.clone(),
)?;
let abort_handle = tasks.spawn(async move {
gateway_client.handle_connection().await;
gateway_handler.handle_connection().await;
});
Ok(abort_handle)
};
Expand Down Expand Up @@ -223,8 +223,7 @@ pub async fn run_grpc_bidi_stream(
let mut client_mfa_server = ClientMfaServer::new(pool.clone(), mail_tx, events_tx);
let polling_server = PollingServer::new(pool);

let endpoint = Endpoint::from_shared(config.proxy_url.as_deref().unwrap())?;
let endpoint = endpoint
let endpoint = Endpoint::from_shared(config.proxy_url.as_deref().unwrap())?
.http2_keep_alive_interval(TEN_SECS)
.tcp_keepalive(Some(TEN_SECS))
.keep_alive_while_idle(true);
Expand Down Expand Up @@ -254,7 +253,7 @@ pub async fn run_grpc_bidi_stream(
}
Ok(Some(received)) => {
info!("Received message from proxy.");
debug!("Received the following message from proxy: {received:?}");
debug!("Message from proxy: {received:?}");
let payload = match received.payload {
// rpc StartEnrollment (EnrollmentStartRequest) returns (EnrollmentStartResponse)
Some(core_request::Payload::EnrollmentStart(request)) => {
Expand Down
Loading

0 comments on commit c01358d

Please sign in to comment.