Skip to content

Commit

Permalink
[OneBot] try to support multi-connection
Browse files Browse the repository at this point in the history
  • Loading branch information
Linwenxuan authored and Linwenxuan committed Oct 27, 2023
1 parent 0a88ad9 commit b417adc
Show file tree
Hide file tree
Showing 8 changed files with 115 additions and 45 deletions.
69 changes: 62 additions & 7 deletions Lagrange.OneBot/Core/Network/LagrangeWebSvcCollection.cs
Original file line number Diff line number Diff line change
@@ -1,29 +1,84 @@
using Lagrange.OneBot.Core.Network.Service;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;

namespace Lagrange.OneBot.Core.Network;

public class LagrangeWebSvcCollection : List<ILagrangeWebService>, IHostedService
public class LagrangeWebSvcCollection : Dictionary<string, ILagrangeWebService>, IHostedService
{
private const string Tag = nameof(LagrangeWebSvcCollection);

public event EventHandler<MsgRecvEventArgs> OnMessageReceived = delegate { };

public LagrangeWebSvcCollection(IEnumerable<ILagrangeWebService> services) : base(services)
public LagrangeWebSvcCollection(IConfiguration global, ILogger<LagrangeApp> logger)
{
foreach (var service in this) service.OnMessageReceived += OnMessageReceived.Invoke;
uint uin = global.GetValue<uint>("Account:Uin");

if (global.GetSection("Implementations").Exists())
{
logger.LogInformation($"[{Tag}]: Multi Connection has been configured");

foreach (var section in global.GetSection("Implementations").GetChildren())
{
ILagrangeWebService? service = section["Type"] switch
{
"ReverseWebSocket" => new ReverseWSService(section, logger, uin),
"ForwardWebSocket" => new ForwardWSService(section, logger, uin),
_ => null
};

if (service == null) logger.LogWarning($"[{Tag}]: unknown type of service of {section["Type"]} is configured, skipped");
else Add(new Guid().ToString(), service);
}
}
else if (global.GetSection("Implementation").Exists())
{
logger.LogInformation($"[{Tag}]: Single Connection has been configured");

string identifier = new Guid().ToString();
if (global.GetSection("Implementation:ReverseWebSocket").Exists())
{
this[identifier] = new ReverseWSService(global.GetSection("Implementation:ReverseWebSocket"), logger, uin);
}
else if (global.GetSection("Implementation:ForwardWebSocket").Exists())
{
this[identifier] = new ForwardWSService(global.GetSection("Implementation:ForwardWebSocket"), logger, uin);
}
}
else
{
logger.LogWarning($"[{Tag}]: No implementation has been configured");
}

foreach (var (identifier, service) in this)
{
service.OnMessageReceived += (sender, args) =>
{
OnMessageReceived.Invoke(sender, new MsgRecvEventArgs(identifier, args.Data));
};
}
}

public async Task StartAsync(CancellationToken cancellationToken)
{
foreach (var service in this) await service.StartAsync(cancellationToken);
foreach (var (_, service) in this) await service.StartAsync(cancellationToken);
}

public async Task StopAsync(CancellationToken cancellationToken)
{
foreach (var service in this) await service.StopAsync(cancellationToken);
foreach (var (_, service) in this) await service.StopAsync(cancellationToken);
}

public async Task SendJsonAsync<T>(T json, CancellationToken cancellationToken = default)
public async Task SendJsonAsync<T>(T json, string? identifier = null, CancellationToken cancellationToken = default)
{
foreach (var service in this) await service.SendJsonAsync(json, cancellationToken);
if (identifier == null)
{
foreach (var (_, service) in this) await service.SendJsonAsync(json, cancellationToken);
}
else
{
if (TryGetValue(identifier, out var service)) await service.SendJsonAsync(json, cancellationToken);
}
}
}
13 changes: 6 additions & 7 deletions Lagrange.OneBot/Core/Network/Service/ForwardWSService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,22 +18,21 @@ public sealed class ForwardWSService : LagrangeWSService

private readonly Timer _timer;

public ForwardWSService(IConfiguration config, ILogger<LagrangeApp> logger) : base(config, logger)
public ForwardWSService(IConfiguration config, ILogger<LagrangeApp> logger, uint uin) : base(config, logger, uin)
{
var ws = config.GetSection("Implementation").GetSection("ForwardWebSocket");
string url = $"ws://{ws["Host"]}:{ws["Port"]}";
string url = $"ws://{config["Host"]}:{config["Port"]}";

_server = new WebSocketServer(url)
{
RestartAfterListenError = true
};

_timer = new Timer(OnHeartbeat, null, 1, ws.GetValue<int>("HeartBeatInterval"));
_timer = new Timer(OnHeartbeat, null, 1, config.GetValue<int>("HeartBeatInterval"));
}

public override Task StartAsync(CancellationToken cancellationToken)
{
uint port = Config.GetValue<uint?>("Implementation:ForwardWebSocket:Port") ?? throw new Exception("Port is not defined");
uint port = Config.GetValue<uint?>("Port") ?? throw new Exception("Port is not defined");
if (IsPortInUse(port))
{
Logger.LogCritical($"[{Tag}] The port {port} is in use, {Tag} failed to start");
Expand All @@ -56,10 +55,10 @@ public override Task StartAsync(CancellationToken cancellationToken)
{
Logger.LogInformation($"[{Tag}]: Connected");

var lifecycle = new OneBotLifecycle(Config.GetValue<uint>("Account:Uin"), "connect");
var lifecycle = new OneBotLifecycle(Uin, "connect");
SendJsonAsync(lifecycle, cancellationToken).GetAwaiter().GetResult();

_timer.Change(0, Config.GetValue<int>("Implementation:ForwardWebSocket:HeartBeatInterval"));
_timer.Change(0, Config.GetValue<int>("HeartBeatInterval"));
};

conn.OnClose = () =>
Expand Down
9 changes: 4 additions & 5 deletions Lagrange.OneBot/Core/Network/Service/LagrangeWSService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,19 +4,18 @@

namespace Lagrange.OneBot.Core.Network.Service;

public abstract class LagrangeWSService(IConfiguration config, ILogger<LagrangeApp> logger) : ILagrangeWebService
public abstract class LagrangeWSService(IConfiguration config, ILogger<LagrangeApp> logger, uint uin) : ILagrangeWebService
{
protected readonly ILogger Logger = logger;

protected readonly IConfiguration Config = config;

protected readonly uint Uin = uin;

protected void OnHeartbeat(object? sender)
{
var status = new OneBotStatus(true, true);
var heartBeat = new OneBotHeartBeat(
Config.GetValue<uint>("Account:Uin"),
Config.GetValue<int>("Implementation:ReverseWebSocket:HeartBeatInterval"),
status);
var heartBeat = new OneBotHeartBeat(Uin, Config.GetValue<int>("HeartBeatInterval"), status);

SendJsonAsync(heartBeat);
}
Expand Down
13 changes: 7 additions & 6 deletions Lagrange.OneBot/Core/Network/Service/ReverseWSService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,9 @@ public sealed class ReverseWSService : LagrangeWSService

private readonly Timer _timer;

public ReverseWSService(IConfiguration config, ILogger<LagrangeApp> logger) : base(config, logger)
public ReverseWSService(IConfiguration config, ILogger<LagrangeApp> logger, uint uin) : base(config, logger, uin)
{
var ws = Config.GetSection("Implementation").GetSection("ReverseWebSocket");
string url = $"ws://{ws["Host"]}:{ws["Port"]}{ws["Suffix"]}";
string url = $"ws://{config["Host"]}:{config["Port"]}{config["Suffix"]}";

_socket = new WebsocketClient(new Uri(url), () =>
{
Expand All @@ -33,12 +32,12 @@ public ReverseWSService(IConfiguration config, ILogger<LagrangeApp> logger) : ba
{ "User-Agent", Constant.OneBotImpl }
});
socket.Options.KeepAliveInterval = TimeSpan.FromSeconds(5);
if (Config["AccessToken"] != null) socket.Options.SetRequestHeader("Authorization", $"Bearer {Config["AccessToken"]}");
if (string.IsNullOrEmpty(config["AccessToken"])) socket.Options.SetRequestHeader("Authorization", $"Bearer {config["AccessToken"]}");

return socket;
});

_timer = new Timer(OnHeartbeat, null, -1, ws.GetValue<int>("HeartBeatInterval"));
_timer = new Timer(OnHeartbeat, null, -1, config.GetValue<int>("HeartBeatInterval"));
_socket.MessageReceived.Subscribe(resp =>
{
Logger.LogTrace($"[{Tag}] Receive: {resp.Text}");
Expand All @@ -50,8 +49,10 @@ public override async Task StartAsync(CancellationToken cancellationToken)
{
await _socket.Start();

var lifecycle = new OneBotLifecycle(Config.GetValue<uint>("Account:Uin"), "connect");
var lifecycle = new OneBotLifecycle(Uin, "connect");
await SendJsonAsync(lifecycle, cancellationToken);

_timer.Change(0, Config.GetValue<int>("HeartBeatInterval"));
}

public override Task StopAsync(CancellationToken cancellationToken)
Expand Down
12 changes: 6 additions & 6 deletions Lagrange.OneBot/Core/Operation/OperationService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,12 @@ public OperationService(BotContext bot, LagrangeWebSvcCollection service)
if (attribute != null) _operations[attribute.Api] = (IOperation)type.CreateInstance(false);
}

service.OnMessageReceived += async (_, e) => await HandleOperation(e.Data);
service.OnMessageReceived += async (_, e) => await HandleOperation(e);
}

private async Task HandleOperation(string data)
private async Task HandleOperation(MsgRecvEventArgs eventArgs)
{
var action = JsonSerializer.Deserialize<OneBotAction>(data);
var action = JsonSerializer.Deserialize<OneBotAction>(eventArgs.Data);

try
{
Expand All @@ -44,11 +44,11 @@ private async Task HandleOperation(string data)
var result = await handler.HandleOperation(_bot, action.Params);
result.Echo = action.Echo;

await _service.SendJsonAsync(result);
await _service.SendJsonAsync(result, eventArgs.Identifier);
}
else
{
await _service.SendJsonAsync(new OneBotResult(null, 404, "failed"));
await _service.SendJsonAsync(new OneBotResult(null, 404, "failed"), eventArgs.Identifier);
}
}
else
Expand All @@ -58,7 +58,7 @@ private async Task HandleOperation(string data)
}
catch
{
await _service.SendJsonAsync(new OneBotResult(null, 200, "failed"));
await _service.SendJsonAsync(new OneBotResult(null, 200, "failed"), eventArgs.Identifier);
}
}
}
4 changes: 2 additions & 2 deletions Lagrange.OneBot/LagrangeApp.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
using Lagrange.Core.Common.Interface.Api;
using Lagrange.Core.Utility.Sign;
using Lagrange.OneBot.Core.Message;
using Lagrange.OneBot.Core.Network.Service;
using Lagrange.OneBot.Core.Network;
using Lagrange.OneBot.Core.Operation;
using Lagrange.OneBot.Utility;
using Microsoft.Extensions.Configuration;
Expand All @@ -26,7 +26,7 @@ public class LagrangeApp : IHost

public BotContext Instance => Services.GetRequiredService<BotContext>();

public ILagrangeWebService WebService => Services.GetRequiredService<ILagrangeWebService>();
public LagrangeWebSvcCollection WebService => Services.GetRequiredService<LagrangeWebSvcCollection>();

public MessageService MessageService { get; set; }

Expand Down
9 changes: 0 additions & 9 deletions Lagrange.OneBot/LagrangeAppBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -78,15 +78,6 @@ public LagrangeAppBuilder ConfigureBots()

public LagrangeAppBuilder ConfigureOneBot()
{
if (Configuration.GetSection("Implementation:ReverseWebSocket").Exists())
{
Services.AddSingleton<ILagrangeWebService, ReverseWSService>();
}
else if (Configuration.GetSection("Implementation:ForwardWebSocket").Exists())
{
Services.AddSingleton<ILagrangeWebService, ForwardWSService>();
}

Services.AddSingleton<LagrangeWebSvcCollection>();

Services.AddSingleton<ContextBase, LiteDbContext>();
Expand Down
31 changes: 28 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,6 @@ Please use Lagrange.Core responsibly and in accordance with the law.
"Microsoft.Hosting.Lifetime": "Information"
}
},
"AccessToken": "",
"SignServerUrl": "",
"Account": {
"Uin": 0,
Expand All @@ -269,14 +268,16 @@ Please use Lagrange.Core responsibly and in accordance with the law.
"ForwardWebSocket": {
"Host": "127.0.0.1",
"Port": 8081,
"HeartBeatInterval": 5000
"HeartBeatInterval": 5000,
"AccessToken": ""
},
"ReverseWebSocket": {
"Host": "127.0.0.1",
"Port": 8080,
"Suffix": "/onebot/v11/ws",
"ReconnectInterval": 5000,
"HeartBeatInterval": 5000
"HeartBeatInterval": 5000,
"AccessToken": ""
},
"Http": {
"Host": "",
Expand All @@ -296,6 +297,30 @@ Please use Lagrange.Core responsibly and in accordance with the law.
- Create a file named 'appsettings.json' under Lagrange.OneBot executable directory
- As the Password is empty here, this indicates that QRCode login is used
- After the QRCode Login, write Uin back to perform EasyLogin
- If you want a multi connection, remove the 'Implementation' Part and add
```json
{ "Implementations": [
{
"Type": "ReverseWebsocket",
"Host": "127.0.0.1",
"Port": 8080,
"Suffix": "/onebot/v11/ws",
"ReconnectInterval": 5000,
"HeartBeatInterval": 5000,
"AccessToken": ""
},
{
"Type": "ReverseWebsocket",
"Host": "127.0.0.1",
"Port": 8081,
"HeartBeatInterval": 5000,
"AccessToken": ""
}
]
}

```


## Known Problem

Expand Down

0 comments on commit b417adc

Please sign in to comment.