Skip to content

Commit

Permalink
feat: improve client
Browse files Browse the repository at this point in the history
  • Loading branch information
josephnhtam committed Sep 15, 2024
1 parent 623231d commit fbdaa20
Show file tree
Hide file tree
Showing 4 changed files with 51 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

namespace LiveStreamingServerNet.Networking.Contracts
{
public interface INetworkStream : INetworkStreamWriter, INetworkStreamReader, IDisposable
public interface INetworkStream : INetworkStreamWriter, INetworkStreamReader, IAsyncDisposable
{
Stream InnerStream { get; }
}
Expand Down
59 changes: 39 additions & 20 deletions src/LiveStreamingServerNet.Networking/Internal/Client.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,12 @@ internal sealed class Client : IClient
private readonly ILogger _logger;

private readonly IClientBufferSender _bufferSender;

private readonly TaskCompletionSource _stoppedTcs = new();
private CancellationTokenSource? _cts;
private readonly CancellationTokenSource _cts = new();

public uint ClientId { get; }
public DateTime StartTime { get; }
public bool IsConnected => _tcpClient?.Connected ?? false;
public bool IsConnected => _tcpClient.Connected;
public EndPoint LocalEndPoint => _tcpClient.Client.LocalEndPoint!;
public EndPoint RemoteEndPoint => _tcpClient.Client.RemoteEndPoint!;

Expand Down Expand Up @@ -52,20 +51,18 @@ public async Task RunAsync(CancellationToken stoppingToken)
{
_logger.ClientConnected(ClientId);

_cts = CancellationTokenSource.CreateLinkedTokenSource(stoppingToken);
var cancellationToken = _cts.Token;
using var linkedCts = CancellationTokenSource.CreateLinkedTokenSource(_cts.Token, stoppingToken);
var cancellationToken = linkedCts.Token;

IClientHandler? handler = null;
INetworkStream? networkStream = null;

try
{
handler = CreateClientHandler();

networkStream = await CreateNetworkStreamAsync(_serverEndPoint, cancellationToken);

_bufferSender.Start(networkStream, cancellationToken);

handler = CreateClientHandler();
await handler.InitializeAsync();

while (_tcpClient.Connected && !cancellationToken.IsCancellationRequested)
Expand All @@ -80,21 +77,15 @@ public async Task RunAsync(CancellationToken stoppingToken)
{
_logger.ClientLoopError(ClientId, ex);
}
finally
{
_cts.Cancel();
}

await _bufferSender.DisposeAsync();

await (handler?.DisposeAsync() ?? ValueTask.CompletedTask);
linkedCts.Cancel();

networkStream?.Dispose();

_tcpClient.Close();
await DisposeAsync(handler);
await DisposeAsync(_bufferSender);
await DisposeAsync(networkStream);
CloseTcpClient();

_stoppedTcs.TrySetResult();

_logger.ClientDisconnected(ClientId);
}

Expand Down Expand Up @@ -140,7 +131,7 @@ public ValueTask SendAsync(Action<IDataBuffer> writer)

public void Disconnect()
{
_cts?.Cancel();
_cts.Cancel();
}

public async Task DisconnectAsync(CancellationToken cancellation)
Expand All @@ -149,8 +140,36 @@ public async Task DisconnectAsync(CancellationToken cancellation)
await _stoppedTcs.Task.WithCancellation(cancellation);
}

private async Task DisposeAsync(IAsyncDisposable? disposable)
{
try
{
if (disposable != null)
{
await disposable.DisposeAsync();
}
}
catch (Exception ex)
{
_logger.DisposeError(ClientId, ex);
}
}

private void CloseTcpClient()
{
try
{
_tcpClient.Close();
}
catch (Exception ex)
{
_logger.CloseTcpClientError(ClientId, ex);
}
}

public ValueTask DisposeAsync()
{
_cts.Dispose();
_tcpClient.Dispose();
return ValueTask.CompletedTask;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,5 +55,11 @@ internal static partial class LoggerExtensions

[LoggerMessage(LogLevel.Error, "An error occurred while dispatching server stopped event")]
public static partial void DispatchingServerStoppedEventError(this ILogger logger, Exception ex);

[LoggerMessage(LogLevel.Error, "ClientId: {ClientId} | An error occurred while disposing the client")]
public static partial void DisposeError(this ILogger logger, uint clientId, Exception ex);

[LoggerMessage(LogLevel.Error, "ClientId: {ClientId} | An error occurred while closing the TCP client")]
public static partial void CloseTcpClientError(this ILogger logger, uint clientId, Exception ex);
}
}
10 changes: 5 additions & 5 deletions src/LiveStreamingServerNet.Networking/Internal/NetworkStream.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,6 @@ public NetworkStream(Stream stream)
InnerStream = stream;
}

public void Dispose()
{
InnerStream.Dispose();
}

[MethodImpl(MethodImplOptions.AggressiveInlining)]
public ValueTask ReadExactlyAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken = default)
{
Expand All @@ -28,5 +23,10 @@ public ValueTask WriteAsync(byte[] buffer, int offset, int count, CancellationTo
{
return InnerStream.WriteAsync(new Memory<byte>(buffer, offset, count), cancellationToken);
}

public ValueTask DisposeAsync()
{
return InnerStream.DisposeAsync();
}
}
}

0 comments on commit fbdaa20

Please sign in to comment.