Skip to content

Commit

Permalink
fix(server): at-least-once messages received during subscription must…
Browse files Browse the repository at this point in the history
… be delivered
  • Loading branch information
marve committed May 3, 2024
1 parent 4f9e54b commit db29350
Show file tree
Hide file tree
Showing 2 changed files with 60 additions and 0 deletions.
34 changes: 34 additions & 0 deletions Source/MQTTnet.Tests/Server/Retained_Messages_Tests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
// See the LICENSE file in the project root for more information.

using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.VisualStudio.TestTools.UnitTesting;
using MQTTnet.Client;
Expand Down Expand Up @@ -169,6 +170,39 @@ public async Task Receive_Retained_Message_After_Subscribe()
}
}

[TestMethod]
public async Task Receive_AtLeastOnce_Retained_Message_Published_During_Subscribe()
{
using (var testEnvironment = CreateTestEnvironment())
{
var messagePublished = new SemaphoreSlim(0,1);
var subscribeReceived = new SemaphoreSlim(0,1);
await testEnvironment.StartServer();
testEnvironment.Server.InterceptingSubscriptionAsync += _ =>
{
subscribeReceived.Release();
return messagePublished.WaitAsync();
};

var c1 = await testEnvironment.ConnectClient();

var c2 = await testEnvironment.ConnectClient();
var messageHandler = testEnvironment.CreateApplicationMessageHandler(c2);

Task subscribeComplete = c2.SubscribeAsync(new MqttTopicFilterBuilder().WithTopic("retained").WithAtLeastOnceQoS().Build());
await subscribeReceived.WaitAsync(1000);
await c1.PublishAsync(new MqttApplicationMessageBuilder().WithTopic("retained").WithPayload(new byte[3]).WithRetainFlag().WithQualityOfServiceLevel(MqttQualityOfServiceLevel.AtLeastOnce).Build());
await c1.DisconnectAsync();

messagePublished.Release();
await subscribeComplete;
await Task.Delay(500);

messageHandler.AssertReceivedCountEquals(1);
Assert.IsTrue(messageHandler.ReceivedEventArgs.First().ApplicationMessage.Retain);
}
}

[TestMethod]
public async Task Receive_Retained_Messages_From_Higher_Qos_Level()
{
Expand Down
26 changes: 26 additions & 0 deletions Source/MQTTnet/Server/Internal/MqttClientSubscriptionsManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
using MQTTnet.Internal;
using MQTTnet.Packets;
using MQTTnet.Protocol;
using static MQTTnet.Server.MqttClientSubscriptionsManager;

namespace MQTTnet.Server
{
Expand Down Expand Up @@ -177,6 +178,13 @@ public async Task<SubscribeResult> Subscribe(MqttSubscribePacket subscribePacket

var addedSubscriptions = new List<string>();
var finalTopicFilters = new List<MqttTopicFilter>();
var atLeastOnceSubscriptionResults = new List<CreateSubscriptionResult>();

IList<MqttApplicationMessage> retainedApplicationMessages = null;
if (subscribePacket.TopicFilters.Any(f => f.QualityOfServiceLevel != MqttQualityOfServiceLevel.AtLeastOnce))
{
retainedApplicationMessages = await _retainedMessagesManager.GetMessages().ConfigureAwait(false);
}

// The topic filters are order by its QoS so that the higher QoS will win over a
// lower one.
Expand Down Expand Up @@ -208,6 +216,24 @@ public async Task<SubscribeResult> Subscribe(MqttSubscribePacket subscribePacket
finalTopicFilters.Add(topicFilter);

FilterRetainedApplicationMessages(retainedApplicationMessages, createSubscriptionResult, result);
if (createSubscriptionResult.Subscription.GrantedQualityOfServiceLevel != MqttQualityOfServiceLevel.AtLeastOnce)
{
FilterRetainedApplicationMessages(retainedApplicationMessages, createSubscriptionResult, result);
}
else
{
atLeastOnceSubscriptionResults.Add(createSubscriptionResult);
}
}

if (atLeastOnceSubscriptionResults.Count != 0)
{
// In order to satisfy at least once, we must query for retained messages after creating the subscription.
retainedApplicationMessages = await _retainedMessagesManager.GetMessages().ConfigureAwait(false);
foreach (var createSubscriptionResult in atLeastOnceSubscriptionResults)
{
FilterRetainedApplicationMessages(retainedApplicationMessages, createSubscriptionResult, result);
}
}

// This call will add the new subscription to the internal storage.
Expand Down

0 comments on commit db29350

Please sign in to comment.