From 105abeba065ae3b39e1228145db8039685d1e0e4 Mon Sep 17 00:00:00 2001 From: Christian Kratky Date: Fri, 5 May 2017 21:34:37 +0200 Subject: [PATCH] * [Server] Added support for complex client IDs. * [Server] Fixed an issue with not correctly removed old client sessions. * [Server] Several minor performance improvements. * [Server] An existing client session is no longer closed if a new client connection is invalid. * [Client] Added support for sending "CleanSession" flag. --- MQTTnet.Core/Client/MqttClient.cs | 1 + MQTTnet.Core/Client/MqttClientOptions.cs | 2 + MQTTnet.Core/MQTTnet.Core.csproj | 6 +- .../Server/GetOrCreateClientSessionResult.cs | 9 ++ ...nsManager.cs => MqttClientMessageQueue.cs} | 14 +- MQTTnet.Core/Server/MqttClientSession.cs | 52 +++---- .../Server/MqttClientSessionManager.cs | 93 ------------ .../Server/MqttClientSessionsManager.cs | 135 ++++++++++++++++++ MQTTnet.Core/Server/MqttServer.cs | 15 +- MQTTnet.nuspec | 8 +- Tests/MQTTnet.TestMqttClient/Program.cs | 4 +- 11 files changed, 204 insertions(+), 135 deletions(-) create mode 100644 MQTTnet.Core/Server/GetOrCreateClientSessionResult.cs rename MQTTnet.Core/Server/{MqttOutgoingPublicationsManager.cs => MqttClientMessageQueue.cs} (87%) delete mode 100644 MQTTnet.Core/Server/MqttClientSessionManager.cs create mode 100644 MQTTnet.Core/Server/MqttClientSessionsManager.cs diff --git a/MQTTnet.Core/Client/MqttClient.cs b/MQTTnet.Core/Client/MqttClient.cs index 5c89a913a..412563e5e 100644 --- a/MQTTnet.Core/Client/MqttClient.cs +++ b/MQTTnet.Core/Client/MqttClient.cs @@ -53,6 +53,7 @@ public async Task ConnectAsync(MqttApplicationMessage willApplicationMessage = n ClientId = _options.ClientId, Username = _options.UserName, Password = _options.Password, + CleanSession = _options.CleanSession, KeepAlivePeriod = (ushort)_options.KeepAlivePeriod.TotalSeconds, WillMessage = willApplicationMessage }; diff --git a/MQTTnet.Core/Client/MqttClientOptions.cs b/MQTTnet.Core/Client/MqttClientOptions.cs index 5410527c2..830fef5be 100644 --- a/MQTTnet.Core/Client/MqttClientOptions.cs +++ b/MQTTnet.Core/Client/MqttClientOptions.cs @@ -14,6 +14,8 @@ public class MqttClientOptions public string ClientId { get; set; } = Guid.NewGuid().ToString().Replace("-", string.Empty); + public bool CleanSession { get; set; } = true; + public TimeSpan KeepAlivePeriod { get; set; } = TimeSpan.FromSeconds(5); public TimeSpan DefaultCommunicationTimeout { get; set; } = TimeSpan.FromSeconds(10); diff --git a/MQTTnet.Core/MQTTnet.Core.csproj b/MQTTnet.Core/MQTTnet.Core.csproj index 1459a5dae..1b8346d7c 100644 --- a/MQTTnet.Core/MQTTnet.Core.csproj +++ b/MQTTnet.Core/MQTTnet.Core.csproj @@ -9,15 +9,15 @@ MQTTnet Christian Kratky Christian Kratky - 2.1.1.0 + 2.1.2.0 MQTTnet.Core Copyright © Christian Kratky 2016-2017 https://github.com/chkr1011/MQTTnet https://raw.githubusercontent.com/chkr1011/MQTTnet/master/Images/Logo_128x128.png https://github.com/chkr1011/MQTTnet MQTT MQTTClient MQTTServer MQTTBroker Broker - 2.1.1.0 - 2.1.1.0 + 2.1.2.0 + 2.1.2.0 https://github.com/chkr1011/MQTTnet/blob/master/LICENSE diff --git a/MQTTnet.Core/Server/GetOrCreateClientSessionResult.cs b/MQTTnet.Core/Server/GetOrCreateClientSessionResult.cs new file mode 100644 index 000000000..ea62122ee --- /dev/null +++ b/MQTTnet.Core/Server/GetOrCreateClientSessionResult.cs @@ -0,0 +1,9 @@ +namespace MQTTnet.Core.Server +{ + public class GetOrCreateClientSessionResult + { + public bool IsExistingSession { get; set; } + + public MqttClientSession Session { get; set; } + } +} diff --git a/MQTTnet.Core/Server/MqttOutgoingPublicationsManager.cs b/MQTTnet.Core/Server/MqttClientMessageQueue.cs similarity index 87% rename from MQTTnet.Core/Server/MqttOutgoingPublicationsManager.cs rename to MQTTnet.Core/Server/MqttClientMessageQueue.cs index 8e7c67200..63b4881e5 100644 --- a/MQTTnet.Core/Server/MqttOutgoingPublicationsManager.cs +++ b/MQTTnet.Core/Server/MqttClientMessageQueue.cs @@ -11,7 +11,7 @@ namespace MQTTnet.Core.Server { - public class MqttOutgoingPublicationsManager + public class MqttClientMessageQueue { private readonly List _pendingPublishPackets = new List(); private readonly AsyncGate _gate = new AsyncGate(); @@ -20,7 +20,7 @@ public class MqttOutgoingPublicationsManager private CancellationTokenSource _cancellationTokenSource; private IMqttCommunicationAdapter _adapter; - public MqttOutgoingPublicationsManager(MqttServerOptions options) + public MqttClientMessageQueue(MqttServerOptions options) { _options = options ?? throw new ArgumentNullException(nameof(options)); } @@ -29,13 +29,13 @@ public void Start(IMqttCommunicationAdapter adapter) { if (_cancellationTokenSource != null) { - throw new InvalidOperationException($"{nameof(MqttOutgoingPublicationsManager)} already started."); + throw new InvalidOperationException($"{nameof(MqttClientMessageQueue)} already started."); } _adapter = adapter ?? throw new ArgumentNullException(nameof(adapter)); _cancellationTokenSource = new CancellationTokenSource(); - Task.Run(() => SendPendingPublishPacketsAsync(_cancellationTokenSource.Token)).Forget(); + Task.Run(() => SendPendingPublishPacketsAsync(_cancellationTokenSource.Token)); } public void Stop() @@ -87,7 +87,7 @@ private async Task SendPendingPublishPacketsAsync(CancellationToken cancellation } catch (Exception e) { - MqttTrace.Error(nameof(MqttOutgoingPublicationsManager), e, "Error while sending pending publish packets."); + MqttTrace.Error(nameof(MqttClientMessageQueue), e, "Error while sending pending publish packets."); } finally { @@ -112,11 +112,11 @@ private async Task TrySendPendingPublishPacketAsync(MqttClientPublishPacketConte } catch (MqttCommunicationException exception) { - MqttTrace.Warning(nameof(MqttOutgoingPublicationsManager), exception, "Sending publish packet failed."); + MqttTrace.Warning(nameof(MqttClientMessageQueue), exception, "Sending publish packet failed."); } catch (Exception exception) { - MqttTrace.Error(nameof(MqttOutgoingPublicationsManager), exception, "Sending publish packet failed."); + MqttTrace.Error(nameof(MqttClientMessageQueue), exception, "Sending publish packet failed."); } finally { diff --git a/MQTTnet.Core/Server/MqttClientSession.cs b/MQTTnet.Core/Server/MqttClientSession.cs index 0af6e4635..19b4cfdd2 100644 --- a/MQTTnet.Core/Server/MqttClientSession.cs +++ b/MQTTnet.Core/Server/MqttClientSession.cs @@ -11,12 +11,12 @@ namespace MQTTnet.Core.Server { - public class MqttClientSession + public sealed class MqttClientSession : IDisposable { private readonly ConcurrentDictionary _pendingIncomingPublications = new ConcurrentDictionary(); private readonly MqttClientSubscriptionsManager _subscriptionsManager = new MqttClientSubscriptionsManager(); - private readonly MqttOutgoingPublicationsManager _outgoingPublicationsManager; + private readonly MqttClientMessageQueue _messageQueue; private readonly Action _publishPacketReceivedCallback; private readonly MqttServerOptions _options; @@ -24,15 +24,17 @@ public class MqttClientSession private IMqttCommunicationAdapter _adapter; private string _identifier; private MqttApplicationMessage _willApplicationMessage; - + public MqttClientSession(MqttServerOptions options, Action publishPacketReceivedCallback) { _options = options ?? throw new ArgumentNullException(nameof(options)); _publishPacketReceivedCallback = publishPacketReceivedCallback ?? throw new ArgumentNullException(nameof(publishPacketReceivedCallback)); - - _outgoingPublicationsManager = new MqttOutgoingPublicationsManager(options); + + _messageQueue = new MqttClientMessageQueue(options); } + public bool IsConnected => _adapter != null; + public async Task RunAsync(string identifier, MqttApplicationMessage willApplicationMessage, IMqttCommunicationAdapter adapter) { if (adapter == null) throw new ArgumentNullException(nameof(adapter)); @@ -45,7 +47,7 @@ public async Task RunAsync(string identifier, MqttApplicationMessage willApplica _adapter = adapter; _cancellationTokenSource = new CancellationTokenSource(); - _outgoingPublicationsManager.Start(adapter); + _messageQueue.Start(adapter); while (!_cancellationTokenSource.IsCancellationRequested) { var packet = await adapter.ReceivePacketAsync(TimeSpan.Zero); @@ -65,8 +67,8 @@ public async Task RunAsync(string identifier, MqttApplicationMessage willApplica { _publishPacketReceivedCallback(this, _willApplicationMessage.ToPublishPacket()); } - - _outgoingPublicationsManager.Stop(); + + _messageQueue.Stop(); _cancellationTokenSource.Cancel(); _adapter = null; @@ -84,66 +86,68 @@ public void EnqueuePublishPacket(MqttClientSession senderClientSession, MqttPubl return; } - _outgoingPublicationsManager.Enqueue(senderClientSession, publishPacket); + _messageQueue.Enqueue(senderClientSession, publishPacket); MqttTrace.Verbose(nameof(MqttClientSession), $"Client '{_identifier}: Enqueued pending publish packet."); } - private async Task HandleIncomingPacketAsync(MqttBasePacket packet) + public void Dispose() + { + _cancellationTokenSource?.Cancel(); + _cancellationTokenSource?.Dispose(); + } + + private Task HandleIncomingPacketAsync(MqttBasePacket packet) { var subscribePacket = packet as MqttSubscribePacket; if (subscribePacket != null) { - await _adapter.SendPacketAsync(_subscriptionsManager.Subscribe(subscribePacket), _options.DefaultCommunicationTimeout); - return; + return _adapter.SendPacketAsync(_subscriptionsManager.Subscribe(subscribePacket), _options.DefaultCommunicationTimeout); } var unsubscribePacket = packet as MqttUnsubscribePacket; if (unsubscribePacket != null) { - await _adapter.SendPacketAsync(_subscriptionsManager.Unsubscribe(unsubscribePacket), _options.DefaultCommunicationTimeout); - return; + return _adapter.SendPacketAsync(_subscriptionsManager.Unsubscribe(unsubscribePacket), _options.DefaultCommunicationTimeout); } var publishPacket = packet as MqttPublishPacket; if (publishPacket != null) { - await HandleIncomingPublishPacketAsync(publishPacket); - return; + return HandleIncomingPublishPacketAsync(publishPacket); } var pubRelPacket = packet as MqttPubRelPacket; if (pubRelPacket != null) { - await HandleIncomingPubRelPacketAsync(pubRelPacket); - return; + return HandleIncomingPubRelPacketAsync(pubRelPacket); } var pubAckPacket = packet as MqttPubAckPacket; if (pubAckPacket != null) { - await HandleIncomingPubAckPacketAsync(pubAckPacket); - return; + return HandleIncomingPubAckPacketAsync(pubAckPacket); } if (packet is MqttPingReqPacket) { - await _adapter.SendPacketAsync(new MqttPingRespPacket(), _options.DefaultCommunicationTimeout); - return; + return _adapter.SendPacketAsync(new MqttPingRespPacket(), _options.DefaultCommunicationTimeout); } if (packet is MqttDisconnectPacket || packet is MqttConnectPacket) { _cancellationTokenSource.Cancel(); - return; + return Task.FromResult((object)null); } MqttTrace.Warning(nameof(MqttClientSession), $"Client '{_identifier}': Received not supported packet ({packet}). Closing connection."); _cancellationTokenSource.Cancel(); + + return Task.FromResult((object)null); } private async Task HandleIncomingPubAckPacketAsync(MqttPubAckPacket pubAckPacket) { - await Task.FromResult(0); + await Task.FromResult((object)null); } private async Task HandleIncomingPublishPacketAsync(MqttPublishPacket publishPacket) diff --git a/MQTTnet.Core/Server/MqttClientSessionManager.cs b/MQTTnet.Core/Server/MqttClientSessionManager.cs deleted file mode 100644 index 5f6f63e46..000000000 --- a/MQTTnet.Core/Server/MqttClientSessionManager.cs +++ /dev/null @@ -1,93 +0,0 @@ -using System; -using System.Collections.Concurrent; -using System.Linq; -using System.Threading.Tasks; -using MQTTnet.Core.Adapter; -using MQTTnet.Core.Diagnostics; -using MQTTnet.Core.Exceptions; -using MQTTnet.Core.Packets; -using MQTTnet.Core.Protocol; - -namespace MQTTnet.Core.Server -{ - public class MqttClientSessionManager - { - private readonly ConcurrentDictionary _clientSessions = new ConcurrentDictionary(); - private readonly MqttServerOptions _options; - - public MqttClientSessionManager(MqttServerOptions options) - { - _options = options ?? throw new ArgumentNullException(nameof(options)); - } - - public async Task RunClientSessionAsync(MqttClientConnectedEventArgs eventArgs) - { - try - { - var connectPacket = await eventArgs.ClientAdapter.ReceivePacketAsync(_options.DefaultCommunicationTimeout) as MqttConnectPacket; - if (connectPacket == null) - { - throw new MqttProtocolViolationException("The first packet from a client must be a 'Connect' packet [MQTT-3.1.0-1]."); - } - - var connectReturnCode = MqttConnectReturnCode.ConnectionAccepted; - if (_options.ConnectionValidator != null) - { - connectReturnCode = _options.ConnectionValidator(connectPacket); - } - - MqttClientSession clientSession = null; - var isSessionPresent = _clientSessions.ContainsKey(connectPacket.ClientId); - if (isSessionPresent && connectPacket.CleanSession) - { - MqttClientSession _; - _clientSessions.TryRemove(connectPacket.ClientId, out _); - } - else if (!connectPacket.CleanSession) - { - _clientSessions.TryGetValue(connectPacket.ClientId, out clientSession); - } - - await eventArgs.ClientAdapter.SendPacketAsync(new MqttConnAckPacket - { - ConnectReturnCode = connectReturnCode, - IsSessionPresent = clientSession != null - }, _options.DefaultCommunicationTimeout); - - if (connectReturnCode != MqttConnectReturnCode.ConnectionAccepted) - { - return; - } - - if (clientSession == null) - { - clientSession = new MqttClientSession(_options, DispatchPublishPacket); - _clientSessions.TryAdd(connectPacket.ClientId, clientSession); - } - - await clientSession.RunAsync(eventArgs.Identifier, connectPacket.WillMessage, eventArgs.ClientAdapter); - } - catch (Exception exception) - { - MqttTrace.Error(nameof(MqttServer), exception, exception.Message); - } - finally - { - await eventArgs.ClientAdapter.DisconnectAsync(); - } - } - - public void Clear() - { - _clientSessions.Clear(); - } - - private void DispatchPublishPacket(MqttClientSession senderClientSession, MqttPublishPacket publishPacket) - { - foreach (var clientSession in _clientSessions.Values.ToList()) - { - clientSession.EnqueuePublishPacket(senderClientSession, publishPacket); - } - } - } -} diff --git a/MQTTnet.Core/Server/MqttClientSessionsManager.cs b/MQTTnet.Core/Server/MqttClientSessionsManager.cs new file mode 100644 index 000000000..1dcbd4429 --- /dev/null +++ b/MQTTnet.Core/Server/MqttClientSessionsManager.cs @@ -0,0 +1,135 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Threading.Tasks; +using MQTTnet.Core.Adapter; +using MQTTnet.Core.Diagnostics; +using MQTTnet.Core.Exceptions; +using MQTTnet.Core.Packets; +using MQTTnet.Core.Protocol; + +namespace MQTTnet.Core.Server +{ + public class MqttClientSessionsManager + { + private readonly object _syncRoot = new object(); + private readonly Dictionary _clientSessions = new Dictionary(); + private readonly MqttServerOptions _options; + + public MqttClientSessionsManager(MqttServerOptions options) + { + _options = options ?? throw new ArgumentNullException(nameof(options)); + } + + public async Task RunClientSessionAsync(MqttClientConnectedEventArgs eventArgs) + { + try + { + var connectPacket = await eventArgs.ClientAdapter.ReceivePacketAsync(_options.DefaultCommunicationTimeout) as MqttConnectPacket; + if (connectPacket == null) + { + throw new MqttProtocolViolationException("The first packet from a client must be a 'CONNECT' packet [MQTT-3.1.0-1]."); + } + + var connectReturnCode = ValidateConnection(connectPacket); + if (connectReturnCode != MqttConnectReturnCode.ConnectionAccepted) + { + await eventArgs.ClientAdapter.SendPacketAsync(new MqttConnAckPacket + { + ConnectReturnCode = connectReturnCode + }, _options.DefaultCommunicationTimeout); + + return; + } + + var clientSession = GetOrCreateClientSession(connectPacket); + + await eventArgs.ClientAdapter.SendPacketAsync(new MqttConnAckPacket + { + ConnectReturnCode = connectReturnCode, + IsSessionPresent = clientSession.IsExistingSession + }, _options.DefaultCommunicationTimeout); + + await clientSession.Session.RunAsync(eventArgs.Identifier, connectPacket.WillMessage, eventArgs.ClientAdapter); + } + catch (Exception exception) + { + MqttTrace.Error(nameof(MqttServer), exception, exception.Message); + } + finally + { + await eventArgs.ClientAdapter.DisconnectAsync(); + } + } + + public void Clear() + { + lock (_syncRoot) + { + _clientSessions.Clear(); + } + } + + public IList GetConnectedClients() + { + lock (_syncRoot) + { + return _clientSessions.Where(s => s.Value.IsConnected).Select(s => s.Key).ToList(); + } + } + + private MqttConnectReturnCode ValidateConnection(MqttConnectPacket connectPacket) + { + if (_options.ConnectionValidator != null) + { + return _options.ConnectionValidator(connectPacket); + } + + return MqttConnectReturnCode.ConnectionAccepted; + } + + private GetOrCreateClientSessionResult GetOrCreateClientSession(MqttConnectPacket connectPacket) + { + lock (_syncRoot) + { + MqttClientSession clientSession; + var isSessionPresent = _clientSessions.TryGetValue(connectPacket.ClientId, out clientSession); + if (isSessionPresent) + { + if (connectPacket.CleanSession) + { + _clientSessions.Remove(connectPacket.ClientId); + clientSession.Dispose(); + clientSession = null; + MqttTrace.Verbose(nameof(MqttClientSessionsManager), $"Disposed existing session of client '{connectPacket.ClientId}'."); + } + else + { + MqttTrace.Verbose(nameof(MqttClientSessionsManager), $"Reusing existing session of client '{connectPacket.ClientId}'."); + } + } + + var isExistingSession = true; + if (clientSession == null) + { + isExistingSession = false; + + clientSession = new MqttClientSession(_options, DispatchPublishPacket); + _clientSessions[connectPacket.ClientId] = clientSession; + + MqttTrace.Verbose(nameof(MqttClientSessionsManager), $"Created a new session for client '{connectPacket.ClientId}'."); + } + + return new GetOrCreateClientSessionResult { IsExistingSession = isExistingSession, Session = clientSession }; + } + } + + private void DispatchPublishPacket(MqttClientSession senderClientSession, MqttPublishPacket publishPacket) + { + foreach (var clientSession in _clientSessions.Values.ToList()) + { + clientSession.EnqueuePublishPacket(senderClientSession, publishPacket); + } + } + } +} \ No newline at end of file diff --git a/MQTTnet.Core/Server/MqttServer.cs b/MQTTnet.Core/Server/MqttServer.cs index 52bca301e..0b689e21c 100644 --- a/MQTTnet.Core/Server/MqttServer.cs +++ b/MQTTnet.Core/Server/MqttServer.cs @@ -1,15 +1,15 @@ using System; +using System.Collections.Generic; using System.Threading; using System.Threading.Tasks; using MQTTnet.Core.Adapter; using MQTTnet.Core.Diagnostics; -using MQTTnet.Core.Internal; namespace MQTTnet.Core.Server { public class MqttServer { - private readonly MqttClientSessionManager _clientSessionManager; + private readonly MqttClientSessionsManager _clientSessionsManager; private readonly IMqttServerAdapter _adapter; private readonly MqttServerOptions _options; @@ -20,7 +20,12 @@ public MqttServer(MqttServerOptions options, IMqttServerAdapter adapter) _options = options ?? throw new ArgumentNullException(nameof(options)); _adapter = adapter ?? throw new ArgumentNullException(nameof(adapter)); - _clientSessionManager = new MqttClientSessionManager(options); + _clientSessionsManager = new MqttClientSessionsManager(options); + } + + public IList GetConnectedClients() + { + return _clientSessionsManager.GetConnectedClients(); } public event EventHandler ClientConnected; @@ -54,7 +59,7 @@ public void Stop() _adapter.ClientConnected -= OnClientConnected; _adapter.Stop(); - _clientSessionManager.Clear(); + _clientSessionsManager.Clear(); MqttTrace.Information(nameof(MqttServer), "Stopped."); } @@ -64,7 +69,7 @@ private void OnClientConnected(object sender, MqttClientConnectedEventArgs event MqttTrace.Information(nameof(MqttServer), $"Client '{eventArgs.Identifier}': Connected."); ClientConnected?.Invoke(this, eventArgs); - Task.Run(async () => await _clientSessionManager.RunClientSessionAsync(eventArgs), _cancellationTokenSource.Token).Forget(); + Task.Run(() => _clientSessionsManager.RunClientSessionAsync(eventArgs), _cancellationTokenSource.Token); } } } diff --git a/MQTTnet.nuspec b/MQTTnet.nuspec index 4eaa76b16..f0de0fb27 100644 --- a/MQTTnet.nuspec +++ b/MQTTnet.nuspec @@ -2,7 +2,7 @@ MQTTnet - 2.1.1.0 + 2.1.2.0 Christian Kratky Christian Kratky https://github.com/chkr1011/MQTTnet/blob/master/LICENSE @@ -10,7 +10,11 @@ https://raw.githubusercontent.com/chkr1011/MQTTnet/master/Images/Logo_128x128.png false MQTTnet is a .NET library for MQTT based communication. It provides a MQTT client and a MQTT server (broker). - * Added MQTT server. + * [Server] Added support for complex client IDs. + * [Server] Fixed an issue with not correctly removed old client sessions. + * [Server] Several minor performance improvements. + * [Server] An existing client session is no longer closed if a new client connection is invalid. + * [Client] Added support for sending "CleanSession" flag. Copyright Christian Kratky 2016-2017 MQTT MQTTClient MQTTServer MQTTBroker Broker diff --git a/Tests/MQTTnet.TestMqttClient/Program.cs b/Tests/MQTTnet.TestMqttClient/Program.cs index ee036fa16..41d547a0e 100644 --- a/Tests/MQTTnet.TestMqttClient/Program.cs +++ b/Tests/MQTTnet.TestMqttClient/Program.cs @@ -33,7 +33,9 @@ private static async Task Run(string[] arguments) { var options = new MqttClientOptions { - Server = "localhost" + Server = "localhost", + ClientId = "XYZ", + CleanSession = true }; var client = new MqttClientFactory().CreateMqttClient(options);