Skip to content

Commit

Permalink
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Minor refactoring
Browse files Browse the repository at this point in the history
RReverser committed Jun 5, 2024
1 parent 63195af commit 4a61f56
Showing 4 changed files with 38 additions and 39 deletions.
1 change: 0 additions & 1 deletion src/ClientCache.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
using System;
using System.Collections;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using SpacetimeDB.BSATN;
41 changes: 21 additions & 20 deletions src/SpacetimeDBClient.cs
Original file line number Diff line number Diff line change
@@ -5,7 +5,6 @@
using System.IO.Compression;
using System.Linq;
using System.Net.WebSockets;
using System.Reflection;
using System.Threading;
using System.Threading.Tasks;
using ClientApi;
@@ -273,8 +272,7 @@ HashSet<byte[]> GetInsertHashSet(System.Type tableType, int tableSize)
{
if ((op.insert is not null && oldOp.insert is not null) || (op.delete is not null && oldOp.delete is not null))
{
Logger.LogWarning($"Update with the same primary key was " +
$"applied multiple times! tableName={tableName}");
Logger.LogWarning($"Update with the same primary key was applied multiple times! tableName={tableName}");
// TODO(jdetter): Is this a correctable error? This would be a major error on the
// SpacetimeDB side.
continue;
@@ -320,7 +318,7 @@ HashSet<byte[]> GetInsertHashSet(System.Type tableType, int tableSize)

if (!waitingOneOffQueries.Remove(messageId, out var resultSource))
{
Logger.LogError("Response to unknown one-off-query: " + messageId);
Logger.LogError($"Response to unknown one-off-query: {messageId}");
break;
}

@@ -506,10 +504,10 @@ private void OnMessageProcessComplete(PreProcessedMessage preProcessed)

switch (message)
{
case { TypeCase: Message.TypeOneofCase.SubscriptionUpdate }:
case { TypeCase: Message.TypeOneofCase.SubscriptionUpdate, SubscriptionUpdate: var subscriptionUpdate }:
onBeforeSubscriptionApplied?.Invoke();
stats.ParseMessageTracker.InsertRequest(DateTime.UtcNow, DateTime.UtcNow - timestamp, "type=" + message.TypeCase.ToString());
stats.SubscriptionRequestTracker.FinishTrackingRequest(message.SubscriptionUpdate.RequestId);
stats.ParseMessageTracker.InsertRequest(timestamp, $"type={message.TypeCase}");
stats.SubscriptionRequestTracker.FinishTrackingRequest(subscriptionUpdate.RequestId);
OnMessageProcessCompleteUpdate(null, dbOps);
try
{
@@ -520,17 +518,20 @@ private void OnMessageProcessComplete(PreProcessedMessage preProcessed)
Logger.LogException(e);
}
break;
case { TypeCase: Message.TypeOneofCase.TransactionUpdate, TransactionUpdate: { Event: var transactionEvent } }:
stats.ParseMessageTracker.InsertRequest(DateTime.UtcNow, DateTime.UtcNow - timestamp, "type=" + message.TypeCase.ToString() + ",reducer=" + message.TransactionUpdate.Event.FunctionCall.Reducer);
stats.AllReducersTracker.InsertRequest(DateTime.UtcNow, TimeSpan.FromMilliseconds(message.TransactionUpdate.Event.HostExecutionDurationMicros / 1000.0d), "reducer=" + message.TransactionUpdate.Event.FunctionCall.Reducer);
var callerIdentity = Identity.From(message.TransactionUpdate.Event.CallerIdentity.ToByteArray());
case { TypeCase: Message.TypeOneofCase.TransactionUpdate, TransactionUpdate: var transactionUpdate }:
var transactionEvent = transactionUpdate.Event;
var reducer = transactionEvent.FunctionCall.Reducer;
stats.ParseMessageTracker.InsertRequest(timestamp, $"type={message.TypeCase},reducer={reducer}");
var hostDuration = TimeSpan.FromMilliseconds(transactionEvent.HostExecutionDurationMicros / 1000.0d);
stats.AllReducersTracker.InsertRequest(hostDuration, $"reducer={reducer}");
var callerIdentity = Identity.From(transactionEvent.CallerIdentity.ToByteArray());
if (callerIdentity == clientIdentity)
{
// This was a request that we initiated
if (!stats.ReducerRequestTracker.FinishTrackingRequest(message.TransactionUpdate.SubscriptionUpdate.RequestId))
var requestId = transactionUpdate.SubscriptionUpdate.RequestId;
if (!stats.ReducerRequestTracker.FinishTrackingRequest(requestId))
{
Logger.LogWarning("Failed to finish tracking reducer request: " +
message.TransactionUpdate.SubscriptionUpdate.RequestId);
Logger.LogWarning($"Failed to finish tracking reducer request: {requestId}");
}
}
OnMessageProcessCompleteUpdate(transactionEvent, dbOps);
@@ -647,7 +648,7 @@ public async Task<T[]> OneOffQuery<T>(string query)

// unsanitized here, but writes will be prevented serverside.
// the best they can do is send multiple selects, which will just result in them getting no data back.
string queryString = "SELECT * FROM " + type.Name + " " + query;
string queryString = $"SELECT * FROM {type.Name} ${query}";

var requestId = stats.OneOffRequestTracker.StartTrackingRequest();
webSocket.Send(new Message
@@ -664,33 +665,33 @@ public async Task<T[]> OneOffQuery<T>(string query)

if (!stats.OneOffRequestTracker.FinishTrackingRequest(requestId))
{
Logger.LogWarning("Failed to finish tracking one off request: " + requestId);
Logger.LogWarning($"Failed to finish tracking one off request: {requestId}");
}

T[] LogAndThrow(string error)
{
error = "While processing one-off-query `" + queryString + "`, ID " + messageId + ": " + error;
error = $"While processing one-off-query `{queryString}`, ID {messageId}: {error}";
Logger.LogError(error);
throw new Exception(error);
}

// The server got back to us
if (result.Error != null && result.Error != "")
{
return LogAndThrow("Server error: " + result.Error);
return LogAndThrow($"Server error: {result.Error}");
}

if (result.Tables.Count != 1)
{
return LogAndThrow("Expected a single table, but got " + result.Tables.Count);
return LogAndThrow($"Expected a single table, but got {result.Tables.Count}");
}

var resultTable = result.Tables[0];
var cacheTable = clientDB.GetTable(resultTable.TableName);

if (cacheTable?.ClientTableType != type)
{
return LogAndThrow("Mismatched result type, expected " + type + " but got " + resultTable.TableName);
return LogAndThrow($"Mismatched result type, expected {type} but got {resultTable.TableName}");
}

return resultTable.Row.Select(row => BSATNHelpers.FromProtoBytes<T>(row)).ToArray();
33 changes: 16 additions & 17 deletions src/Stats.cs
Original file line number Diff line number Diff line change
@@ -1,20 +1,16 @@
using System.Diagnostics.CodeAnalysis;
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using UnityEngine;

namespace SpacetimeDB
{
public class NetworkRequestTracker
{
private readonly ConcurrentQueue<(DateTime, TimeSpan, string)> _requestDurations =
new ConcurrentQueue<(DateTime, TimeSpan, string)>();
private readonly ConcurrentQueue<(DateTime, TimeSpan, string)> _requestDurations = new();

private uint _nextRequestId;
private Dictionary<uint, (DateTime, string)> _requests = new Dictionary<uint, (DateTime, string)>();
private readonly Dictionary<uint, (DateTime, string)> _requests = new();

public uint StartTrackingRequest(string metadata = "")
{
@@ -40,15 +36,18 @@ public bool FinishTrackingRequest(uint requestId)
}

// Calculate the duration and add it to the queue
var endTime = DateTime.UtcNow;
var duration = endTime - entry.Item1;
_requestDurations.Enqueue((endTime, duration, entry.Item2));
InsertRequest(entry.Item1, entry.Item2);
return true;
}

public void InsertRequest(DateTime timestamp, TimeSpan duration, string metadata)
public void InsertRequest(TimeSpan duration, string metadata)
{
_requestDurations.Enqueue((timestamp, duration, metadata));
_requestDurations.Enqueue((DateTime.UtcNow, duration, metadata));
}

public void InsertRequest(DateTime start, string metadata)
{
InsertRequest(DateTime.UtcNow - start, metadata);
}

public ((TimeSpan, string), (TimeSpan, string)) GetMinMaxTimes(int lastSeconds)
@@ -73,10 +72,10 @@ public void InsertRequest(DateTime timestamp, TimeSpan duration, string metadata

public class Stats
{
public NetworkRequestTracker ReducerRequestTracker = new NetworkRequestTracker();
public NetworkRequestTracker OneOffRequestTracker = new NetworkRequestTracker();
public NetworkRequestTracker SubscriptionRequestTracker = new NetworkRequestTracker();
public NetworkRequestTracker AllReducersTracker = new NetworkRequestTracker();
public NetworkRequestTracker ParseMessageTracker = new NetworkRequestTracker();
public NetworkRequestTracker ReducerRequestTracker = new();
public NetworkRequestTracker OneOffRequestTracker = new();
public NetworkRequestTracker SubscriptionRequestTracker = new();
public NetworkRequestTracker AllReducersTracker = new();
public NetworkRequestTracker ParseMessageTracker = new();
}
}
}
2 changes: 1 addition & 1 deletion src/WebSocket.cs
Original file line number Diff line number Diff line change
@@ -59,7 +59,7 @@ public async Task Connect(string? auth, string host, string nameOrAddress, Addre
{
var tokenBytes = Encoding.UTF8.GetBytes($"token:{auth}");
var base64 = Convert.ToBase64String(tokenBytes);
Ws.Options.SetRequestHeader("Authorization", "Basic " + base64);
Ws.Options.SetRequestHeader("Authorization", $"Basic {base64}");
}
else
{

0 comments on commit 4a61f56

Please sign in to comment.