From b51a7aff99be34c2e0cb0d7f26cd4e208684dfd9 Mon Sep 17 00:00:00 2001 From: Artem Derevnjuk Date: Wed, 4 Oct 2023 10:51:21 +0400 Subject: [PATCH] fix(repeater): timeout ack --- .../Bus/DefaultRepeaterBusFactory.cs | 17 ++++++++--------- .../Bus/SocketIoRepeaterBus.cs | 13 +++++++------ .../Bus/SocketIoRepeaterBusOptions.cs | 12 ++++++++++++ .../Bus/SocketIoRepeaterBusTests.cs | 9 +++++---- 4 files changed, 32 insertions(+), 19 deletions(-) create mode 100644 src/SecTester.Repeater/Bus/SocketIoRepeaterBusOptions.cs diff --git a/src/SecTester.Repeater/Bus/DefaultRepeaterBusFactory.cs b/src/SecTester.Repeater/Bus/DefaultRepeaterBusFactory.cs index 0a8ead0..87fefd7 100644 --- a/src/SecTester.Repeater/Bus/DefaultRepeaterBusFactory.cs +++ b/src/SecTester.Repeater/Bus/DefaultRepeaterBusFactory.cs @@ -32,19 +32,18 @@ public IRepeaterBus Create(string repeaterId) } var url = new Uri(_config.Api); - var options = new SocketIOOptions + var options = new SocketIoRepeaterBusOptions(url); + var client = new SocketIOClient.SocketIO(url, new SocketIOOptions { - Path = "/api/ws/v1", - ReconnectionAttempts = 20, - ReconnectionDelayMax = 86400000, - ConnectionTimeout = TimeSpan.FromSeconds(10), + Path = options.Path, + ReconnectionAttempts = options.ReconnectionAttempts, + ReconnectionDelayMax = options.ReconnectionDelayMax, + ConnectionTimeout = options.ConnectionTimeout, Auth = new List> { new("token", _config.Credentials.Token), new("domain", repeaterId) }, - }; - - var client = new SocketIOClient.SocketIO(url, options) + }) { Serializer = new SocketIOMessagePackSerializer() }; @@ -53,6 +52,6 @@ public IRepeaterBus Create(string repeaterId) var scope = _scopeFactory.CreateAsyncScope(); var timerProvider = scope.ServiceProvider.GetRequiredService(); - return new SocketIoRepeaterBus(url, wrapper, timerProvider, _loggerFactory.CreateLogger()); + return new SocketIoRepeaterBus(options, wrapper, timerProvider, _loggerFactory.CreateLogger()); } } diff --git a/src/SecTester.Repeater/Bus/SocketIoRepeaterBus.cs b/src/SecTester.Repeater/Bus/SocketIoRepeaterBus.cs index 0d7550b..c0334fe 100644 --- a/src/SecTester.Repeater/Bus/SocketIoRepeaterBus.cs +++ b/src/SecTester.Repeater/Bus/SocketIoRepeaterBus.cs @@ -14,11 +14,11 @@ internal sealed class SocketIoRepeaterBus : IRepeaterBus private readonly ITimerProvider _heartbeat; private readonly ISocketIoClient _client; private readonly ILogger _logger; - private readonly Uri _url; + private readonly SocketIoRepeaterBusOptions _options; - internal SocketIoRepeaterBus(Uri url, ISocketIoClient client, ITimerProvider heartbeat, ILogger logger) + internal SocketIoRepeaterBus(SocketIoRepeaterBusOptions options, ISocketIoClient client, ITimerProvider heartbeat, ILogger logger) { - _url = url ?? throw new ArgumentNullException(nameof(url)); + _options = options ?? throw new ArgumentNullException(nameof(options)); _client = client ?? throw new ArgumentNullException(nameof(client)); _heartbeat = heartbeat ?? throw new ArgumentNullException(nameof(heartbeat)); _logger = logger ?? throw new ArgumentNullException(nameof(logger)); @@ -42,7 +42,7 @@ public async Task Connect() await SchedulePing().ConfigureAwait(false); - _logger.LogDebug("Repeater connected to {Url}", _url); + _logger.LogDebug("Repeater connected to {Url}", _options.Url); } } @@ -67,9 +67,10 @@ private void DelegateEvents() return; } + var ct = new CancellationTokenSource(_options.AckTimeout); var request = response.GetValue(); var result = await RequestReceived.Invoke(request).ConfigureAwait(false); - await response.CallbackAsync(result).ConfigureAwait(false); + await response.CallbackAsync(ct.Token, result).ConfigureAwait(false); }); } @@ -80,7 +81,7 @@ public async ValueTask DisposeAsync() _heartbeat.Elapsed -= Ping; _heartbeat.Stop(); await _client.Disconnect().ConfigureAwait(false); - _logger.LogDebug("Repeater disconnected from {Url}", _url); + _logger.LogDebug("Repeater disconnected from {Url}", _options.Url); } _client.Dispose(); diff --git a/src/SecTester.Repeater/Bus/SocketIoRepeaterBusOptions.cs b/src/SecTester.Repeater/Bus/SocketIoRepeaterBusOptions.cs new file mode 100644 index 0000000..fa5b07e --- /dev/null +++ b/src/SecTester.Repeater/Bus/SocketIoRepeaterBusOptions.cs @@ -0,0 +1,12 @@ +using System; + +namespace SecTester.Repeater.Bus; + +internal record SocketIoRepeaterBusOptions(Uri Url) +{ + public string Path { get; init; } = "/api/ws/v1"; + public TimeSpan ConnectionTimeout { get; init; } = TimeSpan.FromSeconds(10); + public TimeSpan AckTimeout { get; init; } = TimeSpan.FromSeconds(60); + public int ReconnectionDelayMax { get; init; } = 86_400_000; + public int ReconnectionAttempts { get; init; } = 20; +} diff --git a/test/SecTester.Repeater.Tests/Bus/SocketIoRepeaterBusTests.cs b/test/SecTester.Repeater.Tests/Bus/SocketIoRepeaterBusTests.cs index 8e2ee0f..ff80976 100644 --- a/test/SecTester.Repeater.Tests/Bus/SocketIoRepeaterBusTests.cs +++ b/test/SecTester.Repeater.Tests/Bus/SocketIoRepeaterBusTests.cs @@ -3,7 +3,8 @@ namespace SecTester.Repeater.Tests.Bus; public sealed class SocketIoRepeaterBusTests : IDisposable { private static readonly string RepeaterId = "g5MvgM74sweGcK1U6hvs76"; - private static readonly string Url = new("http://example.com"); + private static readonly Uri Url = new("http://example.com"); + private static readonly SocketIoRepeaterBusOptions Options = new(Url); private readonly ISocketIoClient _client = Substitute.For(); private readonly ITimerProvider _heartbeat = Substitute.For(); @@ -13,7 +14,7 @@ public sealed class SocketIoRepeaterBusTests : IDisposable public SocketIoRepeaterBusTests() { - _sut = new SocketIoRepeaterBus(new Uri(Url), _client, _heartbeat, _logger); + _sut = new SocketIoRepeaterBus(Options, _client, _heartbeat, _logger); } public void Dispose() @@ -35,7 +36,7 @@ public async Task RequestReceived_ExecutesHandler() StatusCode = 204 }; _client.Connect().Returns(Task.CompletedTask); - _socketIoResponse.GetValue().Returns(new IncomingRequest(new(Url))); + _socketIoResponse.GetValue().Returns(new IncomingRequest(Url)); _client.On("request", Arg.Invoke(_socketIoResponse)); _sut.RequestReceived += _ => Task.FromResult(result); @@ -43,7 +44,7 @@ public async Task RequestReceived_ExecutesHandler() await _sut.Connect(); // assert - await _socketIoResponse.Received().CallbackAsync(result); + await _socketIoResponse.Received().CallbackAsync(Arg.Any(), result); } [Fact]