Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(server): at-least-once messages received during subscription must… #1988

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.CodeAnalysis.CSharp.Syntax;
using Microsoft.VisualStudio.TestTools.UnitTesting;
using MQTTnet.Client;
using MQTTnet.Diagnostics;
Expand Down
34 changes: 34 additions & 0 deletions Source/MQTTnet.Tests/Server/Retained_Messages_Tests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
// See the LICENSE file in the project root for more information.

using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.VisualStudio.TestTools.UnitTesting;
using MQTTnet.Client;
Expand Down Expand Up @@ -169,6 +170,39 @@ public async Task Receive_Retained_Message_After_Subscribe()
}
}

[TestMethod]
public async Task Receive_AtLeastOnce_Retained_Message_Published_During_Subscribe()
{
using (var testEnvironment = CreateTestEnvironment())
{
var messagePublished = new SemaphoreSlim(0,1);
var subscribeReceived = new SemaphoreSlim(0,1);
await testEnvironment.StartServer();
testEnvironment.Server.InterceptingSubscriptionAsync += _ =>
{
subscribeReceived.Release();
return messagePublished.WaitAsync();
};

var c1 = await testEnvironment.ConnectClient();

var c2 = await testEnvironment.ConnectClient();
var messageHandler = testEnvironment.CreateApplicationMessageHandler(c2);

Task subscribeComplete = c2.SubscribeAsync(new MqttTopicFilterBuilder().WithTopic("retained").WithAtLeastOnceQoS().Build());
await subscribeReceived.WaitAsync(1000);
await c1.PublishAsync(new MqttApplicationMessageBuilder().WithTopic("retained").WithPayload(new byte[3]).WithRetainFlag().WithQualityOfServiceLevel(MqttQualityOfServiceLevel.AtLeastOnce).Build());
await c1.DisconnectAsync();

messagePublished.Release();
await subscribeComplete;
await Task.Delay(500);

messageHandler.AssertReceivedCountEquals(1);
Assert.IsTrue(messageHandler.ReceivedEventArgs.First().ApplicationMessage.Retain);
}
}

[TestMethod]
public async Task Receive_Retained_Messages_From_Higher_Qos_Level()
{
Expand Down
27 changes: 26 additions & 1 deletion Source/MQTTnet/Server/Internal/MqttClientSubscriptionsManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
using MQTTnet.Internal;
using MQTTnet.Packets;
using MQTTnet.Protocol;
using static MQTTnet.Server.MqttClientSubscriptionsManager;

namespace MQTTnet.Server
{
Expand Down Expand Up @@ -172,11 +173,17 @@ public async Task<SubscribeResult> Subscribe(MqttSubscribePacket subscribePacket
throw new ArgumentNullException(nameof(subscribePacket));
}

var retainedApplicationMessages = await _retainedMessagesManager.GetMessages().ConfigureAwait(false);
var result = new SubscribeResult(subscribePacket.TopicFilters.Count);

var addedSubscriptions = new List<string>();
var finalTopicFilters = new List<MqttTopicFilter>();
var atLeastOnceSubscriptionResults = new List<CreateSubscriptionResult>();

IList<MqttApplicationMessage> retainedApplicationMessages = null;
marve marked this conversation as resolved.
Show resolved Hide resolved
if (subscribePacket.TopicFilters.Any(f => f.QualityOfServiceLevel != MqttQualityOfServiceLevel.AtLeastOnce))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this only relevant for at least once QoS? Isn't this also beneficial for QoS 0 and 2?
Or are you fixing a specific issue in your project here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good question. The project wherein we discovered this bug uses solely AtLeastOnce. But I think you're right that it may fix potential issues with AtMostOnce and ExactlyOnce as well.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@chkr1011 Is this PR sufficient as-is or are you requesting that I make changes?

{
retainedApplicationMessages = await _retainedMessagesManager.GetMessages().ConfigureAwait(false);
}

// The topic filters are order by its QoS so that the higher QoS will win over a
// lower one.
Expand Down Expand Up @@ -208,6 +215,24 @@ public async Task<SubscribeResult> Subscribe(MqttSubscribePacket subscribePacket
finalTopicFilters.Add(topicFilter);

FilterRetainedApplicationMessages(retainedApplicationMessages, createSubscriptionResult, result);
if (createSubscriptionResult.Subscription.GrantedQualityOfServiceLevel != MqttQualityOfServiceLevel.AtLeastOnce)
{
FilterRetainedApplicationMessages(retainedApplicationMessages, createSubscriptionResult, result);
}
else
{
atLeastOnceSubscriptionResults.Add(createSubscriptionResult);
}
}

if (atLeastOnceSubscriptionResults.Count != 0)
{
// In order to satisfy at least once, we must query for retained messages after creating the subscription.
retainedApplicationMessages = await _retainedMessagesManager.GetMessages().ConfigureAwait(false);
foreach (var createSubscriptionResult in atLeastOnceSubscriptionResults)
{
FilterRetainedApplicationMessages(retainedApplicationMessages, createSubscriptionResult, result);
}
}

// This call will add the new subscription to the internal storage.
Expand Down
Loading