Skip to content

Commit

Permalink
Improve logging of messages pub/sub over MQTT
Browse files Browse the repository at this point in the history
Signed-off-by: Didier Wenzek <[email protected]>
  • Loading branch information
didier-wenzek committed Jul 1, 2024
1 parent fc97023 commit 700d52e
Show file tree
Hide file tree
Showing 6 changed files with 52 additions and 12 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

25 changes: 25 additions & 0 deletions crates/common/mqtt_channel/src/messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use serde::Deserializer;
use serde::Serialize;
use serde::Serializer;
use std::fmt::Debug;
use std::fmt::Display;
use std::fmt::Formatter;
use std::fmt::Write;

Expand All @@ -20,6 +21,21 @@ pub struct MqttMessage {
pub retain: bool,
}

impl Display for MqttMessage {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
f.write_char('[')?;
f.write_str(&self.topic.name)?;
f.write_str(" qos=")?;
f.write_char(match self.qos {
QoS::AtMostOnce => '0',
QoS::AtLeastOnce => '1',
QoS::ExactlyOnce => '2',
})?;
f.write_str(if self.retain { " retained] " } else { "] " })?;
Display::fmt(&self.payload, f)
}
}

fn serialize_qos<S>(qos: &QoS, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
Expand Down Expand Up @@ -119,6 +135,15 @@ impl<'de> Deserialize<'de> for DebugPayload {
}
}

impl Display for DebugPayload {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
match self.as_str() {
Ok(str) => f.write_str(str),
Err(_) => f.write_str(&format!("non UTF-8 payload of {} bytes", self.0.len())),
}
}
}

impl DebugPayload {
/// The payload string (unless this payload is not UTF8)
pub fn as_str(&self) -> Result<&str, MqttError> {
Expand Down
4 changes: 4 additions & 0 deletions crates/common/mqtt_channel/src/topics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,10 @@ impl TopicFilter {
})
.collect()
}

pub fn patterns(&self) -> &Vec<String> {
&self.patterns
}
}

impl TryInto<Topic> for &str {
Expand Down
9 changes: 7 additions & 2 deletions crates/core/tedge_actors/src/message_boxes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -155,8 +155,7 @@ impl<Input: Debug> LoggingReceiver<Input> {

/// Close the input so no new messages can be sent to this receiver
pub fn close_input(&mut self) {
self.receiver.input_receiver.close();
self.receiver.signal_receiver.close();
self.receiver.close_input();
}
}

Expand Down Expand Up @@ -364,6 +363,12 @@ impl<Input> CombinedReceiver<Input> {
signal_receiver,
}
}

/// Close the input so no new messages can be sent to this receiver
pub fn close_input(&mut self) {
self.input_receiver.close();
self.signal_receiver.close();
}
}

#[async_trait]
Expand Down
1 change: 1 addition & 0 deletions crates/extensions/tedge_mqtt_ext/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ serde_json = { workspace = true }
tedge_actors = { workspace = true }
tedge_utils = { workspace = true }
tokio = { workspace = true, default_features = false, features = ["macros"] }
tracing = { workspace = true }

[dev-dependencies]
futures = { workspace = true }
Expand Down
24 changes: 14 additions & 10 deletions crates/extensions/tedge_mqtt_ext/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,8 @@ use tedge_actors::futures::channel::mpsc;
use tedge_actors::Actor;
use tedge_actors::Builder;
use tedge_actors::ChannelError;
use tedge_actors::CombinedReceiver;
use tedge_actors::DynSender;
use tedge_actors::LoggingReceiver;
use tedge_actors::LoggingSender;
use tedge_actors::MessageReceiver;
use tedge_actors::MessageSink;
use tedge_actors::MessageSource;
Expand All @@ -32,17 +31,17 @@ pub use mqtt_channel::TopicFilter;

pub struct MqttActorBuilder {
mqtt_config: mqtt_channel::Config,
input_receiver: LoggingReceiver<MqttMessage>,
input_receiver: CombinedReceiver<MqttMessage>,
publish_sender: mpsc::Sender<MqttMessage>,
pub subscriber_addresses: Vec<(TopicFilter, LoggingSender<MqttMessage>)>,
pub subscriber_addresses: Vec<(TopicFilter, DynSender<MqttMessage>)>,
signal_sender: mpsc::Sender<RuntimeRequest>,
}

impl MqttActorBuilder {
pub fn new(config: mqtt_channel::Config) -> Self {
let (publish_sender, publish_receiver) = mpsc::channel(10);
let (signal_sender, signal_receiver) = mpsc::channel(10);
let input_receiver = LoggingReceiver::new("MQTT".into(), publish_receiver, signal_receiver);
let input_receiver = CombinedReceiver::new(publish_receiver, signal_receiver);

MqttActorBuilder {
mqtt_config: config,
Expand All @@ -56,6 +55,9 @@ impl MqttActorBuilder {
pub(crate) fn build_actor(self) -> MqttActor {
let mut combined_topic_filter = TopicFilter::empty();
for (topic_filter, _) in self.subscriber_addresses.iter() {
for pattern in topic_filter.patterns() {
tracing::info!(target: "MQTT sub", "{pattern}");
}
combined_topic_filter.add_all(topic_filter.to_owned());
}
let mqtt_config = self.mqtt_config.with_subscriptions(combined_topic_filter);
Expand All @@ -72,7 +74,7 @@ impl AsMut<MqttConfig> for MqttActorBuilder {

impl MessageSource<MqttMessage, TopicFilter> for MqttActorBuilder {
fn connect_sink(&mut self, subscriptions: TopicFilter, peer: &impl MessageSink<MqttMessage>) {
let sender = LoggingSender::new("MQTT".into(), peer.get_sender());
let sender = peer.get_sender();
self.subscriber_addresses.push((subscriptions, sender));
}
}
Expand Down Expand Up @@ -102,11 +104,11 @@ impl Builder<MqttActor> for MqttActorBuilder {
}

pub struct FromPeers {
input_receiver: LoggingReceiver<MqttMessage>,
input_receiver: CombinedReceiver<MqttMessage>,
}

pub struct ToPeers {
peer_senders: Vec<(TopicFilter, LoggingSender<MqttMessage>)>,
peer_senders: Vec<(TopicFilter, DynSender<MqttMessage>)>,
}

impl FromPeers {
Expand All @@ -115,6 +117,7 @@ impl FromPeers {
outgoing_mqtt: &mut mpsc::UnboundedSender<MqttMessage>,
) -> Result<(), RuntimeError> {
while let Ok(Some(message)) = self.try_recv().await {
tracing::debug!(target: "MQTT pub", "{message}");
SinkExt::send(outgoing_mqtt, message)
.await
.map_err(Box::new)?;
Expand All @@ -139,6 +142,7 @@ impl ToPeers {
incoming_mqtt: &mut mpsc::UnboundedReceiver<MqttMessage>,
) -> Result<(), RuntimeError> {
while let Some(message) = incoming_mqtt.next().await {
tracing::debug!(target: "MQTT recv", "{message}");
self.send(message).await?;
}
Ok(())
Expand Down Expand Up @@ -178,8 +182,8 @@ pub struct MqttActor {
impl MqttActor {
fn new(
mqtt_config: mqtt_channel::Config,
input_receiver: LoggingReceiver<MqttMessage>,
peer_senders: Vec<(TopicFilter, LoggingSender<MqttMessage>)>,
input_receiver: CombinedReceiver<MqttMessage>,
peer_senders: Vec<(TopicFilter, DynSender<MqttMessage>)>,
) -> Self {
MqttActor {
mqtt_config,
Expand Down

0 comments on commit 700d52e

Please sign in to comment.