Skip to content

Commit

Permalink
Increase max message size to 256MiB
Browse files Browse the repository at this point in the history
Signed-off-by: James Rhodes <[email protected]>
  • Loading branch information
jarhodes314 committed Aug 13, 2024
1 parent 10b4d65 commit 5cfb663
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 6 deletions.
2 changes: 1 addition & 1 deletion crates/extensions/tedge_mqtt_bridge/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ use crate::topics::matches_ignore_dollar_prefix;
use crate::topics::TopicConverter;
pub use config::*;

const MAX_PACKET_SIZE: usize = 10 * 1024 * 1024;
const MAX_PACKET_SIZE: usize = 268435455; // maximum allowed MQTT payload size

pub struct MqttBridgeActorBuilder {}

Expand Down
49 changes: 44 additions & 5 deletions crates/extensions/tedge_mqtt_bridge/tests/bridge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,18 +29,17 @@ use tokio::time::timeout;
use tracing::info;
use tracing::warn;

const DEFAULT_TIMEOUT: Duration = Duration::from_secs(3);
const DEFAULT_TIMEOUT: Duration = Duration::from_secs(10);

fn new_broker_and_client(name: &str, port: u16) -> (AsyncClient, EventLoop) {
let mut broker = Broker::new(get_rumqttd_config(port));
std::thread::Builder::new()
.name(format!("{name} broker"))
.spawn(move || broker.start().unwrap())
.unwrap();
AsyncClient::new(
MqttOptions::new(format!("{name}-test-client"), "127.0.0.1", port),
10,
)
let mut client_opts = MqttOptions::new(format!("{name}-test-client"), "127.0.0.1", port);
client_opts.set_max_packet_size(268435455, 268435455);
AsyncClient::new(client_opts, 10)
}

async fn start_mqtt_bridge(local_port: u16, cloud_port: u16, rules: BridgeConfig) {
Expand Down Expand Up @@ -117,6 +116,46 @@ async fn bridge_many_messages() {
next_received_message(&mut local).await.unwrap();
}

#[tokio::test]
async fn bridge_forwards_large_messages() {
std::env::set_var("RUST_LOG", "tedge_mqtt_bridge=debug,rumqttc=trace,info");
let _ = env_logger::try_init();
let local_broker_port = free_port().await;
let cloud_broker_port = free_port().await;
let (local, mut ev_local) = new_broker_and_client("local", local_broker_port);
let (cloud, mut ev_cloud) = new_broker_and_client("cloud", cloud_broker_port);

let mut rules = BridgeConfig::new();
rules.forward_from_local("s/us", "c8y/", "").unwrap();
rules.forward_from_remote("s/ds", "c8y/", "").unwrap();

start_mqtt_bridge(local_broker_port, cloud_broker_port, rules).await;

local.subscribe(HEALTH, QoS::AtLeastOnce).await.unwrap();

wait_until_health_status_is("up", &mut ev_local)
.await
.unwrap();

local.unsubscribe(HEALTH).await.unwrap();
cloud.subscribe("s/us", QoS::AtLeastOnce).await.unwrap();
await_subscription(&mut ev_cloud).await;

let _poll_local = EventPoller::run_in_bg(ev_local);

let payload = std::iter::repeat(b'a')
.take(25 * 1024 * 1024)
.collect::<Vec<u8>>();

local
.publish("c8y/s/us", QoS::AtLeastOnce, false, payload.clone())
.await
.unwrap();

let msg = next_received_message(&mut ev_cloud).await.unwrap();
assert_eq!(msg.payload, payload);
}

#[tokio::test]
async fn bridge_disconnect_while_sending() {
std::env::set_var("RUST_LOG", "tedge_mqtt_bridge=info");
Expand Down

0 comments on commit 5cfb663

Please sign in to comment.