diff --git a/crates/common/tedge_config/src/tedge_config_cli/tedge_config.rs b/crates/common/tedge_config/src/tedge_config_cli/tedge_config.rs index 279c7fd09cc..f36d3329dfe 100644 --- a/crates/common/tedge_config/src/tedge_config_cli/tedge_config.rs +++ b/crates/common/tedge_config/src/tedge_config_cli/tedge_config.rs @@ -769,6 +769,9 @@ define_tedge_config! { bridge: { #[tedge_config(default(value = false))] + #[tedge_config(example = "false")] + #[tedge_config(note = "After changing this value, run `tedge reconnect ` to apply the changes")] + /// Enables the built-in bridge when running tedge-mapper built_in: bool, reconnect_policy: { diff --git a/crates/extensions/tedge_mqtt_bridge/src/lib.rs b/crates/extensions/tedge_mqtt_bridge/src/lib.rs index a19fd854845..f1d32d6cfcd 100644 --- a/crates/extensions/tedge_mqtt_bridge/src/lib.rs +++ b/crates/extensions/tedge_mqtt_bridge/src/lib.rs @@ -57,6 +57,8 @@ use crate::topics::matches_ignore_dollar_prefix; use crate::topics::TopicConverter; pub use config::*; +const MAX_PACKET_SIZE: usize = 268435455; // maximum allowed MQTT payload size + pub struct MqttBridgeActorBuilder {} impl MqttBridgeActorBuilder { @@ -90,6 +92,7 @@ impl MqttBridgeActorBuilder { if let Some(tls_config) = local_tls_config { local_config.set_transport(Transport::tls_with_config(tls_config.into())); } + local_config.set_max_packet_size(MAX_PACKET_SIZE, MAX_PACKET_SIZE); local_config.set_manual_acks(true); local_config.set_last_will(LastWill::new( &health_topic.name, @@ -102,6 +105,7 @@ impl MqttBridgeActorBuilder { let reconnect_policy = tedge_config.mqtt.bridge.reconnect_policy.clone(); cloud_config.set_manual_acks(true); + cloud_config.set_max_packet_size(MAX_PACKET_SIZE, MAX_PACKET_SIZE); let (local_client, local_event_loop) = AsyncClient::new(local_config, 10); let (cloud_client, cloud_event_loop) = AsyncClient::new(cloud_config, 10); @@ -377,28 +381,16 @@ async fn half_bridge( // Keep track of packet IDs so we can acknowledge messages Event::Outgoing(Outgoing::Publish(pkid)) => { - if pkid == 0 { - // Messages with pkid 0 (meaning QoS=0) should not be added to the hashmap - // as multiple messages with the pkid=0 can be received - match companion_bridge_half.recv().await { - // A message was forwarded by the other bridge half, note the packet id - Some(Some((topic, msg))) => { - loop_breaker.forward_on_topic(topic, &msg); - } - - // A healthcheck message was published, ignore this packet id - Some(None) => {} - - // The other bridge half has disconnected, break the loop and shut down the bridge - None => break, - } - } else if let hash_map::Entry::Vacant(e) = forward_pkid_to_received_msg.entry(pkid) - { + if let hash_map::Entry::Vacant(e) = forward_pkid_to_received_msg.entry(pkid) { match companion_bridge_half.recv().await { // A message was forwarded by the other bridge half, note the packet id Some(Some((topic, msg))) => { loop_breaker.forward_on_topic(topic, &msg); - e.insert(msg); + if pkid != 0 { + // Messages with pkid 0 (meaning QoS=0) should not be added to the hashmap + // as multiple messages with the pkid=0 can be received + e.insert(msg); + } } // A healthcheck message was published, ignore this packet id diff --git a/crates/extensions/tedge_mqtt_bridge/tests/bridge.rs b/crates/extensions/tedge_mqtt_bridge/tests/bridge.rs index 52fb85f0c37..340a5fbd624 100644 --- a/crates/extensions/tedge_mqtt_bridge/tests/bridge.rs +++ b/crates/extensions/tedge_mqtt_bridge/tests/bridge.rs @@ -29,7 +29,7 @@ use tokio::time::timeout; use tracing::info; use tracing::warn; -const DEFAULT_TIMEOUT: Duration = Duration::from_secs(3); +const DEFAULT_TIMEOUT: Duration = Duration::from_secs(10); fn new_broker_and_client(name: &str, port: u16) -> (AsyncClient, EventLoop) { let mut broker = Broker::new(get_rumqttd_config(port)); @@ -37,10 +37,9 @@ fn new_broker_and_client(name: &str, port: u16) -> (AsyncClient, EventLoop) { .name(format!("{name} broker")) .spawn(move || broker.start().unwrap()) .unwrap(); - AsyncClient::new( - MqttOptions::new(format!("{name}-test-client"), "127.0.0.1", port), - 10, - ) + let mut client_opts = MqttOptions::new(format!("{name}-test-client"), "127.0.0.1", port); + client_opts.set_max_packet_size(268435455, 268435455); + AsyncClient::new(client_opts, 10) } async fn start_mqtt_bridge(local_port: u16, cloud_port: u16, rules: BridgeConfig) { @@ -117,6 +116,46 @@ async fn bridge_many_messages() { next_received_message(&mut local).await.unwrap(); } +#[tokio::test] +async fn bridge_forwards_large_messages() { + std::env::set_var("RUST_LOG", "tedge_mqtt_bridge=debug,rumqttc=trace,info"); + let _ = env_logger::try_init(); + let local_broker_port = free_port().await; + let cloud_broker_port = free_port().await; + let (local, mut ev_local) = new_broker_and_client("local", local_broker_port); + let (cloud, mut ev_cloud) = new_broker_and_client("cloud", cloud_broker_port); + + let mut rules = BridgeConfig::new(); + rules.forward_from_local("s/us", "c8y/", "").unwrap(); + rules.forward_from_remote("s/ds", "c8y/", "").unwrap(); + + start_mqtt_bridge(local_broker_port, cloud_broker_port, rules).await; + + local.subscribe(HEALTH, QoS::AtLeastOnce).await.unwrap(); + + wait_until_health_status_is("up", &mut ev_local) + .await + .unwrap(); + + local.unsubscribe(HEALTH).await.unwrap(); + cloud.subscribe("s/us", QoS::AtLeastOnce).await.unwrap(); + await_subscription(&mut ev_cloud).await; + + let _poll_local = EventPoller::run_in_bg(ev_local); + + let payload = std::iter::repeat(b'a') + .take(25 * 1024 * 1024) + .collect::>(); + + local + .publish("c8y/s/us", QoS::AtLeastOnce, false, payload.clone()) + .await + .unwrap(); + + let msg = next_received_message(&mut ev_cloud).await.unwrap(); + assert_eq!(msg.payload, payload); +} + #[tokio::test] async fn bridge_disconnect_while_sending() { std::env::set_var("RUST_LOG", "tedge_mqtt_bridge=info");