diff --git a/src/Directory.Build.props b/src/Directory.Build.props index f60d8732..fed041a6 100644 --- a/src/Directory.Build.props +++ b/src/Directory.Build.props @@ -12,7 +12,7 @@ MarcusW VNC-Client - 1.0.0-alpha4 + 1.0.0-alpha5 High-performance cross-platform VNC-Client library for C# Marcus Wichelmann Marcus Wichelmann diff --git a/src/MarcusW.VncClient/Protocol/Implementation/Services/Communication/RfbMessageReceiver.cs b/src/MarcusW.VncClient/Protocol/Implementation/Services/Communication/RfbMessageReceiver.cs index ebdd25e7..25924da0 100644 --- a/src/MarcusW.VncClient/Protocol/Implementation/Services/Communication/RfbMessageReceiver.cs +++ b/src/MarcusW.VncClient/Protocol/Implementation/Services/Communication/RfbMessageReceiver.cs @@ -1,8 +1,11 @@ using System; +using System.Buffers; using System.Collections.Immutable; using System.Diagnostics; using System.IO; using System.Linq; +using System.Runtime; +using System.Runtime.InteropServices; using System.Text; using System.Threading; using System.Threading.Tasks; @@ -54,7 +57,7 @@ public Task StopReceiveLoopAsync() // This method will not catch exceptions so the BackgroundThread base class will receive them, // raise a "Failure" and trigger a reconnect. - protected override void ThreadWorker(CancellationToken cancellationToken) + protected override async Task ThreadWorker(CancellationToken cancellationToken) { // Get the transport stream so we don't have to call the getter every time Debug.Assert(_context.Transport != null, "_context.Transport != null"); @@ -65,12 +68,12 @@ protected override void ThreadWorker(CancellationToken cancellationToken) ImmutableDictionary incomingMessageLookup = _context.SupportedMessageTypes.OfType().ToImmutableDictionary(mt => mt.Id); - Span messageTypeBuffer = stackalloc byte[1]; + var messageTypeBuffer = new byte[1]; while (!cancellationToken.IsCancellationRequested) { // Read message type - if (transportStream.Read(messageTypeBuffer) == 0) + if (await transportStream.ReadAsync(messageTypeBuffer, 0, messageTypeBuffer.Length).ConfigureAwait(false) == 0) throw new UnexpectedEndOfStreamException("Stream reached its end while reading next message type."); byte messageTypeId = messageTypeBuffer[0]; diff --git a/src/MarcusW.VncClient/Protocol/Implementation/Services/Communication/RfbMessageSender.cs b/src/MarcusW.VncClient/Protocol/Implementation/Services/Communication/RfbMessageSender.cs index 6d892f56..0ea063e3 100644 --- a/src/MarcusW.VncClient/Protocol/Implementation/Services/Communication/RfbMessageSender.cs +++ b/src/MarcusW.VncClient/Protocol/Implementation/Services/Communication/RfbMessageSender.cs @@ -3,6 +3,7 @@ using System.Diagnostics; using System.Linq; using System.Threading; +using System.Threading.Channels; using System.Threading.Tasks; using MarcusW.VncClient.Protocol.Implementation.MessageTypes.Outgoing; using MarcusW.VncClient.Protocol.MessageTypes; @@ -21,7 +22,7 @@ public class RfbMessageSender : BackgroundThread, IRfbMessageSender private readonly ProtocolState _state; private readonly ILogger _logger; - private readonly BlockingCollection _queue = new BlockingCollection(new ConcurrentQueue()); + private readonly Channel _queue = Channel.CreateUnbounded(); private volatile bool _disposed; @@ -80,7 +81,7 @@ public void EnqueueMessage(IOutgoingMessage message, TMessageType messageType = GetAndCheckMessageType(); // Add message to queue - _queue.Add(new QueueItem(message, messageType), cancellationToken); + _queue.Writer.TryWrite(new QueueItem(message, messageType)); } /// @@ -108,14 +109,14 @@ public Task SendMessageAndWaitAsync(IOutgoingMessage var completionSource = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); // Add message to queue - _queue.Add(new QueueItem(message, messageType, completionSource), cancellationToken); + _queue.Writer.TryWrite(new QueueItem(message, messageType, completionSource)); return completionSource.Task; } // This method will not catch exceptions so the BackgroundThread base class will receive them, // raise a "Failure" and trigger a reconnect. - protected override void ThreadWorker(CancellationToken cancellationToken) + protected override async Task ThreadWorker(CancellationToken cancellationToken) { try { @@ -123,8 +124,9 @@ protected override void ThreadWorker(CancellationToken cancellationToken) ITransport transport = _context.Transport; // Iterate over all queued items (will block if the queue is empty) - foreach (QueueItem queueItem in _queue.GetConsumingEnumerable(cancellationToken)) + while (await _queue.Reader.WaitToReadAsync(cancellationToken).ConfigureAwait(false)) { + var queueItem = await _queue.Reader.ReadAsync().ConfigureAwait(false); IOutgoingMessage message = queueItem.Message; IOutgoingMessageType messageType = queueItem.MessageType; @@ -167,7 +169,7 @@ protected override void Dispose(bool disposing) if (disposing) { SetQueueCancelled(); - _queue.Dispose(); + _queue.Writer.TryComplete(); } _disposed = true; @@ -191,8 +193,7 @@ private TMessageType GetAndCheckMessageType() where TMessageType : private void SetQueueCancelled() { - _queue.CompleteAdding(); - foreach (QueueItem queueItem in _queue) + while (_queue.Reader.TryRead(out var queueItem)) queueItem.CompletionSource?.TrySetCanceled(); } diff --git a/src/MarcusW.VncClient/Utils/BackgroundThread.cs b/src/MarcusW.VncClient/Utils/BackgroundThread.cs index c1cd821b..c87ee7c5 100644 --- a/src/MarcusW.VncClient/Utils/BackgroundThread.cs +++ b/src/MarcusW.VncClient/Utils/BackgroundThread.cs @@ -10,13 +10,8 @@ namespace MarcusW.VncClient.Utils /// public abstract class BackgroundThread : IBackgroundThread { - private readonly Thread _thread; - - private bool _started; - private readonly object _startLock = new object(); - private readonly CancellationTokenSource _stopCts = new CancellationTokenSource(); - private readonly TaskCompletionSource _completedTcs = new TaskCompletionSource(); + private Lazy _task; private volatile bool _disposed; @@ -27,15 +22,18 @@ public abstract class BackgroundThread : IBackgroundThread /// Initializes a new instance of the . /// /// The thread name. + [Obsolete("The name field is no longer used")] protected BackgroundThread(string name) + : this() { - if (name == null) - throw new ArgumentNullException(nameof(name)); + } - _thread = new Thread(ThreadStart) { - Name = name, - IsBackground = true - }; + /// + /// Initializes a new instance of the . + /// + protected BackgroundThread() + { + _task = new Lazy(() => ThreadWorker(_stopCts.Token)); } /// @@ -49,13 +47,14 @@ protected void Start() if (_disposed) throw new ObjectDisposedException(nameof(BackgroundThread)); - lock (_startLock) + try { - if (_started) - throw new InvalidOperationException("Thread already started."); - - _thread.Start(_stopCts.Token); - _started = true; + // Do your work... + _ = _task.Value; + } + catch (Exception exception) when (!(exception is OperationCanceledException || exception is ThreadAbortException)) + { + Failed?.Invoke(this, new BackgroundThreadFailedEventArgs(exception)); } } @@ -65,50 +64,26 @@ protected void Start() /// /// It is safe to call this method multiple times. /// - protected Task StopAndWaitAsync() + protected async Task StopAndWaitAsync() { if (_disposed) throw new ObjectDisposedException(nameof(BackgroundThread)); - lock (_startLock) - { - if (!_started) - throw new InvalidOperationException("Thread has not been started."); - } - // Tell the thread to stop _stopCts.Cancel(); // Wait for completion - return _completedTcs.Task; + if (_task.IsValueCreated) + { + await _task.Value.ConfigureAwait(false); + } } /// /// Executes the work that should happen in the background. /// /// The cancellation token that tells the method implementation when to complete. - protected abstract void ThreadWorker(CancellationToken cancellationToken); - - private void ThreadStart(object? parameter) - { - Debug.Assert(parameter != null, nameof(parameter) + " != null"); - var cancellationToken = (CancellationToken)parameter; - - try - { - // Do your work... - ThreadWorker(cancellationToken); - } - catch (Exception exception) when (!(exception is OperationCanceledException || exception is ThreadAbortException)) - { - Failed?.Invoke(this, new BackgroundThreadFailedEventArgs(exception)); - } - finally - { - // Notify stop method that thread has completed - _completedTcs.TrySetResult(null); - } - } + protected abstract Task ThreadWorker(CancellationToken cancellationToken); /// public void Dispose() => Dispose(true); @@ -120,25 +95,7 @@ protected virtual void Dispose(bool disposing) if (disposing) { - try - { - // Ensure the thread is stopped - _stopCts.Cancel(); - if (_thread.IsAlive) - { - // Block and wait for completion or hard-kill the thread after 1 second - if (!_thread.Join(TimeSpan.FromSeconds(1))) - _thread.Abort(); - } - } - catch - { - // Ignore - } - - // Just to be sure... - _completedTcs.TrySetResult(null); - + _stopCts.Cancel(); _stopCts.Dispose(); } diff --git a/tests/MarcusW.VncClient.Tests/Utils/BackgroundThreadTests.cs b/tests/MarcusW.VncClient.Tests/Utils/BackgroundThreadTests.cs index 34505792..7f9ddcc8 100644 --- a/tests/MarcusW.VncClient.Tests/Utils/BackgroundThreadTests.cs +++ b/tests/MarcusW.VncClient.Tests/Utils/BackgroundThreadTests.cs @@ -64,10 +64,10 @@ public CancellableThread() : base("Cancellable Thread") { } public new Task StopAndWaitAsync() => base.StopAndWaitAsync(); - protected override void ThreadWorker(CancellationToken cancellationToken) + protected override async Task ThreadWorker(CancellationToken cancellationToken) { while (!cancellationToken.IsCancellationRequested) - Thread.Sleep(10); + await Task.Delay(10); } } }