Skip to content

Commit

Permalink
Adding Akka.Persistence.Hosting module (#67)
Browse files Browse the repository at this point in the history
* [WIP] Adding Akka.Persistence.Hosting module

working on #64

* fixed up Akka.Persistence.Hosting

* added reference to Akka.Persistence.Hosting from Akka.Cluster.Hosting

To make it easier to configure where it's likely to be used.

* stubbing out unit tests

* fixed type binding issue with `AkkaPersistenceJournalBuilder`

* completed unit tests

* added journal configurator to both Postgres and SQL Server

Also made `PersistenceMode` part of Akka.Persistence.Hosting, rather than lib-specific.
  • Loading branch information
Aaronontheweb authored Jun 16, 2022
1 parent 658b798 commit 9c2385a
Show file tree
Hide file tree
Showing 10 changed files with 337 additions and 49 deletions.
12 changes: 12 additions & 0 deletions Akka.Hosting.sln
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,10 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Akka.Remote.Hosting.Tests",
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Akka.Cluster.Hosting.Tests", "src\Akka.Cluster.Hosting.Tests\Akka.Cluster.Hosting.Tests.csproj", "{EEFCC5A9-94BB-41DA-A9D3-12ACB889FE42}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Akka.Persistence.Hosting", "src\Akka.Persistence.Hosting\Akka.Persistence.Hosting.csproj", "{424A63E4-2B7A-45B9-9E69-185277EBE507}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Akka.Persistence.Hosting.Tests", "src\Akka.Persistence.Hosting.Tests\Akka.Persistence.Hosting.Tests.csproj", "{876DE0B6-5FA8-4F79-876E-92EF5E9E7011}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
Expand Down Expand Up @@ -87,6 +91,14 @@ Global
{EEFCC5A9-94BB-41DA-A9D3-12ACB889FE42}.Debug|Any CPU.Build.0 = Debug|Any CPU
{EEFCC5A9-94BB-41DA-A9D3-12ACB889FE42}.Release|Any CPU.ActiveCfg = Release|Any CPU
{EEFCC5A9-94BB-41DA-A9D3-12ACB889FE42}.Release|Any CPU.Build.0 = Release|Any CPU
{424A63E4-2B7A-45B9-9E69-185277EBE507}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{424A63E4-2B7A-45B9-9E69-185277EBE507}.Debug|Any CPU.Build.0 = Debug|Any CPU
{424A63E4-2B7A-45B9-9E69-185277EBE507}.Release|Any CPU.ActiveCfg = Release|Any CPU
{424A63E4-2B7A-45B9-9E69-185277EBE507}.Release|Any CPU.Build.0 = Release|Any CPU
{876DE0B6-5FA8-4F79-876E-92EF5E9E7011}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{876DE0B6-5FA8-4F79-876E-92EF5E9E7011}.Debug|Any CPU.Build.0 = Debug|Any CPU
{876DE0B6-5FA8-4F79-876E-92EF5E9E7011}.Release|Any CPU.ActiveCfg = Release|Any CPU
{876DE0B6-5FA8-4F79-876E-92EF5E9E7011}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
Expand Down
1 change: 1 addition & 0 deletions src/Akka.Cluster.Hosting/Akka.Cluster.Hosting.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
</ItemGroup>

<ItemGroup>
<ProjectReference Include="..\Akka.Persistence.Hosting\Akka.Persistence.Hosting.csproj" />
<ProjectReference Include="..\Akka.Remote.Hosting\Akka.Remote.Hosting.csproj" />
</ItemGroup>

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<TargetFramework>$(TestsNetCoreFramework)</TargetFramework>
</PropertyGroup>

<ItemGroup>
<PackageReference Include="FluentAssertions" Version="6.7.0" />
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="$(TestSdkVersion)" />
<PackageReference Include="xunit" Version="$(XunitVersion)" />
<PackageReference Include="xunit.runner.visualstudio" Version="$(XunitVersion)" />
</ItemGroup>

<ItemGroup>
<ProjectReference Include="..\Akka.Persistence.Hosting\Akka.Persistence.Hosting.csproj" />
</ItemGroup>

</Project>
114 changes: 114 additions & 0 deletions src/Akka.Persistence.Hosting.Tests/EventAdapterSpecs.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
using System;
using System.Threading.Tasks;
using Akka.Actor;
using Akka.Hosting;
using Akka.Persistence.Journal;
using Akka.Util;
using FluentAssertions;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Xunit;

namespace Akka.Persistence.Hosting.Tests;

public class EventAdapterSpecs
{
public static async Task<IHost> StartHost(Action<IServiceCollection> testSetup)
{
var host = new HostBuilder()
.ConfigureServices(testSetup).Build();

await host.StartAsync();
return host;
}

public sealed class Event1{ }
public sealed class Event2{ }

public sealed class EventMapper1 : IWriteEventAdapter
{
public string Manifest(object evt)
{
return string.Empty;
}

public object ToJournal(object evt)
{
return evt;
}
}

public sealed class Tagger : IWriteEventAdapter
{
public string Manifest(object evt)
{
return string.Empty;
}

public object ToJournal(object evt)
{
if (evt is Tagged t)
return t;
return new Tagged(evt, new[] { "foo" });
}
}

public sealed class ReadAdapter : IReadEventAdapter
{
public IEventSequence FromJournal(object evt, string manifest)
{
return new SingleEventSequence(evt);
}
}

public sealed class ComboAdapter : IEventAdapter
{
public string Manifest(object evt)
{
return string.Empty;
}

public object ToJournal(object evt)
{
return evt;
}

public IEventSequence FromJournal(object evt, string manifest)
{
return new SingleEventSequence(evt);
}
}

[Fact]
public async Task Should_use_correct_EventAdapter_bindings()
{
// arrange
using var host = await StartHost(collection => collection.AddAkka("MySys", builder =>
{
builder.WithJournal("sql-server", journalBuilder =>
{
journalBuilder.AddWriteEventAdapter<EventMapper1>("mapper1", new Type[] { typeof(Event1) });
journalBuilder.AddReadEventAdapter<ReadAdapter>("reader1", new Type[] { typeof(Event1) });
journalBuilder.AddEventAdapter<ComboAdapter>("combo", boundTypes: new Type[] { typeof(Event2) });
journalBuilder.AddWriteEventAdapter<Tagger>("tagger",
boundTypes: new Type[] { typeof(Event1), typeof(Event2) });
});
}));

// act
var sys = host.Services.GetRequiredService<ActorSystem>();
var config = sys.Settings.Config;
var sqlPersistenceJournal = config.GetConfig("akka.persistence.journal.sql-server");

// assert
sqlPersistenceJournal.GetStringList($"event-adapter-bindings.\"{typeof(Event1).TypeQualifiedName()}\"").Should()
.BeEquivalentTo("mapper1", "reader1", "tagger");
sqlPersistenceJournal.GetStringList($"event-adapter-bindings.\"{typeof(Event2).TypeQualifiedName()}\"").Should()
.BeEquivalentTo("combo", "tagger");

sqlPersistenceJournal.GetString("event-adapters.mapper1").Should().Be(typeof(EventMapper1).TypeQualifiedName());
sqlPersistenceJournal.GetString("event-adapters.reader1").Should().Be(typeof(ReadAdapter).TypeQualifiedName());
sqlPersistenceJournal.GetString("event-adapters.combo").Should().Be(typeof(ComboAdapter).TypeQualifiedName());
sqlPersistenceJournal.GetString("event-adapters.tagger").Should().Be(typeof(Tagger).TypeQualifiedName());
}
}
21 changes: 21 additions & 0 deletions src/Akka.Persistence.Hosting/Akka.Persistence.Hosting.csproj
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<TargetFramework>$(LibraryFramework)</TargetFramework>
<PackageReadmeFile>README.md</PackageReadmeFile>
<Description>Akka.Persistence Microsoft.Extensions.Hosting support.</Description>
<LangVersion>9</LangVersion>
</PropertyGroup>

<ItemGroup>
<None Include="..\..\README.md" Pack="true" PackagePath="\" />
</ItemGroup>

<ItemGroup>
<ProjectReference Include="..\Akka.Hosting\Akka.Hosting.csproj" />
</ItemGroup>

<ItemGroup>
<PackageReference Include="Akka.Persistence" Version="$(AkkaVersion)" />
</ItemGroup>

</Project>
136 changes: 136 additions & 0 deletions src/Akka.Persistence.Hosting/AkkaPersistenceHostingExtensions.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
using System;
using System.Collections.Generic;
using System.Text;
using Akka.Configuration;
using Akka.Hosting;
using Akka.Persistence.Journal;
using Akka.Util;

namespace Akka.Persistence.Hosting
{
public enum PersistenceMode
{
/// <summary>
/// Sets both the akka.persistence.journal and the akka.persistence.snapshot-store to use this plugin.
/// </summary>
Both,

/// <summary>
/// Sets ONLY the akka.persistence.journal to use this plugin.
/// </summary>
Journal,

/// <summary>
/// Sets ONLY the akka.persistence.snapshot-store to use this plugin.
/// </summary>
SnapshotStore,
}

/// <summary>
/// Used to help build journal configurations
/// </summary>
public sealed class AkkaPersistenceJournalBuilder
{
internal readonly string JournalId;
internal readonly AkkaConfigurationBuilder Builder;
internal readonly Dictionary<Type, HashSet<string>> Bindings = new Dictionary<Type, HashSet<string>>();
internal readonly Dictionary<string, Type> Adapters = new Dictionary<string, Type>();

public AkkaPersistenceJournalBuilder(string journalId, AkkaConfigurationBuilder builder)
{
JournalId = journalId;
Builder = builder;
}

public AkkaPersistenceJournalBuilder AddEventAdapter<TAdapter>(string eventAdapterName,
IEnumerable<Type> boundTypes) where TAdapter : IEventAdapter
{
AddAdapter<TAdapter>(eventAdapterName, boundTypes);

return this;
}

public AkkaPersistenceJournalBuilder AddReadEventAdapter<TAdapter>(string eventAdapterName,
IEnumerable<Type> boundTypes) where TAdapter : IReadEventAdapter
{
AddAdapter<TAdapter>(eventAdapterName, boundTypes);

return this;
}

public AkkaPersistenceJournalBuilder AddWriteEventAdapter<TAdapter>(string eventAdapterName,
IEnumerable<Type> boundTypes) where TAdapter : IWriteEventAdapter
{
AddAdapter<TAdapter>(eventAdapterName, boundTypes);

return this;
}

private void AddAdapter<TAdapter>(string eventAdapterName, IEnumerable<Type> boundTypes)
{
Adapters[eventAdapterName] = typeof(TAdapter);
foreach (var t in boundTypes)
{
if (!Bindings.ContainsKey(t))
Bindings[t] = new HashSet<string>();
Bindings[t].Add(eventAdapterName);
}
}

/// <summary>
/// INTERNAL API - Builds the HOCON and then injects it.
/// </summary>
internal void Build()
{
// useless configuration - don't bother.
if (Adapters.Count == 0 || Bindings.Count == 0)
return;

var adapters = new StringBuilder()
.Append($"akka.persistence.journal.{JournalId}").Append("{")
.AppendLine("event-adapters {");
foreach (var kv in Adapters)
{
adapters.AppendLine($"{kv.Key} = \"{kv.Value.TypeQualifiedName()}\"");
}

adapters.AppendLine("}").AppendLine("event-adapter-bindings {");
foreach (var kv in Bindings)
{
adapters.AppendLine($"\"{kv.Key.TypeQualifiedName()}\" = [{string.Join(",", kv.Value)}]");
}

adapters.AppendLine("}").AppendLine("}");

var finalHocon = ConfigurationFactory.ParseString(adapters.ToString());
Builder.AddHocon(finalHocon, HoconAddMode.Prepend);
}
}

/// <summary>
/// The set of options for generic Akka.Persistence.
/// </summary>
public static class AkkaPersistenceHostingExtensions
{
/// <summary>
/// Used to configure a specific Akka.Persistence.Journal instance, primarily to support <see cref="IEventAdapter"/>s.
/// </summary>
/// <param name="builder">The builder instance being configured.</param>
/// <param name="journalId">The id of the journal. i.e. if you want to apply this adapter to the `akka.persistence.journal.sql` journal, just type `sql`.</param>
/// <param name="journalBuilder">Configuration method for configuring the journal.</param>
/// <returns>The same <see cref="AkkaConfigurationBuilder"/> instance originally passed in.</returns>
/// <remarks>
/// This method can be called multiple times for different <see cref="IEventAdapter"/>s.
/// </remarks>
public static AkkaConfigurationBuilder WithJournal(this AkkaConfigurationBuilder builder,
string journalId, Action<AkkaPersistenceJournalBuilder> journalBuilder)
{
var jBuilder = new AkkaPersistenceJournalBuilder(journalId, builder);
journalBuilder(jBuilder);

// build and inject the HOCON
jBuilder.Build();
return builder;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@

<ItemGroup>
<ProjectReference Include="..\Akka.Hosting\Akka.Hosting.csproj" />
<ProjectReference Include="..\Akka.Persistence.Hosting\Akka.Persistence.Hosting.csproj" />
</ItemGroup>

<ItemGroup>
Expand Down
Loading

0 comments on commit 9c2385a

Please sign in to comment.