diff --git a/Akka.Hosting.sln b/Akka.Hosting.sln index c34d12d5..21e96435 100644 --- a/Akka.Hosting.sln +++ b/Akka.Hosting.sln @@ -35,6 +35,10 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Akka.Remote.Hosting.Tests", EndProject Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Akka.Cluster.Hosting.Tests", "src\Akka.Cluster.Hosting.Tests\Akka.Cluster.Hosting.Tests.csproj", "{EEFCC5A9-94BB-41DA-A9D3-12ACB889FE42}" EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Akka.Persistence.Hosting", "src\Akka.Persistence.Hosting\Akka.Persistence.Hosting.csproj", "{424A63E4-2B7A-45B9-9E69-185277EBE507}" +EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Akka.Persistence.Hosting.Tests", "src\Akka.Persistence.Hosting.Tests\Akka.Persistence.Hosting.Tests.csproj", "{876DE0B6-5FA8-4F79-876E-92EF5E9E7011}" +EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|Any CPU = Debug|Any CPU @@ -87,6 +91,14 @@ Global {EEFCC5A9-94BB-41DA-A9D3-12ACB889FE42}.Debug|Any CPU.Build.0 = Debug|Any CPU {EEFCC5A9-94BB-41DA-A9D3-12ACB889FE42}.Release|Any CPU.ActiveCfg = Release|Any CPU {EEFCC5A9-94BB-41DA-A9D3-12ACB889FE42}.Release|Any CPU.Build.0 = Release|Any CPU + {424A63E4-2B7A-45B9-9E69-185277EBE507}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {424A63E4-2B7A-45B9-9E69-185277EBE507}.Debug|Any CPU.Build.0 = Debug|Any CPU + {424A63E4-2B7A-45B9-9E69-185277EBE507}.Release|Any CPU.ActiveCfg = Release|Any CPU + {424A63E4-2B7A-45B9-9E69-185277EBE507}.Release|Any CPU.Build.0 = Release|Any CPU + {876DE0B6-5FA8-4F79-876E-92EF5E9E7011}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {876DE0B6-5FA8-4F79-876E-92EF5E9E7011}.Debug|Any CPU.Build.0 = Debug|Any CPU + {876DE0B6-5FA8-4F79-876E-92EF5E9E7011}.Release|Any CPU.ActiveCfg = Release|Any CPU + {876DE0B6-5FA8-4F79-876E-92EF5E9E7011}.Release|Any CPU.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE diff --git a/src/Akka.Cluster.Hosting/Akka.Cluster.Hosting.csproj b/src/Akka.Cluster.Hosting/Akka.Cluster.Hosting.csproj index 96ec5581..0d445c2f 100644 --- a/src/Akka.Cluster.Hosting/Akka.Cluster.Hosting.csproj +++ b/src/Akka.Cluster.Hosting/Akka.Cluster.Hosting.csproj @@ -11,6 +11,7 @@ + diff --git a/src/Akka.Persistence.Hosting.Tests/Akka.Persistence.Hosting.Tests.csproj b/src/Akka.Persistence.Hosting.Tests/Akka.Persistence.Hosting.Tests.csproj new file mode 100644 index 00000000..532b0bd0 --- /dev/null +++ b/src/Akka.Persistence.Hosting.Tests/Akka.Persistence.Hosting.Tests.csproj @@ -0,0 +1,18 @@ + + + + $(TestsNetCoreFramework) + + + + + + + + + + + + + + diff --git a/src/Akka.Persistence.Hosting.Tests/EventAdapterSpecs.cs b/src/Akka.Persistence.Hosting.Tests/EventAdapterSpecs.cs new file mode 100644 index 00000000..6ed7a5ee --- /dev/null +++ b/src/Akka.Persistence.Hosting.Tests/EventAdapterSpecs.cs @@ -0,0 +1,114 @@ +using System; +using System.Threading.Tasks; +using Akka.Actor; +using Akka.Hosting; +using Akka.Persistence.Journal; +using Akka.Util; +using FluentAssertions; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Hosting; +using Xunit; + +namespace Akka.Persistence.Hosting.Tests; + +public class EventAdapterSpecs +{ + public static async Task StartHost(Action testSetup) + { + var host = new HostBuilder() + .ConfigureServices(testSetup).Build(); + + await host.StartAsync(); + return host; + } + + public sealed class Event1{ } + public sealed class Event2{ } + + public sealed class EventMapper1 : IWriteEventAdapter + { + public string Manifest(object evt) + { + return string.Empty; + } + + public object ToJournal(object evt) + { + return evt; + } + } + + public sealed class Tagger : IWriteEventAdapter + { + public string Manifest(object evt) + { + return string.Empty; + } + + public object ToJournal(object evt) + { + if (evt is Tagged t) + return t; + return new Tagged(evt, new[] { "foo" }); + } + } + + public sealed class ReadAdapter : IReadEventAdapter + { + public IEventSequence FromJournal(object evt, string manifest) + { + return new SingleEventSequence(evt); + } + } + + public sealed class ComboAdapter : IEventAdapter + { + public string Manifest(object evt) + { + return string.Empty; + } + + public object ToJournal(object evt) + { + return evt; + } + + public IEventSequence FromJournal(object evt, string manifest) + { + return new SingleEventSequence(evt); + } + } + + [Fact] + public async Task Should_use_correct_EventAdapter_bindings() + { + // arrange + using var host = await StartHost(collection => collection.AddAkka("MySys", builder => + { + builder.WithJournal("sql-server", journalBuilder => + { + journalBuilder.AddWriteEventAdapter("mapper1", new Type[] { typeof(Event1) }); + journalBuilder.AddReadEventAdapter("reader1", new Type[] { typeof(Event1) }); + journalBuilder.AddEventAdapter("combo", boundTypes: new Type[] { typeof(Event2) }); + journalBuilder.AddWriteEventAdapter("tagger", + boundTypes: new Type[] { typeof(Event1), typeof(Event2) }); + }); + })); + + // act + var sys = host.Services.GetRequiredService(); + var config = sys.Settings.Config; + var sqlPersistenceJournal = config.GetConfig("akka.persistence.journal.sql-server"); + + // assert + sqlPersistenceJournal.GetStringList($"event-adapter-bindings.\"{typeof(Event1).TypeQualifiedName()}\"").Should() + .BeEquivalentTo("mapper1", "reader1", "tagger"); + sqlPersistenceJournal.GetStringList($"event-adapter-bindings.\"{typeof(Event2).TypeQualifiedName()}\"").Should() + .BeEquivalentTo("combo", "tagger"); + + sqlPersistenceJournal.GetString("event-adapters.mapper1").Should().Be(typeof(EventMapper1).TypeQualifiedName()); + sqlPersistenceJournal.GetString("event-adapters.reader1").Should().Be(typeof(ReadAdapter).TypeQualifiedName()); + sqlPersistenceJournal.GetString("event-adapters.combo").Should().Be(typeof(ComboAdapter).TypeQualifiedName()); + sqlPersistenceJournal.GetString("event-adapters.tagger").Should().Be(typeof(Tagger).TypeQualifiedName()); + } +} \ No newline at end of file diff --git a/src/Akka.Persistence.Hosting/Akka.Persistence.Hosting.csproj b/src/Akka.Persistence.Hosting/Akka.Persistence.Hosting.csproj new file mode 100644 index 00000000..85fbdf94 --- /dev/null +++ b/src/Akka.Persistence.Hosting/Akka.Persistence.Hosting.csproj @@ -0,0 +1,21 @@ + + + $(LibraryFramework) + README.md + Akka.Persistence Microsoft.Extensions.Hosting support. + 9 + + + + + + + + + + + + + + + diff --git a/src/Akka.Persistence.Hosting/AkkaPersistenceHostingExtensions.cs b/src/Akka.Persistence.Hosting/AkkaPersistenceHostingExtensions.cs new file mode 100644 index 00000000..df7238e5 --- /dev/null +++ b/src/Akka.Persistence.Hosting/AkkaPersistenceHostingExtensions.cs @@ -0,0 +1,136 @@ +using System; +using System.Collections.Generic; +using System.Text; +using Akka.Configuration; +using Akka.Hosting; +using Akka.Persistence.Journal; +using Akka.Util; + +namespace Akka.Persistence.Hosting +{ + public enum PersistenceMode + { + /// + /// Sets both the akka.persistence.journal and the akka.persistence.snapshot-store to use this plugin. + /// + Both, + + /// + /// Sets ONLY the akka.persistence.journal to use this plugin. + /// + Journal, + + /// + /// Sets ONLY the akka.persistence.snapshot-store to use this plugin. + /// + SnapshotStore, + } + + /// + /// Used to help build journal configurations + /// + public sealed class AkkaPersistenceJournalBuilder + { + internal readonly string JournalId; + internal readonly AkkaConfigurationBuilder Builder; + internal readonly Dictionary> Bindings = new Dictionary>(); + internal readonly Dictionary Adapters = new Dictionary(); + + public AkkaPersistenceJournalBuilder(string journalId, AkkaConfigurationBuilder builder) + { + JournalId = journalId; + Builder = builder; + } + + public AkkaPersistenceJournalBuilder AddEventAdapter(string eventAdapterName, + IEnumerable boundTypes) where TAdapter : IEventAdapter + { + AddAdapter(eventAdapterName, boundTypes); + + return this; + } + + public AkkaPersistenceJournalBuilder AddReadEventAdapter(string eventAdapterName, + IEnumerable boundTypes) where TAdapter : IReadEventAdapter + { + AddAdapter(eventAdapterName, boundTypes); + + return this; + } + + public AkkaPersistenceJournalBuilder AddWriteEventAdapter(string eventAdapterName, + IEnumerable boundTypes) where TAdapter : IWriteEventAdapter + { + AddAdapter(eventAdapterName, boundTypes); + + return this; + } + + private void AddAdapter(string eventAdapterName, IEnumerable boundTypes) + { + Adapters[eventAdapterName] = typeof(TAdapter); + foreach (var t in boundTypes) + { + if (!Bindings.ContainsKey(t)) + Bindings[t] = new HashSet(); + Bindings[t].Add(eventAdapterName); + } + } + + /// + /// INTERNAL API - Builds the HOCON and then injects it. + /// + internal void Build() + { + // useless configuration - don't bother. + if (Adapters.Count == 0 || Bindings.Count == 0) + return; + + var adapters = new StringBuilder() + .Append($"akka.persistence.journal.{JournalId}").Append("{") + .AppendLine("event-adapters {"); + foreach (var kv in Adapters) + { + adapters.AppendLine($"{kv.Key} = \"{kv.Value.TypeQualifiedName()}\""); + } + + adapters.AppendLine("}").AppendLine("event-adapter-bindings {"); + foreach (var kv in Bindings) + { + adapters.AppendLine($"\"{kv.Key.TypeQualifiedName()}\" = [{string.Join(",", kv.Value)}]"); + } + + adapters.AppendLine("}").AppendLine("}"); + + var finalHocon = ConfigurationFactory.ParseString(adapters.ToString()); + Builder.AddHocon(finalHocon, HoconAddMode.Prepend); + } + } + + /// + /// The set of options for generic Akka.Persistence. + /// + public static class AkkaPersistenceHostingExtensions + { + /// + /// Used to configure a specific Akka.Persistence.Journal instance, primarily to support s. + /// + /// The builder instance being configured. + /// The id of the journal. i.e. if you want to apply this adapter to the `akka.persistence.journal.sql` journal, just type `sql`. + /// Configuration method for configuring the journal. + /// The same instance originally passed in. + /// + /// This method can be called multiple times for different s. + /// + public static AkkaConfigurationBuilder WithJournal(this AkkaConfigurationBuilder builder, + string journalId, Action journalBuilder) + { + var jBuilder = new AkkaPersistenceJournalBuilder(journalId, builder); + journalBuilder(jBuilder); + + // build and inject the HOCON + jBuilder.Build(); + return builder; + } + } +} \ No newline at end of file diff --git a/src/Akka.Persistence.PostgreSql.Hosting/Akka.Persistence.PostgreSql.Hosting.csproj b/src/Akka.Persistence.PostgreSql.Hosting/Akka.Persistence.PostgreSql.Hosting.csproj index 86de9550..2b123a4e 100644 --- a/src/Akka.Persistence.PostgreSql.Hosting/Akka.Persistence.PostgreSql.Hosting.csproj +++ b/src/Akka.Persistence.PostgreSql.Hosting/Akka.Persistence.PostgreSql.Hosting.csproj @@ -12,6 +12,7 @@ + diff --git a/src/Akka.Persistence.PostgreSql.Hosting/AkkaPersistencePostgreSqlHostingExtensions.cs b/src/Akka.Persistence.PostgreSql.Hosting/AkkaPersistencePostgreSqlHostingExtensions.cs index 00812042..afff5f36 100644 --- a/src/Akka.Persistence.PostgreSql.Hosting/AkkaPersistencePostgreSqlHostingExtensions.cs +++ b/src/Akka.Persistence.PostgreSql.Hosting/AkkaPersistencePostgreSqlHostingExtensions.cs @@ -1,29 +1,11 @@ using System; using Akka.Configuration; using Akka.Hosting; +using Akka.Persistence.Hosting; using Akka.Persistence.Query.Sql; namespace Akka.Persistence.PostgreSql.Hosting { - public enum SqlPersistenceMode - { - /// - /// Sets both the akka.persistence.journal and the akka.persistence.snapshot-store to use - /// Akka.Persistence.PostgreSql. - /// - Both, - - /// - /// Sets ONLY the akka.persistence.journal to use Akka.Persistence.PostgreSql. - /// - Journal, - - /// - /// Sets ONLY the akka.persistence.snapshot-store to use Akka.Persistence.PostgreSql. - /// - SnapshotStore, - } - /// /// Extension methods for Akka.Persistence.PostgreSql /// @@ -32,12 +14,12 @@ public static class AkkaPersistencePostgreSqlHostingExtensions public static AkkaConfigurationBuilder WithPostgreSqlPersistence( this AkkaConfigurationBuilder builder, string connectionString, - SqlPersistenceMode mode = SqlPersistenceMode.Both, + PersistenceMode mode = PersistenceMode.Both, string schemaName = "public", bool autoInitialize = false, StoredAsType storedAsType = StoredAsType.ByteA, bool sequentialAccess = false, - bool useBigintIdentityForOrderingColumn = false) + bool useBigintIdentityForOrderingColumn = false, Action configurator = null) { var storedAs = storedAsType switch { @@ -88,17 +70,22 @@ class = ""Akka.Persistence.PostgreSql.Snapshot.PostgreSqlSnapshotStore, Akka.Per var finalConfig = mode switch { - SqlPersistenceMode.Both => journalConfiguration + PersistenceMode.Both => journalConfiguration .WithFallback(snapshotStoreConfig) .WithFallback(SqlReadJournal.DefaultConfiguration()), - SqlPersistenceMode.Journal => journalConfiguration + PersistenceMode.Journal => journalConfiguration .WithFallback(SqlReadJournal.DefaultConfiguration()), - SqlPersistenceMode.SnapshotStore => snapshotStoreConfig, + PersistenceMode.SnapshotStore => snapshotStoreConfig, - _ => throw new ArgumentOutOfRangeException(nameof(mode), mode, "Invalid SqlPersistenceMode defined.") + _ => throw new ArgumentOutOfRangeException(nameof(mode), mode, "Invalid PersistenceMode defined.") }; + + if (configurator != null) // configure event adapters + { + builder.WithJournal("postgresql", configurator); + } return builder.AddHocon(finalConfig.WithFallback(PostgreSqlPersistence.DefaultConfiguration())); } diff --git a/src/Akka.Persistence.SqlServer.Hosting/Akka.Persistence.SqlServer.Hosting.csproj b/src/Akka.Persistence.SqlServer.Hosting/Akka.Persistence.SqlServer.Hosting.csproj index 05998808..e47e22e0 100644 --- a/src/Akka.Persistence.SqlServer.Hosting/Akka.Persistence.SqlServer.Hosting.csproj +++ b/src/Akka.Persistence.SqlServer.Hosting/Akka.Persistence.SqlServer.Hosting.csproj @@ -12,6 +12,7 @@ + diff --git a/src/Akka.Persistence.SqlServer.Hosting/AkkaPersistenceSqlServerHostingExtensions.cs b/src/Akka.Persistence.SqlServer.Hosting/AkkaPersistenceSqlServerHostingExtensions.cs index c7dd3340..fe7dbea7 100644 --- a/src/Akka.Persistence.SqlServer.Hosting/AkkaPersistenceSqlServerHostingExtensions.cs +++ b/src/Akka.Persistence.SqlServer.Hosting/AkkaPersistenceSqlServerHostingExtensions.cs @@ -1,38 +1,30 @@ using System; +using Akka.Actor; using Akka.Configuration; using Akka.Hosting; +using Akka.Persistence.Hosting; using Akka.Persistence.Query.Sql; namespace Akka.Persistence.SqlServer.Hosting { - public enum SqlPersistenceMode - { - /// - /// Sets both the akka.persistence.journal and the akka.persistence.snapshot-store to use - /// Akka.Persistence.SqlServer. - /// - Both, - - /// - /// Sets ONLY the akka.persistence.journal to use Akka.Persistence.SqlServer. - /// - Journal, - - /// - /// Sets ONLY the akka.persistence.snapshot-store to use Akka.Persistence.SqlServer. - /// - SnapshotStore, - } - /// /// Extension methods for Akka.Persistence.SqlServer /// public static class AkkaPersistenceSqlServerHostingExtensions { + /// + /// Adds Akka.Persistence.SqlServer to this . + /// + /// + /// + /// + /// + /// + /// public static AkkaConfigurationBuilder WithSqlServerPersistence( this AkkaConfigurationBuilder builder, string connectionString, - SqlPersistenceMode mode = SqlPersistenceMode.Both) + PersistenceMode mode = PersistenceMode.Both, Action configurator = null) { Config journalConfiguration = @$" akka.persistence {{ @@ -65,19 +57,24 @@ class = ""Akka.Persistence.SqlServer.Snapshot.SqlServerSnapshotStore, Akka.Persi var finalConfig = mode switch { - SqlPersistenceMode.Both => journalConfiguration + PersistenceMode.Both => journalConfiguration .WithFallback(snapshotStoreConfig) .WithFallback(SqlReadJournal.DefaultConfiguration()), - SqlPersistenceMode.Journal => journalConfiguration + PersistenceMode.Journal => journalConfiguration .WithFallback(SqlReadJournal.DefaultConfiguration()), - SqlPersistenceMode.SnapshotStore => snapshotStoreConfig, + PersistenceMode.SnapshotStore => snapshotStoreConfig, _ => throw new ArgumentOutOfRangeException(nameof(mode), mode, "Invalid SqlPersistenceMode defined.") }; + if (configurator != null) // configure event adapters + { + builder.WithJournal("sql-server", configurator); + } + return builder.AddHocon(finalConfig.WithFallback(SqlServerPersistence.DefaultConfiguration())); } } -} +} \ No newline at end of file