diff --git a/src/DrawTogether.Actors/Drawings/DrawingSessionActor.cs b/src/DrawTogether.Actors/Drawings/DrawingSessionActor.cs index 40c0942..eff3819 100644 --- a/src/DrawTogether.Actors/Drawings/DrawingSessionActor.cs +++ b/src/DrawTogether.Actors/Drawings/DrawingSessionActor.cs @@ -48,7 +48,7 @@ private PublishActivity() public DrawingSessionActor(string sessionId, IRequiredActor drawingActivityPublisher) { - PersistenceId = sessionId; + PersistenceId = $"drawsession-{sessionId}"; _drawingActivityPublisher = drawingActivityPublisher.ActorRef; State = new DrawingSessionState(new DrawingSessionId(sessionId)); } diff --git a/src/DrawTogether.Actors/Local/LocalDrawingSessionActor.cs b/src/DrawTogether.Actors/Local/LocalDrawingSessionActor.cs index 9a9577a..d18ec01 100644 --- a/src/DrawTogether.Actors/Local/LocalDrawingSessionActor.cs +++ b/src/DrawTogether.Actors/Local/LocalDrawingSessionActor.cs @@ -4,10 +4,12 @@ using Akka.Hosting; using Akka.Streams; using Akka.Streams.Dsl; +using Akka.Util; using DrawTogether.Actors.Drawings; using DrawTogether.Entities; using DrawTogether.Entities.Drawings; using DrawTogether.Entities.Drawings.Messages; +using DrawTogether.Entities.Users; using static DrawTogether.Actors.Local.LocalPaintProtocol; namespace DrawTogether.Actors.Local; @@ -93,7 +95,7 @@ protected override void OnReceive(object message) { switch (message) { - case AddPointToConnectedStroke or CreateConnectedStroke: + case AddPointToConnectedStroke: _debouncer.Tell(message); break; case IPaintSessionMessage paintMessage: @@ -140,30 +142,36 @@ protected override void OnReceive(object message) break; } } + + private readonly int _randomSeed = Random.Shared.Next(); + private int _strokeIdCounter = 0; - private List TransmitActions(IReadOnlyList actions) + private int NextStrokeId(UserId userId) { - var createActions = actions.Where(a => a is CreateConnectedStroke) - .Select(a => ((CreateConnectedStroke)a).ConnectedStroke) - .ToArray(); - - var addActions = actions.OfType().ToArray(); + return MurmurHash.StringHash(userId.IdentityName) + _randomSeed + _strokeIdCounter++; + } + + private List TransmitActions(IReadOnlyList actions) + { + + // group all the actions by user + var userActions = actions.GroupBy(a => a.UserId).ToList(); - _log.Info("BATCHED {0} create actions and {1} add actions", createActions.Length, addActions.Length); + _log.Info("BATCHED {0} create actions from {1} users", actions.Count, userActions.Count); var drawSessionCommands = new List(); - foreach (var create in createActions) + foreach (var userStuff in userActions) { - var allPoints = create.Points.ToList(); - - foreach (var a in addActions.Where(a => a.StrokeId == create.Id)) + var userId = userStuff.Key; + var connectedStroke = new ConnectedStroke(new StrokeId(NextStrokeId(userId))) { - allPoints.Add(a.Point); - } - - drawSessionCommands.Add(new DrawingSessionCommands.AddStroke(_drawingSessionId, - create with { Points = allPoints })); + Points = userStuff.Select(a => a.Point).ToList(), + StrokeColor = userStuff.First().StrokeColor, + StrokeWidth = userStuff.First().StrokeWidth + }; + + drawSessionCommands.Add(new DrawingSessionCommands.AddStroke(_drawingSessionId, connectedStroke)); } return drawSessionCommands; @@ -177,11 +185,11 @@ protected override void PreStart() AttemptToSubscribe(); Context.SetReceiveTimeout(TimeSpan.FromMinutes(2)); - var (sourceRef, source) = Source.ActorRef(1000, OverflowStrategy.DropHead) + var (sourceRef, source) = Source.ActorRef(1000, OverflowStrategy.DropHead) .PreMaterialize(_materializer); _debouncer = sourceRef; - source.GroupedWithin(100, TimeSpan.FromMilliseconds(75)) + source.GroupedWithin(10, TimeSpan.FromMilliseconds(75)) .Select(c => TransmitActions(c.ToList())) .SelectMany(c => c) .SelectAsync(1, async c => diff --git a/src/DrawTogether.Actors/Local/LocalPaintProtocol.cs b/src/DrawTogether.Actors/Local/LocalPaintProtocol.cs index dce0c78..5e18a73 100644 --- a/src/DrawTogether.Actors/Local/LocalPaintProtocol.cs +++ b/src/DrawTogether.Actors/Local/LocalPaintProtocol.cs @@ -1,4 +1,5 @@ -using DrawTogether.Entities.Drawings; +using DrawTogether.Entities; +using DrawTogether.Entities.Drawings; using DrawTogether.Entities.Drawings.Messages; using DrawTogether.Entities.Users; @@ -28,25 +29,17 @@ public sealed class LeavePaintSession(DrawingSessionId drawingSessionId, UserId public sealed class AddPointToConnectedStroke( Point point, - StrokeId strokeId, DrawingSessionId drawingSessionId, - UserId userId) + UserId userId, GreaterThanZeroInteger strokeWidth, Color strokeColor) : IPaintSessionMessage { public Point Point { get; } = point; - - public StrokeId StrokeId { get; } = strokeId; - public DrawingSessionId DrawingSessionId { get; } = drawingSessionId; - public UserId UserId { get; } = userId; - } - - public sealed class CreateConnectedStroke(DrawingSessionId drawingSessionId, UserId userId, ConnectedStroke connectedStroke) - : IPaintSessionMessage - { - public StrokeId StrokeId { get; } = connectedStroke.Id; - public ConnectedStroke ConnectedStroke { get; } = connectedStroke; public DrawingSessionId DrawingSessionId { get; } = drawingSessionId; public UserId UserId { get; } = userId; + + public GreaterThanZeroInteger StrokeWidth { get; } = strokeWidth; + + public Color StrokeColor { get; } = strokeColor; } } \ No newline at end of file diff --git a/src/DrawTogether/Components/Pages/Home.razor b/src/DrawTogether/Components/Pages/Home.razor index 93f24a2..2c7e227 100644 --- a/src/DrawTogether/Components/Pages/Home.razor +++ b/src/DrawTogether/Components/Pages/Home.razor @@ -78,21 +78,24 @@ else // create subscription async Task RunSubscription() { - var source = Source.ActorRef(100, OverflowStrategy.DropHead); + var source = Source.ActorRef>(100, OverflowStrategy.DropHead); var (srcActor, src) = source.PreMaterialize(ActorSystem); // starts the pump allDrawingsActor.Tell(new DrawingIndexQueries.SubscribeToDrawingSessionUpdates(srcActor)); - await foreach (var item in src.RunAsAsyncEnumerable(ActorSystem).WithCancellation(_shutdownCts.Token)) + await foreach (var items in src.RunAsAsyncEnumerable(ActorSystem).WithCancellation(_shutdownCts.Token)) { - if (item.IsRemoved) + foreach (var item in items) { - _drawingActivities.Remove(item.DrawingSessionId); - } - else - { - _drawingActivities[item.DrawingSessionId] = item; + if (item.IsRemoved) + { + _drawingActivities.Remove(item.DrawingSessionId); + } + else + { + _drawingActivities[item.DrawingSessionId] = item; + } } StateHasChanged(); diff --git a/src/DrawTogether/Components/Pages/Paint.razor b/src/DrawTogether/Components/Pages/Paint.razor index 6b8fffc..2a34e17 100644 --- a/src/DrawTogether/Components/Pages/Paint.razor +++ b/src/DrawTogether/Components/Pages/Paint.razor @@ -75,7 +75,6 @@ var authstate = await GetAuthenticationStateAsync.GetAuthenticationStateAsync(); _user = authstate.User; _currentUserId = new UserId(_user.FindFirstValue(ClaimTypes.NameIdentifier) ?? throw new InvalidOperationException("User not authenticated")); - _hashedUserId = MurmurHash.StringHash(_currentUserId.IdentityName); DrawingSessionId = new DrawingSessionId(SessionId); @@ -159,29 +158,8 @@ private string mousePointerMessage = "foo"; private string blazorHubDebugMessage = "bar"; - private readonly int _randomSeed = Random.Shared.Next(); - private int _strokeIdCounter = 0; - private int _hashedUserId; - - private int NextStrokeId() - { - return _hashedUserId + _randomSeed + _strokeIdCounter++; - } - - private StrokeId CurrentStrokeId { get; set; } - private void CursorDown(MouseEventArgs e) { - CurrentStrokeId = new StrokeId(NextStrokeId()); - var newStroke = new ConnectedStroke(CurrentStrokeId) - { - StrokeColor = _color, - StrokeWidth = _cursorSize, - Points = new List { new(e.OffsetX, e.OffsetY) } - }; - - _localActorHandle.Tell(new LocalPaintProtocol.CreateConnectedStroke(DrawingSessionId, _currentUserId, newStroke)); - if (e.Buttons == 1) { OnNext(new Point(e.OffsetX, e.OffsetY)); @@ -190,7 +168,7 @@ private void OnNext(Point point) { - _localActorHandle.Tell(new LocalPaintProtocol.AddPointToConnectedStroke(point, CurrentStrokeId, DrawingSessionId, _currentUserId)); + _localActorHandle.Tell(new LocalPaintProtocol.AddPointToConnectedStroke(point, DrawingSessionId!, _currentUserId!, _cursorSize, _color)); } private void CursorMove(MouseEventArgs e)