diff --git a/src/MarcusW.VncClient/Protocol/Implementation/Services/Communication/RfbMessageReceiver.cs b/src/MarcusW.VncClient/Protocol/Implementation/Services/Communication/RfbMessageReceiver.cs
index 9635534..b72316f 100644
--- a/src/MarcusW.VncClient/Protocol/Implementation/Services/Communication/RfbMessageReceiver.cs
+++ b/src/MarcusW.VncClient/Protocol/Implementation/Services/Communication/RfbMessageReceiver.cs
@@ -25,7 +25,7 @@ public sealed class RfbMessageReceiver : BackgroundThread, IRfbMessageReceiver
/// Initializes a new instance of the .
///
/// The connection context.
- public RfbMessageReceiver(RfbConnectionContext context) : base("RFB Message Receiver")
+ public RfbMessageReceiver(RfbConnectionContext context)
{
_context = context;
_state = context.GetState();
@@ -51,7 +51,7 @@ public Task StopReceiveLoopAsync()
// This method will not catch exceptions so the BackgroundThread base class will receive them,
// raise a "Failure" and trigger a reconnect.
- protected override void ThreadWorker(CancellationToken cancellationToken)
+ protected override async Task ThreadWorker(CancellationToken cancellationToken)
{
// Get the transport stream so we don't have to call the getter every time
Debug.Assert(_context.Transport != null, "_context.Transport != null");
@@ -62,12 +62,12 @@ protected override void ThreadWorker(CancellationToken cancellationToken)
ImmutableDictionary incomingMessageLookup = _context.SupportedMessageTypes
.OfType().ToImmutableDictionary(mt => mt.Id);
- Span messageTypeBuffer = stackalloc byte[1];
+ var messageTypeBuffer = new byte[1];
while (!cancellationToken.IsCancellationRequested)
{
// Read message type
- if (transportStream.Read(messageTypeBuffer) == 0)
+ if (await transportStream.ReadAsync(messageTypeBuffer.AsMemory(), cancellationToken) == 0)
{
throw new UnexpectedEndOfStreamException("Stream reached its end while reading next message type.");
}
diff --git a/src/MarcusW.VncClient/Protocol/Implementation/Services/Communication/RfbMessageSender.cs b/src/MarcusW.VncClient/Protocol/Implementation/Services/Communication/RfbMessageSender.cs
index f325660..30f493e 100644
--- a/src/MarcusW.VncClient/Protocol/Implementation/Services/Communication/RfbMessageSender.cs
+++ b/src/MarcusW.VncClient/Protocol/Implementation/Services/Communication/RfbMessageSender.cs
@@ -1,8 +1,8 @@
using System;
-using System.Collections.Concurrent;
using System.Diagnostics;
using System.Linq;
using System.Threading;
+using System.Threading.Channels;
using System.Threading.Tasks;
using MarcusW.VncClient.Protocol.Implementation.MessageTypes.Outgoing;
using MarcusW.VncClient.Protocol.MessageTypes;
@@ -20,7 +20,7 @@ public class RfbMessageSender : BackgroundThread, IRfbMessageSender
private readonly RfbConnectionContext _context;
private readonly ILogger _logger;
- private readonly BlockingCollection _queue = new(new ConcurrentQueue());
+ private readonly Channel _queue = Channel.CreateUnbounded();
private readonly ProtocolState _state;
@@ -30,7 +30,7 @@ public class RfbMessageSender : BackgroundThread, IRfbMessageSender
/// Initializes a new instance of the .
///
/// The connection context.
- public RfbMessageSender(RfbConnectionContext context) : base("RFB Message Sender")
+ public RfbMessageSender(RfbConnectionContext context)
{
_context = context;
_state = context.GetState();
@@ -79,7 +79,7 @@ public void EnqueueMessage(IOutgoingMessage message,
var messageType = GetAndCheckMessageType();
// Add message to queue
- _queue.Add(new(message, messageType), cancellationToken);
+ _queue.Writer.TryWrite(new QueueItem(message, messageType));
}
///
@@ -104,7 +104,7 @@ public Task SendMessageAndWaitAsync(IOutgoingMessage
TaskCompletionSource