Skip to content

Commit

Permalink
added Akka.Hosting and Sharding config
Browse files Browse the repository at this point in the history
  • Loading branch information
Aaronontheweb committed May 6, 2024
1 parent ae8d96a commit 6fdaa1b
Show file tree
Hide file tree
Showing 3 changed files with 89 additions and 24 deletions.
19 changes: 19 additions & 0 deletions src/DrawTogether.Actors/Drawings/AllDrawingsIndexActor.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using Akka.Actor;
using Akka.Cluster.Hosting;
using Akka.DistributedData;
using Akka.Event;
using Akka.Hosting;
Expand Down Expand Up @@ -111,6 +112,24 @@ public static AkkaConfigurationBuilder AddAllDrawingsIndexActor(this AkkaConfigu
registry.Register<AllDrawingsIndexActor>(allDrawingsIndexActor);
});

// add the corresponding publisher actor
builder.WithActors((system, registry, resolver) =>
{
var allDrawingsPublisherActor = system.ActorOf(Props.Create<AllDrawingsPublisherActor>(), "all-drawings-publisher");
registry.Register<AllDrawingsPublisherActor>(allDrawingsPublisherActor);
});

// configure DData accordingly
builder.WithDistributedData(options =>
{
options.Durable = new DurableOptions()
{
// disable durable storage for this actor
Keys = []
};
options.RecreateOnFailure = true;
options.Role = clusterRoleName;
});

return builder;
}
Expand Down
75 changes: 51 additions & 24 deletions src/DrawTogether.Actors/Drawings/DrawingSessionActor.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
using Akka.Actor;
using Akka.Cluster.Hosting;
using Akka.Cluster.Sharding;
using Akka.Event;
using Akka.Hosting;
using Akka.Persistence;
using DrawTogether.Entities.Drawings;
using DrawTogether.Entities.Drawings.Messages;
Expand All @@ -15,36 +17,42 @@ public sealed class DrawingSessionActor : UntypedPersistentActor, IWithTimers
/// <summary>
/// Timer-driven message to publish the current state of the drawing session
/// </summary>
public sealed class PublishActivity : IDeadLetterSuppression, INoSerializationVerificationNeeded, INotInfluenceReceiveTimeout
public sealed class PublishActivity : IDeadLetterSuppression, INoSerializationVerificationNeeded,
INotInfluenceReceiveTimeout
{
private PublishActivity() { }
private PublishActivity()
{
}

public static PublishActivity Instance { get; } = new();
}

public override string PersistenceId { get; }
public DrawingSessionState State { get; private set; }

/// <summary>
/// Used for updating the <see cref="AllDrawingsIndexActor"/>'s state and
/// operates are a lower data refresh rate than our direct subscribers
/// </summary>
private readonly IActorRef _drawingActivityPublisher;

private DateTime? _lastActivityPublished;

/// <summary>
/// Subscribers to this drawing session - they will be notified
/// if we are killed / rebalanced re-subscribe.
/// </summary>
private readonly HashSet<IActorRef> _subscribers = new();

private readonly ILoggingAdapter _log = Context.GetLogger();
public DrawingSessionActor(string sessionId, IActorRef drawingActivityPublisher)

public DrawingSessionActor(string sessionId, IRequiredActor<AllDrawingsPublisherActor> drawingActivityPublisher)
{
PersistenceId = sessionId;
_drawingActivityPublisher = drawingActivityPublisher;
_drawingActivityPublisher = drawingActivityPublisher.ActorRef;
State = new DrawingSessionState(new DrawingSessionId(sessionId));
}

protected override void OnCommand(object message)
{
switch (message)
Expand All @@ -58,45 +66,46 @@ protected override void OnCommand(object message)
Sender.Tell(resp);
return;
}

var hasReplied = false;

if (State.IsEmpty)
{
// special case: state is empty, need to speed up activity publishing
Self.Tell(PublishActivity.Instance);
}

PersistAll(events, evt =>
{
if (!hasReplied)
{
Sender.Tell(resp);
hasReplied = true;
}

PublishToSubscribers(evt);
State = State.Apply(evt);


if (LastSequenceNr % 100 == 0)
{
SaveSnapshot(State);
}
});

break;
}
case PublishActivity:
{
if(_lastActivityPublished.HasValue && _lastActivityPublished == State.LastUpdate)
if (_lastActivityPublished.HasValue && _lastActivityPublished == State.LastUpdate)
{
// we've already published this update
return;
}

_drawingActivityPublisher.Tell(new DrawingActivityUpdate(State.DrawingSessionId, State.ConnectedUsers.Count, State.LastUpdate, false));


_drawingActivityPublisher.Tell(new DrawingActivityUpdate(State.DrawingSessionId,
State.ConnectedUsers.Count, State.LastUpdate, false));

// sync the clocks
_lastActivityPublished = State.LastUpdate;
break;
Expand Down Expand Up @@ -125,23 +134,24 @@ protected override void OnCommand(object message)
case ReceiveTimeout _:
{
_log.Info("Drawing session {DrawingSessionId} has been idle for too long, closing session");

Persist(new DrawingSessionEvents.DrawingSessionClosed(State.DrawingSessionId), evt =>
{
State = State.Apply(evt);
PublishToSubscribers(evt);

// let everyone know the session is toast
_drawingActivityPublisher.Tell(new DrawingActivityUpdate(State.DrawingSessionId, State.ConnectedUsers.Count, State.LastUpdate, true));

_drawingActivityPublisher.Tell(new DrawingActivityUpdate(State.DrawingSessionId,
State.ConnectedUsers.Count, State.LastUpdate, true));

// have the sharding system forget us and shut us down
Context.Parent.Tell(new Passivate(PoisonPill.Instance));
});
break;
}
}
}

private void PublishToSubscribers(IDrawingSessionEvent evt)
{
foreach (var subscriber in _subscribers)
Expand All @@ -166,10 +176,27 @@ protected override void OnRecover(object message)
protected override void PreStart()
{
Timers.StartPeriodicTimer("publish-activity", PublishActivity.Instance, TimeSpan.FromSeconds(5));

// if we go more than 2 minutes without activity, we time the session out
Context.SetReceiveTimeout(TimeSpan.FromMinutes(2));
}

public ITimerScheduler Timers { get; set; } = null!;
}

public static class DrawingSessionActorExtensions
{
public static AkkaConfigurationBuilder AddDrawingSessionActor(this AkkaConfigurationBuilder builder, string clusterRoleName = ClusterConstants.DrawStateRoleName)
{
builder.WithShardRegion<DrawingSessionActor>("drawing-session",
(system, registry, resolver) => s => resolver.Props<DrawingSessionActor>(s),
new DrawingSessionActorMessageExtractor(), new ShardOptions()
{
StateStoreMode = StateStoreMode.DData,
RememberEntities = true,
RememberEntitiesStore = RememberEntitiesStore.Eventsourced,
Role = clusterRoleName
});
return builder;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
using Akka.Cluster.Sharding;
using DrawTogether.Entities.Drawings.Messages;

namespace DrawTogether.Actors.Drawings;

public sealed class DrawingSessionActorMessageExtractor() : HashCodeMessageExtractor(ShardCount)
{
public const int ShardCount = 50;

public override string? EntityId(object message)
{
if(message is IWithDrawingSessionId withDrawingSessionId)
{
return withDrawingSessionId.DrawingSessionId.SessionId;
}

return null;
}
}

0 comments on commit 6fdaa1b

Please sign in to comment.