From 06d2cb0b7fe02ea1a7e58cd64f0d9c4c722892d7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Alexander=20N=C3=B8rup?= Date: Tue, 9 Apr 2024 13:41:30 +0200 Subject: [PATCH 1/3] Migrated from using Threads to using Tasks for background tasks --- .../Communication/RfbMessageReceiver.cs | 45 +++++---- .../Communication/RfbMessageSender.cs | 17 ++-- .../Utils/BackgroundThread.cs | 91 +++++-------------- .../Utils/BackgroundThreadTests.cs | 4 +- 4 files changed, 62 insertions(+), 95 deletions(-) diff --git a/src/MarcusW.VncClient/Protocol/Implementation/Services/Communication/RfbMessageReceiver.cs b/src/MarcusW.VncClient/Protocol/Implementation/Services/Communication/RfbMessageReceiver.cs index ebdd25e7..f2012329 100644 --- a/src/MarcusW.VncClient/Protocol/Implementation/Services/Communication/RfbMessageReceiver.cs +++ b/src/MarcusW.VncClient/Protocol/Implementation/Services/Communication/RfbMessageReceiver.cs @@ -1,8 +1,11 @@ using System; +using System.Buffers; using System.Collections.Immutable; using System.Diagnostics; using System.IO; using System.Linq; +using System.Runtime; +using System.Runtime.InteropServices; using System.Text; using System.Threading; using System.Threading.Tasks; @@ -54,7 +57,7 @@ public Task StopReceiveLoopAsync() // This method will not catch exceptions so the BackgroundThread base class will receive them, // raise a "Failure" and trigger a reconnect. - protected override void ThreadWorker(CancellationToken cancellationToken) + protected override async Task ThreadWorker(CancellationToken cancellationToken) { // Get the transport stream so we don't have to call the getter every time Debug.Assert(_context.Transport != null, "_context.Transport != null"); @@ -65,28 +68,34 @@ protected override void ThreadWorker(CancellationToken cancellationToken) ImmutableDictionary incomingMessageLookup = _context.SupportedMessageTypes.OfType().ToImmutableDictionary(mt => mt.Id); - Span messageTypeBuffer = stackalloc byte[1]; - - while (!cancellationToken.IsCancellationRequested) + var messageTypeBuffer = MemoryPool.Shared.Rent(); + try { - // Read message type - if (transportStream.Read(messageTypeBuffer) == 0) - throw new UnexpectedEndOfStreamException("Stream reached its end while reading next message type."); - byte messageTypeId = messageTypeBuffer[0]; + while (!cancellationToken.IsCancellationRequested) + { + // Read message type + if (await transportStream.ReadAsync(messageTypeBuffer.Memory).ConfigureAwait(false) == 0) + throw new UnexpectedEndOfStreamException("Stream reached its end while reading next message type."); + byte messageTypeId = messageTypeBuffer.Memory.Span[0]; - // Find message type - if (!incomingMessageLookup.TryGetValue(messageTypeId, out IIncomingMessageType messageType)) - throw new UnexpectedDataException($"Server sent a message of type {messageTypeId} that is not supported by this protocol implementation. " - + "Servers should always check for client support before using protocol extensions."); + // Find message type + if (!incomingMessageLookup.TryGetValue(messageTypeId, out IIncomingMessageType messageType)) + throw new UnexpectedDataException($"Server sent a message of type {messageTypeId} that is not supported by this protocol implementation. " + + "Servers should always check for client support before using protocol extensions."); - _logger.LogDebug("Received message: {name}({id})", messageType.Name, messageTypeId); + _logger.LogDebug("Received message: {name}({id})", messageType.Name, messageTypeId); - // Ensure the message type is marked as used - if (!messageType.IsStandardMessageType) - _state.EnsureMessageTypeIsMarkedAsUsed(messageType); + // Ensure the message type is marked as used + if (!messageType.IsStandardMessageType) + _state.EnsureMessageTypeIsMarkedAsUsed(messageType); - // Read the message - messageType.ReadMessage(transport, cancellationToken); + // Read the message + messageType.ReadMessage(transport, cancellationToken); + } + } + finally + { + messageTypeBuffer.Dispose(); } } } diff --git a/src/MarcusW.VncClient/Protocol/Implementation/Services/Communication/RfbMessageSender.cs b/src/MarcusW.VncClient/Protocol/Implementation/Services/Communication/RfbMessageSender.cs index 6d892f56..0ea063e3 100644 --- a/src/MarcusW.VncClient/Protocol/Implementation/Services/Communication/RfbMessageSender.cs +++ b/src/MarcusW.VncClient/Protocol/Implementation/Services/Communication/RfbMessageSender.cs @@ -3,6 +3,7 @@ using System.Diagnostics; using System.Linq; using System.Threading; +using System.Threading.Channels; using System.Threading.Tasks; using MarcusW.VncClient.Protocol.Implementation.MessageTypes.Outgoing; using MarcusW.VncClient.Protocol.MessageTypes; @@ -21,7 +22,7 @@ public class RfbMessageSender : BackgroundThread, IRfbMessageSender private readonly ProtocolState _state; private readonly ILogger _logger; - private readonly BlockingCollection _queue = new BlockingCollection(new ConcurrentQueue()); + private readonly Channel _queue = Channel.CreateUnbounded(); private volatile bool _disposed; @@ -80,7 +81,7 @@ public void EnqueueMessage(IOutgoingMessage message, TMessageType messageType = GetAndCheckMessageType(); // Add message to queue - _queue.Add(new QueueItem(message, messageType), cancellationToken); + _queue.Writer.TryWrite(new QueueItem(message, messageType)); } /// @@ -108,14 +109,14 @@ public Task SendMessageAndWaitAsync(IOutgoingMessage var completionSource = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); // Add message to queue - _queue.Add(new QueueItem(message, messageType, completionSource), cancellationToken); + _queue.Writer.TryWrite(new QueueItem(message, messageType, completionSource)); return completionSource.Task; } // This method will not catch exceptions so the BackgroundThread base class will receive them, // raise a "Failure" and trigger a reconnect. - protected override void ThreadWorker(CancellationToken cancellationToken) + protected override async Task ThreadWorker(CancellationToken cancellationToken) { try { @@ -123,8 +124,9 @@ protected override void ThreadWorker(CancellationToken cancellationToken) ITransport transport = _context.Transport; // Iterate over all queued items (will block if the queue is empty) - foreach (QueueItem queueItem in _queue.GetConsumingEnumerable(cancellationToken)) + while (await _queue.Reader.WaitToReadAsync(cancellationToken).ConfigureAwait(false)) { + var queueItem = await _queue.Reader.ReadAsync().ConfigureAwait(false); IOutgoingMessage message = queueItem.Message; IOutgoingMessageType messageType = queueItem.MessageType; @@ -167,7 +169,7 @@ protected override void Dispose(bool disposing) if (disposing) { SetQueueCancelled(); - _queue.Dispose(); + _queue.Writer.TryComplete(); } _disposed = true; @@ -191,8 +193,7 @@ private TMessageType GetAndCheckMessageType() where TMessageType : private void SetQueueCancelled() { - _queue.CompleteAdding(); - foreach (QueueItem queueItem in _queue) + while (_queue.Reader.TryRead(out var queueItem)) queueItem.CompletionSource?.TrySetCanceled(); } diff --git a/src/MarcusW.VncClient/Utils/BackgroundThread.cs b/src/MarcusW.VncClient/Utils/BackgroundThread.cs index c1cd821b..c87ee7c5 100644 --- a/src/MarcusW.VncClient/Utils/BackgroundThread.cs +++ b/src/MarcusW.VncClient/Utils/BackgroundThread.cs @@ -10,13 +10,8 @@ namespace MarcusW.VncClient.Utils /// public abstract class BackgroundThread : IBackgroundThread { - private readonly Thread _thread; - - private bool _started; - private readonly object _startLock = new object(); - private readonly CancellationTokenSource _stopCts = new CancellationTokenSource(); - private readonly TaskCompletionSource _completedTcs = new TaskCompletionSource(); + private Lazy _task; private volatile bool _disposed; @@ -27,15 +22,18 @@ public abstract class BackgroundThread : IBackgroundThread /// Initializes a new instance of the . /// /// The thread name. + [Obsolete("The name field is no longer used")] protected BackgroundThread(string name) + : this() { - if (name == null) - throw new ArgumentNullException(nameof(name)); + } - _thread = new Thread(ThreadStart) { - Name = name, - IsBackground = true - }; + /// + /// Initializes a new instance of the . + /// + protected BackgroundThread() + { + _task = new Lazy(() => ThreadWorker(_stopCts.Token)); } /// @@ -49,13 +47,14 @@ protected void Start() if (_disposed) throw new ObjectDisposedException(nameof(BackgroundThread)); - lock (_startLock) + try { - if (_started) - throw new InvalidOperationException("Thread already started."); - - _thread.Start(_stopCts.Token); - _started = true; + // Do your work... + _ = _task.Value; + } + catch (Exception exception) when (!(exception is OperationCanceledException || exception is ThreadAbortException)) + { + Failed?.Invoke(this, new BackgroundThreadFailedEventArgs(exception)); } } @@ -65,50 +64,26 @@ protected void Start() /// /// It is safe to call this method multiple times. /// - protected Task StopAndWaitAsync() + protected async Task StopAndWaitAsync() { if (_disposed) throw new ObjectDisposedException(nameof(BackgroundThread)); - lock (_startLock) - { - if (!_started) - throw new InvalidOperationException("Thread has not been started."); - } - // Tell the thread to stop _stopCts.Cancel(); // Wait for completion - return _completedTcs.Task; + if (_task.IsValueCreated) + { + await _task.Value.ConfigureAwait(false); + } } /// /// Executes the work that should happen in the background. /// /// The cancellation token that tells the method implementation when to complete. - protected abstract void ThreadWorker(CancellationToken cancellationToken); - - private void ThreadStart(object? parameter) - { - Debug.Assert(parameter != null, nameof(parameter) + " != null"); - var cancellationToken = (CancellationToken)parameter; - - try - { - // Do your work... - ThreadWorker(cancellationToken); - } - catch (Exception exception) when (!(exception is OperationCanceledException || exception is ThreadAbortException)) - { - Failed?.Invoke(this, new BackgroundThreadFailedEventArgs(exception)); - } - finally - { - // Notify stop method that thread has completed - _completedTcs.TrySetResult(null); - } - } + protected abstract Task ThreadWorker(CancellationToken cancellationToken); /// public void Dispose() => Dispose(true); @@ -120,25 +95,7 @@ protected virtual void Dispose(bool disposing) if (disposing) { - try - { - // Ensure the thread is stopped - _stopCts.Cancel(); - if (_thread.IsAlive) - { - // Block and wait for completion or hard-kill the thread after 1 second - if (!_thread.Join(TimeSpan.FromSeconds(1))) - _thread.Abort(); - } - } - catch - { - // Ignore - } - - // Just to be sure... - _completedTcs.TrySetResult(null); - + _stopCts.Cancel(); _stopCts.Dispose(); } diff --git a/tests/MarcusW.VncClient.Tests/Utils/BackgroundThreadTests.cs b/tests/MarcusW.VncClient.Tests/Utils/BackgroundThreadTests.cs index 34505792..7f9ddcc8 100644 --- a/tests/MarcusW.VncClient.Tests/Utils/BackgroundThreadTests.cs +++ b/tests/MarcusW.VncClient.Tests/Utils/BackgroundThreadTests.cs @@ -64,10 +64,10 @@ public CancellableThread() : base("Cancellable Thread") { } public new Task StopAndWaitAsync() => base.StopAndWaitAsync(); - protected override void ThreadWorker(CancellationToken cancellationToken) + protected override async Task ThreadWorker(CancellationToken cancellationToken) { while (!cancellationToken.IsCancellationRequested) - Thread.Sleep(10); + await Task.Delay(10); } } } From 792769edda780e6496bc157c7914eb5abd1abbac Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Alexander=20N=C3=B8rup?= Date: Tue, 9 Apr 2024 15:55:07 +0200 Subject: [PATCH 2/3] Use shared byte array per Task --- .../Communication/RfbMessageReceiver.cs | 40 ++++++++----------- 1 file changed, 17 insertions(+), 23 deletions(-) diff --git a/src/MarcusW.VncClient/Protocol/Implementation/Services/Communication/RfbMessageReceiver.cs b/src/MarcusW.VncClient/Protocol/Implementation/Services/Communication/RfbMessageReceiver.cs index f2012329..25924da0 100644 --- a/src/MarcusW.VncClient/Protocol/Implementation/Services/Communication/RfbMessageReceiver.cs +++ b/src/MarcusW.VncClient/Protocol/Implementation/Services/Communication/RfbMessageReceiver.cs @@ -68,34 +68,28 @@ protected override async Task ThreadWorker(CancellationToken cancellationToken) ImmutableDictionary incomingMessageLookup = _context.SupportedMessageTypes.OfType().ToImmutableDictionary(mt => mt.Id); - var messageTypeBuffer = MemoryPool.Shared.Rent(); - try + var messageTypeBuffer = new byte[1]; + + while (!cancellationToken.IsCancellationRequested) { - while (!cancellationToken.IsCancellationRequested) - { - // Read message type - if (await transportStream.ReadAsync(messageTypeBuffer.Memory).ConfigureAwait(false) == 0) - throw new UnexpectedEndOfStreamException("Stream reached its end while reading next message type."); - byte messageTypeId = messageTypeBuffer.Memory.Span[0]; + // Read message type + if (await transportStream.ReadAsync(messageTypeBuffer, 0, messageTypeBuffer.Length).ConfigureAwait(false) == 0) + throw new UnexpectedEndOfStreamException("Stream reached its end while reading next message type."); + byte messageTypeId = messageTypeBuffer[0]; - // Find message type - if (!incomingMessageLookup.TryGetValue(messageTypeId, out IIncomingMessageType messageType)) - throw new UnexpectedDataException($"Server sent a message of type {messageTypeId} that is not supported by this protocol implementation. " - + "Servers should always check for client support before using protocol extensions."); + // Find message type + if (!incomingMessageLookup.TryGetValue(messageTypeId, out IIncomingMessageType messageType)) + throw new UnexpectedDataException($"Server sent a message of type {messageTypeId} that is not supported by this protocol implementation. " + + "Servers should always check for client support before using protocol extensions."); - _logger.LogDebug("Received message: {name}({id})", messageType.Name, messageTypeId); + _logger.LogDebug("Received message: {name}({id})", messageType.Name, messageTypeId); - // Ensure the message type is marked as used - if (!messageType.IsStandardMessageType) - _state.EnsureMessageTypeIsMarkedAsUsed(messageType); + // Ensure the message type is marked as used + if (!messageType.IsStandardMessageType) + _state.EnsureMessageTypeIsMarkedAsUsed(messageType); - // Read the message - messageType.ReadMessage(transport, cancellationToken); - } - } - finally - { - messageTypeBuffer.Dispose(); + // Read the message + messageType.ReadMessage(transport, cancellationToken); } } } From ac8d84da63e67f34678de870f020bf9ca4382c90 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Alexander=20N=C3=B8rup?= Date: Tue, 9 Apr 2024 16:00:26 +0200 Subject: [PATCH 3/3] Bump version number to alpha5 --- src/Directory.Build.props | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Directory.Build.props b/src/Directory.Build.props index f60d8732..fed041a6 100644 --- a/src/Directory.Build.props +++ b/src/Directory.Build.props @@ -12,7 +12,7 @@ MarcusW VNC-Client - 1.0.0-alpha4 + 1.0.0-alpha5 High-performance cross-platform VNC-Client library for C# Marcus Wichelmann Marcus Wichelmann