Skip to content

Commit

Permalink
fix(repeater): timeout ack
Browse files Browse the repository at this point in the history
  • Loading branch information
derevnjuk committed Oct 4, 2023
1 parent d7800a6 commit b51a7af
Show file tree
Hide file tree
Showing 4 changed files with 32 additions and 19 deletions.
17 changes: 8 additions & 9 deletions src/SecTester.Repeater/Bus/DefaultRepeaterBusFactory.cs
Original file line number Diff line number Diff line change
Expand Up @@ -32,19 +32,18 @@ public IRepeaterBus Create(string repeaterId)
}

var url = new Uri(_config.Api);
var options = new SocketIOOptions
var options = new SocketIoRepeaterBusOptions(url);
var client = new SocketIOClient.SocketIO(url, new SocketIOOptions
{
Path = "/api/ws/v1",
ReconnectionAttempts = 20,
ReconnectionDelayMax = 86400000,
ConnectionTimeout = TimeSpan.FromSeconds(10),
Path = options.Path,
ReconnectionAttempts = options.ReconnectionAttempts,
ReconnectionDelayMax = options.ReconnectionDelayMax,
ConnectionTimeout = options.ConnectionTimeout,
Auth = new List<KeyValuePair<string, string>>
{
new("token", _config.Credentials.Token), new("domain", repeaterId)
},
};

var client = new SocketIOClient.SocketIO(url, options)
})
{
Serializer = new SocketIOMessagePackSerializer()
};
Expand All @@ -53,6 +52,6 @@ public IRepeaterBus Create(string repeaterId)
var scope = _scopeFactory.CreateAsyncScope();
var timerProvider = scope.ServiceProvider.GetRequiredService<ITimerProvider>();

return new SocketIoRepeaterBus(url, wrapper, timerProvider, _loggerFactory.CreateLogger<IRepeaterBus>());
return new SocketIoRepeaterBus(options, wrapper, timerProvider, _loggerFactory.CreateLogger<IRepeaterBus>());
}
}
13 changes: 7 additions & 6 deletions src/SecTester.Repeater/Bus/SocketIoRepeaterBus.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,11 @@ internal sealed class SocketIoRepeaterBus : IRepeaterBus
private readonly ITimerProvider _heartbeat;
private readonly ISocketIoClient _client;
private readonly ILogger<IRepeaterBus> _logger;
private readonly Uri _url;
private readonly SocketIoRepeaterBusOptions _options;

internal SocketIoRepeaterBus(Uri url, ISocketIoClient client, ITimerProvider heartbeat, ILogger<IRepeaterBus> logger)
internal SocketIoRepeaterBus(SocketIoRepeaterBusOptions options, ISocketIoClient client, ITimerProvider heartbeat, ILogger<IRepeaterBus> logger)
{
_url = url ?? throw new ArgumentNullException(nameof(url));
_options = options ?? throw new ArgumentNullException(nameof(options));
_client = client ?? throw new ArgumentNullException(nameof(client));
_heartbeat = heartbeat ?? throw new ArgumentNullException(nameof(heartbeat));
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
Expand All @@ -42,7 +42,7 @@ public async Task Connect()

await SchedulePing().ConfigureAwait(false);

_logger.LogDebug("Repeater connected to {Url}", _url);
_logger.LogDebug("Repeater connected to {Url}", _options.Url);
}
}

Expand All @@ -67,9 +67,10 @@ private void DelegateEvents()
return;
}

var ct = new CancellationTokenSource(_options.AckTimeout);
var request = response.GetValue<IncomingRequest>();
var result = await RequestReceived.Invoke(request).ConfigureAwait(false);
await response.CallbackAsync(result).ConfigureAwait(false);
await response.CallbackAsync(ct.Token, result).ConfigureAwait(false);
});
}

Expand All @@ -80,7 +81,7 @@ public async ValueTask DisposeAsync()
_heartbeat.Elapsed -= Ping;
_heartbeat.Stop();
await _client.Disconnect().ConfigureAwait(false);
_logger.LogDebug("Repeater disconnected from {Url}", _url);
_logger.LogDebug("Repeater disconnected from {Url}", _options.Url);
}

_client.Dispose();
Expand Down
12 changes: 12 additions & 0 deletions src/SecTester.Repeater/Bus/SocketIoRepeaterBusOptions.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
using System;

namespace SecTester.Repeater.Bus;

internal record SocketIoRepeaterBusOptions(Uri Url)
{
public string Path { get; init; } = "/api/ws/v1";
public TimeSpan ConnectionTimeout { get; init; } = TimeSpan.FromSeconds(10);
public TimeSpan AckTimeout { get; init; } = TimeSpan.FromSeconds(60);
public int ReconnectionDelayMax { get; init; } = 86_400_000;
public int ReconnectionAttempts { get; init; } = 20;
}
9 changes: 5 additions & 4 deletions test/SecTester.Repeater.Tests/Bus/SocketIoRepeaterBusTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@ namespace SecTester.Repeater.Tests.Bus;
public sealed class SocketIoRepeaterBusTests : IDisposable
{
private static readonly string RepeaterId = "g5MvgM74sweGcK1U6hvs76";
private static readonly string Url = new("http://example.com");
private static readonly Uri Url = new("http://example.com");
private static readonly SocketIoRepeaterBusOptions Options = new(Url);

private readonly ISocketIoClient _client = Substitute.For<ISocketIoClient>();
private readonly ITimerProvider _heartbeat = Substitute.For<ITimerProvider>();
Expand All @@ -13,7 +14,7 @@ public sealed class SocketIoRepeaterBusTests : IDisposable

public SocketIoRepeaterBusTests()
{
_sut = new SocketIoRepeaterBus(new Uri(Url), _client, _heartbeat, _logger);
_sut = new SocketIoRepeaterBus(Options, _client, _heartbeat, _logger);
}

public void Dispose()
Expand All @@ -35,15 +36,15 @@ public async Task RequestReceived_ExecutesHandler()
StatusCode = 204
};
_client.Connect().Returns(Task.CompletedTask);
_socketIoResponse.GetValue<IncomingRequest>().Returns(new IncomingRequest(new(Url)));
_socketIoResponse.GetValue<IncomingRequest>().Returns(new IncomingRequest(Url));
_client.On("request", Arg.Invoke(_socketIoResponse));
_sut.RequestReceived += _ => Task.FromResult(result);

// act
await _sut.Connect();

// assert
await _socketIoResponse.Received().CallbackAsync(result);
await _socketIoResponse.Received().CallbackAsync(Arg.Any<CancellationToken>(), result);
}

[Fact]
Expand Down

0 comments on commit b51a7af

Please sign in to comment.