Skip to content

Commit

Permalink
Merge pull request #61 from itn3000/unix-domain-socket
Browse files Browse the repository at this point in the history
Add UnixDomainSocket feature to interprocess PubSub
  • Loading branch information
neuecc authored Jan 13, 2022
2 parents 5ee8e62 + c6af515 commit 044b810
Show file tree
Hide file tree
Showing 32 changed files with 1,574 additions and 650 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -143,3 +143,5 @@ src/MessagePipe.Unity/Zenject-Editor.csproj
src/MessagePipe.Unity/Zenject.csproj

src/MessagePipe.Unity/MessagePipe.Editor.csproj

BenchmarkDotNet.Artifacts/
9 changes: 9 additions & 0 deletions MessagePipe.sln
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "MessagePipe.Interprocess.Te
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "InterprocessServer", "sandbox\InterprocessServer\InterprocessServer.csproj", "{0E645BF9-3464-4856-A624-FEFCF0050220}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "MessagePipe.Interprocess.Benchmark", "tests\MessagePipe.Interprocess.Benchmark\MessagePipe.Interprocess.Benchmark.csproj", "{AF421C72-0DDA-4568-BD7E-B3CE90B3B31C}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
Expand Down Expand Up @@ -135,6 +137,12 @@ Global
{0E645BF9-3464-4856-A624-FEFCF0050220}.Release|Any CPU.Build.0 = Release|Any CPU
{0E645BF9-3464-4856-A624-FEFCF0050220}.WinBenchmark|Any CPU.ActiveCfg = Debug|Any CPU
{0E645BF9-3464-4856-A624-FEFCF0050220}.WinBenchmark|Any CPU.Build.0 = Debug|Any CPU
{AF421C72-0DDA-4568-BD7E-B3CE90B3B31C}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{AF421C72-0DDA-4568-BD7E-B3CE90B3B31C}.Debug|Any CPU.Build.0 = Debug|Any CPU
{AF421C72-0DDA-4568-BD7E-B3CE90B3B31C}.Release|Any CPU.ActiveCfg = Release|Any CPU
{AF421C72-0DDA-4568-BD7E-B3CE90B3B31C}.Release|Any CPU.Build.0 = Release|Any CPU
{AF421C72-0DDA-4568-BD7E-B3CE90B3B31C}.WinBenchmark|Any CPU.ActiveCfg = Debug|Any CPU
{AF421C72-0DDA-4568-BD7E-B3CE90B3B31C}.WinBenchmark|Any CPU.Build.0 = Debug|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
Expand All @@ -154,6 +162,7 @@ Global
{10E626FF-DE92-4784-A9EB-F5A6BE26D1D3} = {381F6F79-110B-4CE4-9A49-583046D8C164}
{3B2C3908-E9EB-43F3-9340-3CF5EFBA2F09} = {36546FD6-866F-4809-AFCE-87F7F4201361}
{0E645BF9-3464-4856-A624-FEFCF0050220} = {9813BFC3-7860-4697-A3AF-118BDF710BD0}
{AF421C72-0DDA-4568-BD7E-B3CE90B3B31C} = {36546FD6-866F-4809-AFCE-87F7F4201361}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {138B7AA8-E4C0-46A3-A48B-0D373CAC365D}
Expand Down
30 changes: 30 additions & 0 deletions src/MessagePipe.Interprocess/MessagePipeInterprocessOptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -64,4 +64,34 @@ public MessagePipeInterprocessTcpOptions(string host, int port)
this.HostAsServer = null;
}
}
#if NET5_0_OR_GREATER
public sealed class MessagePipeInterprocessUdpUdsOptions : MessagePipeInterprocessOptions
{
public string SocketPath { get; set; }

public MessagePipeInterprocessUdpUdsOptions(string socketPath)
: base()
{
this.SocketPath = socketPath;
}

}
public sealed class MessagePipeInterprocessTcpUdsOptions : MessagePipeInterprocessOptions
{
public string SocketPath { get; set; }
public int? SendBufferSize { get; set; }
public int? ReceiveBufferSize { get; set; }
public bool? HostAsServer { get; set; }
public MessagePipeInterprocessTcpUdsOptions(string socketPath): this(socketPath, null, null)
{
}
public MessagePipeInterprocessTcpUdsOptions(string socketPath, int? sendBufferSize, int? recvBufferSize)
{
this.SocketPath = socketPath;
HostAsServer = null;
this.SendBufferSize = sendBufferSize;
this.ReceiveBufferSize = recvBufferSize;
}
}
#endif
}
Original file line number Diff line number Diff line change
Expand Up @@ -163,5 +163,52 @@ public static IServiceCollection RegisterTcpRemoteRequestHandler<TRequest, TResp
//}

#endif
#if NET5_0_OR_GREATER
public static ReturnType AddMessagePipeUdpInterprocessUds(this IServiceCollection services, string domainSocketPath)
{
return AddMessagePipeUdpInterprocessUds(services, domainSocketPath, _ => { });
}
public static ReturnType AddMessagePipeUdpInterprocessUds(this IServiceCollection services, string domainSocketPath, Action<MessagePipeInterprocessUdpUdsOptions> configure)
{
var options = new MessagePipeInterprocessUdpUdsOptions(domainSocketPath);
configure(options);

services.AddSingleton(options);
services.Add(typeof(UdpWorker), options.InstanceLifetime);

#if !UNITY_2018_3_OR_NEWER
services.Add(typeof(IDistributedPublisher<,>), typeof(UdpDistributedPublisher<,>), options.InstanceLifetime);
services.Add(typeof(IDistributedSubscriber<,>), typeof(UdpDistributedSubscriber<,>), options.InstanceLifetime);
return services;
#else
AddAsyncMessageBroker<IInterprocessKey, IInterprocessValue>(services, options);
return options;
#endif
}
public static ReturnType AddMessagePipeTcpInterprocessUds(this IServiceCollection services, string domainSocketPath)
{
return AddMessagePipeTcpInterprocessUds(services, domainSocketPath, _ => { });
}

public static ReturnType AddMessagePipeTcpInterprocessUds(this IServiceCollection services, string domainSocketPath, Action<MessagePipeInterprocessTcpUdsOptions> configure)
{
var options = new MessagePipeInterprocessTcpUdsOptions(domainSocketPath);
configure(options);

services.AddSingleton(options);
services.Add(typeof(TcpWorker), options.InstanceLifetime);

#if !UNITY_2018_3_OR_NEWER
services.Add(typeof(IDistributedPublisher<,>), typeof(TcpDistributedPublisher<,>), options.InstanceLifetime);
services.Add(typeof(IDistributedSubscriber<,>), typeof(TcpDistributedSubscriber<,>), options.InstanceLifetime);
services.Add(typeof(IRemoteRequestHandler<,>), typeof(TcpRemoteRequestHandler<,>), options.InstanceLifetime);
return services;
#else
AddAsyncMessageBroker<IInterprocessKey, IInterprocessValue>(services, options);
return options;
#endif

}
#endif // NET5_0_OR_GREATER
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ public ValueTask PublishAsync(TKey key, TMessage message, CancellationToken canc
public sealed class TcpDistributedSubscriber<TKey, TMessage> : IDistributedSubscriber<TKey, TMessage>
{
// Pubsished from UdpWorker.
readonly MessagePipeInterprocessTcpOptions options;
readonly MessagePipeInterprocessOptions options;
readonly IAsyncSubscriber<IInterprocessKey, IInterprocessValue> subscriberCore;
readonly FilterAttachedMessageHandlerFactory syncHandlerFactory;
readonly FilterAttachedAsyncMessageHandlerFactory asyncHandlerFactory;
Expand All @@ -43,7 +43,18 @@ public TcpDistributedSubscriber(TcpWorker worker, MessagePipeInterprocessTcpOpti

worker.StartReceiver();
}
#if NET5_0_OR_GREATER
[Preserve]
public TcpDistributedSubscriber(TcpWorker worker, MessagePipeInterprocessTcpUdsOptions options, IAsyncSubscriber<IInterprocessKey, IInterprocessValue> subscriberCore, FilterAttachedMessageHandlerFactory syncHandlerFactory, FilterAttachedAsyncMessageHandlerFactory asyncHandlerFactory)
{
this.options = options;
this.subscriberCore = subscriberCore;
this.syncHandlerFactory = syncHandlerFactory;
this.asyncHandlerFactory = asyncHandlerFactory;

worker.StartReceiver();
}
#endif
public ValueTask<IAsyncDisposable> SubscribeAsync(TKey key, IMessageHandler<TMessage> handler, CancellationToken cancellationToken = default)
{
return SubscribeAsync(key, handler, Array.Empty<MessageHandlerFilter<TMessage>>(), cancellationToken);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ public ValueTask PublishAsync(TKey key, TMessage message, CancellationToken canc
public sealed class UdpDistributedSubscriber<TKey, TMessage> : IDistributedSubscriber<TKey, TMessage>
{
// Pubsished from UdpWorker.
readonly MessagePipeInterprocessUdpOptions options;
readonly MessagePipeInterprocessOptions options;
readonly IAsyncSubscriber<IInterprocessKey, IInterprocessValue> subscriberCore;
readonly FilterAttachedMessageHandlerFactory syncHandlerFactory;
readonly FilterAttachedAsyncMessageHandlerFactory asyncHandlerFactory;
Expand All @@ -43,7 +43,18 @@ public UdpDistributedSubscriber(UdpWorker worker, MessagePipeInterprocessUdpOpti

worker.StartReceiver();
}
#if NET5_0_OR_GREATER
[Preserve]
public UdpDistributedSubscriber(UdpWorker worker, MessagePipeInterprocessUdpUdsOptions options, IAsyncSubscriber<IInterprocessKey, IInterprocessValue> subscriberCore, FilterAttachedMessageHandlerFactory syncHandlerFactory, FilterAttachedAsyncMessageHandlerFactory asyncHandlerFactory)
{
this.options = options;
this.subscriberCore = subscriberCore;
this.syncHandlerFactory = syncHandlerFactory;
this.asyncHandlerFactory = asyncHandlerFactory;

worker.StartReceiver();
}
#endif
public ValueTask<IAsyncDisposable> SubscribeAsync(TKey key, IMessageHandler<TMessage> handler, CancellationToken cancellationToken = default)
{
return SubscribeAsync(key, handler, Array.Empty<MessageHandlerFilter<TMessage>>(), cancellationToken);
Expand Down
52 changes: 46 additions & 6 deletions src/MessagePipe.Interprocess/Workers/SocketTcpClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,21 +14,47 @@ internal sealed class SocketTcpServer : IDisposable

readonly Socket socket;

SocketTcpServer(AddressFamily addressFamily)
SocketTcpServer(AddressFamily addressFamily, ProtocolType protocolType, int? sendBufferSize, int? recvBufferSize)
{
socket = new Socket(addressFamily, SocketType.Stream, ProtocolType.Tcp);
socket = new Socket(addressFamily, SocketType.Stream, protocolType);
if(sendBufferSize.HasValue)
{
socket.SendBufferSize = sendBufferSize.Value;
}
if(recvBufferSize.HasValue)
{
socket.ReceiveBufferSize = recvBufferSize.Value;
}
}

public static SocketTcpServer Listen(string host, int port)
{
var ip = new IPEndPoint(IPAddress.Parse(host), port);
var server = new SocketTcpServer(ip.AddressFamily);
var server = new SocketTcpServer(ip.AddressFamily, ProtocolType.Tcp, null, null);

server.socket.Bind(ip);
server.socket.Listen(MaxConnections);
return server;
}

#if NET5_0_OR_GREATER
/// <summary>
/// create TCP unix domain socket server and listen
/// </summary>
/// <param name="domainSocketPath">path to unix domain socket</param>
/// <param name="recvBufferSize">socket's receive buffer size</param>
/// <param name="sendBufferSize">socket's send buffer size</param>
/// <exception cref="SocketException">unix domain socket not supported or socket already exists</exception>
/// <returns>TCP unix domain socket server</returns>
public static SocketTcpServer ListenUds(string domainSocketPath, int? sendBufferSize = null, int? recvBufferSize = null)
{
var server = new SocketTcpServer(AddressFamily.Unix, ProtocolType.IP, sendBufferSize, recvBufferSize);
server.socket.Bind(new UnixDomainSocketEndPoint(domainSocketPath));
server.socket.Listen(MaxConnections);
return server;
}
#endif

public async void StartAcceptLoopAsync(Action<SocketTcpClient> onAccept, CancellationToken cancellationToken)
{
while (!cancellationToken.IsCancellationRequested)
Expand Down Expand Up @@ -56,9 +82,9 @@ internal sealed class SocketTcpClient : IDisposable
{
readonly Socket socket;

SocketTcpClient(AddressFamily addressFamily)
SocketTcpClient(AddressFamily addressFamily, ProtocolType protocolType)
{
socket = new Socket(addressFamily, SocketType.Stream, ProtocolType.Tcp);
socket = new Socket(addressFamily, SocketType.Stream, protocolType);
}

internal SocketTcpClient(Socket socket)
Expand All @@ -69,10 +95,24 @@ internal SocketTcpClient(Socket socket)
public static SocketTcpClient Connect(string host, int port)
{
var ip = new IPEndPoint(IPAddress.Parse(host), port);
var client = new SocketTcpClient(ip.AddressFamily);
var client = new SocketTcpClient(ip.AddressFamily, ProtocolType.Tcp);
client.socket.Connect(ip);
return client;
}
#if NET5_0_OR_GREATER
/// <summary>
/// create TCP unix domain socket client and connect to server
/// </summary>
/// <param name="domainSocketPath">path to unix domain socket</param>
/// <exception cref="SocketException">unix domain socket not supported or server does not listen</exception>
/// <returns>TCP socket client.</returns>
public static SocketTcpClient ConnectUds(string domainSocketPath)
{
var client = new SocketTcpClient(AddressFamily.Unix, ProtocolType.IP);
client.socket.Connect(new UnixDomainSocketEndPoint(domainSocketPath));
return client;
}
#endif

public async ValueTask<int> ReceiveAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
{
Expand Down
46 changes: 38 additions & 8 deletions src/MessagePipe.Interprocess/Workers/SocketUdpClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,19 +13,33 @@ internal sealed class SocketUdpServer : IDisposable
readonly Socket socket;
readonly byte[] buffer;

SocketUdpServer(int bufferSize)
SocketUdpServer(int bufferSize, AddressFamily addressFamily, ProtocolType protocolType)
{
socket = new Socket(AddressFamily.InterNetwork, SocketType.Dgram, ProtocolType.Udp);
socket = new Socket(addressFamily, SocketType.Dgram, protocolType);
socket.ReceiveBufferSize = bufferSize;
buffer = new byte[Math.Max(bufferSize, MinBuffer)];
}

public static SocketUdpServer Bind(int port, int bufferSize)
{
var server = new SocketUdpServer(bufferSize);
var server = new SocketUdpServer(bufferSize, AddressFamily.InterNetwork, ProtocolType.Udp);
server.socket.Bind(new IPEndPoint(IPAddress.Any, port));
return server;
}
#if NET5_0_OR_GREATER
/// <summary>
/// create UDP socket and bind for listen.
/// </summary>
/// <param name="domainSocketPath">path to socket</param>
/// <param name="bufferSize">socket buffer size</param>
/// <exception cref="SocketException">unix domain socket not supported or socket already exists even if it is not bound</exception>
/// <returns>UDP server with bound socket</returns>
public static SocketUdpServer BindUds(string domainSocketPath, int bufferSize)
{
var server = new SocketUdpServer(bufferSize, AddressFamily.Unix, ProtocolType.IP);
server.socket.Bind(new UnixDomainSocketEndPoint(domainSocketPath));
return server;
}
#endif

public async ValueTask<ReadOnlyMemory<byte>> ReceiveAsync(CancellationToken cancellationToken)
{
Expand Down Expand Up @@ -68,19 +82,35 @@ internal sealed class SocketUdpClient : IDisposable
readonly Socket socket;
readonly byte[] buffer;

SocketUdpClient(int bufferSize)
SocketUdpClient(int bufferSize, AddressFamily addressFamily, ProtocolType protocolType)
{
socket = new Socket(AddressFamily.InterNetwork, SocketType.Dgram, ProtocolType.Udp);
socket = new Socket(addressFamily, SocketType.Dgram, protocolType);
socket.SendBufferSize = bufferSize;
buffer = new byte[Math.Max(bufferSize, MinBuffer)];
}

public static SocketUdpClient Connect(string host, int port, int bufferSize)
{
var client = new SocketUdpClient(bufferSize);
client.socket.Connect(new IPEndPoint(IPAddress.Parse(host), port));
var ipaddr = IPAddress.Parse(host);
var client = new SocketUdpClient(bufferSize, ipaddr.AddressFamily, ProtocolType.Udp);
client.socket.Connect(new IPEndPoint(ipaddr, port));
return client;
}
#if NET5_0_OR_GREATER
/// <summary>
/// create UDP unix domain socket client and connect to server
/// </summary>
/// <param name="domainSocketPath">path to unix domain socket</param>
/// <param name="bufferSize"></param>
/// <exception cref="SocketException">unix domain socket not supported or server does not exist</exception>
/// <returns>UDP unix domain socket client</returns>
public static SocketUdpClient ConnectUds(string domainSocketPath, int bufferSize)
{
var client = new SocketUdpClient(bufferSize, AddressFamily.Unix, ProtocolType.IP);
client.socket.Connect(new UnixDomainSocketEndPoint(domainSocketPath));
return client;
}
#endif

public ValueTask<int> SendAsync(byte[] buffer, CancellationToken cancellationToken = default)
{
Expand Down
Loading

0 comments on commit 044b810

Please sign in to comment.