From d8d7e8ba6fa4670a47beb66c480dae177aa224a4 Mon Sep 17 00:00:00 2001 From: Yoshifumi Kawai Date: Mon, 12 Jul 2021 14:58:29 +0900 Subject: [PATCH 1/5] Create LICENSE --- LICENSE | 21 +++++++++++++++++++++ 1 file changed, 21 insertions(+) create mode 100644 LICENSE diff --git a/LICENSE b/LICENSE new file mode 100644 index 0000000..595af32 --- /dev/null +++ b/LICENSE @@ -0,0 +1,21 @@ +MIT License + +Copyright (c) 2021 Cysharp, Inc. + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. From fb67e38eb27e18736e03f09a443ca5d57ad9605d Mon Sep 17 00:00:00 2001 From: Andrey Dobrikov Date: Sun, 1 Aug 2021 12:42:55 -0400 Subject: [PATCH 2/5] Update README.md Fixes typo in AsObservable. --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index 8bc11d4..a812c12 100644 --- a/README.md +++ b/README.md @@ -461,7 +461,7 @@ public static ValueTask FirstAsync(this ISubscriber` overload can filter messages by predicate (internally implemented with PredicateFilter, where Order is int.MinValue and is always checked first). -`AsObservable` can convert message pipeline to `IObservable`, it can handle by Reactive Extensions(in Unity, you can use `UniRx`). `AsObervable` exists in sync subscriber(keyless, keyed, buffered). +`AsObservable` can convert message pipeline to `IObservable`, it can handle by Reactive Extensions(in Unity, you can use `UniRx`). `AsObservable` exists in sync subscriber(keyless, keyed, buffered). `AsAsyncEnumerable` can convert message pipeline to `IAsyncEnumerable`, it can handle by async LINQ and async foreach. `AsAsyncEnumerable` exists in async subscriber(keyless, keyed, buffered). From 2e10b12934c95254aa36af8a06d5d5bde1ab6e79 Mon Sep 17 00:00:00 2001 From: Andrey Dobrikov Date: Sun, 1 Aug 2021 12:54:40 -0400 Subject: [PATCH 3/5] Update README.md Clarified the punctuation and grammar. --- README.md | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/README.md b/README.md index 8bc11d4..ba03e2f 100644 --- a/README.md +++ b/README.md @@ -477,11 +477,11 @@ var value = await subscriber.FirstAsync(cts.Token); Filter --- -Filter system can hook before and after method invocation. It is implemented with the Middleware pattern, which allows you to write synchronous and asynchronous code with similar syntax. MessagePipe's filter kind are sync(`MessageHandlerFilter`) and async(`AsyncMessageHandlerFilter`) and request(`RequestHandlerFilter`) and async request (`AsyncRequestHandlerFilter`), you can inherit theres to implement filter. +Filter system can hook before and after method invocation. It is implemented with the Middleware pattern, which allows you to write synchronous and asynchronous code with similar syntax. MessagePipe provides different filter types - sync (`MessageHandlerFilter`), async (`AsyncMessageHandlerFilter`), request (`RequestHandlerFilter`) and async request (`AsyncRequestHandlerFilter`). To implement other concerete filters the above filter types can be extended. -Filters can be specified in three places. Global(by `MessagePipeOptions.AddGlobalFilter`), per handler type, and per subscribe. The filters are sorted according to the Order specified in each of them, and are generated when subscribing. +Filters can be specified in three places - global(by `MessagePipeOptions.AddGlobalFilter`), per handler type, and per subscription. These filters are sorted according to the Order specified in each of them, and are generated when subscribing. -Since it is generated on a per-subscribe basis, the filter can have a state. +Since the filter is generated on a per subscription basis, the filter can have a state. ```csharp public class ChangedValueFilter : MessageHandlerFilter From e4e0390662a4a8448146c5eab106b0c602552adf Mon Sep 17 00:00:00 2001 From: Yoshifumi Kawai Date: Wed, 27 Oct 2021 11:19:25 +0900 Subject: [PATCH 4/5] ReadMe --- README.md | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/README.md b/README.md index 8bc11d4..6ed6172 100644 --- a/README.md +++ b/README.md @@ -959,7 +959,7 @@ async void A(IRemoteRequestHandler remoteHandler) var v = await remoteHandler.InvokeAsync(9999); Console.WriteLine(v); // ECHO:9999 } -`` +``` For Unity, requires to import MessagePack-CSharp package and needs slightly different configuration. @@ -1184,6 +1184,9 @@ public class GameLifetimeScope : LifetimeScope // RegisterMessagePipe returns options. var options = builder.RegisterMessagePipe(/* configure option */); + // Setup GlobalMessagePipe to enable diagnostics window and global function + builder.RegisterBuildCallback(c => GlobalMessagePipe.SetProvider(c.AsServiceProvider())); + // RegisterMessageBroker: Register for IPublisher/ISubscriber, includes async and buffered. builder.RegisterMessageBroker(options); @@ -1201,13 +1204,10 @@ public class MessagePipeDemo : VContainer.Unity.IStartable readonly IPublisher publisher; readonly ISubscriber subscriber; - public MessagePipeDemo(IPublisher publisher, ISubscriber subscriber, IObjectResolver resolver) + public MessagePipeDemo(IPublisher publisher, ISubscriber subscriber) { this.publisher = publisher; this.subscriber = subscriber; - - // set global to enable diagnostics window and global function - GlobalMessagePipe.SetProvider(resolver.AsServiceProvider()); } public void Start() From 4f838c42938f5460ee1628d5f05b6c7a226ac4c0 Mon Sep 17 00:00:00 2001 From: neuecc Date: Wed, 12 Jan 2022 21:51:06 +0900 Subject: [PATCH 5/5] Add ISingleton/ScopedPublisher/Subscriber --- MessagePipe.sln | 4 +- .../MessagePipe.Sandbox.ConsoleApp/Program.cs | 830 +++++++++--------- .../MessagePipe/Runtime/AsyncMessageBroker.cs | 212 +++-- .../Runtime/AsyncMessageBroker_Key.cs | 50 +- .../Runtime/IPublisherSubscriber.cs | 18 + .../MessagePipe/Runtime/MessageBroker.cs | 156 ++-- .../MessagePipe/Runtime/MessageBroker_Key.cs | 48 +- .../Runtime/ServiceCollectionExtensions.cs | 36 +- .../SubscriberExtensions.FirstAsync.cs | 12 +- src/MessagePipe/AsyncMessageBroker.cs | 212 +++-- src/MessagePipe/AsyncMessageBroker_Key.cs | 50 +- src/MessagePipe/IPublisherSubscriber.cs | 18 + src/MessagePipe/MessageBroker.cs | 156 ++-- src/MessagePipe/MessageBroker_Key.cs | 48 +- .../ServiceCollectionExtensions.cs | 36 +- 15 files changed, 1161 insertions(+), 725 deletions(-) diff --git a/MessagePipe.sln b/MessagePipe.sln index 7ac2765..1ea5cab 100644 --- a/MessagePipe.sln +++ b/MessagePipe.sln @@ -1,7 +1,7 @@  Microsoft Visual Studio Solution File, Format Version 12.00 -# Visual Studio Version 16 -VisualStudioVersion = 16.0.30804.86 +# Visual Studio Version 17 +VisualStudioVersion = 17.0.32002.185 MinimumVisualStudioVersion = 10.0.40219.1 Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "MessagePipe", "src\MessagePipe\MessagePipe.csproj", "{D94EF9D4-3CC2-4420-8D0B-25957E7C5309}" EndProject diff --git a/sandbox/MessagePipe.Sandbox.ConsoleApp/Program.cs b/sandbox/MessagePipe.Sandbox.ConsoleApp/Program.cs index 388966d..d902f6b 100644 --- a/sandbox/MessagePipe.Sandbox.ConsoleApp/Program.cs +++ b/sandbox/MessagePipe.Sandbox.ConsoleApp/Program.cs @@ -1,27 +1,27 @@ #pragma warning disable CS8603 // Possible null reference return. #pragma warning disable CS8618 // Non-nullable field must contain a non-null value when exiting constructor. Consider declaring as nullable. - -using ConsoleAppFramework; -using MessagePipe.Sandbox.ConsoleApp; + +using ConsoleAppFramework; +using MessagePipe.Sandbox.ConsoleApp; using Microsoft.Extensions.Configuration; -using Microsoft.Extensions.DependencyInjection; -using Microsoft.Extensions.DependencyInjection.Extensions; -using Microsoft.Extensions.Hosting; -using Microsoft.Extensions.Logging; -using System; -using System.Collections.Generic; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.DependencyInjection.Extensions; +using Microsoft.Extensions.Hosting; +using Microsoft.Extensions.Logging; +using System; +using System.Collections.Generic; using System.Threading; -using System.Threading.Tasks; -using ZLogger; - -namespace MessagePipe -{ - public class MyOption - { - public int MyProperty { get; set; } - } - - [IgnoreAutoRegistration] +using System.Threading.Tasks; +using ZLogger; + +namespace MessagePipe +{ + public class MyOption + { + public int MyProperty { get; set; } + } + + [IgnoreAutoRegistration] public class MyGenericsHandler : IRequestHandler { public TR2 Invoke(string request) @@ -30,7 +30,7 @@ public TR2 Invoke(string request) return default(TR2); } } - //[IgnoreAutoRegistration] + //[IgnoreAutoRegistration] public class MyMyGenericsHandler : IRequestHandler { public TR2 Invoke(TR1 request) @@ -38,9 +38,9 @@ public TR2 Invoke(TR1 request) Console.WriteLine("everything default!"); return default(TR2); } - } - - [IgnoreAutoRegistration] + } + + [IgnoreAutoRegistration] public class MyGenericsHandler2 : IRequestHandler { public int Invoke(int request) @@ -48,14 +48,14 @@ public int Invoke(int request) Console.WriteLine("everything default 2!"); return default(int); } - } - - class Program : ConsoleAppBase - { - static async Task Main3(string[] args) - { - var c = new ServiceCollection(); - var p = c.BuildServiceProvider(); + } + + class Program : ConsoleAppBase + { + static async Task Main3(string[] args) + { + var c = new ServiceCollection(); + var p = c.BuildServiceProvider(); var p2 = p.GetRequiredService(); @@ -64,29 +64,29 @@ static async Task Main3(string[] args) { x.AddMessagePipe(); - }) - .Build(); // build host before run. - + }) + .Build(); // build host before run. + GlobalMessagePipe.SetProvider(host.Services); // set service provider await host.RunAsync(); // run framework. - args = new[] { "moremore" }; - - await Host.CreateDefaultBuilder() - .ConfigureServices((_, x) => + args = new[] { "moremore" }; + + await Host.CreateDefaultBuilder() + .ConfigureServices((_, x) => { - x.AddMessagePipe(options => - { - options.InstanceLifetime = InstanceLifetime.Singleton; - options.EnableCaptureStackTrace = true; - - options.AddGlobalMessageHandlerFilter(typeof(MyFilter<>)); - //options.AddGlobalMessageHandlerFilter>(); + x.AddMessagePipe(options => + { + options.InstanceLifetime = InstanceLifetime.Singleton; + options.EnableCaptureStackTrace = true; + + options.AddGlobalMessageHandlerFilter(typeof(MyFilter<>)); + //options.AddGlobalMessageHandlerFilter>(); }); @@ -98,35 +98,35 @@ await Host.CreateDefaultBuilder() //x.AddRequestHandler(typeof(MyGenericsHandler<>)); - }) - .ConfigureLogging(x => - { - x.ClearProviders(); - x.SetMinimumLevel(LogLevel.Information); - x.AddZLoggerConsole(); - }) - .RunConsoleAppFrameworkAsync(args); - } - - IPublisher publisher; - ISubscriber subscriber; - IPublisher keylessP; - ISubscriber keylessS; - IAsyncPublisher asyncKeylessP; - IAsyncSubscriber asyncKeylessS; - - - IRequestHandler pingponghandler; - IRequestAllHandler pingallhandler; - // PingHandler pingpingHandler; - - - IPublisher intPublisher; - ISubscriber intSubscriber; - - IServiceScopeFactory scopeF; - MessagePipeDiagnosticsInfo diagnosticsInfo; - + }) + .ConfigureLogging(x => + { + x.ClearProviders(); + x.SetMinimumLevel(LogLevel.Information); + x.AddZLoggerConsole(); + }) + .RunConsoleAppFrameworkAsync(args); + } + + IPublisher publisher; + ISubscriber subscriber; + IPublisher keylessP; + ISubscriber keylessS; + IAsyncPublisher asyncKeylessP; + IAsyncSubscriber asyncKeylessS; + + + IRequestHandler pingponghandler; + IRequestAllHandler pingallhandler; + // PingHandler pingpingHandler; + + + IPublisher intPublisher; + ISubscriber intSubscriber; + + IServiceScopeFactory scopeF; + MessagePipeDiagnosticsInfo diagnosticsInfo; + IServiceProvider provider; public Program( @@ -147,234 +147,234 @@ public Program( IServiceScopeFactory scopeF, MessagePipeDiagnosticsInfo diagnosticsInfo, - IServiceProvider provider - - - ) - { - this.provider = provider; - this.scopeF = scopeF; - this.publisher = publisher; - this.subscriber = subscriber; - this.keylessP = keyless1; - this.keylessS = keyless2; - this.asyncKeylessP = asyncKeylessP; - this.asyncKeylessS = asyncKeylessS; - this.pingponghandler = pingponghandler; - //this.pingpingHandler = pingpingHandler; - this.pingallhandler = pingallhandler; - this.intPublisher = intP; - this.intSubscriber = intS; - this.diagnosticsInfo = diagnosticsInfo; - - var r1 = provider.GetRequiredService>(); - r1.Invoke("foo"); - } - - [Command("keyed")] - public void Keyed() - { - this.subscriber.Subscribe("foo", x => - { - Console.WriteLine("A:" + x.MyProperty); - }); - - this.subscriber.Subscribe("foo", new MyFirst()); - - var d = this.subscriber.Subscribe("foo", x => - { - Console.WriteLine("B:" + x.MyProperty); - }); - - publisher.Publish("foo", new MyMessage() { MyProperty = "tako" }); - publisher.Publish("foo", new MyMessage() { MyProperty = "yaki" }); - - d.Dispose(); - - publisher.Publish("foo", new MyMessage() { MyProperty = "kamo" }); - } - - [Command("keyless")] - public void Keyless() - { - this.keylessS.Subscribe(x => - { - Console.WriteLine("A:" + x.MyProperty); - }); - - var d = this.keylessS.Subscribe(x => - { - Console.WriteLine("B:" + x.MyProperty); - }); - - - keylessP.Publish(new MyMessage() { MyProperty = "tako" }); - keylessP.Publish(new MyMessage() { MyProperty = "yaki" }); - - - keylessS.AsObservable(); - - - d.Dispose(); - - keylessP.Publish(new MyMessage() { MyProperty = "kamo" }); - } - - [Command("asynckeyless")] - public async Task AsyncKeyless() - { - this.asyncKeylessS.Subscribe(async (x, ct) => - { - await Task.Delay(TimeSpan.FromSeconds(2), ct); - Console.WriteLine("A:" + x.MyProperty); - }); - - var d = this.asyncKeylessS.Subscribe(async (x, ct) => - { - await Task.Delay(TimeSpan.FromSeconds(1), ct); - Console.WriteLine("B:" + x.MyProperty); - }); - - await asyncKeylessP.PublishAsync(new MyMessage() { MyProperty = "tako" }); - await asyncKeylessP.PublishAsync(new MyMessage() { MyProperty = "yaki" }); - - Console.WriteLine("here?"); - - d.Dispose(); - - await asyncKeylessP.PublishAsync(new MyMessage() { MyProperty = "kamo" }); - } - - [Command("ping")] - public void Ping() - { - Console.WriteLine("ping"); - var pong = pingponghandler.Invoke(new Ping()); - Console.WriteLine("pong"); - } - - [Command("pingmany")] - public void PingMany() - { - Console.WriteLine("ping"); - var pong = pingallhandler.InvokeAll(new Ping()); - foreach (var item in pong) - { - Console.WriteLine("pong"); - } - } - - event Action myEventAction; - - [Command("myevent")] - public void MyEvent() - { - myEventAction += () => Console.WriteLine("ev one"); - myEventAction += () => Console.WriteLine("ev two"); - myEventAction(); - - myEventAction += () => - { - Console.WriteLine("eve three and exception"); - throw new Exception("???"); - }; - - myEventAction += () => Console.WriteLine("ev four"); - myEventAction(); - } - - [Command("mydelegate")] - public void MyDelegate() - { - var d1 = new FooMore().GetDelegate(); - var d2 = new BarMore().GetDelegate(); - } - - [Command("filter")] - public void Filter() - { - this.keylessS.Subscribe(new MyFirst()); - - keylessP.Publish(new MyMessage() { MyProperty = "tako" }); - keylessP.Publish(new MyMessage() { MyProperty = "yaki" }); - } - - [Command("predicate")] - public void Pred() - { - var d = DisposableBag.CreateBuilder(); - this.keylessS.Subscribe(x => - { - Console.WriteLine("FilteredA:" + x.MyProperty); - }, x => x.MyProperty == "foo" || x.MyProperty == "hoge") - .AddTo(d); - - - this.keylessS.Subscribe(x => - { - Console.WriteLine("FilteredB:" + x.MyProperty); - }, x => x.MyProperty == "foo" || x.MyProperty == "hage").AddTo(d); - - this.keylessP.Publish(new MyMessage { MyProperty = "nano" }); - this.keylessP.Publish(new MyMessage { MyProperty = "foo" }); - this.keylessP.Publish(new MyMessage { MyProperty = "hage" }); - this.keylessP.Publish(new MyMessage { MyProperty = "hoge" }); - - this.intSubscriber.Subscribe(x => Console.WriteLine(x), x => x < 10).AddTo(d); - this.intPublisher.Publish(999); - this.intPublisher.Publish(5); - - d.Build().Dispose(); - d.Clear(); - Console.WriteLine("----"); - - intSubscriber.Subscribe(x => - { - Console.WriteLine("int one:" + x); - }, new ChangedValueFilter()); - - intPublisher.Publish(100); - intPublisher.Publish(200); - intPublisher.Publish(200); - intPublisher.Publish(299); - - - } - - [Command("checkscope")] - public void CheckScope() - { - - var scope = scopeF.CreateScope(); - - var scope2 = scopeF.CreateScope(); - - var p = scope.ServiceProvider.GetRequiredService>(); - var s = scope.ServiceProvider.GetRequiredService>(); - - var p2 = scope2.ServiceProvider.GetRequiredService>(); - var s2 = scope2.ServiceProvider.GetRequiredService>(); - - var d = s.Subscribe(x => Console.WriteLine("foo:" + x)); - var d2 = s2.Subscribe(x => Console.WriteLine("bar:" + x)); - - - - p.Publish(100); - p.Publish(200); - p.Publish(300); - p2.Publish(999); - - - scope.Dispose(); - - p.Publish(129); - s.Subscribe(_ => Console.WriteLine("s2???")); - - p2.Publish(1999); - } - - [Command("moremore")] + IServiceProvider provider + + + ) + { + this.provider = provider; + this.scopeF = scopeF; + this.publisher = publisher; + this.subscriber = subscriber; + this.keylessP = keyless1; + this.keylessS = keyless2; + this.asyncKeylessP = asyncKeylessP; + this.asyncKeylessS = asyncKeylessS; + this.pingponghandler = pingponghandler; + //this.pingpingHandler = pingpingHandler; + this.pingallhandler = pingallhandler; + this.intPublisher = intP; + this.intSubscriber = intS; + this.diagnosticsInfo = diagnosticsInfo; + + var r1 = provider.GetRequiredService>(); + r1.Invoke("foo"); + } + + [Command("keyed")] + public void Keyed() + { + this.subscriber.Subscribe("foo", x => + { + Console.WriteLine("A:" + x.MyProperty); + }); + + this.subscriber.Subscribe("foo", new MyFirst()); + + var d = this.subscriber.Subscribe("foo", x => + { + Console.WriteLine("B:" + x.MyProperty); + }); + + publisher.Publish("foo", new MyMessage() { MyProperty = "tako" }); + publisher.Publish("foo", new MyMessage() { MyProperty = "yaki" }); + + d.Dispose(); + + publisher.Publish("foo", new MyMessage() { MyProperty = "kamo" }); + } + + [Command("keyless")] + public void Keyless() + { + this.keylessS.Subscribe(x => + { + Console.WriteLine("A:" + x.MyProperty); + }); + + var d = this.keylessS.Subscribe(x => + { + Console.WriteLine("B:" + x.MyProperty); + }); + + + keylessP.Publish(new MyMessage() { MyProperty = "tako" }); + keylessP.Publish(new MyMessage() { MyProperty = "yaki" }); + + + keylessS.AsObservable(); + + + d.Dispose(); + + keylessP.Publish(new MyMessage() { MyProperty = "kamo" }); + } + + [Command("asynckeyless")] + public async Task AsyncKeyless() + { + this.asyncKeylessS.Subscribe(async (x, ct) => + { + await Task.Delay(TimeSpan.FromSeconds(2), ct); + Console.WriteLine("A:" + x.MyProperty); + }); + + var d = this.asyncKeylessS.Subscribe(async (x, ct) => + { + await Task.Delay(TimeSpan.FromSeconds(1), ct); + Console.WriteLine("B:" + x.MyProperty); + }); + + await asyncKeylessP.PublishAsync(new MyMessage() { MyProperty = "tako" }); + await asyncKeylessP.PublishAsync(new MyMessage() { MyProperty = "yaki" }); + + Console.WriteLine("here?"); + + d.Dispose(); + + await asyncKeylessP.PublishAsync(new MyMessage() { MyProperty = "kamo" }); + } + + [Command("ping")] + public void Ping() + { + Console.WriteLine("ping"); + var pong = pingponghandler.Invoke(new Ping()); + Console.WriteLine("pong"); + } + + [Command("pingmany")] + public void PingMany() + { + Console.WriteLine("ping"); + var pong = pingallhandler.InvokeAll(new Ping()); + foreach (var item in pong) + { + Console.WriteLine("pong"); + } + } + + event Action myEventAction; + + [Command("myevent")] + public void MyEvent() + { + myEventAction += () => Console.WriteLine("ev one"); + myEventAction += () => Console.WriteLine("ev two"); + myEventAction(); + + myEventAction += () => + { + Console.WriteLine("eve three and exception"); + throw new Exception("???"); + }; + + myEventAction += () => Console.WriteLine("ev four"); + myEventAction(); + } + + [Command("mydelegate")] + public void MyDelegate() + { + var d1 = new FooMore().GetDelegate(); + var d2 = new BarMore().GetDelegate(); + } + + [Command("filter")] + public void Filter() + { + this.keylessS.Subscribe(new MyFirst()); + + keylessP.Publish(new MyMessage() { MyProperty = "tako" }); + keylessP.Publish(new MyMessage() { MyProperty = "yaki" }); + } + + [Command("predicate")] + public void Pred() + { + var d = DisposableBag.CreateBuilder(); + this.keylessS.Subscribe(x => + { + Console.WriteLine("FilteredA:" + x.MyProperty); + }, x => x.MyProperty == "foo" || x.MyProperty == "hoge") + .AddTo(d); + + + this.keylessS.Subscribe(x => + { + Console.WriteLine("FilteredB:" + x.MyProperty); + }, x => x.MyProperty == "foo" || x.MyProperty == "hage").AddTo(d); + + this.keylessP.Publish(new MyMessage { MyProperty = "nano" }); + this.keylessP.Publish(new MyMessage { MyProperty = "foo" }); + this.keylessP.Publish(new MyMessage { MyProperty = "hage" }); + this.keylessP.Publish(new MyMessage { MyProperty = "hoge" }); + + this.intSubscriber.Subscribe(x => Console.WriteLine(x), x => x < 10).AddTo(d); + this.intPublisher.Publish(999); + this.intPublisher.Publish(5); + + d.Build().Dispose(); + d.Clear(); + Console.WriteLine("----"); + + intSubscriber.Subscribe(x => + { + Console.WriteLine("int one:" + x); + }, new ChangedValueFilter()); + + intPublisher.Publish(100); + intPublisher.Publish(200); + intPublisher.Publish(200); + intPublisher.Publish(299); + + + } + + [Command("checkscope")] + public void CheckScope() + { + + var scope = scopeF.CreateScope(); + + var scope2 = scopeF.CreateScope(); + + var p = scope.ServiceProvider.GetRequiredService>(); + var s = scope.ServiceProvider.GetRequiredService>(); + + var p2 = scope2.ServiceProvider.GetRequiredService>(); + var s2 = scope2.ServiceProvider.GetRequiredService>(); + + var d = s.Subscribe(x => Console.WriteLine("foo:" + x)); + var d2 = s2.Subscribe(x => Console.WriteLine("bar:" + x)); + + + + p.Publish(100); + p.Publish(200); + p.Publish(300); + p2.Publish(999); + + + scope.Dispose(); + + p.Publish(129); + s.Subscribe(_ => Console.WriteLine("s2???")); + + p2.Publish(1999); + } + + [Command("moremore")] public void CheckMoreAndMore() { var req = provider.GetRequiredService>(); @@ -385,106 +385,106 @@ public void CheckMoreAndMore() intPublisher.Publish(10); - } + } + } + + + + public class PingHandler : IRequestHandler + { + public Pong Invoke(Ping request) + { + Console.WriteLine("1 ping"); + return new Pong(); + } + } + + public class PingHandler2 : IRequestHandler + { + public Pong Invoke(Ping request) + { + Console.WriteLine("2 ping"); + return new Pong(); + } + } + + + public class MyClass + { + + } + + + public class MyFilter : MessageHandlerFilter + { + public override void Handle(T message, Action next) + { + Console.WriteLine("before:" + Order); + next(message); + Console.WriteLine("after:" + Order); + } + } + + [MessageHandlerFilter(typeof(MyFilter), Order = 30)] + [MessageHandlerFilter(typeof(MyFilter), Order = -99)] + [MessageHandlerFilter(typeof(MyFilter), Order = 1000)] + public class MyFirst : IMessageHandler + { + public void Handle(MyMessage message) + { + Console.WriteLine("YEAHHHH:" + message.MyProperty); + } + } + + + public class Ping + { + } + + public class Pong + { + } + + public class FooMore + { + public int Tako; + public int Nano; + + public Action GetDelegate() => Ahokkusu; + + public void Ahokkusu() + { + Console.WriteLine("nano"); + } + } + + public struct BarMore + { + public int Tako; + public int Nano; + + public Action GetDelegate() => Ahokkusu; + + public void Ahokkusu() + { + Console.WriteLine("nano"); + } + } + + public class MyMessage + { + public string MyProperty { get; set; } } + // .UseDistributedAsyncPublisher(); + + + // DistributedAsyncPublisher - public class PingHandler : IRequestHandler - { - public Pong Invoke(Ping request) - { - Console.WriteLine("1 ping"); - return new Pong(); - } - } - - public class PingHandler2 : IRequestHandler - { - public Pong Invoke(Ping request) - { - Console.WriteLine("2 ping"); - return new Pong(); - } - } - - - public class MyClass - { - - } - - - public class MyFilter : MessageHandlerFilter - { - public override void Handle(T message, Action next) - { - Console.WriteLine("before:" + Order); - next(message); - Console.WriteLine("after:" + Order); - } - } - - [MessageHandlerFilter(typeof(MyFilter), Order = 30)] - [MessageHandlerFilter(typeof(MyFilter), Order = -99)] - [MessageHandlerFilter(typeof(MyFilter), Order = 1000)] - public class MyFirst : IMessageHandler - { - public void Handle(MyMessage message) - { - Console.WriteLine("YEAHHHH:" + message.MyProperty); - } - } - - - public class Ping - { - } - - public class Pong - { - } - - public class FooMore - { - public int Tako; - public int Nano; - - public Action GetDelegate() => Ahokkusu; - - public void Ahokkusu() - { - Console.WriteLine("nano"); - } - } - - public struct BarMore - { - public int Tako; - public int Nano; - - public Action GetDelegate() => Ahokkusu; - - public void Ahokkusu() - { - Console.WriteLine("nano"); - } - } - - public class MyMessage - { - public string MyProperty { get; set; } - } - - // .UseDistributedAsyncPublisher(); - - - // DistributedAsyncPublisher - - - - - public class KeyedMessageBrokerCore + + + public class KeyedMessageBrokerCore { } @@ -498,8 +498,8 @@ public int Invoke(int request) Console.WriteLine("Called First!"); return request; } - } - + } + public class SecondHandler : IRequestHandler { public int Invoke(int request) @@ -516,10 +516,10 @@ public int Invoke(int request) Console.WriteLine("Called Third!"); return request; } - } - - - + } + + + public class MonitorTimer : IDisposable { CancellationTokenSource cts = new CancellationTokenSource(); @@ -543,16 +543,16 @@ public void Dispose() { cts.Cancel(); } - } - - + } + + [MessageHandlerFilter(typeof(ChangedValueFilter<>))] public class WriteLineHandler : IMessageHandler { public void Handle(T message) => Console.WriteLine(message); - } - - + } + + public class DelayRequestFilter : AsyncRequestHandlerFilter { public override async ValueTask InvokeAsync(int request, CancellationToken cancellationToken, Func> next) @@ -593,7 +593,7 @@ public ValueTask InvokeAsync(Command2 request, CancellationToken canc { return default; } - } - - -} + } + + +} diff --git a/src/MessagePipe.Unity/Assets/Plugins/MessagePipe/Runtime/AsyncMessageBroker.cs b/src/MessagePipe.Unity/Assets/Plugins/MessagePipe/Runtime/AsyncMessageBroker.cs index d176bd0..e7524e7 100644 --- a/src/MessagePipe.Unity/Assets/Plugins/MessagePipe/Runtime/AsyncMessageBroker.cs +++ b/src/MessagePipe.Unity/Assets/Plugins/MessagePipe/Runtime/AsyncMessageBroker.cs @@ -7,7 +7,7 @@ namespace MessagePipe { [Preserve] - public sealed class AsyncMessageBroker : IAsyncPublisher, IAsyncSubscriber + public class AsyncMessageBroker : IAsyncPublisher, IAsyncSubscriber { readonly AsyncMessageBrokerCore core; readonly FilterAttachedAsyncMessageHandlerFactory handlerFactory; @@ -41,90 +41,7 @@ public IDisposable Subscribe(IAsyncMessageHandler handler, AsyncMessag } [Preserve] - public sealed class BufferedAsyncMessageBroker : IBufferedAsyncPublisher, IBufferedAsyncSubscriber - { - readonly BufferedAsyncMessageBrokerCore core; - readonly FilterAttachedAsyncMessageHandlerFactory handlerFactory; - - [Preserve] - public BufferedAsyncMessageBroker(BufferedAsyncMessageBrokerCore core, FilterAttachedAsyncMessageHandlerFactory handlerFactory) - { - this.core = core; - this.handlerFactory = handlerFactory; - } - - public void Publish(TMessage message, CancellationToken cancellationToken) - { - core.Publish(message, cancellationToken); - } - - public UniTask PublishAsync(TMessage message, CancellationToken cancellationToken) - { - return core.PublishAsync(message, cancellationToken); - } - - public UniTask PublishAsync(TMessage message, AsyncPublishStrategy publishStrategy, CancellationToken cancellationToken) - { - return core.PublishAsync(message, publishStrategy, cancellationToken); - } - - public UniTask SubscribeAsync(IAsyncMessageHandler handler, CancellationToken cancellationToken) - { - return SubscribeAsync(handler, Array.Empty>(), cancellationToken); - } - - public UniTask SubscribeAsync(IAsyncMessageHandler handler, AsyncMessageHandlerFilter[] filters, CancellationToken cancellationToken) - { - handler = handlerFactory.CreateAsyncMessageHandler(handler, filters); - return core.SubscribeAsync(handler, cancellationToken); - } - } - - [Preserve] - public sealed class BufferedAsyncMessageBrokerCore - { - static readonly bool IsValueType = typeof(TMessage).IsValueType; - - readonly AsyncMessageBrokerCore core; - TMessage lastMessage; - - [Preserve] - public BufferedAsyncMessageBrokerCore(AsyncMessageBrokerCore core) - { - this.core = core; - this.lastMessage = default; - } - - public void Publish(TMessage message, CancellationToken cancellationToken) - { - lastMessage = message; - core.Publish(message, cancellationToken); - } - - public UniTask PublishAsync(TMessage message, CancellationToken cancellationToken) - { - lastMessage = message; - return core.PublishAsync(message, cancellationToken); - } - - public UniTask PublishAsync(TMessage message, AsyncPublishStrategy publishStrategy, CancellationToken cancellationToken) - { - lastMessage = message; - return core.PublishAsync(message, publishStrategy, cancellationToken); - } - - public async UniTask SubscribeAsync(IAsyncMessageHandler handler, CancellationToken cancellationToken) - { - if (IsValueType || lastMessage != null) - { - await handler.HandleAsync(lastMessage, cancellationToken); - } - return core.Subscribe(handler); - } - } - - [Preserve] - public sealed class AsyncMessageBrokerCore : IDisposable, IHandlerHolderMarker + public class AsyncMessageBrokerCore : IDisposable, IHandlerHolderMarker { FreeList> handlers; readonly MessagePipeDiagnosticsInfo diagnotics; @@ -230,4 +147,129 @@ public void Dispose() } } } + + [Preserve] + public sealed class BufferedAsyncMessageBroker : IBufferedAsyncPublisher, IBufferedAsyncSubscriber + { + readonly BufferedAsyncMessageBrokerCore core; + readonly FilterAttachedAsyncMessageHandlerFactory handlerFactory; + + [Preserve] + public BufferedAsyncMessageBroker(BufferedAsyncMessageBrokerCore core, FilterAttachedAsyncMessageHandlerFactory handlerFactory) + { + this.core = core; + this.handlerFactory = handlerFactory; + } + + public void Publish(TMessage message, CancellationToken cancellationToken) + { + core.Publish(message, cancellationToken); + } + + public UniTask PublishAsync(TMessage message, CancellationToken cancellationToken) + { + return core.PublishAsync(message, cancellationToken); + } + + public UniTask PublishAsync(TMessage message, AsyncPublishStrategy publishStrategy, CancellationToken cancellationToken) + { + return core.PublishAsync(message, publishStrategy, cancellationToken); + } + + public UniTask SubscribeAsync(IAsyncMessageHandler handler, CancellationToken cancellationToken) + { + return SubscribeAsync(handler, Array.Empty>(), cancellationToken); + } + + public UniTask SubscribeAsync(IAsyncMessageHandler handler, AsyncMessageHandlerFilter[] filters, CancellationToken cancellationToken) + { + handler = handlerFactory.CreateAsyncMessageHandler(handler, filters); + return core.SubscribeAsync(handler, cancellationToken); + } + } + + [Preserve] + public sealed class BufferedAsyncMessageBrokerCore + { + static readonly bool IsValueType = typeof(TMessage).IsValueType; + + readonly AsyncMessageBrokerCore core; + TMessage lastMessage; + + [Preserve] + public BufferedAsyncMessageBrokerCore(AsyncMessageBrokerCore core) + { + this.core = core; + this.lastMessage = default; + } + + public void Publish(TMessage message, CancellationToken cancellationToken) + { + lastMessage = message; + core.Publish(message, cancellationToken); + } + + public UniTask PublishAsync(TMessage message, CancellationToken cancellationToken) + { + lastMessage = message; + return core.PublishAsync(message, cancellationToken); + } + + public UniTask PublishAsync(TMessage message, AsyncPublishStrategy publishStrategy, CancellationToken cancellationToken) + { + lastMessage = message; + return core.PublishAsync(message, publishStrategy, cancellationToken); + } + + public async UniTask SubscribeAsync(IAsyncMessageHandler handler, CancellationToken cancellationToken) + { + if (IsValueType || lastMessage != null) + { + await handler.HandleAsync(lastMessage, cancellationToken); + } + return core.Subscribe(handler); + } + } + + // Singleton, Scoped variation + + [Preserve] + public class SingletonAsyncMessageBroker : AsyncMessageBroker, ISingletonAsyncPublisher, ISingletonAsyncSubscriber + { + [Preserve] + public SingletonAsyncMessageBroker(SingletonAsyncMessageBrokerCore core, FilterAttachedAsyncMessageHandlerFactory handlerFactory) + : base(core, handlerFactory) + { + } + } + + [Preserve] + public class ScopedAsyncMessageBroker : AsyncMessageBroker, IScopedAsyncPublisher, IScopedAsyncSubscriber + { + [Preserve] + public ScopedAsyncMessageBroker(ScopedAsyncMessageBrokerCore core, FilterAttachedAsyncMessageHandlerFactory handlerFactory) + : base(core, handlerFactory) + { + } + } + + [Preserve] + public class SingletonAsyncMessageBrokerCore : AsyncMessageBrokerCore + { + [Preserve] + public SingletonAsyncMessageBrokerCore(MessagePipeDiagnosticsInfo diagnostics, MessagePipeOptions options) + : base(diagnostics, options) + { + } + } + + [Preserve] + public class ScopedAsyncMessageBrokerCore : AsyncMessageBrokerCore + { + [Preserve] + public ScopedAsyncMessageBrokerCore(MessagePipeDiagnosticsInfo diagnostics, MessagePipeOptions options) + : base(diagnostics, options) + { + } + } } \ No newline at end of file diff --git a/src/MessagePipe.Unity/Assets/Plugins/MessagePipe/Runtime/AsyncMessageBroker_Key.cs b/src/MessagePipe.Unity/Assets/Plugins/MessagePipe/Runtime/AsyncMessageBroker_Key.cs index ea894e4..b07f1df 100644 --- a/src/MessagePipe.Unity/Assets/Plugins/MessagePipe/Runtime/AsyncMessageBroker_Key.cs +++ b/src/MessagePipe.Unity/Assets/Plugins/MessagePipe/Runtime/AsyncMessageBroker_Key.cs @@ -7,8 +7,8 @@ namespace MessagePipe { [Preserve] - public sealed class AsyncMessageBroker : IAsyncPublisher, IAsyncSubscriber - + public class AsyncMessageBroker : IAsyncPublisher, IAsyncSubscriber + { readonly AsyncMessageBrokerCore core; readonly FilterAttachedAsyncMessageHandlerFactory handlerFactory; @@ -42,8 +42,8 @@ public IDisposable Subscribe(TKey key, IAsyncMessageHandler handler, p } [Preserve] - public sealed class AsyncMessageBrokerCore : IDisposable - + public class AsyncMessageBrokerCore : IDisposable + { readonly Dictionary handlerGroup; readonly MessagePipeDiagnosticsInfo diagnotics; @@ -212,4 +212,46 @@ public void Dispose() } } } + + // Singleton, Scoped variation + + [Preserve] + public class SingletonAsyncMessageBroker : AsyncMessageBroker, ISingletonAsyncPublisher, ISingletonAsyncSubscriber + + { + public SingletonAsyncMessageBroker(SingletonAsyncMessageBrokerCore core, FilterAttachedAsyncMessageHandlerFactory handlerFactory) + : base(core, handlerFactory) + { + } + } + + [Preserve] + public class SingletonAsyncMessageBrokerCore : AsyncMessageBrokerCore + + { + public SingletonAsyncMessageBrokerCore(MessagePipeDiagnosticsInfo diagnotics, MessagePipeOptions options) + : base(diagnotics, options) + { + } + } + + [Preserve] + public class ScopedAsyncMessageBroker : AsyncMessageBroker, IScopedAsyncPublisher, IScopedAsyncSubscriber + + { + public ScopedAsyncMessageBroker(ScopedAsyncMessageBrokerCore core, FilterAttachedAsyncMessageHandlerFactory handlerFactory) + : base(core, handlerFactory) + { + } + } + + [Preserve] + public class ScopedAsyncMessageBrokerCore : AsyncMessageBrokerCore + + { + public ScopedAsyncMessageBrokerCore(MessagePipeDiagnosticsInfo diagnotics, MessagePipeOptions options) + : base(diagnotics, options) + { + } + } } \ No newline at end of file diff --git a/src/MessagePipe.Unity/Assets/Plugins/MessagePipe/Runtime/IPublisherSubscriber.cs b/src/MessagePipe.Unity/Assets/Plugins/MessagePipe/Runtime/IPublisherSubscriber.cs index c399749..8a8cd0b 100644 --- a/src/MessagePipe.Unity/Assets/Plugins/MessagePipe/Runtime/IPublisherSubscriber.cs +++ b/src/MessagePipe.Unity/Assets/Plugins/MessagePipe/Runtime/IPublisherSubscriber.cs @@ -40,6 +40,15 @@ public interface IAsyncSubscriber IDisposable Subscribe(IAsyncMessageHandler asyncHandler, params AsyncMessageHandlerFilter[] filters); } + public interface ISingletonPublisher : IPublisher { } + public interface ISingletonSubscriber : ISubscriber { } + public interface IScopedPublisher : IPublisher { } + public interface IScopedSubscriber : ISubscriber { } + public interface ISingletonAsyncPublisher : IAsyncPublisher { } + public interface ISingletonAsyncSubscriber : IAsyncSubscriber { } + public interface IScopedAsyncPublisher : IAsyncPublisher { } + public interface IScopedAsyncSubscriber : IAsyncSubscriber { } + // Keyed public interface IPublisher @@ -68,6 +77,15 @@ public interface IAsyncSubscriber IDisposable Subscribe(TKey key, IAsyncMessageHandler asyncHandler, params AsyncMessageHandlerFilter[] filters); } + public interface ISingletonPublisher : IPublisher { } + public interface ISingletonSubscriber : ISubscriber { } + public interface IScopedPublisher : IPublisher { } + public interface IScopedSubscriber : ISubscriber { } + public interface ISingletonAsyncPublisher : IAsyncPublisher { } + public interface ISingletonAsyncSubscriber : IAsyncSubscriber { } + public interface IScopedAsyncPublisher : IAsyncPublisher { } + public interface IScopedAsyncSubscriber : IAsyncSubscriber { } + // buffered keyless public interface IBufferedPublisher diff --git a/src/MessagePipe.Unity/Assets/Plugins/MessagePipe/Runtime/MessageBroker.cs b/src/MessagePipe.Unity/Assets/Plugins/MessagePipe/Runtime/MessageBroker.cs index b9c236b..ff0abfe 100644 --- a/src/MessagePipe.Unity/Assets/Plugins/MessagePipe/Runtime/MessageBroker.cs +++ b/src/MessagePipe.Unity/Assets/Plugins/MessagePipe/Runtime/MessageBroker.cs @@ -5,7 +5,7 @@ namespace MessagePipe { [Preserve] - public sealed class MessageBroker : IPublisher, ISubscriber + public class MessageBroker : IPublisher, ISubscriber { readonly MessageBrokerCore core; readonly FilterAttachedMessageHandlerFactory handlerFactory; @@ -29,62 +29,7 @@ public IDisposable Subscribe(IMessageHandler handler, params MessageHa } [Preserve] - public sealed class BufferedMessageBroker : IBufferedPublisher, IBufferedSubscriber - { - readonly BufferedMessageBrokerCore core; - readonly FilterAttachedMessageHandlerFactory handlerFactory; - - [Preserve] - public BufferedMessageBroker(BufferedMessageBrokerCore core, FilterAttachedMessageHandlerFactory handlerFactory) - { - this.core = core; - this.handlerFactory = handlerFactory; - } - - public void Publish(TMessage message) - { - core.Publish(message); - } - - public IDisposable Subscribe(IMessageHandler handler, params MessageHandlerFilter[] filters) - { - return core.Subscribe(handlerFactory.CreateMessageHandler(handler, filters)); - } - } - - [Preserve] - public sealed class BufferedMessageBrokerCore - { - static readonly bool IsValueType = typeof(TMessage).IsValueType; - - readonly MessageBrokerCore core; - TMessage lastMessage; - - [Preserve] - public BufferedMessageBrokerCore(MessageBrokerCore core) - { - this.core = core; - this.lastMessage = default; - } - - public void Publish(TMessage message) - { - lastMessage = message; - core.Publish(message); - } - - public IDisposable Subscribe(IMessageHandler handler) - { - if (IsValueType || lastMessage != null) - { - handler.Handle(lastMessage); - } - return core.Subscribe(handler); - } - } - - [Preserve] - public sealed class MessageBrokerCore : IDisposable, IHandlerHolderMarker + public class MessageBrokerCore : IDisposable, IHandlerHolderMarker { readonly FreeList> handlers; readonly MessagePipeDiagnosticsInfo diagnostics; @@ -165,4 +110,101 @@ public void Dispose() } } } + + [Preserve] + public sealed class BufferedMessageBroker : IBufferedPublisher, IBufferedSubscriber + { + readonly BufferedMessageBrokerCore core; + readonly FilterAttachedMessageHandlerFactory handlerFactory; + + [Preserve] + public BufferedMessageBroker(BufferedMessageBrokerCore core, FilterAttachedMessageHandlerFactory handlerFactory) + { + this.core = core; + this.handlerFactory = handlerFactory; + } + + public void Publish(TMessage message) + { + core.Publish(message); + } + + public IDisposable Subscribe(IMessageHandler handler, params MessageHandlerFilter[] filters) + { + return core.Subscribe(handlerFactory.CreateMessageHandler(handler, filters)); + } + } + + [Preserve] + public sealed class BufferedMessageBrokerCore + { + static readonly bool IsValueType = typeof(TMessage).IsValueType; + + readonly MessageBrokerCore core; + TMessage lastMessage; + + [Preserve] + public BufferedMessageBrokerCore(MessageBrokerCore core) + { + this.core = core; + this.lastMessage = default; + } + + public void Publish(TMessage message) + { + lastMessage = message; + core.Publish(message); + } + + public IDisposable Subscribe(IMessageHandler handler) + { + if (IsValueType || lastMessage != null) + { + handler.Handle(lastMessage); + } + return core.Subscribe(handler); + } + } + + // Singleton, Scoped variation + + [Preserve] + public class SingletonMessageBroker : MessageBroker, ISingletonPublisher, ISingletonSubscriber + { + [Preserve] + public SingletonMessageBroker(SingletonMessageBrokerCore core, FilterAttachedMessageHandlerFactory handlerFactory) + : base(core, handlerFactory) + { + } + } + + [Preserve] + public class ScopedMessageBroker : MessageBroker, IScopedPublisher, IScopedSubscriber + { + [Preserve] + public ScopedMessageBroker(ScopedMessageBrokerCore core, FilterAttachedMessageHandlerFactory handlerFactory) + : base(core, handlerFactory) + { + } + } + + [Preserve] + public class SingletonMessageBrokerCore : MessageBrokerCore + { + [Preserve] + public SingletonMessageBrokerCore(MessagePipeDiagnosticsInfo diagnostics, MessagePipeOptions options) + : base(diagnostics, options) + { + } + } + + [Preserve] + public class ScopedMessageBrokerCore : MessageBrokerCore + { + [Preserve] + public ScopedMessageBrokerCore(MessagePipeDiagnosticsInfo diagnostics, MessagePipeOptions options) + : base(diagnostics, options) + { + } + } } \ No newline at end of file diff --git a/src/MessagePipe.Unity/Assets/Plugins/MessagePipe/Runtime/MessageBroker_Key.cs b/src/MessagePipe.Unity/Assets/Plugins/MessagePipe/Runtime/MessageBroker_Key.cs index dd9b958..3de13de 100644 --- a/src/MessagePipe.Unity/Assets/Plugins/MessagePipe/Runtime/MessageBroker_Key.cs +++ b/src/MessagePipe.Unity/Assets/Plugins/MessagePipe/Runtime/MessageBroker_Key.cs @@ -5,8 +5,8 @@ namespace MessagePipe { [Preserve] - public sealed class MessageBroker : IPublisher, ISubscriber - + public class MessageBroker : IPublisher, ISubscriber + { readonly MessageBrokerCore core; readonly FilterAttachedMessageHandlerFactory handlerFactory; @@ -30,7 +30,7 @@ public IDisposable Subscribe(TKey key, IMessageHandler handler, params } [Preserve] - public sealed class MessageBrokerCore : IDisposable + public class MessageBrokerCore : IDisposable { readonly Dictionary handlerGroup; @@ -165,4 +165,46 @@ public void Dispose() } } } + + // Singleton, Scoped variation + + [Preserve] + public class SingletonMessageBroker : MessageBroker, ISingletonPublisher, ISingletonSubscriber + + { + public SingletonMessageBroker(SingletonMessageBrokerCore core, FilterAttachedMessageHandlerFactory handlerFactory) + : base(core, handlerFactory) + { + } + } + + [Preserve] + public class SingletonMessageBrokerCore : MessageBrokerCore + + { + public SingletonMessageBrokerCore(MessagePipeDiagnosticsInfo diagnotics, MessagePipeOptions options) + : base(diagnotics, options) + { + } + } + + [Preserve] + public class ScopedMessageBroker : MessageBroker, IScopedPublisher, IScopedSubscriber + + { + public ScopedMessageBroker(ScopedMessageBrokerCore core, FilterAttachedMessageHandlerFactory handlerFactory) + : base(core, handlerFactory) + { + } + } + + [Preserve] + public class ScopedMessageBrokerCore : MessageBrokerCore + + { + public ScopedMessageBrokerCore(MessagePipeDiagnosticsInfo diagnotics, MessagePipeOptions options) + : base(diagnotics, options) + { + } + } } \ No newline at end of file diff --git a/src/MessagePipe.Unity/Assets/Plugins/MessagePipe/Runtime/ServiceCollectionExtensions.cs b/src/MessagePipe.Unity/Assets/Plugins/MessagePipe/Runtime/ServiceCollectionExtensions.cs index 17429a5..19a4905 100644 --- a/src/MessagePipe.Unity/Assets/Plugins/MessagePipe/Runtime/ServiceCollectionExtensions.cs +++ b/src/MessagePipe.Unity/Assets/Plugins/MessagePipe/Runtime/ServiceCollectionExtensions.cs @@ -54,6 +54,14 @@ public static IServiceCollection AddMessagePipe(this IServiceCollection services services.Add(typeof(IBufferedPublisher<>), typeof(BufferedMessageBroker<>), lifetime); services.Add(typeof(IBufferedSubscriber<>), typeof(BufferedMessageBroker<>), lifetime); + // keyless-variation + services.Add(typeof(SingletonMessageBrokerCore<>), InstanceLifetime.Singleton); + services.Add(typeof(ISingletonPublisher<>), typeof(SingletonMessageBroker<>), InstanceLifetime.Singleton); + services.Add(typeof(ISingletonSubscriber<>), typeof(SingletonMessageBroker<>), InstanceLifetime.Singleton); + services.Add(typeof(ScopedMessageBrokerCore<>), InstanceLifetime.Scoped); + services.Add(typeof(IScopedPublisher<>), typeof(ScopedMessageBroker<>), InstanceLifetime.Scoped); + services.Add(typeof(IScopedSubscriber<>), typeof(ScopedMessageBroker<>), InstanceLifetime.Scoped); + // keyless PubSub async services.Add(typeof(AsyncMessageBrokerCore<>), lifetime); services.Add(typeof(IAsyncPublisher<>), typeof(AsyncMessageBroker<>), lifetime); @@ -62,15 +70,39 @@ public static IServiceCollection AddMessagePipe(this IServiceCollection services services.Add(typeof(IBufferedAsyncPublisher<>), typeof(BufferedAsyncMessageBroker<>), lifetime); services.Add(typeof(IBufferedAsyncSubscriber<>), typeof(BufferedAsyncMessageBroker<>), lifetime); + // keyless-async-variation + services.Add(typeof(SingletonAsyncMessageBrokerCore<>), InstanceLifetime.Singleton); + services.Add(typeof(ISingletonAsyncPublisher<>), typeof(SingletonAsyncMessageBroker<>), InstanceLifetime.Singleton); + services.Add(typeof(ISingletonAsyncSubscriber<>), typeof(SingletonAsyncMessageBroker<>), InstanceLifetime.Singleton); + services.Add(typeof(ScopedAsyncMessageBrokerCore<>), InstanceLifetime.Scoped); + services.Add(typeof(IScopedAsyncPublisher<>), typeof(ScopedAsyncMessageBroker<>), InstanceLifetime.Scoped); + services.Add(typeof(IScopedAsyncSubscriber<>), typeof(ScopedAsyncMessageBroker<>), InstanceLifetime.Scoped); + // keyed PubSub services.Add(typeof(MessageBrokerCore<,>), lifetime); services.Add(typeof(IPublisher<,>), typeof(MessageBroker<,>), lifetime); services.Add(typeof(ISubscriber<,>), typeof(MessageBroker<,>), lifetime); + // keyed-variation + services.Add(typeof(SingletonMessageBrokerCore<,>), InstanceLifetime.Singleton); + services.Add(typeof(ISingletonPublisher<,>), typeof(SingletonMessageBroker<,>), InstanceLifetime.Singleton); + services.Add(typeof(ISingletonSubscriber<,>), typeof(SingletonMessageBroker<,>), InstanceLifetime.Singleton); + services.Add(typeof(ScopedMessageBrokerCore<,>), InstanceLifetime.Scoped); + services.Add(typeof(IScopedPublisher<,>), typeof(ScopedMessageBroker<,>), InstanceLifetime.Scoped); + services.Add(typeof(IScopedSubscriber<,>), typeof(ScopedMessageBroker<,>), InstanceLifetime.Scoped); + // keyed PubSub async services.Add(typeof(AsyncMessageBrokerCore<,>), lifetime); services.Add(typeof(IAsyncPublisher<,>), typeof(AsyncMessageBroker<,>), lifetime); services.Add(typeof(IAsyncSubscriber<,>), typeof(AsyncMessageBroker<,>), lifetime); + + // keyed-async-variation + services.Add(typeof(SingletonAsyncMessageBrokerCore<,>), InstanceLifetime.Singleton); + services.Add(typeof(ISingletonAsyncPublisher<,>), typeof(SingletonAsyncMessageBroker<,>), InstanceLifetime.Singleton); + services.Add(typeof(ISingletonAsyncSubscriber<,>), typeof(SingletonAsyncMessageBroker<,>), InstanceLifetime.Singleton); + services.Add(typeof(ScopedAsyncMessageBrokerCore<,>), InstanceLifetime.Scoped); + services.Add(typeof(IScopedAsyncPublisher<,>), typeof(ScopedAsyncMessageBroker<,>), InstanceLifetime.Scoped); + services.Add(typeof(IScopedAsyncSubscriber<,>), typeof(ScopedAsyncMessageBroker<,>), InstanceLifetime.Scoped); } var lifetime2 = options.RequestHandlerLifetime; // requesthandler lifetime @@ -194,7 +226,7 @@ static IServiceCollection AddRequestHandlerCore(IServiceCollection services, Typ { throw new ArgumentException($"{type.FullName} does not implement {coreType.Name.Replace("Core", "")}."); } - else if(isAsync) + else if (isAsync) { AsyncRequestHandlerRegistory.Add(coreType); } @@ -298,7 +330,7 @@ static void AddRequestHandlerAndFilterFromTypes(IServiceCollection services, Ins } } - NEXT_TYPE: + NEXT_TYPE: continue; } } diff --git a/src/MessagePipe.Unity/Assets/Plugins/MessagePipe/Runtime/SubscriberExtensions.FirstAsync.cs b/src/MessagePipe.Unity/Assets/Plugins/MessagePipe/Runtime/SubscriberExtensions.FirstAsync.cs index b2e0b8e..855f7f1 100644 --- a/src/MessagePipe.Unity/Assets/Plugins/MessagePipe/Runtime/SubscriberExtensions.FirstAsync.cs +++ b/src/MessagePipe.Unity/Assets/Plugins/MessagePipe/Runtime/SubscriberExtensions.FirstAsync.cs @@ -168,7 +168,7 @@ public void Handle(TMessage message) void IUniTaskSource.GetResult(short token) => GetResult(token); public UniTaskStatus UnsafeGetStatus() => core.UnsafeGetStatus(); - public /*replaced*/ UniTaskStatus GetStatus(short token) + public UniTaskStatus GetStatus(short token) { return core.GetStatus(token); } @@ -249,7 +249,7 @@ public void Handle(TMessage message) void IUniTaskSource.GetResult(short token) => GetResult(token); public UniTaskStatus UnsafeGetStatus() => core.UnsafeGetStatus(); - public /*replaced*/ UniTaskStatus GetStatus(short token) + public UniTaskStatus GetStatus(short token) { return core.GetStatus(token); } @@ -331,7 +331,7 @@ public void Handle(TMessage message) void IUniTaskSource.GetResult(short token) => GetResult(token); public UniTaskStatus UnsafeGetStatus() => core.UnsafeGetStatus(); - public /*replaced*/ UniTaskStatus GetStatus(short token) + public UniTaskStatus GetStatus(short token) { return core.GetStatus(token); } @@ -422,7 +422,7 @@ public UniTask HandleAsync(TMessage message, CancellationToken cancellationToken void IUniTaskSource.GetResult(short token) => GetResult(token); public UniTaskStatus UnsafeGetStatus() => core.UnsafeGetStatus(); - public /*replaced*/ UniTaskStatus GetStatus(short token) + public UniTaskStatus GetStatus(short token) { return core.GetStatus(token); } @@ -512,7 +512,7 @@ public UniTask HandleAsync(TMessage message, CancellationToken cancellationToken void IUniTaskSource.GetResult(short token) => GetResult(token); public UniTaskStatus UnsafeGetStatus() => core.UnsafeGetStatus(); - public /*replaced*/ UniTaskStatus GetStatus(short token) + public UniTaskStatus GetStatus(short token) { return core.GetStatus(token); } @@ -604,7 +604,7 @@ public UniTask HandleAsync(TMessage message, CancellationToken cancellationToken void IUniTaskSource.GetResult(short token) => GetResult(token); public UniTaskStatus UnsafeGetStatus() => core.UnsafeGetStatus(); - public /*replaced*/ UniTaskStatus GetStatus(short token) + public UniTaskStatus GetStatus(short token) { return core.GetStatus(token); } diff --git a/src/MessagePipe/AsyncMessageBroker.cs b/src/MessagePipe/AsyncMessageBroker.cs index 46640cf..5888b11 100644 --- a/src/MessagePipe/AsyncMessageBroker.cs +++ b/src/MessagePipe/AsyncMessageBroker.cs @@ -7,7 +7,7 @@ namespace MessagePipe { [Preserve] - public sealed class AsyncMessageBroker : IAsyncPublisher, IAsyncSubscriber + public class AsyncMessageBroker : IAsyncPublisher, IAsyncSubscriber { readonly AsyncMessageBrokerCore core; readonly FilterAttachedAsyncMessageHandlerFactory handlerFactory; @@ -41,90 +41,7 @@ public IDisposable Subscribe(IAsyncMessageHandler handler, AsyncMessag } [Preserve] - public sealed class BufferedAsyncMessageBroker : IBufferedAsyncPublisher, IBufferedAsyncSubscriber - { - readonly BufferedAsyncMessageBrokerCore core; - readonly FilterAttachedAsyncMessageHandlerFactory handlerFactory; - - [Preserve] - public BufferedAsyncMessageBroker(BufferedAsyncMessageBrokerCore core, FilterAttachedAsyncMessageHandlerFactory handlerFactory) - { - this.core = core; - this.handlerFactory = handlerFactory; - } - - public void Publish(TMessage message, CancellationToken cancellationToken) - { - core.Publish(message, cancellationToken); - } - - public ValueTask PublishAsync(TMessage message, CancellationToken cancellationToken) - { - return core.PublishAsync(message, cancellationToken); - } - - public ValueTask PublishAsync(TMessage message, AsyncPublishStrategy publishStrategy, CancellationToken cancellationToken) - { - return core.PublishAsync(message, publishStrategy, cancellationToken); - } - - public ValueTask SubscribeAsync(IAsyncMessageHandler handler, CancellationToken cancellationToken) - { - return SubscribeAsync(handler, Array.Empty>(), cancellationToken); - } - - public ValueTask SubscribeAsync(IAsyncMessageHandler handler, AsyncMessageHandlerFilter[] filters, CancellationToken cancellationToken) - { - handler = handlerFactory.CreateAsyncMessageHandler(handler, filters); - return core.SubscribeAsync(handler, cancellationToken); - } - } - - [Preserve] - public sealed class BufferedAsyncMessageBrokerCore - { - static readonly bool IsValueType = typeof(TMessage).IsValueType; - - readonly AsyncMessageBrokerCore core; - TMessage? lastMessage; - - [Preserve] - public BufferedAsyncMessageBrokerCore(AsyncMessageBrokerCore core) - { - this.core = core; - this.lastMessage = default; - } - - public void Publish(TMessage message, CancellationToken cancellationToken) - { - lastMessage = message; - core.Publish(message, cancellationToken); - } - - public ValueTask PublishAsync(TMessage message, CancellationToken cancellationToken) - { - lastMessage = message; - return core.PublishAsync(message, cancellationToken); - } - - public ValueTask PublishAsync(TMessage message, AsyncPublishStrategy publishStrategy, CancellationToken cancellationToken) - { - lastMessage = message; - return core.PublishAsync(message, publishStrategy, cancellationToken); - } - - public async ValueTask SubscribeAsync(IAsyncMessageHandler handler, CancellationToken cancellationToken) - { - if (IsValueType || lastMessage != null) - { - await handler.HandleAsync(lastMessage!, cancellationToken); - } - return core.Subscribe(handler); - } - } - - [Preserve] - public sealed class AsyncMessageBrokerCore : IDisposable, IHandlerHolderMarker + public class AsyncMessageBrokerCore : IDisposable, IHandlerHolderMarker { FreeList> handlers; readonly MessagePipeDiagnosticsInfo diagnotics; @@ -230,4 +147,129 @@ public void Dispose() } } } + + [Preserve] + public sealed class BufferedAsyncMessageBroker : IBufferedAsyncPublisher, IBufferedAsyncSubscriber + { + readonly BufferedAsyncMessageBrokerCore core; + readonly FilterAttachedAsyncMessageHandlerFactory handlerFactory; + + [Preserve] + public BufferedAsyncMessageBroker(BufferedAsyncMessageBrokerCore core, FilterAttachedAsyncMessageHandlerFactory handlerFactory) + { + this.core = core; + this.handlerFactory = handlerFactory; + } + + public void Publish(TMessage message, CancellationToken cancellationToken) + { + core.Publish(message, cancellationToken); + } + + public ValueTask PublishAsync(TMessage message, CancellationToken cancellationToken) + { + return core.PublishAsync(message, cancellationToken); + } + + public ValueTask PublishAsync(TMessage message, AsyncPublishStrategy publishStrategy, CancellationToken cancellationToken) + { + return core.PublishAsync(message, publishStrategy, cancellationToken); + } + + public ValueTask SubscribeAsync(IAsyncMessageHandler handler, CancellationToken cancellationToken) + { + return SubscribeAsync(handler, Array.Empty>(), cancellationToken); + } + + public ValueTask SubscribeAsync(IAsyncMessageHandler handler, AsyncMessageHandlerFilter[] filters, CancellationToken cancellationToken) + { + handler = handlerFactory.CreateAsyncMessageHandler(handler, filters); + return core.SubscribeAsync(handler, cancellationToken); + } + } + + [Preserve] + public sealed class BufferedAsyncMessageBrokerCore + { + static readonly bool IsValueType = typeof(TMessage).IsValueType; + + readonly AsyncMessageBrokerCore core; + TMessage? lastMessage; + + [Preserve] + public BufferedAsyncMessageBrokerCore(AsyncMessageBrokerCore core) + { + this.core = core; + this.lastMessage = default; + } + + public void Publish(TMessage message, CancellationToken cancellationToken) + { + lastMessage = message; + core.Publish(message, cancellationToken); + } + + public ValueTask PublishAsync(TMessage message, CancellationToken cancellationToken) + { + lastMessage = message; + return core.PublishAsync(message, cancellationToken); + } + + public ValueTask PublishAsync(TMessage message, AsyncPublishStrategy publishStrategy, CancellationToken cancellationToken) + { + lastMessage = message; + return core.PublishAsync(message, publishStrategy, cancellationToken); + } + + public async ValueTask SubscribeAsync(IAsyncMessageHandler handler, CancellationToken cancellationToken) + { + if (IsValueType || lastMessage != null) + { + await handler.HandleAsync(lastMessage!, cancellationToken); + } + return core.Subscribe(handler); + } + } + + // Singleton, Scoped variation + + [Preserve] + public class SingletonAsyncMessageBroker : AsyncMessageBroker, ISingletonAsyncPublisher, ISingletonAsyncSubscriber + { + [Preserve] + public SingletonAsyncMessageBroker(SingletonAsyncMessageBrokerCore core, FilterAttachedAsyncMessageHandlerFactory handlerFactory) + : base(core, handlerFactory) + { + } + } + + [Preserve] + public class ScopedAsyncMessageBroker : AsyncMessageBroker, IScopedAsyncPublisher, IScopedAsyncSubscriber + { + [Preserve] + public ScopedAsyncMessageBroker(ScopedAsyncMessageBrokerCore core, FilterAttachedAsyncMessageHandlerFactory handlerFactory) + : base(core, handlerFactory) + { + } + } + + [Preserve] + public class SingletonAsyncMessageBrokerCore : AsyncMessageBrokerCore + { + [Preserve] + public SingletonAsyncMessageBrokerCore(MessagePipeDiagnosticsInfo diagnostics, MessagePipeOptions options) + : base(diagnostics, options) + { + } + } + + [Preserve] + public class ScopedAsyncMessageBrokerCore : AsyncMessageBrokerCore + { + [Preserve] + public ScopedAsyncMessageBrokerCore(MessagePipeDiagnosticsInfo diagnostics, MessagePipeOptions options) + : base(diagnostics, options) + { + } + } } \ No newline at end of file diff --git a/src/MessagePipe/AsyncMessageBroker_Key.cs b/src/MessagePipe/AsyncMessageBroker_Key.cs index ff0c91d..8f87d68 100644 --- a/src/MessagePipe/AsyncMessageBroker_Key.cs +++ b/src/MessagePipe/AsyncMessageBroker_Key.cs @@ -7,8 +7,8 @@ namespace MessagePipe { [Preserve] - public sealed class AsyncMessageBroker : IAsyncPublisher, IAsyncSubscriber - where TKey : notnull + public class AsyncMessageBroker : IAsyncPublisher, IAsyncSubscriber + where TKey : notnull { readonly AsyncMessageBrokerCore core; readonly FilterAttachedAsyncMessageHandlerFactory handlerFactory; @@ -42,8 +42,8 @@ public IDisposable Subscribe(TKey key, IAsyncMessageHandler handler, p } [Preserve] - public sealed class AsyncMessageBrokerCore : IDisposable - where TKey : notnull + public class AsyncMessageBrokerCore : IDisposable + where TKey : notnull { readonly Dictionary handlerGroup; readonly MessagePipeDiagnosticsInfo diagnotics; @@ -212,4 +212,46 @@ public void Dispose() } } } + + // Singleton, Scoped variation + + [Preserve] + public class SingletonAsyncMessageBroker : AsyncMessageBroker, ISingletonAsyncPublisher, ISingletonAsyncSubscriber + where TKey : notnull + { + public SingletonAsyncMessageBroker(SingletonAsyncMessageBrokerCore core, FilterAttachedAsyncMessageHandlerFactory handlerFactory) + : base(core, handlerFactory) + { + } + } + + [Preserve] + public class SingletonAsyncMessageBrokerCore : AsyncMessageBrokerCore + where TKey : notnull + { + public SingletonAsyncMessageBrokerCore(MessagePipeDiagnosticsInfo diagnotics, MessagePipeOptions options) + : base(diagnotics, options) + { + } + } + + [Preserve] + public class ScopedAsyncMessageBroker : AsyncMessageBroker, IScopedAsyncPublisher, IScopedAsyncSubscriber + where TKey : notnull + { + public ScopedAsyncMessageBroker(ScopedAsyncMessageBrokerCore core, FilterAttachedAsyncMessageHandlerFactory handlerFactory) + : base(core, handlerFactory) + { + } + } + + [Preserve] + public class ScopedAsyncMessageBrokerCore : AsyncMessageBrokerCore + where TKey : notnull + { + public ScopedAsyncMessageBrokerCore(MessagePipeDiagnosticsInfo diagnotics, MessagePipeOptions options) + : base(diagnotics, options) + { + } + } } \ No newline at end of file diff --git a/src/MessagePipe/IPublisherSubscriber.cs b/src/MessagePipe/IPublisherSubscriber.cs index fc5b50e..10a0e56 100644 --- a/src/MessagePipe/IPublisherSubscriber.cs +++ b/src/MessagePipe/IPublisherSubscriber.cs @@ -40,6 +40,15 @@ public interface IAsyncSubscriber IDisposable Subscribe(IAsyncMessageHandler asyncHandler, params AsyncMessageHandlerFilter[] filters); } + public interface ISingletonPublisher : IPublisher { } + public interface ISingletonSubscriber : ISubscriber { } + public interface IScopedPublisher : IPublisher { } + public interface IScopedSubscriber : ISubscriber { } + public interface ISingletonAsyncPublisher : IAsyncPublisher { } + public interface ISingletonAsyncSubscriber : IAsyncSubscriber { } + public interface IScopedAsyncPublisher : IAsyncPublisher { } + public interface IScopedAsyncSubscriber : IAsyncSubscriber { } + // Keyed public interface IPublisher @@ -68,6 +77,15 @@ public interface IAsyncSubscriber IDisposable Subscribe(TKey key, IAsyncMessageHandler asyncHandler, params AsyncMessageHandlerFilter[] filters); } + public interface ISingletonPublisher : IPublisher where TKey : notnull { } + public interface ISingletonSubscriber : ISubscriber where TKey : notnull { } + public interface IScopedPublisher : IPublisher where TKey : notnull { } + public interface IScopedSubscriber : ISubscriber where TKey : notnull { } + public interface ISingletonAsyncPublisher : IAsyncPublisher where TKey : notnull { } + public interface ISingletonAsyncSubscriber : IAsyncSubscriber where TKey : notnull { } + public interface IScopedAsyncPublisher : IAsyncPublisher where TKey : notnull { } + public interface IScopedAsyncSubscriber : IAsyncSubscriber where TKey : notnull { } + // buffered keyless public interface IBufferedPublisher diff --git a/src/MessagePipe/MessageBroker.cs b/src/MessagePipe/MessageBroker.cs index 033896a..58c96b8 100644 --- a/src/MessagePipe/MessageBroker.cs +++ b/src/MessagePipe/MessageBroker.cs @@ -5,7 +5,7 @@ namespace MessagePipe { [Preserve] - public sealed class MessageBroker : IPublisher, ISubscriber + public class MessageBroker : IPublisher, ISubscriber { readonly MessageBrokerCore core; readonly FilterAttachedMessageHandlerFactory handlerFactory; @@ -29,62 +29,7 @@ public IDisposable Subscribe(IMessageHandler handler, params MessageHa } [Preserve] - public sealed class BufferedMessageBroker : IBufferedPublisher, IBufferedSubscriber - { - readonly BufferedMessageBrokerCore core; - readonly FilterAttachedMessageHandlerFactory handlerFactory; - - [Preserve] - public BufferedMessageBroker(BufferedMessageBrokerCore core, FilterAttachedMessageHandlerFactory handlerFactory) - { - this.core = core; - this.handlerFactory = handlerFactory; - } - - public void Publish(TMessage message) - { - core.Publish(message); - } - - public IDisposable Subscribe(IMessageHandler handler, params MessageHandlerFilter[] filters) - { - return core.Subscribe(handlerFactory.CreateMessageHandler(handler, filters)); - } - } - - [Preserve] - public sealed class BufferedMessageBrokerCore - { - static readonly bool IsValueType = typeof(TMessage).IsValueType; - - readonly MessageBrokerCore core; - TMessage? lastMessage; - - [Preserve] - public BufferedMessageBrokerCore(MessageBrokerCore core) - { - this.core = core; - this.lastMessage = default; - } - - public void Publish(TMessage message) - { - lastMessage = message; - core.Publish(message); - } - - public IDisposable Subscribe(IMessageHandler handler) - { - if (IsValueType || lastMessage != null) - { - handler.Handle(lastMessage!); - } - return core.Subscribe(handler); - } - } - - [Preserve] - public sealed class MessageBrokerCore : IDisposable, IHandlerHolderMarker + public class MessageBrokerCore : IDisposable, IHandlerHolderMarker { readonly FreeList> handlers; readonly MessagePipeDiagnosticsInfo diagnostics; @@ -165,4 +110,101 @@ public void Dispose() } } } + + [Preserve] + public sealed class BufferedMessageBroker : IBufferedPublisher, IBufferedSubscriber + { + readonly BufferedMessageBrokerCore core; + readonly FilterAttachedMessageHandlerFactory handlerFactory; + + [Preserve] + public BufferedMessageBroker(BufferedMessageBrokerCore core, FilterAttachedMessageHandlerFactory handlerFactory) + { + this.core = core; + this.handlerFactory = handlerFactory; + } + + public void Publish(TMessage message) + { + core.Publish(message); + } + + public IDisposable Subscribe(IMessageHandler handler, params MessageHandlerFilter[] filters) + { + return core.Subscribe(handlerFactory.CreateMessageHandler(handler, filters)); + } + } + + [Preserve] + public sealed class BufferedMessageBrokerCore + { + static readonly bool IsValueType = typeof(TMessage).IsValueType; + + readonly MessageBrokerCore core; + TMessage? lastMessage; + + [Preserve] + public BufferedMessageBrokerCore(MessageBrokerCore core) + { + this.core = core; + this.lastMessage = default; + } + + public void Publish(TMessage message) + { + lastMessage = message; + core.Publish(message); + } + + public IDisposable Subscribe(IMessageHandler handler) + { + if (IsValueType || lastMessage != null) + { + handler.Handle(lastMessage!); + } + return core.Subscribe(handler); + } + } + + // Singleton, Scoped variation + + [Preserve] + public class SingletonMessageBroker : MessageBroker, ISingletonPublisher, ISingletonSubscriber + { + [Preserve] + public SingletonMessageBroker(SingletonMessageBrokerCore core, FilterAttachedMessageHandlerFactory handlerFactory) + : base(core, handlerFactory) + { + } + } + + [Preserve] + public class ScopedMessageBroker : MessageBroker, IScopedPublisher, IScopedSubscriber + { + [Preserve] + public ScopedMessageBroker(ScopedMessageBrokerCore core, FilterAttachedMessageHandlerFactory handlerFactory) + : base(core, handlerFactory) + { + } + } + + [Preserve] + public class SingletonMessageBrokerCore : MessageBrokerCore + { + [Preserve] + public SingletonMessageBrokerCore(MessagePipeDiagnosticsInfo diagnostics, MessagePipeOptions options) + : base(diagnostics, options) + { + } + } + + [Preserve] + public class ScopedMessageBrokerCore : MessageBrokerCore + { + [Preserve] + public ScopedMessageBrokerCore(MessagePipeDiagnosticsInfo diagnostics, MessagePipeOptions options) + : base(diagnostics, options) + { + } + } } \ No newline at end of file diff --git a/src/MessagePipe/MessageBroker_Key.cs b/src/MessagePipe/MessageBroker_Key.cs index 7348bf5..a5cf015 100644 --- a/src/MessagePipe/MessageBroker_Key.cs +++ b/src/MessagePipe/MessageBroker_Key.cs @@ -5,8 +5,8 @@ namespace MessagePipe { [Preserve] - public sealed class MessageBroker : IPublisher, ISubscriber - where TKey : notnull + public class MessageBroker : IPublisher, ISubscriber + where TKey : notnull { readonly MessageBrokerCore core; readonly FilterAttachedMessageHandlerFactory handlerFactory; @@ -30,7 +30,7 @@ public IDisposable Subscribe(TKey key, IMessageHandler handler, params } [Preserve] - public sealed class MessageBrokerCore : IDisposable + public class MessageBrokerCore : IDisposable where TKey : notnull { readonly Dictionary handlerGroup; @@ -165,4 +165,46 @@ public void Dispose() } } } + + // Singleton, Scoped variation + + [Preserve] + public class SingletonMessageBroker : MessageBroker, ISingletonPublisher, ISingletonSubscriber + where TKey : notnull + { + public SingletonMessageBroker(SingletonMessageBrokerCore core, FilterAttachedMessageHandlerFactory handlerFactory) + : base(core, handlerFactory) + { + } + } + + [Preserve] + public class SingletonMessageBrokerCore : MessageBrokerCore + where TKey : notnull + { + public SingletonMessageBrokerCore(MessagePipeDiagnosticsInfo diagnotics, MessagePipeOptions options) + : base(diagnotics, options) + { + } + } + + [Preserve] + public class ScopedMessageBroker : MessageBroker, IScopedPublisher, IScopedSubscriber + where TKey : notnull + { + public ScopedMessageBroker(ScopedMessageBrokerCore core, FilterAttachedMessageHandlerFactory handlerFactory) + : base(core, handlerFactory) + { + } + } + + [Preserve] + public class ScopedMessageBrokerCore : MessageBrokerCore + where TKey : notnull + { + public ScopedMessageBrokerCore(MessagePipeDiagnosticsInfo diagnotics, MessagePipeOptions options) + : base(diagnotics, options) + { + } + } } \ No newline at end of file diff --git a/src/MessagePipe/ServiceCollectionExtensions.cs b/src/MessagePipe/ServiceCollectionExtensions.cs index ad628fe..4712b31 100644 --- a/src/MessagePipe/ServiceCollectionExtensions.cs +++ b/src/MessagePipe/ServiceCollectionExtensions.cs @@ -54,6 +54,14 @@ public static IServiceCollection AddMessagePipe(this IServiceCollection services services.Add(typeof(IBufferedPublisher<>), typeof(BufferedMessageBroker<>), lifetime); services.Add(typeof(IBufferedSubscriber<>), typeof(BufferedMessageBroker<>), lifetime); + // keyless-variation + services.Add(typeof(SingletonMessageBrokerCore<>), InstanceLifetime.Singleton); + services.Add(typeof(ISingletonPublisher<>), typeof(SingletonMessageBroker<>), InstanceLifetime.Singleton); + services.Add(typeof(ISingletonSubscriber<>), typeof(SingletonMessageBroker<>), InstanceLifetime.Singleton); + services.Add(typeof(ScopedMessageBrokerCore<>), InstanceLifetime.Scoped); + services.Add(typeof(IScopedPublisher<>), typeof(ScopedMessageBroker<>), InstanceLifetime.Scoped); + services.Add(typeof(IScopedSubscriber<>), typeof(ScopedMessageBroker<>), InstanceLifetime.Scoped); + // keyless PubSub async services.Add(typeof(AsyncMessageBrokerCore<>), lifetime); services.Add(typeof(IAsyncPublisher<>), typeof(AsyncMessageBroker<>), lifetime); @@ -62,15 +70,39 @@ public static IServiceCollection AddMessagePipe(this IServiceCollection services services.Add(typeof(IBufferedAsyncPublisher<>), typeof(BufferedAsyncMessageBroker<>), lifetime); services.Add(typeof(IBufferedAsyncSubscriber<>), typeof(BufferedAsyncMessageBroker<>), lifetime); + // keyless-async-variation + services.Add(typeof(SingletonAsyncMessageBrokerCore<>), InstanceLifetime.Singleton); + services.Add(typeof(ISingletonAsyncPublisher<>), typeof(SingletonAsyncMessageBroker<>), InstanceLifetime.Singleton); + services.Add(typeof(ISingletonAsyncSubscriber<>), typeof(SingletonAsyncMessageBroker<>), InstanceLifetime.Singleton); + services.Add(typeof(ScopedAsyncMessageBrokerCore<>), InstanceLifetime.Scoped); + services.Add(typeof(IScopedAsyncPublisher<>), typeof(ScopedAsyncMessageBroker<>), InstanceLifetime.Scoped); + services.Add(typeof(IScopedAsyncSubscriber<>), typeof(ScopedAsyncMessageBroker<>), InstanceLifetime.Scoped); + // keyed PubSub services.Add(typeof(MessageBrokerCore<,>), lifetime); services.Add(typeof(IPublisher<,>), typeof(MessageBroker<,>), lifetime); services.Add(typeof(ISubscriber<,>), typeof(MessageBroker<,>), lifetime); + // keyed-variation + services.Add(typeof(SingletonMessageBrokerCore<,>), InstanceLifetime.Singleton); + services.Add(typeof(ISingletonPublisher<,>), typeof(SingletonMessageBroker<,>), InstanceLifetime.Singleton); + services.Add(typeof(ISingletonSubscriber<,>), typeof(SingletonMessageBroker<,>), InstanceLifetime.Singleton); + services.Add(typeof(ScopedMessageBrokerCore<,>), InstanceLifetime.Scoped); + services.Add(typeof(IScopedPublisher<,>), typeof(ScopedMessageBroker<,>), InstanceLifetime.Scoped); + services.Add(typeof(IScopedSubscriber<,>), typeof(ScopedMessageBroker<,>), InstanceLifetime.Scoped); + // keyed PubSub async services.Add(typeof(AsyncMessageBrokerCore<,>), lifetime); services.Add(typeof(IAsyncPublisher<,>), typeof(AsyncMessageBroker<,>), lifetime); services.Add(typeof(IAsyncSubscriber<,>), typeof(AsyncMessageBroker<,>), lifetime); + + // keyed-async-variation + services.Add(typeof(SingletonAsyncMessageBrokerCore<,>), InstanceLifetime.Singleton); + services.Add(typeof(ISingletonAsyncPublisher<,>), typeof(SingletonAsyncMessageBroker<,>), InstanceLifetime.Singleton); + services.Add(typeof(ISingletonAsyncSubscriber<,>), typeof(SingletonAsyncMessageBroker<,>), InstanceLifetime.Singleton); + services.Add(typeof(ScopedAsyncMessageBrokerCore<,>), InstanceLifetime.Scoped); + services.Add(typeof(IScopedAsyncPublisher<,>), typeof(ScopedAsyncMessageBroker<,>), InstanceLifetime.Scoped); + services.Add(typeof(IScopedAsyncSubscriber<,>), typeof(ScopedAsyncMessageBroker<,>), InstanceLifetime.Scoped); } var lifetime2 = options.RequestHandlerLifetime; // requesthandler lifetime @@ -194,7 +226,7 @@ static IServiceCollection AddRequestHandlerCore(IServiceCollection services, Typ { throw new ArgumentException($"{type.FullName} does not implement {coreType.Name.Replace("Core", "")}."); } - else if(isAsync) + else if (isAsync) { AsyncRequestHandlerRegistory.Add(coreType); } @@ -298,7 +330,7 @@ static void AddRequestHandlerAndFilterFromTypes(IServiceCollection services, Ins } } - NEXT_TYPE: + NEXT_TYPE: continue; } }