From db29350c26dce7efe8adc591577efef63736807a Mon Sep 17 00:00:00 2001 From: marve Date: Thu, 2 May 2024 14:38:55 -0700 Subject: [PATCH] fix(server): at-least-once messages received during subscription must be delivered --- .../Server/Retained_Messages_Tests.cs | 34 +++++++++++++++++++ .../MqttClientSubscriptionsManager.cs | 26 ++++++++++++++ 2 files changed, 60 insertions(+) diff --git a/Source/MQTTnet.Tests/Server/Retained_Messages_Tests.cs b/Source/MQTTnet.Tests/Server/Retained_Messages_Tests.cs index b27741575..5adf3d86f 100644 --- a/Source/MQTTnet.Tests/Server/Retained_Messages_Tests.cs +++ b/Source/MQTTnet.Tests/Server/Retained_Messages_Tests.cs @@ -3,6 +3,7 @@ // See the LICENSE file in the project root for more information. using System.Linq; +using System.Threading; using System.Threading.Tasks; using Microsoft.VisualStudio.TestTools.UnitTesting; using MQTTnet.Client; @@ -169,6 +170,39 @@ public async Task Receive_Retained_Message_After_Subscribe() } } + [TestMethod] + public async Task Receive_AtLeastOnce_Retained_Message_Published_During_Subscribe() + { + using (var testEnvironment = CreateTestEnvironment()) + { + var messagePublished = new SemaphoreSlim(0,1); + var subscribeReceived = new SemaphoreSlim(0,1); + await testEnvironment.StartServer(); + testEnvironment.Server.InterceptingSubscriptionAsync += _ => + { + subscribeReceived.Release(); + return messagePublished.WaitAsync(); + }; + + var c1 = await testEnvironment.ConnectClient(); + + var c2 = await testEnvironment.ConnectClient(); + var messageHandler = testEnvironment.CreateApplicationMessageHandler(c2); + + Task subscribeComplete = c2.SubscribeAsync(new MqttTopicFilterBuilder().WithTopic("retained").WithAtLeastOnceQoS().Build()); + await subscribeReceived.WaitAsync(1000); + await c1.PublishAsync(new MqttApplicationMessageBuilder().WithTopic("retained").WithPayload(new byte[3]).WithRetainFlag().WithQualityOfServiceLevel(MqttQualityOfServiceLevel.AtLeastOnce).Build()); + await c1.DisconnectAsync(); + + messagePublished.Release(); + await subscribeComplete; + await Task.Delay(500); + + messageHandler.AssertReceivedCountEquals(1); + Assert.IsTrue(messageHandler.ReceivedEventArgs.First().ApplicationMessage.Retain); + } + } + [TestMethod] public async Task Receive_Retained_Messages_From_Higher_Qos_Level() { diff --git a/Source/MQTTnet/Server/Internal/MqttClientSubscriptionsManager.cs b/Source/MQTTnet/Server/Internal/MqttClientSubscriptionsManager.cs index 8cacf5605..735076d16 100644 --- a/Source/MQTTnet/Server/Internal/MqttClientSubscriptionsManager.cs +++ b/Source/MQTTnet/Server/Internal/MqttClientSubscriptionsManager.cs @@ -10,6 +10,7 @@ using MQTTnet.Internal; using MQTTnet.Packets; using MQTTnet.Protocol; +using static MQTTnet.Server.MqttClientSubscriptionsManager; namespace MQTTnet.Server { @@ -177,6 +178,13 @@ public async Task Subscribe(MqttSubscribePacket subscribePacket var addedSubscriptions = new List(); var finalTopicFilters = new List(); + var atLeastOnceSubscriptionResults = new List(); + + IList retainedApplicationMessages = null; + if (subscribePacket.TopicFilters.Any(f => f.QualityOfServiceLevel != MqttQualityOfServiceLevel.AtLeastOnce)) + { + retainedApplicationMessages = await _retainedMessagesManager.GetMessages().ConfigureAwait(false); + } // The topic filters are order by its QoS so that the higher QoS will win over a // lower one. @@ -208,6 +216,24 @@ public async Task Subscribe(MqttSubscribePacket subscribePacket finalTopicFilters.Add(topicFilter); FilterRetainedApplicationMessages(retainedApplicationMessages, createSubscriptionResult, result); + if (createSubscriptionResult.Subscription.GrantedQualityOfServiceLevel != MqttQualityOfServiceLevel.AtLeastOnce) + { + FilterRetainedApplicationMessages(retainedApplicationMessages, createSubscriptionResult, result); + } + else + { + atLeastOnceSubscriptionResults.Add(createSubscriptionResult); + } + } + + if (atLeastOnceSubscriptionResults.Count != 0) + { + // In order to satisfy at least once, we must query for retained messages after creating the subscription. + retainedApplicationMessages = await _retainedMessagesManager.GetMessages().ConfigureAwait(false); + foreach (var createSubscriptionResult in atLeastOnceSubscriptionResults) + { + FilterRetainedApplicationMessages(retainedApplicationMessages, createSubscriptionResult, result); + } } // This call will add the new subscription to the internal storage.