From c6c4b23d23b104cf686c386b1617a7620321a2bb Mon Sep 17 00:00:00 2001 From: Ingvar Stepanyan Date: Thu, 9 May 2024 11:26:48 -0400 Subject: [PATCH 1/2] NFC: inline insert/delete update handling --- src/SpacetimeDBClient.cs | 279 +++++++++++---------------------------- 1 file changed, 76 insertions(+), 203 deletions(-) diff --git a/src/SpacetimeDBClient.cs b/src/SpacetimeDBClient.cs index bddc4e6c..d2115b9a 100644 --- a/src/SpacetimeDBClient.cs +++ b/src/SpacetimeDBClient.cs @@ -20,14 +20,6 @@ namespace SpacetimeDB { public class SpacetimeDBClient { - public enum TableOp - { - Insert, - Delete, - Update, - NoChange, - } - public class ReducerCallRequest { public string fn; @@ -39,14 +31,23 @@ public class SubscriptionRequest public string subscriptionQuery; } - public struct DbOp + struct DbValue + { + public object value; + public byte[] bytes; + + public DbValue(object value, byte[] bytes) + { + this.value = value; + this.bytes = bytes; + } + } + + struct DbOp { public ClientCache.TableCache table; - public TableOp op; - public object newValue; - public object oldValue; - public byte[] deletedBytes; - public byte[] insertedBytes; + public DbValue? delete; + public DbValue? insert; public AlgebraicValue rowValue; } @@ -326,11 +327,7 @@ HashSet GetInsertHashSet(string tableName, int tableSize) var op = new DbOp { table = table, - deletedBytes = null, - insertedBytes = rowBytes, - op = TableOp.Insert, - newValue = obj, - oldValue = null, + insert = new(obj, rowBytes), rowValue = deserializedRow, }; @@ -380,24 +377,26 @@ HashSet GetInsertHashSet(string tableName, int tableSize) var op = new DbOp { table = table, - deletedBytes = - row.Op == TableRowOperation.Types.OperationType.Delete ? rowBytes : null, - insertedBytes = - row.Op == TableRowOperation.Types.OperationType.Delete ? null : rowBytes, - op = row.Op == TableRowOperation.Types.OperationType.Delete - ? TableOp.Delete - : TableOp.Insert, - newValue = row.Op == TableRowOperation.Types.OperationType.Delete ? null : obj, - oldValue = row.Op == TableRowOperation.Types.OperationType.Delete ? obj : null, rowValue = deserializedRow, }; + var dbValue = new DbValue(obj, rowBytes); + + if (row.Op == TableRowOperation.Types.OperationType.Insert) + { + op.insert = dbValue; + } + else + { + op.delete = dbValue; + } + if (primaryKeyType != null) { var primaryKeyLookup = GetPrimaryKeyLookup(tableName, primaryKeyType); - if (primaryKeyLookup.TryGetValue(primaryKeyValue, out var value)) + if (primaryKeyLookup.TryGetValue(primaryKeyValue, out var oldOp)) { - if (value.op == op.op || value.op == TableOp.Update) + if ((op.insert is not null && oldOp.insert is not null) || (op.delete is not null && oldOp.delete is not null)) { Logger.LogWarning($"Update with the same primary key was " + $"applied multiple times! tableName={tableName}"); @@ -405,23 +404,13 @@ HashSet GetInsertHashSet(string tableName, int tableSize) // SpacetimeDB side. continue; } - - var insertOp = op; - var deleteOp = value; - if (op.op == TableOp.Delete) - { - insertOp = value; - deleteOp = op; - } + var (insertOp, deleteOp) = op.insert is not null ? (op, oldOp) : (oldOp, op); primaryKeyLookup[primaryKeyValue] = new DbOp { table = insertOp.table, - op = TableOp.Update, - newValue = insertOp.newValue, - oldValue = deleteOp.oldValue, - deletedBytes = deleteOp.deletedBytes, - insertedBytes = insertOp.insertedBytes, + delete = deleteOp.delete, + insert = insertOp.insert, rowValue = insertOp.rowValue, }; } @@ -531,11 +520,7 @@ void ExecuteStateDiff() dbOps.Add(new DbOp { table = table, - op = TableOp.Delete, - newValue = null, - oldValue = table.entries[rowBytes].Item2, - deletedBytes = rowBytes, - insertedBytes = null + delete = new(table.entries[rowBytes].Item2, rowBytes), }); } } @@ -595,15 +580,16 @@ public void Connect(string token, string uri, string addressOrName) private void OnMessageProcessCompleteUpdate(Message message, List dbOps) { + var transactionEvent = message.TransactionUpdate?.Event!; + // First trigger OnBeforeDelete foreach (var update in dbOps) { - if (update.op == TableOp.Delete) + if (update.delete is { value: var oldValue }) { try { - update.table.BeforeDeleteCallback?.Invoke(update.oldValue, - message.TransactionUpdate?.Event); + update.table.BeforeDeleteCallback?.Invoke(oldValue, transactionEvent); } catch (Exception e) { @@ -612,175 +598,62 @@ private void OnMessageProcessCompleteUpdate(Message message, List dbOps) } } - void InternalDeleteCallback(DbOp op) - { - if (op.oldValue != null) - { - op.table.InternalValueDeletedCallback(op.oldValue); - } - else - { - Logger.LogError("Delete issued, but no value was present!"); - } - } - - void InternalInsertCallback(DbOp op) - { - if (op.newValue != null) - { - op.table.InternalValueInsertedCallback(op.newValue); - } - else - { - Logger.LogError("Insert issued, but no value was present!"); - } - } - // Apply all of the state for (var i = 0; i < dbOps.Count; i++) { // TODO: Reimplement updates when we add support for primary keys var update = dbOps[i]; - switch (update.op) + + if (update.delete is {} delete) { - case TableOp.Delete: - if (dbOps[i].table.DeleteEntry(update.deletedBytes)) - { - InternalDeleteCallback(update); - } - else - { - var op = dbOps[i]; - op.op = TableOp.NoChange; - dbOps[i] = op; - } - break; - case TableOp.Insert: - if (dbOps[i].table.InsertEntry(update.insertedBytes, update.rowValue)) - { - InternalInsertCallback(update); - } - else - { - var op = dbOps[i]; - op.op = TableOp.NoChange; - dbOps[i] = op; - } - break; - case TableOp.Update: - if (dbOps[i].table.DeleteEntry(update.deletedBytes)) - { - InternalDeleteCallback(update); - } - else - { - var op = dbOps[i]; - op.op = TableOp.NoChange; - dbOps[i] = op; - } + if (update.table.DeleteEntry(delete.bytes)) + { + update.table.InternalValueDeletedCallback(delete.value); + } + else + { + update.delete = null; + dbOps[i] = update; + } + } - if (dbOps[i].table.InsertEntry(update.insertedBytes, update.rowValue)) - { - InternalInsertCallback(update); - } - else - { - var op = dbOps[i]; - op.op = TableOp.NoChange; - dbOps[i] = op; - } - break; - default: - throw new ArgumentOutOfRangeException(); + if (update.insert is {} insert) + { + if (update.table.InsertEntry(insert.bytes, update.rowValue)) + { + update.table.InternalValueInsertedCallback(insert.value); + } + else + { + update.insert = null; + dbOps[i] = update; + } } } // Send out events - var updateCount = dbOps.Count; - for (var i = 0; i < updateCount; i++) + foreach (var dbOp in dbOps) { - var tableName = dbOps[i].table.ClientTableType.Name; - var tableOp = dbOps[i].op; - var oldValue = dbOps[i].oldValue; - var newValue = dbOps[i].newValue; - - switch (tableOp) + try { - case TableOp.Insert: - if (oldValue == null && newValue != null) - { - try - { - if (dbOps[i].table.InsertCallback != null) - { - dbOps[i].table.InsertCallback.Invoke(newValue, - message.TransactionUpdate?.Event); - } - } - catch (Exception e) - { - Logger.LogException(e); - } - } - else - { - Logger.LogError("Failed to send callback: invalid insert!"); - } - - break; - case TableOp.Delete: - { - if (oldValue != null && newValue == null) - { - if (dbOps[i].table.DeleteCallback != null) - { - try - { - dbOps[i].table.DeleteCallback.Invoke(oldValue, - message.TransactionUpdate?.Event); - } - catch (Exception e) - { - Logger.LogException(e); - } - } - } - else - { - Logger.LogError("Failed to send callback: invalid delete"); - } + switch (dbOp) + { + case { insert: { value: var newValue }, delete: { value: var oldValue } }: + dbOp.table.UpdateCallback?.Invoke(oldValue, newValue, transactionEvent); + break; + case { insert: { value: var newValue } }: + dbOp.table.InsertCallback?.Invoke(newValue, transactionEvent); break; - } - case TableOp.Update: - { - if (oldValue != null && newValue != null) - { - try - { - if (dbOps[i].table.UpdateCallback != null) - { - dbOps[i].table.UpdateCallback.Invoke(oldValue, newValue, - message.TransactionUpdate?.Event); - } - } - catch (Exception e) - { - Logger.LogException(e); - } - } - else - { - Logger.LogError("Failed to send callback: invalid update"); - } + case { delete: { value: var oldValue } }: + dbOp.table.DeleteCallback?.Invoke(oldValue, transactionEvent); break; - } - case TableOp.NoChange: - // noop - break; - default: - throw new ArgumentOutOfRangeException(); + } + } + catch (Exception e) + { + Logger.LogException(e); } } } From 2cea0813afd01b6ee16f5bfe7b5e5971bd6940b1 Mon Sep 17 00:00:00 2001 From: Ingvar Stepanyan Date: Mon, 13 May 2024 16:08:47 +0100 Subject: [PATCH 2/2] Few more minor opts --- src/SpacetimeDBClient.cs | 28 +++++++++++----------------- 1 file changed, 11 insertions(+), 17 deletions(-) diff --git a/src/SpacetimeDBClient.cs b/src/SpacetimeDBClient.cs index d2115b9a..29bad06a 100644 --- a/src/SpacetimeDBClient.cs +++ b/src/SpacetimeDBClient.cs @@ -406,7 +406,7 @@ HashSet GetInsertHashSet(string tableName, int tableSize) } var (insertOp, deleteOp) = op.insert is not null ? (op, oldOp) : (oldOp, op); - primaryKeyLookup[primaryKeyValue] = new DbOp + op = new DbOp { table = insertOp.table, delete = deleteOp.delete, @@ -414,10 +414,7 @@ HashSet GetInsertHashSet(string tableName, int tableSize) rowValue = insertOp.rowValue, }; } - else - { - primaryKeyLookup[primaryKeyValue] = op; - } + primaryKeyLookup[primaryKeyValue] = op; } else { @@ -506,23 +503,20 @@ void ExecuteStateDiff() { foreach (var table in clientDB.GetTables()) { - foreach (var rowBytes in table.entries.Keys) + if (!preProcessedMessage.inserts.TryGetValue(table.Name, out var hashSet)) { - if (!preProcessedMessage.inserts.TryGetValue(table.Name, out var hashSet)) - { - continue; - } + continue; + } - if (!hashSet.Contains(rowBytes)) + foreach (var (rowBytes, oldValue) in table.entries.Where(kv => !hashSet.Contains(kv.Key))) + { + dbOps.Add(new DbOp { + table = table, // This is a row that we had before, but we do not have it now. // This must have been a delete. - dbOps.Add(new DbOp - { - table = table, - delete = new(table.entries[rowBytes].Item2, rowBytes), - }); - } + delete = new(oldValue.Item2, rowBytes), + }); } } }