Skip to content

Commit

Permalink
NFC: inline insert/delete update handling
Browse files Browse the repository at this point in the history
  • Loading branch information
RReverser committed May 13, 2024
1 parent e79dcea commit c6c4b23
Showing 1 changed file with 76 additions and 203 deletions.
279 changes: 76 additions & 203 deletions src/SpacetimeDBClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,6 @@ namespace SpacetimeDB
{
public class SpacetimeDBClient
{
public enum TableOp
{
Insert,
Delete,
Update,
NoChange,
}

public class ReducerCallRequest
{
public string fn;
Expand All @@ -39,14 +31,23 @@ public class SubscriptionRequest
public string subscriptionQuery;
}

public struct DbOp
struct DbValue
{
public object value;
public byte[] bytes;

public DbValue(object value, byte[] bytes)
{
this.value = value;
this.bytes = bytes;
}
}

struct DbOp
{
public ClientCache.TableCache table;
public TableOp op;
public object newValue;
public object oldValue;
public byte[] deletedBytes;
public byte[] insertedBytes;
public DbValue? delete;
public DbValue? insert;
public AlgebraicValue rowValue;
}

Expand Down Expand Up @@ -326,11 +327,7 @@ HashSet<byte[]> GetInsertHashSet(string tableName, int tableSize)
var op = new DbOp
{
table = table,
deletedBytes = null,
insertedBytes = rowBytes,
op = TableOp.Insert,
newValue = obj,
oldValue = null,
insert = new(obj, rowBytes),
rowValue = deserializedRow,
};

Expand Down Expand Up @@ -380,48 +377,40 @@ HashSet<byte[]> GetInsertHashSet(string tableName, int tableSize)
var op = new DbOp
{
table = table,
deletedBytes =
row.Op == TableRowOperation.Types.OperationType.Delete ? rowBytes : null,
insertedBytes =
row.Op == TableRowOperation.Types.OperationType.Delete ? null : rowBytes,
op = row.Op == TableRowOperation.Types.OperationType.Delete
? TableOp.Delete
: TableOp.Insert,
newValue = row.Op == TableRowOperation.Types.OperationType.Delete ? null : obj,
oldValue = row.Op == TableRowOperation.Types.OperationType.Delete ? obj : null,
rowValue = deserializedRow,
};

var dbValue = new DbValue(obj, rowBytes);

if (row.Op == TableRowOperation.Types.OperationType.Insert)
{
op.insert = dbValue;
}
else
{
op.delete = dbValue;
}

if (primaryKeyType != null)
{
var primaryKeyLookup = GetPrimaryKeyLookup(tableName, primaryKeyType);
if (primaryKeyLookup.TryGetValue(primaryKeyValue, out var value))
if (primaryKeyLookup.TryGetValue(primaryKeyValue, out var oldOp))
{
if (value.op == op.op || value.op == TableOp.Update)
if ((op.insert is not null && oldOp.insert is not null) || (op.delete is not null && oldOp.delete is not null))
{
Logger.LogWarning($"Update with the same primary key was " +
$"applied multiple times! tableName={tableName}");
// TODO(jdetter): Is this a correctable error? This would be a major error on the
// SpacetimeDB side.
continue;
}

var insertOp = op;
var deleteOp = value;
if (op.op == TableOp.Delete)
{
insertOp = value;
deleteOp = op;
}
var (insertOp, deleteOp) = op.insert is not null ? (op, oldOp) : (oldOp, op);

primaryKeyLookup[primaryKeyValue] = new DbOp
{
table = insertOp.table,
op = TableOp.Update,
newValue = insertOp.newValue,
oldValue = deleteOp.oldValue,
deletedBytes = deleteOp.deletedBytes,
insertedBytes = insertOp.insertedBytes,
delete = deleteOp.delete,
insert = insertOp.insert,
rowValue = insertOp.rowValue,
};
}
Expand Down Expand Up @@ -531,11 +520,7 @@ void ExecuteStateDiff()
dbOps.Add(new DbOp
{
table = table,
op = TableOp.Delete,
newValue = null,
oldValue = table.entries[rowBytes].Item2,
deletedBytes = rowBytes,
insertedBytes = null
delete = new(table.entries[rowBytes].Item2, rowBytes),
});
}
}
Expand Down Expand Up @@ -595,15 +580,16 @@ public void Connect(string token, string uri, string addressOrName)

private void OnMessageProcessCompleteUpdate(Message message, List<DbOp> dbOps)
{
var transactionEvent = message.TransactionUpdate?.Event!;

// First trigger OnBeforeDelete
foreach (var update in dbOps)
{
if (update.op == TableOp.Delete)
if (update.delete is { value: var oldValue })
{
try
{
update.table.BeforeDeleteCallback?.Invoke(update.oldValue,
message.TransactionUpdate?.Event);
update.table.BeforeDeleteCallback?.Invoke(oldValue, transactionEvent);
}
catch (Exception e)
{
Expand All @@ -612,175 +598,62 @@ private void OnMessageProcessCompleteUpdate(Message message, List<DbOp> dbOps)
}
}

void InternalDeleteCallback(DbOp op)
{
if (op.oldValue != null)
{
op.table.InternalValueDeletedCallback(op.oldValue);
}
else
{
Logger.LogError("Delete issued, but no value was present!");
}
}

void InternalInsertCallback(DbOp op)
{
if (op.newValue != null)
{
op.table.InternalValueInsertedCallback(op.newValue);
}
else
{
Logger.LogError("Insert issued, but no value was present!");
}
}

// Apply all of the state
for (var i = 0; i < dbOps.Count; i++)
{
// TODO: Reimplement updates when we add support for primary keys
var update = dbOps[i];
switch (update.op)

if (update.delete is {} delete)
{
case TableOp.Delete:
if (dbOps[i].table.DeleteEntry(update.deletedBytes))
{
InternalDeleteCallback(update);
}
else
{
var op = dbOps[i];
op.op = TableOp.NoChange;
dbOps[i] = op;
}
break;
case TableOp.Insert:
if (dbOps[i].table.InsertEntry(update.insertedBytes, update.rowValue))
{
InternalInsertCallback(update);
}
else
{
var op = dbOps[i];
op.op = TableOp.NoChange;
dbOps[i] = op;
}
break;
case TableOp.Update:
if (dbOps[i].table.DeleteEntry(update.deletedBytes))
{
InternalDeleteCallback(update);
}
else
{
var op = dbOps[i];
op.op = TableOp.NoChange;
dbOps[i] = op;
}
if (update.table.DeleteEntry(delete.bytes))
{
update.table.InternalValueDeletedCallback(delete.value);
}
else
{
update.delete = null;
dbOps[i] = update;
}
}

if (dbOps[i].table.InsertEntry(update.insertedBytes, update.rowValue))
{
InternalInsertCallback(update);
}
else
{
var op = dbOps[i];
op.op = TableOp.NoChange;
dbOps[i] = op;
}
break;
default:
throw new ArgumentOutOfRangeException();
if (update.insert is {} insert)
{
if (update.table.InsertEntry(insert.bytes, update.rowValue))
{
update.table.InternalValueInsertedCallback(insert.value);
}
else
{
update.insert = null;
dbOps[i] = update;
}
}
}

// Send out events
var updateCount = dbOps.Count;
for (var i = 0; i < updateCount; i++)
foreach (var dbOp in dbOps)
{
var tableName = dbOps[i].table.ClientTableType.Name;
var tableOp = dbOps[i].op;
var oldValue = dbOps[i].oldValue;
var newValue = dbOps[i].newValue;

switch (tableOp)
try
{
case TableOp.Insert:
if (oldValue == null && newValue != null)
{
try
{
if (dbOps[i].table.InsertCallback != null)
{
dbOps[i].table.InsertCallback.Invoke(newValue,
message.TransactionUpdate?.Event);
}
}
catch (Exception e)
{
Logger.LogException(e);
}
}
else
{
Logger.LogError("Failed to send callback: invalid insert!");
}

break;
case TableOp.Delete:
{
if (oldValue != null && newValue == null)
{
if (dbOps[i].table.DeleteCallback != null)
{
try
{
dbOps[i].table.DeleteCallback.Invoke(oldValue,
message.TransactionUpdate?.Event);
}
catch (Exception e)
{
Logger.LogException(e);
}
}
}
else
{
Logger.LogError("Failed to send callback: invalid delete");
}
switch (dbOp)
{
case { insert: { value: var newValue }, delete: { value: var oldValue } }:
dbOp.table.UpdateCallback?.Invoke(oldValue, newValue, transactionEvent);
break;

case { insert: { value: var newValue } }:
dbOp.table.InsertCallback?.Invoke(newValue, transactionEvent);
break;
}
case TableOp.Update:
{
if (oldValue != null && newValue != null)
{
try
{
if (dbOps[i].table.UpdateCallback != null)
{
dbOps[i].table.UpdateCallback.Invoke(oldValue, newValue,
message.TransactionUpdate?.Event);
}
}
catch (Exception e)
{
Logger.LogException(e);
}
}
else
{
Logger.LogError("Failed to send callback: invalid update");
}

case { delete: { value: var oldValue } }:
dbOp.table.DeleteCallback?.Invoke(oldValue, transactionEvent);
break;
}
case TableOp.NoChange:
// noop
break;
default:
throw new ArgumentOutOfRangeException();
}
}
catch (Exception e)
{
Logger.LogException(e);
}
}
}
Expand Down

0 comments on commit c6c4b23

Please sign in to comment.