Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Brotli decompress incoming Websocket messages #26

Merged
merged 1 commit into from
Mar 29, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 13 additions & 10 deletions Scripts/SpacetimeDBClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
using System.Collections.Generic;
using System.Diagnostics;
using System.IO;
using System.IO.Compression;
using System.Linq;
using System.Net.WebSockets;
using System.Reflection;
Expand Down Expand Up @@ -84,7 +85,7 @@ public delegate void RowUpdate(string tableName, TableOp op, object oldValue, ob
/// Invoked when a subscription is about to start being processed. This is called even before OnBeforeDelete.
/// </summary>
public event Action onBeforeSubscriptionApplied;

/// <summary>
/// Invoked when the local client cache is updated as a result of changes made to the subscription queries.
/// </summary>
Expand Down Expand Up @@ -222,7 +223,7 @@ protected SpacetimeDBClient(ISpacetimeDBLogger loggerToUse)
var reducerType = FindReducerType();
if (reducerType != null)
{
// cache all our reducer events by their function name
// cache all our reducer events by their function name
foreach (var methodInfo in reducerType.GetMethods())
{
if (methodInfo.GetCustomAttribute<ReducerCallbackAttribute>() is
Expand Down Expand Up @@ -292,7 +293,9 @@ void PreProcessMessages()
PreProcessedMessage PreProcessMessage(byte[] bytes, DateTime timestamp)
{
var dbOps = new List<DbOp>();
var message = Message.Parser.ParseFrom(bytes);
using var compressedStream = new MemoryStream(bytes);
using var decompressedStream = new BrotliStream(compressedStream, CompressionMode.Decompress);
var message = Message.Parser.ParseFrom(decompressedStream);
using var stream = new MemoryStream();
using var reader = new BinaryReader(stream);

Expand Down Expand Up @@ -528,7 +531,7 @@ struct ProcessedMessage
}

// The message that has been preprocessed and has had its state diff calculated

private BlockingCollection<ProcessedMessage> _stateDiffMessages = new BlockingCollection<ProcessedMessage>();
private CancellationTokenSource _stateDiffCancellationTokenSource = new CancellationTokenSource();
private CancellationToken _stateDiffCancellationToken;
Expand Down Expand Up @@ -567,7 +570,7 @@ void ExecuteStateDiff()
{
continue;
}

if (!hashSet.Contains(rowBytes))
{
// This is a row that we had before, but we do not have it now.
Expand Down Expand Up @@ -640,7 +643,7 @@ public void Connect(string token, string uri, string addressOrName)

private void OnMessageProcessComplete(Message message, DateTime timestamp, List<DbOp> dbOps)
{

switch (message.TypeCase)
{
case Message.TypeOneofCase.SubscriptionUpdate:
Expand All @@ -663,10 +666,10 @@ private void OnMessageProcessComplete(Message message, DateTime timestamp, List<
}
}
}

break;
}

switch (message.TypeCase)
{
case Message.TypeOneofCase.SubscriptionUpdate:
Expand Down Expand Up @@ -754,7 +757,7 @@ void InternalInsertCallback(DbOp op)
op.op = TableOp.NoChange;
dbOps[i] = op;
}

if (dbOps[i].table.InsertEntry(update.insertedBytes, update.rowValue))
{
InternalInsertCallback(update);
Expand Down Expand Up @@ -1005,7 +1008,7 @@ struct ReducerRequest
{
public string fn;
}

public void InternalCallReducer(string json)
{
if (!webSocket.IsConnected)
Expand Down