Skip to content

Commit

Permalink
Add test to reproduce error after disconnection
Browse files Browse the repository at this point in the history
  • Loading branch information
jarhodes314 committed Jul 16, 2024
1 parent 2283810 commit d5e3c37
Showing 1 changed file with 72 additions and 3 deletions.
75 changes: 72 additions & 3 deletions crates/extensions/tedge_mqtt_bridge/tests/bridge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use tokio::time::timeout;
use tracing::info;
use tracing::warn;

const DEFAULT_TIMEOUT: Duration = Duration::from_secs(5);
const DEFAULT_TIMEOUT: Duration = Duration::from_secs(3);

fn new_broker_and_client(name: &str, port: u16) -> (AsyncClient, EventLoop) {
let mut broker = Broker::new(get_rumqttd_config(port));
Expand Down Expand Up @@ -64,7 +64,7 @@ const HEALTH: &str = "te/device/main/#";
// TODO acknowledgement with lost connection bridge, check we acknowledge the correct message
#[tokio::test]
async fn bridge_many_messages() {
std::env::set_var("RUST_LOG", "tedge_mqtt_bridge=info");
std::env::set_var("RUST_LOG", "rumqttd=debug,tedge_mqtt_bridge=info");
let _ = env_logger::try_init();
let local_broker_port = free_port().await;
let cloud_broker_port = free_port().await;
Expand Down Expand Up @@ -116,6 +116,75 @@ async fn bridge_many_messages() {
next_received_message(&mut local).await.unwrap();
}

#[tokio::test]
async fn bridge_disconnect_while_sending() {
std::env::set_var("RUST_LOG", "tedge_mqtt_bridge=info");
let _ = env_logger::try_init();
let local_broker_port = free_port().await;
let cloud_broker_port = free_port().await;
let (local, mut ev_local) = new_broker_and_client("local", local_broker_port);
let (cloud, mut ev_cloud) = new_broker_and_client("cloud", cloud_broker_port);

// We can't easily restart rumqttd, so instead, we'll connect via a proxy
// that we can interrupt the connection of
let cloud_proxy = Proxy::start(cloud_broker_port).await;

let mut rules = BridgeConfig::new();
rules.forward_from_local("s/us", "c8y/", "").unwrap();
rules.forward_from_remote("s/ds", "c8y/", "").unwrap();

start_mqtt_bridge(local_broker_port, cloud_proxy.port, rules).await;

local.subscribe(HEALTH, QoS::AtLeastOnce).await.unwrap();

wait_until_health_status_is("up", &mut ev_local)
.await
.unwrap();

local.unsubscribe(HEALTH).await.unwrap();
local.subscribe("c8y/s/ds", QoS::AtLeastOnce).await.unwrap();
await_subscription(&mut ev_local).await;
cloud.subscribe("s/us", QoS::AtLeastOnce).await.unwrap();
await_subscription(&mut ev_cloud).await;

let poll_local = EventPoller::run_in_bg(ev_local);

// Verify messages are forwarded from cloud to local
for i in 1..10000 {
local
.publish(
"c8y/s/us",
QoS::AtMostOnce,
false,
format!("a,fake,smartrest,message{i}"),
)
.await
.unwrap();
}
cloud_proxy.interrupt_connections();
let _ev_cloud = EventPoller::run_in_bg(ev_cloud);
for _ in 1..10000 {
local
.publish(
"c8y/s/us",
QoS::AtMostOnce,
false,
"a,fake,smartrest,message",
)
.await
.unwrap();
}

// tokio::time::sleep(Duration::from_secs(5)).await;
let mut local = poll_local.stop_polling().await;
cloud
.publish("s/ds", QoS::AtLeastOnce, true, "test")
.await
.unwrap();

next_received_message(&mut local).await.unwrap();
}

#[tokio::test]
async fn bridge_reconnects_successfully_after_cloud_connection_interrupted() {
std::env::set_var("RUST_LOG", "tedge_mqtt_bridge=info");
Expand Down Expand Up @@ -398,7 +467,7 @@ impl Proxy {
};

write_socket.shutdown().await.unwrap();
write_conn.shutdown().await.unwrap();
let _ = write_conn.shutdown().await; //.await.unwrap();
});
}
}
Expand Down

0 comments on commit d5e3c37

Please sign in to comment.