From 11d2e97428746d3e4c24880555615c867fa46f70 Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Wed, 31 Jul 2024 11:35:20 +0530 Subject: [PATCH] feat(rumqttd): unsubscribe method in local link (#894) --- rumqttd/CHANGELOG.md | 1 + rumqttd/src/link/local.rs | 23 +++++++++++++++++++++++ 2 files changed, 24 insertions(+) diff --git a/rumqttd/CHANGELOG.md b/rumqttd/CHANGELOG.md index c05ddb253..ed1a3248c 100644 --- a/rumqttd/CHANGELOG.md +++ b/rumqttd/CHANGELOG.md @@ -9,6 +9,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Added - Assign random identifier to clients connecting with empty client id. +- `Unsubscribe` with `local::LinkTx`. ### Changed - Public re-export `Strategy` for shared subscriptions diff --git a/rumqttd/src/link/local.rs b/rumqttd/src/link/local.rs index 44b08b5ea..212630b46 100644 --- a/rumqttd/src/link/local.rs +++ b/rumqttd/src/link/local.rs @@ -1,5 +1,6 @@ use crate::protocol::{ Filter, LastWill, LastWillProperties, Packet, Publish, QoS, RetainForwardRule, Subscribe, + Unsubscribe, }; use crate::router::Ack; use crate::router::{ @@ -281,6 +282,28 @@ impl LinkTx { Ok(len) } + /// Sends a MQTT Unsubscribe to the eventloop + pub fn unsubscribe>(&mut self, filter: S) -> Result { + let unsubscribe = Unsubscribe { + pkid: 0, + filters: vec![filter.into()], + }; + + let len = self.push(Packet::Unsubscribe(unsubscribe, None))?; + Ok(len) + } + + /// Sends a MQTT Unsubscribe to the eventloop + pub fn try_unsubscribe>(&mut self, filter: S) -> Result { + let unsubscribe = Unsubscribe { + pkid: 0, + filters: vec![filter.into()], + }; + + let len = self.try_push(Packet::Unsubscribe(unsubscribe, None))?; + Ok(len) + } + /// Request to get device shadow pub fn shadow>(&mut self, filter: S) -> Result<(), LinkError> { let message = Event::Shadow(ShadowRequest {