From d760f16b9aaeb40811371dca8eac7bc7451356d3 Mon Sep 17 00:00:00 2001 From: Miquel Ruiz Date: Thu, 12 Sep 2024 23:06:00 -0400 Subject: [PATCH] feat(mqtt sink): expose retain config flag --- src/sinks/mqtt/config.rs | 9 +++++++++ src/sinks/mqtt/service.rs | 4 +++- src/sinks/mqtt/sink.rs | 3 +++ website/cue/reference/components/sinks/base/mqtt.cue | 5 +++++ 4 files changed, 20 insertions(+), 1 deletion(-) diff --git a/src/sinks/mqtt/config.rs b/src/sinks/mqtt/config.rs index cd88d6f5e58f7..e3ea54e972c73 100644 --- a/src/sinks/mqtt/config.rs +++ b/src/sinks/mqtt/config.rs @@ -52,6 +52,10 @@ pub struct MqttSinkConfig { /// MQTT publish topic (templates allowed) pub topic: Template, + /// Whether the messages should be retained by the server + #[serde(default = "default_retain")] + pub retain: bool, + #[configurable(derived)] pub encoding: EncodingConfig, @@ -112,6 +116,10 @@ const fn default_qos() -> MqttQoS { MqttQoS::AtLeastOnce } +const fn default_retain() -> bool { + false +} + impl Default for MqttSinkConfig { fn default() -> Self { Self { @@ -124,6 +132,7 @@ impl Default for MqttSinkConfig { clean_session: default_clean_session(), tls: None, topic: Template::try_from("vector").expect("Cannot parse as a template"), + retain: default_retain(), encoding: JsonSerializerConfig::default().into(), acknowledgements: AcknowledgementsConfig::default(), quality_of_service: MqttQoS::default(), diff --git a/src/sinks/mqtt/service.rs b/src/sinks/mqtt/service.rs index fcba9c808fdf6..376a30a31ad53 100644 --- a/src/sinks/mqtt/service.rs +++ b/src/sinks/mqtt/service.rs @@ -53,6 +53,7 @@ impl MetaDescriptive for MqttRequest { pub(super) struct MqttService { pub(super) client: AsyncClient, pub(super) quality_of_service: MqttQoS, + pub(super) retain: bool, } #[derive(Debug, Snafu)] @@ -72,13 +73,14 @@ impl Service for MqttService { fn call(&mut self, req: MqttRequest) -> Self::Future { let quality_of_service = self.quality_of_service; + let retain = self.retain; let client = self.client.clone(); Box::pin(async move { let byte_size = req.body.len(); let res = client - .publish(&req.topic, quality_of_service.into(), false, req.body) + .publish(&req.topic, quality_of_service.into(), retain, req.body) .await; match res { Ok(()) => Ok(MqttResponse { diff --git a/src/sinks/mqtt/sink.rs b/src/sinks/mqtt/sink.rs index 248a193c37def..d220cf8af2d40 100644 --- a/src/sinks/mqtt/sink.rs +++ b/src/sinks/mqtt/sink.rs @@ -52,6 +52,7 @@ pub struct MqttSink { encoder: Encoder<()>, connector: MqttConnector, quality_of_service: MqttQoS, + retain: bool, } pub(super) struct MqttEvent { @@ -70,6 +71,7 @@ impl MqttSink { encoder, connector, quality_of_service: config.quality_of_service, + retain: config.retain, }) } @@ -115,6 +117,7 @@ impl MqttSink { let service = ServiceBuilder::new().service(MqttService { client, quality_of_service: self.quality_of_service, + retain: self.retain, }); let request_builder = MqttRequestBuilder { diff --git a/website/cue/reference/components/sinks/base/mqtt.cue b/website/cue/reference/components/sinks/base/mqtt.cue index 63affaa83bfee..3eb0da7b67139 100644 --- a/website/cue/reference/components/sinks/base/mqtt.cue +++ b/website/cue/reference/components/sinks/base/mqtt.cue @@ -325,6 +325,11 @@ base: components: sinks: mqtt: configuration: { } } } + retain: { + description: "Whether the messages should be retained by the server" + required: false + type: bool: default: false + } tls: { description: "Configures the TLS options for incoming/outgoing connections." required: false