diff --git a/src/sinks/mqtt/config.rs b/src/sinks/mqtt/config.rs index cd88d6f5e58f7a..e3ea54e972c733 100644 --- a/src/sinks/mqtt/config.rs +++ b/src/sinks/mqtt/config.rs @@ -52,6 +52,10 @@ pub struct MqttSinkConfig { /// MQTT publish topic (templates allowed) pub topic: Template, + /// Whether the messages should be retained by the server + #[serde(default = "default_retain")] + pub retain: bool, + #[configurable(derived)] pub encoding: EncodingConfig, @@ -112,6 +116,10 @@ const fn default_qos() -> MqttQoS { MqttQoS::AtLeastOnce } +const fn default_retain() -> bool { + false +} + impl Default for MqttSinkConfig { fn default() -> Self { Self { @@ -124,6 +132,7 @@ impl Default for MqttSinkConfig { clean_session: default_clean_session(), tls: None, topic: Template::try_from("vector").expect("Cannot parse as a template"), + retain: default_retain(), encoding: JsonSerializerConfig::default().into(), acknowledgements: AcknowledgementsConfig::default(), quality_of_service: MqttQoS::default(), diff --git a/src/sinks/mqtt/service.rs b/src/sinks/mqtt/service.rs index fcba9c808fdf6d..376a30a31ad535 100644 --- a/src/sinks/mqtt/service.rs +++ b/src/sinks/mqtt/service.rs @@ -53,6 +53,7 @@ impl MetaDescriptive for MqttRequest { pub(super) struct MqttService { pub(super) client: AsyncClient, pub(super) quality_of_service: MqttQoS, + pub(super) retain: bool, } #[derive(Debug, Snafu)] @@ -72,13 +73,14 @@ impl Service for MqttService { fn call(&mut self, req: MqttRequest) -> Self::Future { let quality_of_service = self.quality_of_service; + let retain = self.retain; let client = self.client.clone(); Box::pin(async move { let byte_size = req.body.len(); let res = client - .publish(&req.topic, quality_of_service.into(), false, req.body) + .publish(&req.topic, quality_of_service.into(), retain, req.body) .await; match res { Ok(()) => Ok(MqttResponse { diff --git a/src/sinks/mqtt/sink.rs b/src/sinks/mqtt/sink.rs index 248a193c37defe..d220cf8af2d406 100644 --- a/src/sinks/mqtt/sink.rs +++ b/src/sinks/mqtt/sink.rs @@ -52,6 +52,7 @@ pub struct MqttSink { encoder: Encoder<()>, connector: MqttConnector, quality_of_service: MqttQoS, + retain: bool, } pub(super) struct MqttEvent { @@ -70,6 +71,7 @@ impl MqttSink { encoder, connector, quality_of_service: config.quality_of_service, + retain: config.retain, }) } @@ -115,6 +117,7 @@ impl MqttSink { let service = ServiceBuilder::new().service(MqttService { client, quality_of_service: self.quality_of_service, + retain: self.retain, }); let request_builder = MqttRequestBuilder { diff --git a/website/cue/reference/components/sinks/base/mqtt.cue b/website/cue/reference/components/sinks/base/mqtt.cue index 63affaa83bfeef..3eb0da7b67139d 100644 --- a/website/cue/reference/components/sinks/base/mqtt.cue +++ b/website/cue/reference/components/sinks/base/mqtt.cue @@ -325,6 +325,11 @@ base: components: sinks: mqtt: configuration: { } } } + retain: { + description: "Whether the messages should be retained by the server" + required: false + type: bool: default: false + } tls: { description: "Configures the TLS options for incoming/outgoing connections." required: false