Skip to content

Commit

Permalink
Merge pull request #3059 from jarhodes314/bug/bridge-max-packet-size
Browse files Browse the repository at this point in the history
fix: increase max packet size for built-in bridge
  • Loading branch information
jarhodes314 authored Aug 13, 2024
2 parents a52baba + 5cfb663 commit 57e5b42
Show file tree
Hide file tree
Showing 3 changed files with 57 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -769,6 +769,9 @@ define_tedge_config! {

bridge: {
#[tedge_config(default(value = false))]
#[tedge_config(example = "false")]
#[tedge_config(note = "After changing this value, run `tedge reconnect <cloud>` to apply the changes")]
/// Enables the built-in bridge when running tedge-mapper
built_in: bool,

reconnect_policy: {
Expand Down
28 changes: 10 additions & 18 deletions crates/extensions/tedge_mqtt_bridge/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ use crate::topics::matches_ignore_dollar_prefix;
use crate::topics::TopicConverter;
pub use config::*;

const MAX_PACKET_SIZE: usize = 268435455; // maximum allowed MQTT payload size

pub struct MqttBridgeActorBuilder {}

impl MqttBridgeActorBuilder {
Expand Down Expand Up @@ -90,6 +92,7 @@ impl MqttBridgeActorBuilder {
if let Some(tls_config) = local_tls_config {
local_config.set_transport(Transport::tls_with_config(tls_config.into()));
}
local_config.set_max_packet_size(MAX_PACKET_SIZE, MAX_PACKET_SIZE);
local_config.set_manual_acks(true);
local_config.set_last_will(LastWill::new(
&health_topic.name,
Expand All @@ -102,6 +105,7 @@ impl MqttBridgeActorBuilder {
let reconnect_policy = tedge_config.mqtt.bridge.reconnect_policy.clone();

cloud_config.set_manual_acks(true);
cloud_config.set_max_packet_size(MAX_PACKET_SIZE, MAX_PACKET_SIZE);

let (local_client, local_event_loop) = AsyncClient::new(local_config, 10);
let (cloud_client, cloud_event_loop) = AsyncClient::new(cloud_config, 10);
Expand Down Expand Up @@ -377,28 +381,16 @@ async fn half_bridge(

// Keep track of packet IDs so we can acknowledge messages
Event::Outgoing(Outgoing::Publish(pkid)) => {
if pkid == 0 {
// Messages with pkid 0 (meaning QoS=0) should not be added to the hashmap
// as multiple messages with the pkid=0 can be received
match companion_bridge_half.recv().await {
// A message was forwarded by the other bridge half, note the packet id
Some(Some((topic, msg))) => {
loop_breaker.forward_on_topic(topic, &msg);
}

// A healthcheck message was published, ignore this packet id
Some(None) => {}

// The other bridge half has disconnected, break the loop and shut down the bridge
None => break,
}
} else if let hash_map::Entry::Vacant(e) = forward_pkid_to_received_msg.entry(pkid)
{
if let hash_map::Entry::Vacant(e) = forward_pkid_to_received_msg.entry(pkid) {
match companion_bridge_half.recv().await {
// A message was forwarded by the other bridge half, note the packet id
Some(Some((topic, msg))) => {
loop_breaker.forward_on_topic(topic, &msg);
e.insert(msg);
if pkid != 0 {
// Messages with pkid 0 (meaning QoS=0) should not be added to the hashmap
// as multiple messages with the pkid=0 can be received
e.insert(msg);
}
}

// A healthcheck message was published, ignore this packet id
Expand Down
49 changes: 44 additions & 5 deletions crates/extensions/tedge_mqtt_bridge/tests/bridge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,18 +29,17 @@ use tokio::time::timeout;
use tracing::info;
use tracing::warn;

const DEFAULT_TIMEOUT: Duration = Duration::from_secs(3);
const DEFAULT_TIMEOUT: Duration = Duration::from_secs(10);

fn new_broker_and_client(name: &str, port: u16) -> (AsyncClient, EventLoop) {
let mut broker = Broker::new(get_rumqttd_config(port));
std::thread::Builder::new()
.name(format!("{name} broker"))
.spawn(move || broker.start().unwrap())
.unwrap();
AsyncClient::new(
MqttOptions::new(format!("{name}-test-client"), "127.0.0.1", port),
10,
)
let mut client_opts = MqttOptions::new(format!("{name}-test-client"), "127.0.0.1", port);
client_opts.set_max_packet_size(268435455, 268435455);
AsyncClient::new(client_opts, 10)
}

async fn start_mqtt_bridge(local_port: u16, cloud_port: u16, rules: BridgeConfig) {
Expand Down Expand Up @@ -117,6 +116,46 @@ async fn bridge_many_messages() {
next_received_message(&mut local).await.unwrap();
}

#[tokio::test]
async fn bridge_forwards_large_messages() {
std::env::set_var("RUST_LOG", "tedge_mqtt_bridge=debug,rumqttc=trace,info");
let _ = env_logger::try_init();
let local_broker_port = free_port().await;
let cloud_broker_port = free_port().await;
let (local, mut ev_local) = new_broker_and_client("local", local_broker_port);
let (cloud, mut ev_cloud) = new_broker_and_client("cloud", cloud_broker_port);

let mut rules = BridgeConfig::new();
rules.forward_from_local("s/us", "c8y/", "").unwrap();
rules.forward_from_remote("s/ds", "c8y/", "").unwrap();

start_mqtt_bridge(local_broker_port, cloud_broker_port, rules).await;

local.subscribe(HEALTH, QoS::AtLeastOnce).await.unwrap();

wait_until_health_status_is("up", &mut ev_local)
.await
.unwrap();

local.unsubscribe(HEALTH).await.unwrap();
cloud.subscribe("s/us", QoS::AtLeastOnce).await.unwrap();
await_subscription(&mut ev_cloud).await;

let _poll_local = EventPoller::run_in_bg(ev_local);

let payload = std::iter::repeat(b'a')
.take(25 * 1024 * 1024)
.collect::<Vec<u8>>();

local
.publish("c8y/s/us", QoS::AtLeastOnce, false, payload.clone())
.await
.unwrap();

let msg = next_received_message(&mut ev_cloud).await.unwrap();
assert_eq!(msg.payload, payload);
}

#[tokio::test]
async fn bridge_disconnect_while_sending() {
std::env::set_var("RUST_LOG", "tedge_mqtt_bridge=info");
Expand Down

0 comments on commit 57e5b42

Please sign in to comment.