Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Migrate from using Threads to using Tasks for receiving and sending data #26

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/Directory.Build.props
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@

<PropertyGroup>
<Product>MarcusW VNC-Client</Product>
<VersionNumber>1.0.0-alpha4</VersionNumber>
<VersionNumber>1.0.0-alpha5</VersionNumber>
<Title>High-performance cross-platform VNC-Client library for C#</Title>
<Authors>Marcus Wichelmann</Authors>
<Company>Marcus Wichelmann</Company>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
using System;
using System.Buffers;
using System.Collections.Immutable;
using System.Diagnostics;
using System.IO;
using System.Linq;
using System.Runtime;
using System.Runtime.InteropServices;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
Expand Down Expand Up @@ -54,7 +57,7 @@ public Task StopReceiveLoopAsync()

// This method will not catch exceptions so the BackgroundThread base class will receive them,
// raise a "Failure" and trigger a reconnect.
protected override void ThreadWorker(CancellationToken cancellationToken)
protected override async Task ThreadWorker(CancellationToken cancellationToken)
{
// Get the transport stream so we don't have to call the getter every time
Debug.Assert(_context.Transport != null, "_context.Transport != null");
Expand All @@ -65,12 +68,12 @@ protected override void ThreadWorker(CancellationToken cancellationToken)
ImmutableDictionary<byte, IIncomingMessageType> incomingMessageLookup =
_context.SupportedMessageTypes.OfType<IIncomingMessageType>().ToImmutableDictionary(mt => mt.Id);

Span<byte> messageTypeBuffer = stackalloc byte[1];
var messageTypeBuffer = new byte[1];

while (!cancellationToken.IsCancellationRequested)
{
// Read message type
if (transportStream.Read(messageTypeBuffer) == 0)
if (await transportStream.ReadAsync(messageTypeBuffer, 0, messageTypeBuffer.Length).ConfigureAwait(false) == 0)
throw new UnexpectedEndOfStreamException("Stream reached its end while reading next message type.");
byte messageTypeId = messageTypeBuffer[0];

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
using System.Diagnostics;
using System.Linq;
using System.Threading;
using System.Threading.Channels;
using System.Threading.Tasks;
using MarcusW.VncClient.Protocol.Implementation.MessageTypes.Outgoing;
using MarcusW.VncClient.Protocol.MessageTypes;
Expand All @@ -21,7 +22,7 @@ public class RfbMessageSender : BackgroundThread, IRfbMessageSender
private readonly ProtocolState _state;
private readonly ILogger<RfbMessageSender> _logger;

private readonly BlockingCollection<QueueItem> _queue = new BlockingCollection<QueueItem>(new ConcurrentQueue<QueueItem>());
private readonly Channel<QueueItem> _queue = Channel.CreateUnbounded<QueueItem>();

private volatile bool _disposed;

Expand Down Expand Up @@ -80,7 +81,7 @@ public void EnqueueMessage<TMessageType>(IOutgoingMessage<TMessageType> message,
TMessageType messageType = GetAndCheckMessageType<TMessageType>();

// Add message to queue
_queue.Add(new QueueItem(message, messageType), cancellationToken);
_queue.Writer.TryWrite(new QueueItem(message, messageType));
}

/// <inheritdoc />
Expand Down Expand Up @@ -108,23 +109,24 @@ public Task SendMessageAndWaitAsync<TMessageType>(IOutgoingMessage<TMessageType>
var completionSource = new TaskCompletionSource<object?>(TaskCreationOptions.RunContinuationsAsynchronously);

// Add message to queue
_queue.Add(new QueueItem(message, messageType, completionSource), cancellationToken);
_queue.Writer.TryWrite(new QueueItem(message, messageType, completionSource));

return completionSource.Task;
}

// This method will not catch exceptions so the BackgroundThread base class will receive them,
// raise a "Failure" and trigger a reconnect.
protected override void ThreadWorker(CancellationToken cancellationToken)
protected override async Task ThreadWorker(CancellationToken cancellationToken)
{
try
{
Debug.Assert(_context.Transport != null, "_context.Transport != null");
ITransport transport = _context.Transport;

// Iterate over all queued items (will block if the queue is empty)
foreach (QueueItem queueItem in _queue.GetConsumingEnumerable(cancellationToken))
while (await _queue.Reader.WaitToReadAsync(cancellationToken).ConfigureAwait(false))
{
var queueItem = await _queue.Reader.ReadAsync().ConfigureAwait(false);
IOutgoingMessage<IOutgoingMessageType> message = queueItem.Message;
IOutgoingMessageType messageType = queueItem.MessageType;

Expand Down Expand Up @@ -167,7 +169,7 @@ protected override void Dispose(bool disposing)
if (disposing)
{
SetQueueCancelled();
_queue.Dispose();
_queue.Writer.TryComplete();
}

_disposed = true;
Expand All @@ -191,8 +193,7 @@ private TMessageType GetAndCheckMessageType<TMessageType>() where TMessageType :

private void SetQueueCancelled()
{
_queue.CompleteAdding();
foreach (QueueItem queueItem in _queue)
while (_queue.Reader.TryRead(out var queueItem))
queueItem.CompletionSource?.TrySetCanceled();
}

Expand Down
91 changes: 24 additions & 67 deletions src/MarcusW.VncClient/Utils/BackgroundThread.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,8 @@ namespace MarcusW.VncClient.Utils
/// </summary>
public abstract class BackgroundThread : IBackgroundThread
{
private readonly Thread _thread;

private bool _started;
private readonly object _startLock = new object();

private readonly CancellationTokenSource _stopCts = new CancellationTokenSource();
private readonly TaskCompletionSource<object?> _completedTcs = new TaskCompletionSource<object?>();
private Lazy<Task> _task;

private volatile bool _disposed;

Expand All @@ -27,15 +22,18 @@ public abstract class BackgroundThread : IBackgroundThread
/// Initializes a new instance of the <see cref="BackgroundThread"/>.
/// </summary>
/// <param name="name">The thread name.</param>
[Obsolete("The name field is no longer used")]
protected BackgroundThread(string name)
: this()
{
if (name == null)
throw new ArgumentNullException(nameof(name));
}

_thread = new Thread(ThreadStart) {
Name = name,
IsBackground = true
};
/// <summary>
/// Initializes a new instance of the <see cref="BackgroundThread"/>.
/// </summary>
protected BackgroundThread()
{
_task = new Lazy<Task>(() => ThreadWorker(_stopCts.Token));
}

/// <summary>
Expand All @@ -49,13 +47,14 @@ protected void Start()
if (_disposed)
throw new ObjectDisposedException(nameof(BackgroundThread));

lock (_startLock)
try
{
if (_started)
throw new InvalidOperationException("Thread already started.");

_thread.Start(_stopCts.Token);
_started = true;
// Do your work...
_ = _task.Value;
}
catch (Exception exception) when (!(exception is OperationCanceledException || exception is ThreadAbortException))
{
Failed?.Invoke(this, new BackgroundThreadFailedEventArgs(exception));
}
}

Expand All @@ -65,50 +64,26 @@ protected void Start()
/// <remarks>
/// It is safe to call this method multiple times.
/// </remarks>
protected Task StopAndWaitAsync()
protected async Task StopAndWaitAsync()
{
if (_disposed)
throw new ObjectDisposedException(nameof(BackgroundThread));

lock (_startLock)
{
if (!_started)
throw new InvalidOperationException("Thread has not been started.");
}

// Tell the thread to stop
_stopCts.Cancel();

// Wait for completion
return _completedTcs.Task;
if (_task.IsValueCreated)
{
await _task.Value.ConfigureAwait(false);
}
}

/// <summary>
/// Executes the work that should happen in the background.
/// </summary>
/// <param name="cancellationToken">The cancellation token that tells the method implementation when to complete.</param>
protected abstract void ThreadWorker(CancellationToken cancellationToken);

private void ThreadStart(object? parameter)
{
Debug.Assert(parameter != null, nameof(parameter) + " != null");
var cancellationToken = (CancellationToken)parameter;

try
{
// Do your work...
ThreadWorker(cancellationToken);
}
catch (Exception exception) when (!(exception is OperationCanceledException || exception is ThreadAbortException))
{
Failed?.Invoke(this, new BackgroundThreadFailedEventArgs(exception));
}
finally
{
// Notify stop method that thread has completed
_completedTcs.TrySetResult(null);
}
}
protected abstract Task ThreadWorker(CancellationToken cancellationToken);

/// <inheritdoc />
public void Dispose() => Dispose(true);
Expand All @@ -120,25 +95,7 @@ protected virtual void Dispose(bool disposing)

if (disposing)
{
try
{
// Ensure the thread is stopped
_stopCts.Cancel();
if (_thread.IsAlive)
{
// Block and wait for completion or hard-kill the thread after 1 second
if (!_thread.Join(TimeSpan.FromSeconds(1)))
_thread.Abort();
}
}
catch
{
// Ignore
}

// Just to be sure...
_completedTcs.TrySetResult(null);

_stopCts.Cancel();
_stopCts.Dispose();
}

Expand Down
4 changes: 2 additions & 2 deletions tests/MarcusW.VncClient.Tests/Utils/BackgroundThreadTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -64,10 +64,10 @@ public CancellableThread() : base("Cancellable Thread") { }

public new Task StopAndWaitAsync() => base.StopAndWaitAsync();

protected override void ThreadWorker(CancellationToken cancellationToken)
protected override async Task ThreadWorker(CancellationToken cancellationToken)
{
while (!cancellationToken.IsCancellationRequested)
Thread.Sleep(10);
await Task.Delay(10);
}
}
}
Expand Down