-
Notifications
You must be signed in to change notification settings - Fork 14
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fixed stroke generation #32
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -4,10 +4,12 @@ | |
using Akka.Hosting; | ||
using Akka.Streams; | ||
using Akka.Streams.Dsl; | ||
using Akka.Util; | ||
using DrawTogether.Actors.Drawings; | ||
using DrawTogether.Entities; | ||
using DrawTogether.Entities.Drawings; | ||
using DrawTogether.Entities.Drawings.Messages; | ||
using DrawTogether.Entities.Users; | ||
using static DrawTogether.Actors.Local.LocalPaintProtocol; | ||
|
||
namespace DrawTogether.Actors.Local; | ||
|
@@ -93,7 +95,7 @@ protected override void OnReceive(object message) | |
{ | ||
switch (message) | ||
{ | ||
case AddPointToConnectedStroke or CreateConnectedStroke: | ||
case AddPointToConnectedStroke: | ||
_debouncer.Tell(message); | ||
break; | ||
case IPaintSessionMessage paintMessage: | ||
|
@@ -140,30 +142,36 @@ protected override void OnReceive(object message) | |
break; | ||
} | ||
} | ||
|
||
private readonly int _randomSeed = Random.Shared.Next(); | ||
private int _strokeIdCounter = 0; | ||
|
||
private List<IDrawingSessionCommand> TransmitActions(IReadOnlyList<IPaintSessionMessage> actions) | ||
private int NextStrokeId(UserId userId) | ||
{ | ||
var createActions = actions.Where(a => a is CreateConnectedStroke) | ||
.Select(a => ((CreateConnectedStroke)a).ConnectedStroke) | ||
.ToArray(); | ||
|
||
var addActions = actions.OfType<AddPointToConnectedStroke>().ToArray(); | ||
return MurmurHash.StringHash(userId.IdentityName) + _randomSeed + _strokeIdCounter++; | ||
} | ||
|
||
private List<IDrawingSessionCommand> TransmitActions(IReadOnlyList<AddPointToConnectedStroke> actions) | ||
{ | ||
|
||
// group all the actions by user | ||
var userActions = actions.GroupBy(a => a.UserId).ToList(); | ||
|
||
_log.Info("BATCHED {0} create actions and {1} add actions", createActions.Length, addActions.Length); | ||
_log.Info("BATCHED {0} create actions from {1} users", actions.Count, userActions.Count); | ||
|
||
var drawSessionCommands = new List<IDrawingSessionCommand>(); | ||
|
||
foreach (var create in createActions) | ||
foreach (var userStuff in userActions) | ||
{ | ||
var allPoints = create.Points.ToList(); | ||
|
||
foreach (var a in addActions.Where(a => a.StrokeId == create.Id)) | ||
var userId = userStuff.Key; | ||
var connectedStroke = new ConnectedStroke(new StrokeId(NextStrokeId(userId))) | ||
{ | ||
allPoints.Add(a.Point); | ||
} | ||
|
||
drawSessionCommands.Add(new DrawingSessionCommands.AddStroke(_drawingSessionId, | ||
create with { Points = allPoints })); | ||
Points = userStuff.Select(a => a.Point).ToList(), | ||
StrokeColor = userStuff.First().StrokeColor, | ||
StrokeWidth = userStuff.First().StrokeWidth | ||
}; | ||
|
||
drawSessionCommands.Add(new DrawingSessionCommands.AddStroke(_drawingSessionId, connectedStroke)); | ||
} | ||
|
||
return drawSessionCommands; | ||
|
@@ -177,11 +185,11 @@ protected override void PreStart() | |
AttemptToSubscribe(); | ||
|
||
Context.SetReceiveTimeout(TimeSpan.FromMinutes(2)); | ||
var (sourceRef, source) = Source.ActorRef<IPaintSessionMessage>(1000, OverflowStrategy.DropHead) | ||
var (sourceRef, source) = Source.ActorRef<AddPointToConnectedStroke>(1000, OverflowStrategy.DropHead) | ||
.PreMaterialize(_materializer); | ||
|
||
_debouncer = sourceRef; | ||
source.GroupedWithin(100, TimeSpan.FromMilliseconds(75)) | ||
source.GroupedWithin(10, TimeSpan.FromMilliseconds(75)) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Reduce the size of stroke chunks |
||
.Select(c => TransmitActions(c.ToList())) | ||
.SelectMany(c => c) | ||
.SelectAsync(1, async c => | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,4 +1,5 @@ | ||
using DrawTogether.Entities.Drawings; | ||
using DrawTogether.Entities; | ||
using DrawTogether.Entities.Drawings; | ||
using DrawTogether.Entities.Drawings.Messages; | ||
using DrawTogether.Entities.Users; | ||
|
||
|
@@ -28,25 +29,17 @@ public sealed class LeavePaintSession(DrawingSessionId drawingSessionId, UserId | |
|
||
public sealed class AddPointToConnectedStroke( | ||
Point point, | ||
StrokeId strokeId, | ||
DrawingSessionId drawingSessionId, | ||
UserId userId) | ||
UserId userId, GreaterThanZeroInteger strokeWidth, Color strokeColor) | ||
: IPaintSessionMessage | ||
{ | ||
public Point Point { get; } = point; | ||
|
||
public StrokeId StrokeId { get; } = strokeId; | ||
public DrawingSessionId DrawingSessionId { get; } = drawingSessionId; | ||
public UserId UserId { get; } = userId; | ||
} | ||
|
||
public sealed class CreateConnectedStroke(DrawingSessionId drawingSessionId, UserId userId, ConnectedStroke connectedStroke) | ||
: IPaintSessionMessage | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Don't have the Blazor UI decide how or when to group strokes - have the debouncer do it. |
||
{ | ||
public StrokeId StrokeId { get; } = connectedStroke.Id; | ||
|
||
public ConnectedStroke ConnectedStroke { get; } = connectedStroke; | ||
public DrawingSessionId DrawingSessionId { get; } = drawingSessionId; | ||
public UserId UserId { get; } = userId; | ||
|
||
public GreaterThanZeroInteger StrokeWidth { get; } = strokeWidth; | ||
|
||
public Color StrokeColor { get; } = strokeColor; | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -78,21 +78,24 @@ else | |
// create subscription | ||
async Task RunSubscription() | ||
{ | ||
var source = Source.ActorRef<DrawingActivityUpdate>(100, OverflowStrategy.DropHead); | ||
var source = Source.ActorRef<List<DrawingActivityUpdate>>(100, OverflowStrategy.DropHead); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Fix bug with streaming updates on sessions |
||
var (srcActor, src) = source.PreMaterialize(ActorSystem); | ||
|
||
// starts the pump | ||
allDrawingsActor.Tell(new DrawingIndexQueries.SubscribeToDrawingSessionUpdates(srcActor)); | ||
|
||
await foreach (var item in src.RunAsAsyncEnumerable(ActorSystem).WithCancellation(_shutdownCts.Token)) | ||
await foreach (var items in src.RunAsAsyncEnumerable(ActorSystem).WithCancellation(_shutdownCts.Token)) | ||
{ | ||
if (item.IsRemoved) | ||
foreach (var item in items) | ||
{ | ||
_drawingActivities.Remove(item.DrawingSessionId); | ||
} | ||
else | ||
{ | ||
_drawingActivities[item.DrawingSessionId] = item; | ||
if (item.IsRemoved) | ||
{ | ||
_drawingActivities.Remove(item.DrawingSessionId); | ||
} | ||
else | ||
{ | ||
_drawingActivities[item.DrawingSessionId] = item; | ||
} | ||
} | ||
|
||
StateHasChanged(); | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Make persistence ids more obvious from looking at the database