-
Notifications
You must be signed in to change notification settings - Fork 55
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix: increase max packet size for built-in bridge #3059
Changes from 3 commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -57,6 +57,8 @@ use crate::topics::matches_ignore_dollar_prefix; | |
use crate::topics::TopicConverter; | ||
pub use config::*; | ||
|
||
const MAX_PACKET_SIZE: usize = 10 * 1024 * 1024; | ||
|
||
pub struct MqttBridgeActorBuilder {} | ||
|
||
impl MqttBridgeActorBuilder { | ||
|
@@ -90,6 +92,7 @@ impl MqttBridgeActorBuilder { | |
if let Some(tls_config) = local_tls_config { | ||
local_config.set_transport(Transport::tls_with_config(tls_config.into())); | ||
} | ||
local_config.set_max_packet_size(MAX_PACKET_SIZE, MAX_PACKET_SIZE); | ||
local_config.set_manual_acks(true); | ||
local_config.set_last_will(LastWill::new( | ||
&health_topic.name, | ||
|
@@ -102,6 +105,7 @@ impl MqttBridgeActorBuilder { | |
let reconnect_policy = tedge_config.mqtt.bridge.reconnect_policy.clone(); | ||
|
||
cloud_config.set_manual_acks(true); | ||
cloud_config.set_max_packet_size(MAX_PACKET_SIZE, MAX_PACKET_SIZE); | ||
|
||
let (local_client, local_event_loop) = AsyncClient::new(local_config, 10); | ||
let (cloud_client, cloud_event_loop) = AsyncClient::new(cloud_config, 10); | ||
|
@@ -377,28 +381,16 @@ async fn half_bridge( | |
|
||
// Keep track of packet IDs so we can acknowledge messages | ||
Event::Outgoing(Outgoing::Publish(pkid)) => { | ||
if pkid == 0 { | ||
// Messages with pkid 0 (meaning QoS=0) should not be added to the hashmap | ||
// as multiple messages with the pkid=0 can be received | ||
match companion_bridge_half.recv().await { | ||
// A message was forwarded by the other bridge half, note the packet id | ||
Some(Some((topic, msg))) => { | ||
loop_breaker.forward_on_topic(topic, &msg); | ||
} | ||
|
||
// A healthcheck message was published, ignore this packet id | ||
Some(None) => {} | ||
|
||
// The other bridge half has disconnected, break the loop and shut down the bridge | ||
None => break, | ||
} | ||
} else if let hash_map::Entry::Vacant(e) = forward_pkid_to_received_msg.entry(pkid) | ||
{ | ||
if let hash_map::Entry::Vacant(e) = forward_pkid_to_received_msg.entry(pkid) { | ||
match companion_bridge_half.recv().await { | ||
// A message was forwarded by the other bridge half, note the packet id | ||
Some(Some((topic, msg))) => { | ||
loop_breaker.forward_on_topic(topic, &msg); | ||
e.insert(msg); | ||
if pkid != 0 { | ||
// Messages with pkid 0 (meaning QoS=0) should not be added to the hashmap | ||
// as multiple messages with the pkid=0 can be received | ||
e.insert(msg); | ||
} | ||
Comment on lines
+389
to
+393
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can this There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Not easily. In an ideal world, we would have some deterministic tests where we control the broker we connect to directly, so we can send QoS 0 messages. This would also shore up the existing integration tests, which have to flood the bridge with thousands of messages before disconnecting to reproduce the desired behaviour. I think this might be worth investigating in a time-boxed manner, but I'll create a separate ticket for it. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
This is tested by the system test "Support publishing QoS 0 messages to c8y topic #2960". Sure, this test is not deterministic (as messages with QoS 0 might not be delivered), but I think that is enough. |
||
} | ||
|
||
// A healthcheck message was published, ignore this packet id | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like we have some inconsistency with packet sizes used across the project.
mqtt_channel::config::Config
: 1024 * 1024bridge::common_mosquitto_config::CommonMosquittoConfig
: 256 * 1024 * 1024 - 1tedge::cli::mqtt::MAX_PACKET_SIZE
: 10 * 1024 * 1024Any reason why you chose the 3rd option and not the second one, keeping it consistent with the existing bridge config?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I didn't realise that mosquitto used a completely different name for the configuration, so didn't notice it. I'll change this to be consistent with the existing bridge.