Skip to content

Commit

Permalink
feat(repeater): refrain from utilizing non standard ports
Browse files Browse the repository at this point in the history
closes #165
  • Loading branch information
derevnjuk committed Oct 3, 2023
1 parent 8573f3a commit 1f28ad8
Show file tree
Hide file tree
Showing 35 changed files with 882 additions and 512 deletions.
51 changes: 51 additions & 0 deletions src/SecTester.Repeater/Bus/DefaultRepeaterBusFactory.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
using System;
using System.Collections.Generic;
using Microsoft.Extensions.Logging;
using SecTester.Core;
using SecTester.Core.Utils;
using SocketIO.Serializer.MessagePack;
using SocketIOClient;

namespace SecTester.Repeater.Bus;

public class DefaultRepeaterBusFactory : IRepeaterBusFactory
{
private readonly Configuration _config;
private readonly ITimerProvider _timerProvider;
private readonly ILoggerFactory _loggerFactory;

public DefaultRepeaterBusFactory(Configuration config, ITimerProvider timerProvider, ILoggerFactory loggerFactory)
{
_config = config ?? throw new ArgumentNullException(nameof(config));
_timerProvider = timerProvider ?? throw new ArgumentNullException(nameof(timerProvider));
_loggerFactory = loggerFactory ?? throw new ArgumentNullException(nameof(loggerFactory));
}

public IRepeaterBus Create(string repeaterId)
{
if (_config.Credentials is null)
{
throw new InvalidOperationException(
"Please provide credentials to establish a connection with the bus."
);
}

var url = new Uri(_config.Api);
var client = new SocketIOClient.SocketIO(url, new SocketIOOptions
{
Path = "/api/ws/v1",
ReconnectionAttempts = 20,
ReconnectionDelayMax = 86400000,
ConnectionTimeout = TimeSpan.FromSeconds(10),
Auth = new List<KeyValuePair<string, string>>
{
new("token", _config.Credentials!.Token), new("domain", repeaterId)
},
})
{
Serializer = new SocketIOMessagePackSerializer()
};

return new SocketIoRepeaterBus(url, client, _timerProvider, _loggerFactory.CreateLogger<IRepeaterBus>());
}
}
38 changes: 0 additions & 38 deletions src/SecTester.Repeater/Bus/DefaultRepeaterEventBusFactory.cs

This file was deleted.

15 changes: 15 additions & 0 deletions src/SecTester.Repeater/Bus/IRepeaterBus.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
using System;
using System.Threading;
using System.Threading.Tasks;

namespace SecTester.Repeater.Bus;

public interface IRepeaterBus : IAsyncDisposable
{
event Func<IncomingRequest, Task<OutgoingResponse>> RequestReceived;
event Action<Exception> ErrorOccurred;
event Action<Version> UpgradeAvailable;

Task Connect();
Task Deploy(string repeaterId, Runtime? runtime = null, CancellationToken? cancellationToken = null);
}
6 changes: 6 additions & 0 deletions src/SecTester.Repeater/Bus/IRepeaterBusFactory.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
namespace SecTester.Repeater.Bus;

public interface IRepeaterBusFactory
{
IRepeaterBus Create(string repeaterId);
}
8 changes: 0 additions & 8 deletions src/SecTester.Repeater/Bus/IRepeaterEventBusFactory.cs

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1,19 +1,14 @@
using System;
using System.Collections.Generic;
using System.Net.Http;
using System.Text.Json.Serialization;
using System.Text.RegularExpressions;
using SecTester.Core.Bus;
using SecTester.Repeater.Runners;

namespace SecTester.Repeater.Bus;

[MessageType(name: "ExecuteScript")]
public record RequestExecutingEvent(Uri Url) : Event, IRequest
public record IncomingRequest(Uri Url) : Event, IRequest
{
public string? Body { get; init; }
[JsonPropertyName("correlation_id_regex")]
public Regex? CorrelationIdRegex { get; init; }
public HttpMethod Method { get; init; } = HttpMethod.Get;
public Protocol Protocol { get; init; } = Protocol.Http;
public Uri Url { get; init; } = Url ?? throw new ArgumentNullException(nameof(Url));
Expand Down
Original file line number Diff line number Diff line change
@@ -1,17 +1,16 @@
using System.Collections.Generic;
using System.Text.Json.Serialization;
using SecTester.Repeater.Runners;

namespace SecTester.Repeater.Bus;

public record RequestExecutingResult : IResponse
public record OutgoingResponse : IResponse
{
[JsonPropertyName("status_code")] public int? StatusCode { get; init; }
public int? StatusCode { get; init; }

public string? Body { get; init; }
public string? Message { get; init; }

[JsonPropertyName("error_code")] public string? ErrorCode { get; init; }
public string? ErrorCode { get; init; }

public Protocol Protocol { get; init; } = Protocol.Http;

Expand Down
6 changes: 0 additions & 6 deletions src/SecTester.Repeater/Bus/RegisterRepeaterCommand.cs

This file was deleted.

3 changes: 0 additions & 3 deletions src/SecTester.Repeater/Bus/RegisterRepeaterPayload.cs

This file was deleted.

3 changes: 0 additions & 3 deletions src/SecTester.Repeater/Bus/RegisterRepeaterResult.cs

This file was deleted.

10 changes: 0 additions & 10 deletions src/SecTester.Repeater/Bus/RepeaterRegisteringError.cs

This file was deleted.

6 changes: 0 additions & 6 deletions src/SecTester.Repeater/Bus/RepeaterStatusEvent.cs

This file was deleted.

30 changes: 0 additions & 30 deletions src/SecTester.Repeater/Bus/RequestExecutingEventListener.cs

This file was deleted.

136 changes: 136 additions & 0 deletions src/SecTester.Repeater/Bus/SocketIoRepeaterBus.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
using System;
using System.Threading;
using System.Threading.Tasks;
using System.Timers;
using Microsoft.Extensions.Logging;
using SecTester.Core.Utils;

namespace SecTester.Repeater.Bus;

public class SocketIoRepeaterBus : IRepeaterBus
{
private static readonly TimeSpan PingInterval = TimeSpan.FromSeconds(10);
private readonly ITimerProvider _heartbeat;
private SocketIOClient.SocketIO _client;
private readonly ILogger<IRepeaterBus> _logger;
private readonly Uri _url;

public SocketIoRepeaterBus(Uri url, SocketIOClient.SocketIO client, ITimerProvider heartbeat, ILogger<IRepeaterBus> logger)
{
_url = url ?? throw new ArgumentNullException(nameof(url));
_client = client ?? throw new ArgumentNullException(nameof(client));
_heartbeat = heartbeat ?? throw new ArgumentNullException(nameof(heartbeat));
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
}

private record RepeaterVersion(string Version);
private record RepeaterError(string Message);

private record RepeaterDeployed(string RepeaterId);

public event Func<IncomingRequest, Task<OutgoingResponse>>? RequestReceived;
public event Action<Exception>? ErrorOccurred;
public event Action<Version>? UpgradeAvailable;

public async Task Connect()
{
if (_client is not { Connected: true })
{
DelegateEvents();

await _client.ConnectAsync().ConfigureAwait(false);

await SchedulePing().ConfigureAwait(false);

_logger.LogDebug("Repeater connected to {Url}", _url);
}
}

private void DelegateEvents()
{
_client.On("error", response =>
{
var err = response.GetValue<RepeaterError>();
ErrorOccurred?.Invoke(new(err.Message));

Check warning on line 54 in src/SecTester.Repeater/Bus/SocketIoRepeaterBus.cs

View workflow job for this annotation

GitHub Actions / windows-2019

Exception type System.Exception is not sufficiently specific
});

_client.On("update-available", response =>
{
var version = response.GetValue<RepeaterVersion>();
UpgradeAvailable?.Invoke(new(version.Version));
});

_client.On("request", async response =>
{
var request = response.GetValue<IncomingRequest>();

if (RequestReceived != null)
{
var result = await RequestReceived.Invoke(request).ConfigureAwait(false);
await response.CallbackAsync(result).ConfigureAwait(false);
}
});
}

public async ValueTask DisposeAsync()
{
if (_client is { Connected: true })
{
_heartbeat.Elapsed -= Ping;
_heartbeat.Stop();
await _client.DisconnectAsync().ConfigureAwait(false);
_logger.LogDebug("Repeater disconnected from {Url}", _url);
}

_client.Dispose();

RequestReceived = null;
ErrorOccurred = null;
UpgradeAvailable = null;

GC.SuppressFinalize(this);
}

public async Task Deploy(string repeaterId, Runtime? runtime = null, CancellationToken? cancellationToken = null)
{
try
{
var tcs = new TaskCompletionSource<RepeaterDeployed>();

_client.On("deployed", response => tcs.TrySetResult(response.GetValue<RepeaterDeployed>()));

await _client.EmitAsync("deploy", new
{
RepeaterId = repeaterId
}, runtime, cancellationToken).ConfigureAwait(false);

using var _ = cancellationToken?.Register(() => tcs.TrySetCanceled());

var result = await tcs.Task.ConfigureAwait(false);

_logger.LogDebug("Repeater ({RepeaterId}) deployed", result?.RepeaterId);
}
finally
{
_client.Off("deployed");
}
}

private async Task SchedulePing()
{
await Ping().ConfigureAwait(false);
_heartbeat.Interval = PingInterval.TotalMilliseconds;
_heartbeat.Elapsed += Ping;
_heartbeat.Start();
}

private async void Ping(object sender, ElapsedEventArgs args)
{
await Ping().ConfigureAwait(false);
}

private async Task Ping()
{
await _client.EmitAsync("ping").ConfigureAwait(false);
}
}
Loading

0 comments on commit 1f28ad8

Please sign in to comment.