Skip to content

Commit

Permalink
Add Akka.Cluster.Tools.Client support (#66)
Browse files Browse the repository at this point in the history
* Add Akka.Cluster.Tools.Client support

* Add settings unit tests

* Simplify ClusterClient setup

* Add some WithClusterClient overloads to give user options
  • Loading branch information
Arkatufus authored Jun 16, 2022
1 parent 9c2385a commit 5bd0aac
Show file tree
Hide file tree
Showing 4 changed files with 174 additions and 0 deletions.
39 changes: 39 additions & 0 deletions src/Akka.Cluster.Hosting.Tests/ClusterClientSpecs.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
using System.Collections.Generic;
using Akka.Actor;
using Akka.Cluster.Tools.Client;
using FluentAssertions;
using FluentAssertions.Extensions;
using Xunit;

namespace Akka.Cluster.Hosting.Tests;

public class ClusterClientSpecs
{
[Fact(DisplayName = "ClusterClientReceptionistSettings should be set correctly")]
public void ClusterClientReceptionistSettingsSpec()
{
var config = AkkaClusterHostingExtensions.CreateReceptionistConfig("customName", "customRole")
.GetConfig("akka.cluster.client.receptionist");
var settings = ClusterReceptionistSettings.Create(config);

config.GetString("name").Should().Be("customName");
settings.Role.Should().Be("customRole");
}

[Fact(DisplayName = "ClusterClientSettings should be set correctly")]
public void ClusterClientSettingsSpec()
{
var contacts = new List<ActorPath>
{
ActorPath.Parse("akka.tcp://one@localhost:1111/system/receptionist"),
ActorPath.Parse("akka.tcp://two@localhost:1111/system/receptionist"),
ActorPath.Parse("akka.tcp://three@localhost:1111/system/receptionist"),
};

var settings = AkkaClusterHostingExtensions.CreateClusterClientSettings(
ClusterClientReceptionist.DefaultConfig(),
contacts);

settings.InitialContacts.Should().BeEquivalentTo(contacts);
}
}
130 changes: 130 additions & 0 deletions src/Akka.Cluster.Hosting/AkkaClusterHostingExtensions.cs
Original file line number Diff line number Diff line change
@@ -1,12 +1,18 @@
using System;
using System.Collections.Generic;
using System.Collections.Immutable;
using System.Linq;
using System.Text;
using System.Text.RegularExpressions;
using Akka.Actor;
using Akka.Cluster.Sharding;
using Akka.Cluster.Tools.Client;
using Akka.Cluster.Tools.PublishSubscribe;
using Akka.Cluster.Tools.Singleton;
using Akka.Configuration;
using Akka.Hosting;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.DependencyInjection.Extensions;

namespace Akka.Cluster.Hosting
{
Expand Down Expand Up @@ -367,5 +373,129 @@ public static AkkaConfigurationBuilder WithSingletonProxy<TKey>(this AkkaConfigu
CreateAndRegisterSingletonProxy<TKey>(singletonName, singletonManagerPath, singletonProxySettings, system, registry);
});
}

/// <summary>
/// Configures a <see cref="ClusterClientReceptionist"/> for the <see cref="ActorSystem"/>
/// </summary>
/// <param name="builder">The builder instance being configured.</param>
/// <param name="name">Actor name of the ClusterReceptionist actor under the system path, by default it is /system/receptionist</param>
/// <param name="role">Checks that the receptionist only start on members tagged with this role. All members are used if empty.</param>
/// <returns>The same <see cref="AkkaConfigurationBuilder"/> instance originally passed in.</returns>
public static AkkaConfigurationBuilder WithClusterClientReceptionist(
this AkkaConfigurationBuilder builder,
string name = "receptionist",
string role = null)
{
builder.AddHocon(CreateReceptionistConfig(name, role), HoconAddMode.Prepend);
return builder;
}

internal static Config CreateReceptionistConfig(string name, string role)
{
const string root = "akka.cluster.client.receptionist.";

var sb = new StringBuilder()
.Append(root).Append("name:").AppendLine(QuoteIfNeeded(name));

if(!string.IsNullOrEmpty(role))
sb.Append(root).Append("role:").AppendLine(QuoteIfNeeded(role));

return ConfigurationFactory.ParseString(sb.ToString());
}

/// <summary>
/// Creates a <see cref="ClusterClient"/> and adds it to the <see cref="ActorRegistry"/> using the given
/// <see cref="TKey"/>.
/// </summary>
/// <param name="builder">The builder instance being configured.</param>
/// <param name="initialContacts"> <para>
/// List of <see cref="ClusterClientReceptionist"/> <see cref="ActorPath"/> that will be used as a seed
/// to discover all of the receptionists in the cluster.
/// </para>
/// <para>
/// This should look something like "akka.tcp://systemName@networkAddress:2552/system/receptionist"
/// </para></param>
/// <typeparam name="TKey">The key type to use for the <see cref="ActorRegistry"/>.</typeparam>
/// <returns>The same <see cref="AkkaConfigurationBuilder"/> instance originally passed in.</returns>
public static AkkaConfigurationBuilder WithClusterClient<TKey>(
this AkkaConfigurationBuilder builder,
IList<ActorPath> initialContacts)
{
if (initialContacts == null)
throw new ArgumentNullException(nameof(initialContacts));

if (initialContacts.Count < 1)
throw new ArgumentException("Must specify at least one initial contact", nameof(initialContacts));

return builder.WithActors((system, registry) =>
{
var clusterClient = system.ActorOf(ClusterClient.Props(
CreateClusterClientSettings(system.Settings.Config, initialContacts)));
registry.TryRegister<TKey>(clusterClient);
});
}

/// <summary>
/// Creates a <see cref="ClusterClient"/> and adds it to the <see cref="ActorRegistry"/> using the given
/// <see cref="TKey"/>.
/// </summary>
/// <param name="builder">The builder instance being configured.</param>
/// <param name="initialContactAddresses"> <para>
/// List of node addresses where the <see cref="ClusterClientReceptionist"/> are located that will be used as seed
/// to discover all of the receptionists in the cluster.
/// </para>
/// <para>
/// This should look something like "akka.tcp://systemName@networkAddress:2552"
/// </para></param>
/// <param name="receptionistActorName">The name of the <see cref="ClusterClientReceptionist"/> actor.
/// Defaults to "receptionist"
/// </param>
/// <typeparam name="TKey">The key type to use for the <see cref="ActorRegistry"/>.</typeparam>
/// <returns>The same <see cref="AkkaConfigurationBuilder"/> instance originally passed in.</returns>
public static AkkaConfigurationBuilder WithClusterClient<TKey>(
this AkkaConfigurationBuilder builder,
IEnumerable<Address> initialContactAddresses,
string receptionistActorName = "receptionist")
=> builder.WithClusterClient<TKey>(initialContactAddresses
.Select(address => new RootActorPath(address) / "system" / receptionistActorName)
.ToList());

/// <summary>
/// Creates a <see cref="ClusterClient"/> and adds it to the <see cref="ActorRegistry"/> using the given
/// <see cref="TKey"/>.
/// </summary>
/// <param name="builder">The builder instance being configured.</param>
/// <param name="initialContacts"> <para>
/// List of actor paths that will be used as a seed to discover all of the receptionists in the cluster.
/// </para>
/// <para>
/// This should look something like "akka.tcp://systemName@networkAddress:2552/system/receptionist"
/// </para></param>
/// <typeparam name="TKey">The key type to use for the <see cref="ActorRegistry"/>.</typeparam>
/// <returns>The same <see cref="AkkaConfigurationBuilder"/> instance originally passed in.</returns>
public static AkkaConfigurationBuilder WithClusterClient<TKey>(
this AkkaConfigurationBuilder builder,
IEnumerable<string> initialContacts)
=> builder.WithClusterClient<TKey>(initialContacts.Select(ActorPath.Parse).ToList());

internal static ClusterClientSettings CreateClusterClientSettings(Config config, IEnumerable<ActorPath> initialContacts)
{
var clientConfig = config.GetConfig("akka.cluster.client");
return ClusterClientSettings.Create(clientConfig)
.WithInitialContacts(initialContacts.ToImmutableHashSet());
}

#region Helper functions

private static readonly Regex EscapeRegex = new Regex("[ \t:]{1}", RegexOptions.Compiled);

private static string QuoteIfNeeded(string text)
{
return text == null
? "" : EscapeRegex.IsMatch(text)
? $"\"{text}\"" : text;
}

#endregion
}
}
3 changes: 3 additions & 0 deletions src/Akka.Cluster.Hosting/Properties/FriendsOf.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
using System.Runtime.CompilerServices;

[assembly: InternalsVisibleTo("Akka.Cluster.Hosting.Tests")]
2 changes: 2 additions & 0 deletions src/Akka.Remote.Hosting.Tests/RemoteConfigurationSpecs.cs
Original file line number Diff line number Diff line change
Expand Up @@ -29,4 +29,6 @@ public async Task AkkaRemoteShouldUsePublicHostnameCorrectly()
// assert
actorSystem.Provider.DefaultAddress.Host.Should().Be("localhost");
}


}

0 comments on commit 5bd0aac

Please sign in to comment.