diff --git a/Scripts/SpacetimeDBClient.cs b/Scripts/SpacetimeDBClient.cs index c57c90e..4c1cdbc 100644 --- a/Scripts/SpacetimeDBClient.cs +++ b/Scripts/SpacetimeDBClient.cs @@ -3,6 +3,7 @@ using System.Collections.Generic; using System.Diagnostics; using System.IO; +using System.IO.Compression; using System.Linq; using System.Net.WebSockets; using System.Reflection; @@ -84,7 +85,7 @@ public delegate void RowUpdate(string tableName, TableOp op, object oldValue, ob /// Invoked when a subscription is about to start being processed. This is called even before OnBeforeDelete. /// public event Action onBeforeSubscriptionApplied; - + /// /// Invoked when the local client cache is updated as a result of changes made to the subscription queries. /// @@ -222,7 +223,7 @@ protected SpacetimeDBClient(ISpacetimeDBLogger loggerToUse) var reducerType = FindReducerType(); if (reducerType != null) { - // cache all our reducer events by their function name + // cache all our reducer events by their function name foreach (var methodInfo in reducerType.GetMethods()) { if (methodInfo.GetCustomAttribute() is @@ -292,7 +293,9 @@ void PreProcessMessages() PreProcessedMessage PreProcessMessage(byte[] bytes, DateTime timestamp) { var dbOps = new List(); - var message = Message.Parser.ParseFrom(bytes); + using var compressedStream = new MemoryStream(bytes); + using var decompressedStream = new BrotliStream(compressedStream, CompressionMode.Decompress); + var message = Message.Parser.ParseFrom(decompressedStream); using var stream = new MemoryStream(); using var reader = new BinaryReader(stream); @@ -528,7 +531,7 @@ struct ProcessedMessage } // The message that has been preprocessed and has had its state diff calculated - + private BlockingCollection _stateDiffMessages = new BlockingCollection(); private CancellationTokenSource _stateDiffCancellationTokenSource = new CancellationTokenSource(); private CancellationToken _stateDiffCancellationToken; @@ -567,7 +570,7 @@ void ExecuteStateDiff() { continue; } - + if (!hashSet.Contains(rowBytes)) { // This is a row that we had before, but we do not have it now. @@ -640,7 +643,7 @@ public void Connect(string token, string uri, string addressOrName) private void OnMessageProcessComplete(Message message, DateTime timestamp, List dbOps) { - + switch (message.TypeCase) { case Message.TypeOneofCase.SubscriptionUpdate: @@ -663,10 +666,10 @@ private void OnMessageProcessComplete(Message message, DateTime timestamp, List< } } } - + break; } - + switch (message.TypeCase) { case Message.TypeOneofCase.SubscriptionUpdate: @@ -754,7 +757,7 @@ void InternalInsertCallback(DbOp op) op.op = TableOp.NoChange; dbOps[i] = op; } - + if (dbOps[i].table.InsertEntry(update.insertedBytes, update.rowValue)) { InternalInsertCallback(update); @@ -1005,7 +1008,7 @@ struct ReducerRequest { public string fn; } - + public void InternalCallReducer(string json) { if (!webSocket.IsConnected)