diff --git a/examples~/quickstart/client/module_bindings/_Globals/SpacetimeDBClient.cs b/examples~/quickstart/client/module_bindings/_Globals/SpacetimeDBClient.cs index c0fb266b..726f56d0 100644 --- a/examples~/quickstart/client/module_bindings/_Globals/SpacetimeDBClient.cs +++ b/examples~/quickstart/client/module_bindings/_Globals/SpacetimeDBClient.cs @@ -63,13 +63,14 @@ internal UserHandle() public sealed class RemoteReducers : RemoteBase { - internal RemoteReducers(DbConnection conn) : base(conn) {} + internal RemoteReducers(DbConnection conn, SetReducerFlags SetReducerFlags) : base(conn) { this.SetCallReducerFlags = SetReducerFlags; } + internal readonly SetReducerFlags SetCallReducerFlags; public delegate void SendMessageHandler(EventContext ctx, string text); public event SendMessageHandler? OnSendMessage; public void SendMessage(string text) { - conn.InternalCallReducer(new SendMessage { Text = text }); + conn.InternalCallReducer(new SendMessage { Text = text }, this.SetCallReducerFlags.SendMessageFlags); } public bool InvokeSendMessage(EventContext ctx, SendMessage args) @@ -86,7 +87,7 @@ public bool InvokeSendMessage(EventContext ctx, SendMessage args) public void SetName(string name) { - conn.InternalCallReducer(new SetName { Name = name }); + conn.InternalCallReducer(new SetName { Name = name }, this.SetCallReducerFlags.SetNameFlags); } public bool InvokeSetName(EventContext ctx, SetName args) @@ -100,14 +101,25 @@ public bool InvokeSetName(EventContext ctx, SetName args) } } + public sealed class SetReducerFlags + { + internal SetReducerFlags() { } + internal CallReducerFlags SendMessageFlags; + public void SendMessage(CallReducerFlags flags) { this.SendMessageFlags = flags; } + internal CallReducerFlags SetNameFlags; + public void SetName(CallReducerFlags flags) { this.SetNameFlags = flags; } + } + public partial record EventContext : DbContext, IEventContext { public readonly RemoteReducers Reducers; + public readonly SetReducerFlags SetReducerFlags; public readonly Event Event; internal EventContext(DbConnection conn, Event reducerEvent) : base(conn.Db) { Reducers = conn.Reducers; + SetReducerFlags = conn.SetReducerFlags; Event = reducerEvent; } } @@ -124,10 +136,12 @@ public class DbConnection : DbConnectionBase { public readonly RemoteTables Db = new(); public readonly RemoteReducers Reducers; + public readonly SetReducerFlags SetReducerFlags; public DbConnection() { - Reducers = new(this); + SetReducerFlags = new(); + Reducers = new(this, this.SetReducerFlags); clientDB.AddTable("message", Db.Message); clientDB.AddTable("user", Db.User); @@ -136,7 +150,8 @@ public DbConnection() protected override Reducer ToReducer(TransactionUpdate update) { var encodedArgs = update.ReducerCall.Args; - return update.ReducerCall.ReducerName switch { + return update.ReducerCall.ReducerName switch + { "send_message" => new Reducer.SendMessage(BSATNHelpers.Decode(encodedArgs)), "set_name" => new Reducer.SetName(BSATNHelpers.Decode(encodedArgs)), "" => new Reducer.StdbNone(default), @@ -153,7 +168,8 @@ protected override IEventContext ToEventContext(Event reducerEvent) => protected override bool Dispatch(IEventContext context, Reducer reducer) { var eventContext = (EventContext)context; - return reducer switch { + return reducer switch + { Reducer.SendMessage(var args) => Reducers.InvokeSendMessage(eventContext, args), Reducer.SetName(var args) => Reducers.InvokeSetName(eventContext, args), Reducer.StdbNone or diff --git a/src/CallReducerFlags.cs b/src/CallReducerFlags.cs new file mode 100644 index 00000000..0a3ea363 --- /dev/null +++ b/src/CallReducerFlags.cs @@ -0,0 +1,9 @@ +namespace SpacetimeDB +{ + public enum CallReducerFlags : byte + { + // This is the default. + FullUpdate = 0, + NoSuccessNotify = 1, + } +} diff --git a/src/CallReducerFlags.cs.meta b/src/CallReducerFlags.cs.meta new file mode 100644 index 00000000..dbfee1f8 --- /dev/null +++ b/src/CallReducerFlags.cs.meta @@ -0,0 +1,11 @@ +fileFormatVersion: 2 +guid: c0567f0d188b749659b7291c277a2b17 +MonoImporter: + externalObjects: {} + serializedVersion: 2 + defaultReferences: [] + executionOrder: 0 + icon: {instanceID: 0} + userData: + assetBundleName: + assetBundleVariant: diff --git a/src/Compression.cs.meta b/src/Compression.cs.meta index eef0325c..31f1b190 100644 --- a/src/Compression.cs.meta +++ b/src/Compression.cs.meta @@ -1,11 +1,11 @@ fileFormatVersion: 2 -guid: 03fdc211a87474eeba9f03495eab491e +guid: 78dd2c1276a6e4e859d2c4fad671c2c7 MonoImporter: externalObjects: {} serializedVersion: 2 defaultReferences: [] executionOrder: 0 icon: {instanceID: 0} - userData: - assetBundleName: - assetBundleVariant: + userData: + assetBundleName: + assetBundleVariant: diff --git a/src/SpacetimeDB/ClientApi/BsatnRowList.cs b/src/SpacetimeDB/ClientApi/BsatnRowList.cs index 5df43d6b..1f17c790 100644 --- a/src/SpacetimeDB/ClientApi/BsatnRowList.cs +++ b/src/SpacetimeDB/ClientApi/BsatnRowList.cs @@ -7,7 +7,6 @@ using System; using SpacetimeDB; using System.Collections.Generic; -using System.Linq; using System.Runtime.Serialization; namespace SpacetimeDB.ClientApi diff --git a/src/SpacetimeDB/ClientApi/CallReducer.cs b/src/SpacetimeDB/ClientApi/CallReducer.cs index 7c3ce90e..212d9bb3 100644 --- a/src/SpacetimeDB/ClientApi/CallReducer.cs +++ b/src/SpacetimeDB/ClientApi/CallReducer.cs @@ -7,7 +7,6 @@ using System; using SpacetimeDB; using System.Collections.Generic; -using System.Linq; using System.Runtime.Serialization; namespace SpacetimeDB.ClientApi @@ -22,16 +21,20 @@ public partial class CallReducer public byte[] Args; [DataMember(Name = "request_id")] public uint RequestId; + [DataMember(Name = "flags")] + public byte Flags; public CallReducer( string Reducer, byte[] Args, - uint RequestId + uint RequestId, + byte Flags ) { this.Reducer = Reducer; this.Args = Args; this.RequestId = RequestId; + this.Flags = Flags; } public CallReducer() diff --git a/src/SpacetimeDB/ClientApi/OneOffQuery.cs b/src/SpacetimeDB/ClientApi/OneOffQuery.cs index 5ee2c66d..11370a54 100644 --- a/src/SpacetimeDB/ClientApi/OneOffQuery.cs +++ b/src/SpacetimeDB/ClientApi/OneOffQuery.cs @@ -7,7 +7,6 @@ using System; using SpacetimeDB; using System.Collections.Generic; -using System.Linq; using System.Runtime.Serialization; namespace SpacetimeDB.ClientApi diff --git a/src/SpacetimeDB/ClientApi/OneOffQueryResponse.cs b/src/SpacetimeDB/ClientApi/OneOffQueryResponse.cs index f539516f..2bdb8e1a 100644 --- a/src/SpacetimeDB/ClientApi/OneOffQueryResponse.cs +++ b/src/SpacetimeDB/ClientApi/OneOffQueryResponse.cs @@ -7,7 +7,6 @@ using System; using SpacetimeDB; using System.Collections.Generic; -using System.Linq; using System.Runtime.Serialization; namespace SpacetimeDB.ClientApi diff --git a/src/SpacetimeDB/ClientApi/QueryUpdate.cs b/src/SpacetimeDB/ClientApi/QueryUpdate.cs index 858dea65..25acc28e 100644 --- a/src/SpacetimeDB/ClientApi/QueryUpdate.cs +++ b/src/SpacetimeDB/ClientApi/QueryUpdate.cs @@ -7,7 +7,6 @@ using System; using SpacetimeDB; using System.Collections.Generic; -using System.Linq; using System.Runtime.Serialization; namespace SpacetimeDB.ClientApi diff --git a/src/SpacetimeDB/ClientApi/ReducerCallInfo.cs b/src/SpacetimeDB/ClientApi/ReducerCallInfo.cs index 0ca52252..f8acaffd 100644 --- a/src/SpacetimeDB/ClientApi/ReducerCallInfo.cs +++ b/src/SpacetimeDB/ClientApi/ReducerCallInfo.cs @@ -7,7 +7,6 @@ using System; using SpacetimeDB; using System.Collections.Generic; -using System.Linq; using System.Runtime.Serialization; namespace SpacetimeDB.ClientApi diff --git a/src/SpacetimeDB/ClientApi/ServerMessage.cs b/src/SpacetimeDB/ClientApi/ServerMessage.cs index 46b5be3a..f66fdd98 100644 --- a/src/SpacetimeDB/ClientApi/ServerMessage.cs +++ b/src/SpacetimeDB/ClientApi/ServerMessage.cs @@ -13,6 +13,7 @@ namespace SpacetimeDB.ClientApi public partial record ServerMessage : SpacetimeDB.TaggedEnum<( SpacetimeDB.ClientApi.InitialSubscription InitialSubscription, SpacetimeDB.ClientApi.TransactionUpdate TransactionUpdate, + SpacetimeDB.ClientApi.TransactionUpdateLight TransactionUpdateLight, SpacetimeDB.ClientApi.IdentityToken IdentityToken, SpacetimeDB.ClientApi.OneOffQueryResponse OneOffQueryResponse )>; diff --git a/src/SpacetimeDB/ClientApi/TransactionUpdateLight.cs b/src/SpacetimeDB/ClientApi/TransactionUpdateLight.cs new file mode 100644 index 00000000..3a3ea6b5 --- /dev/null +++ b/src/SpacetimeDB/ClientApi/TransactionUpdateLight.cs @@ -0,0 +1,38 @@ +// THIS FILE IS AUTOMATICALLY GENERATED BY SPACETIMEDB. EDITS TO THIS FILE +// WILL NOT BE SAVED. MODIFY TABLES IN RUST INSTEAD. +// + +#nullable enable + +using System; +using SpacetimeDB; +using System.Collections.Generic; +using System.Runtime.Serialization; + +namespace SpacetimeDB.ClientApi +{ + [SpacetimeDB.Type] + [DataContract] + public partial class TransactionUpdateLight + { + [DataMember(Name = "request_id")] + public uint RequestId; + [DataMember(Name = "update")] + public SpacetimeDB.ClientApi.DatabaseUpdate Update; + + public TransactionUpdateLight( + uint RequestId, + SpacetimeDB.ClientApi.DatabaseUpdate Update + ) + { + this.RequestId = RequestId; + this.Update = Update; + } + + public TransactionUpdateLight() + { + this.Update = new(); + } + + } +} diff --git a/src/SpacetimeDB/ClientApi/TransactionUpdateLight.cs.meta b/src/SpacetimeDB/ClientApi/TransactionUpdateLight.cs.meta new file mode 100644 index 00000000..50cff86d --- /dev/null +++ b/src/SpacetimeDB/ClientApi/TransactionUpdateLight.cs.meta @@ -0,0 +1,11 @@ +fileFormatVersion: 2 +guid: de607af1bdc1244f894fb952d4199473 +MonoImporter: + externalObjects: {} + serializedVersion: 2 + defaultReferences: [] + executionOrder: 0 + icon: {instanceID: 0} + userData: + assetBundleName: + assetBundleVariant: diff --git a/src/SpacetimeDBClient.cs b/src/SpacetimeDBClient.cs index d908763c..c9d7d704 100644 --- a/src/SpacetimeDBClient.cs +++ b/src/SpacetimeDBClient.cs @@ -23,6 +23,7 @@ public sealed class DbConnectionBuilder string? nameOrAddress; string? token; Compression? compression; + bool light; public DbConnection Build() { @@ -34,7 +35,7 @@ public DbConnection Build() { throw new InvalidOperationException("Building DbConnection with a null nameOrAddress. Call WithModuleName() first."); } - conn.Connect(token, uri, nameOrAddress, compression ?? Compression.Brotli); + conn.Connect(token, uri, nameOrAddress, compression ?? Compression.Brotli, light); #if UNITY_5_3_OR_NEWER if (SpacetimeDBNetworkManager._instance != null) { @@ -68,6 +69,12 @@ public DbConnectionBuilder WithCompression(Compression co return this; } + public DbConnectionBuilder WithLightMode(bool light) + { + this.light = light; + return this; + } + public DbConnectionBuilder OnConnect(Action cb) { conn.onConnect += (identity, token) => cb.Invoke(conn, identity, token); @@ -340,18 +347,28 @@ void PreProcessMessages() } } - PreProcessedMessage PreProcessMessage(UnprocessedMessage unprocessed) + IEnumerable<(IRemoteTableHandle, TableUpdate)> GetTables(DatabaseUpdate updates) { - var dbOps = new List(); + foreach (var update in updates.Tables) + { + var tableName = update.TableName; + var table = clientDB.GetTable(tableName); + if (table == null) + { + Log.Error($"Unknown table name: {tableName}"); + continue; + } - var message = DecompressDecodeMessage(unprocessed.bytes); + yield return (table, update); + } + } - ReducerEvent? reducerEvent = default; + (List, Dictionary>?) PreProcessInitialSubscription(InitialSubscription initSub) + { + var dbOps = new List(); // This is all of the inserts Dictionary>? subscriptionInserts = null; - // All row updates that have a primary key, this contains inserts, deletes and updates - var primaryKeyChanges = new Dictionary<(System.Type tableType, object primaryKeyValue), DbOp>(); HashSet GetInsertHashSet(System.Type tableType, int tableSize) { @@ -364,54 +381,168 @@ HashSet GetInsertHashSet(System.Type tableType, int tableSize) return hashSet; } - switch (message) + int cap = initSub.DatabaseUpdate.Tables.Sum(a => (int)a.NumRows); + subscriptionInserts = new(capacity: cap); + + // First apply all of the state + foreach (var (table, update) in GetTables(initSub.DatabaseUpdate)) { - case ServerMessage.InitialSubscription(var initialSubscription): - int cap = initialSubscription.DatabaseUpdate.Tables.Sum(a => (int)a.NumRows); - subscriptionInserts = new(capacity: cap); + var hashSet = GetInsertHashSet(table.ClientTableType, (int)update.NumRows); + + foreach (var cqu in update.Updates) + { + var qu = DecompressDecodeQueryUpdate(cqu); + if (BsatnRowListCount(qu.Deletes) != 0) + { + Log.Warn("Non-insert during a subscription update!"); + } - // First apply all of the state - foreach (var update in initialSubscription.DatabaseUpdate.Tables) + foreach (var bin in BsatnRowListIter(qu.Inserts)) { - var tableName = update.TableName; - var table = clientDB.GetTable(tableName); - if (table == null) + if (!hashSet.Add(bin)) { - Log.Error($"Unknown table name: {tableName}"); + // Ignore duplicate inserts in the same subscription update. continue; } - var hashSet = GetInsertHashSet(table.ClientTableType, (int)update.NumRows); + var obj = table.DecodeValue(bin); + var op = new DbOp + { + table = table, + insert = new(obj, bin), + }; + + dbOps.Add(op); + } + } + } + + return (dbOps, subscriptionInserts); + } + + List PreProcessDatabaseUpdate(DatabaseUpdate updates) + { + var dbOps = new List(); - foreach (var cqu in update.Updates) + // All row updates that have a primary key, this contains inserts, deletes and updates + var primaryKeyChanges = new Dictionary<(System.Type tableType, object primaryKeyValue), DbOp>(); + + // First apply all of the state + foreach (var (table, update) in GetTables(updates)) + { + foreach (var cqu in update.Updates) + { + var qu = DecompressDecodeQueryUpdate(cqu); + foreach (var row in BsatnRowListIter(qu.Inserts)) + { + var op = new DbOp { table = table, insert = Decode(table, row, out var pk) }; + if (pk != null) { - var qu = DecompressDecodeQueryUpdate(cqu); - if (BsatnRowListCount(qu.Deletes) != 0) + // Compound key that we use for lookup. + // Consists of type of the table (for faster comparison that string names) + actual primary key of the row. + var key = (table.ClientTableType, pk); + + if (primaryKeyChanges.TryGetValue(key, out var oldOp)) { - Log.Warn("Non-insert during a subscription update!"); + if ((op.insert is not null && oldOp.insert is not null) || (op.delete is not null && oldOp.delete is not null)) + { + Log.Warn($"Update with the same primary key was applied multiple times! tableName={update.TableName}"); + // TODO(jdetter): Is this a correctable error? This would be a major error on the + // SpacetimeDB side. + continue; + } + + var (insertOp, deleteOp) = op.insert is not null ? (op, oldOp) : (oldOp, op); + op = new DbOp + { + table = insertOp.table, + delete = deleteOp.delete, + insert = insertOp.insert, + }; } + primaryKeyChanges[key] = op; + } + else + { + dbOps.Add(op); + } + } - foreach (var bin in BsatnRowListIter(qu.Inserts)) + foreach (var row in BsatnRowListIter(qu.Deletes)) + { + var op = new DbOp { table = table, delete = Decode(table, row, out var pk) }; + if (pk != null) + { + // Compound key that we use for lookup. + // Consists of type of the table (for faster comparison that string names) + actual primary key of the row. + var key = (table.ClientTableType, pk); + + if (primaryKeyChanges.TryGetValue(key, out var oldOp)) { - if (!hashSet.Add(bin)) + if ((op.insert is not null && oldOp.insert is not null) || (op.delete is not null && oldOp.delete is not null)) { - // Ignore duplicate inserts in the same subscription update. + Log.Warn($"Update with the same primary key was applied multiple times! tableName={update.TableName}"); + // TODO(jdetter): Is this a correctable error? This would be a major error on the + // SpacetimeDB side. continue; } - var obj = table.DecodeValue(bin); - var op = new DbOp + var (insertOp, deleteOp) = op.insert is not null ? (op, oldOp) : (oldOp, op); + op = new DbOp { - table = table, - insert = new(obj, bin), + table = insertOp.table, + delete = deleteOp.delete, + insert = insertOp.insert, }; - - dbOps.Add(op); } + primaryKeyChanges[key] = op; + } + else + { + dbOps.Add(op); } } - break; + } + } + + // Combine primary key updates and non-primary key updates + dbOps.AddRange(primaryKeyChanges.Values); + + return dbOps; + } + + void PreProcessOneOffQuery(OneOffQueryResponse resp) + { + /// This case does NOT produce a list of DBOps, because it should not modify the client cache state! + var messageId = new Guid(resp.MessageId); + if (!waitingOneOffQueries.Remove(messageId, out var resultSource)) + { + Log.Error($"Response to unknown one-off-query: {messageId}"); + return; + } + + resultSource.SetResult(resp); + } + + PreProcessedMessage PreProcessMessage(UnprocessedMessage unprocessed) + { + var dbOps = new List(); + + var message = DecompressDecodeMessage(unprocessed.bytes); + + ReducerEvent? reducerEvent = default; + + // This is all of the inserts + Dictionary>? subscriptionInserts = null; + + switch (message) + { + case ServerMessage.InitialSubscription(var initSub): + var (ops, subIns) = PreProcessInitialSubscription(initSub); + dbOps = ops; + subscriptionInserts = subIns; + break; case ServerMessage.TransactionUpdate(var transactionUpdate): // Convert the generic event arguments in to a domain specific event object try @@ -437,111 +568,16 @@ HashSet GetInsertHashSet(System.Type tableType, int tableSize) if (transactionUpdate.Status is UpdateStatus.Committed(var committed)) { - primaryKeyChanges = new(); - - // First apply all of the state - foreach (var update in committed.Tables) - { - var tableName = update.TableName; - var table = clientDB.GetTable(tableName); - if (table == null) - { - Log.Error($"Unknown table name: {tableName}"); - continue; - } - - foreach (var cqu in update.Updates) - { - var qu = DecompressDecodeQueryUpdate(cqu); - foreach (var row in BsatnRowListIter(qu.Inserts)) - { - var op = new DbOp { table = table, insert = Decode(table, row, out var pk) }; - if (pk != null) - { - // Compound key that we use for lookup. - // Consists of type of the table (for faster comparison that string names) + actual primary key of the row. - var key = (table.ClientTableType, pk); - - if (primaryKeyChanges.TryGetValue(key, out var oldOp)) - { - if ((op.insert is not null && oldOp.insert is not null) || (op.delete is not null && oldOp.delete is not null)) - { - Log.Warn($"Update with the same primary key was applied multiple times! tableName={tableName}"); - // TODO(jdetter): Is this a correctable error? This would be a major error on the - // SpacetimeDB side. - continue; - } - - var (insertOp, deleteOp) = op.insert is not null ? (op, oldOp) : (oldOp, op); - op = new DbOp - { - table = insertOp.table, - delete = deleteOp.delete, - insert = insertOp.insert, - }; - } - primaryKeyChanges[key] = op; - } - else - { - dbOps.Add(op); - } - } - - foreach (var row in BsatnRowListIter(qu.Deletes)) - { - var op = new DbOp { table = table, delete = Decode(table, row, out var pk) }; - if (pk != null) - { - // Compound key that we use for lookup. - // Consists of type of the table (for faster comparison that string names) + actual primary key of the row. - var key = (table.ClientTableType, pk); - - if (primaryKeyChanges.TryGetValue(key, out var oldOp)) - { - if ((op.insert is not null && oldOp.insert is not null) || (op.delete is not null && oldOp.delete is not null)) - { - Log.Warn($"Update with the same primary key was applied multiple times! tableName={tableName}"); - // TODO(jdetter): Is this a correctable error? This would be a major error on the - // SpacetimeDB side. - continue; - } - - var (insertOp, deleteOp) = op.insert is not null ? (op, oldOp) : (oldOp, op); - op = new DbOp - { - table = insertOp.table, - delete = deleteOp.delete, - insert = insertOp.insert, - }; - } - primaryKeyChanges[key] = op; - } - else - { - dbOps.Add(op); - } - } - } - } - - // Combine primary key updates and non-primary key updates - dbOps.AddRange(primaryKeyChanges.Values); + dbOps = PreProcessDatabaseUpdate(committed); } break; + case ServerMessage.TransactionUpdateLight(var update): + dbOps = PreProcessDatabaseUpdate(update.Update); + break; case ServerMessage.IdentityToken(var identityToken): break; case ServerMessage.OneOffQueryResponse(var resp): - /// This case does NOT produce a list of DBOps, because it should not modify the client cache state! - var messageId = new Guid(resp.MessageId); - - if (!waitingOneOffQueries.Remove(messageId, out var resultSource)) - { - Log.Error($"Response to unknown one-off-query: {messageId}"); - break; - } - - resultSource.SetResult(resp); + PreProcessOneOffQuery(resp); break; default: throw new InvalidOperationException(); @@ -603,7 +639,7 @@ public void Disconnect() /// /// URI of the SpacetimeDB server (ex: https://testnet.spacetimedb.com) /// The name or address of the database to connect to - internal void Connect(string? token, string uri, string addressOrName, Compression compression) + internal void Connect(string? token, string uri, string addressOrName, Compression compression, bool light) { isClosing = false; @@ -621,7 +657,7 @@ internal void Connect(string? token, string uri, string addressOrName, Compressi { try { - await webSocket.Connect(token, uri, addressOrName, Address, compression); + await webSocket.Connect(token, uri, addressOrName, Address, compression, light); } catch (Exception e) { @@ -745,6 +781,16 @@ private void OnMessageProcessComplete(PreProcessedMessage preProcessed) } break; } + case ServerMessage.TransactionUpdateLight(var update): + { + stats.ParseMessageTracker.InsertRequest(timestamp, $"type={nameof(ServerMessage.TransactionUpdateLight)}"); + + var eventContext = ToEventContext(new Event.UnknownTransaction()); + OnMessageProcessCompleteUpdate(eventContext, dbOps); + + break; + } + case ServerMessage.TransactionUpdate(var transactionUpdate): { var reducer = transactionUpdate.ReducerCall.ReducerName; @@ -820,7 +866,7 @@ internal void OnMessageReceived(byte[] bytes, DateTime timestamp) => _messageQueue.Add(new UnprocessedMessage { bytes = bytes, timestamp = timestamp }); // TODO: this should become [Obsolete] but for now is used by autogenerated code. - public void InternalCallReducer(T args) + public void InternalCallReducer(T args, CallReducerFlags flags) where T : IReducerArgs, new() { if (!webSocket.IsConnected) @@ -829,14 +875,12 @@ public void InternalCallReducer(T args) return; } - webSocket.Send(new ClientMessage.CallReducer( - new CallReducer - { - RequestId = stats.ReducerRequestTracker.StartTrackingRequest(args.ReducerName), - Reducer = args.ReducerName, - Args = IStructuralReadWrite.ToBytes(args) - } - )); + webSocket.Send(new ClientMessage.CallReducer(new CallReducer( + args.ReducerName, + IStructuralReadWrite.ToBytes(args), + stats.ReducerRequestTracker.StartTrackingRequest(args.ReducerName), + (byte)flags + ))); } void IDbConnection.Subscribe(ISubscriptionHandle handle, string[] querySqls) diff --git a/src/WebSocket.cs b/src/WebSocket.cs index 86f47c70..bf65c84e 100644 --- a/src/WebSocket.cs +++ b/src/WebSocket.cs @@ -51,9 +51,14 @@ public WebSocket(ConnectOptions options) public bool IsConnected { get { return Ws != null && Ws.State == WebSocketState.Open; } } - public async Task Connect(string? auth, string host, string nameOrAddress, Address clientAddress, Compression compression) + public async Task Connect(string? auth, string host, string nameOrAddress, Address clientAddress, Compression compression, bool light) { - var url = new Uri($"{host}/database/subscribe/{nameOrAddress}?client_address={clientAddress}&compression={compression}"); + var uri = $"{host}/database/subscribe/{nameOrAddress}?client_address={clientAddress}&compression={compression}"; + if (light) + { + uri += "&light=true"; + } + var url = new Uri(uri); Ws.Options.AddSubProtocol(_options.Protocol); var source = new CancellationTokenSource(10000); diff --git a/tests~/SnapshotTests.VerifyAllTablesParsed.verified.txt b/tests~/SnapshotTests.VerifyAllTablesParsed.verified.txt index ea57bb0e..e3ae0cdc 100644 --- a/tests~/SnapshotTests.VerifyAllTablesParsed.verified.txt +++ b/tests~/SnapshotTests.VerifyAllTablesParsed.verified.txt @@ -8,6 +8,7 @@ OnInsertUser: { eventContext: { Reducers: {Scrubbed}, + SetReducerFlags: {}, Event: {}, Db: {Scrubbed} }, @@ -20,6 +21,7 @@ OnInsertUser: { eventContext: { Reducers: {Scrubbed}, + SetReducerFlags: {}, Event: { ReducerEvent: { Timestamp: DateTimeOffset_1, @@ -40,6 +42,7 @@ OnUpdateUser: { eventContext: { Reducers: {Scrubbed}, + SetReducerFlags: {}, Event: { ReducerEvent: { Timestamp: DateTimeOffset_2, @@ -68,6 +71,7 @@ }, OnSetName: { Reducers: {Scrubbed}, + SetReducerFlags: {}, Event: { ReducerEvent: { Timestamp: DateTimeOffset_2, @@ -87,6 +91,7 @@ OnInsertMessage: { eventContext: { Reducers: {Scrubbed}, + SetReducerFlags: {}, Event: { ReducerEvent: { Timestamp: DateTimeOffset_3, @@ -111,6 +116,7 @@ }, OnSendMessage: { Reducers: {Scrubbed}, + SetReducerFlags: {}, Event: { ReducerEvent: { Timestamp: DateTimeOffset_3, @@ -130,6 +136,7 @@ OnUpdateUser: { eventContext: { Reducers: {Scrubbed}, + SetReducerFlags: {}, Event: { ReducerEvent: { Timestamp: DateTimeOffset_4, @@ -158,6 +165,7 @@ }, OnSetName: { Reducers: {Scrubbed}, + SetReducerFlags: {}, Event: { ReducerEvent: { Timestamp: DateTimeOffset_4, @@ -177,6 +185,7 @@ OnInsertMessage: { eventContext: { Reducers: {Scrubbed}, + SetReducerFlags: {}, Event: { ReducerEvent: { Timestamp: DateTimeOffset_5, @@ -201,6 +210,7 @@ }, OnSendMessage: { Reducers: {Scrubbed}, + SetReducerFlags: {}, Event: { ReducerEvent: { Timestamp: DateTimeOffset_5, @@ -220,6 +230,7 @@ OnInsertMessage: { eventContext: { Reducers: {Scrubbed}, + SetReducerFlags: {}, Event: { ReducerEvent: { Timestamp: DateTimeOffset_6, @@ -244,6 +255,7 @@ }, OnSendMessage: { Reducers: {Scrubbed}, + SetReducerFlags: {}, Event: { ReducerEvent: { Timestamp: DateTimeOffset_6, @@ -263,6 +275,7 @@ OnUpdateUser: { eventContext: { Reducers: {Scrubbed}, + SetReducerFlags: {}, Event: { ReducerEvent: { Timestamp: DateTimeOffset_7, @@ -289,6 +302,7 @@ OnInsertMessage: { eventContext: { Reducers: {Scrubbed}, + SetReducerFlags: {}, Event: { ReducerEvent: { Timestamp: DateTimeOffset_8, @@ -313,6 +327,7 @@ }, OnSendMessage: { Reducers: {Scrubbed}, + SetReducerFlags: {}, Event: { ReducerEvent: { Timestamp: DateTimeOffset_8, @@ -390,4 +405,4 @@ Max: type=InitialSubscription } } -} \ No newline at end of file +} diff --git a/tools~/gen-client-api.sh b/tools~/gen-client-api.sh old mode 100644 new mode 100755