From 4a61f56d22de75e80570136d9ec31a4120c0363c Mon Sep 17 00:00:00 2001 From: Ingvar Stepanyan Date: Wed, 5 Jun 2024 14:58:12 +0100 Subject: [PATCH] Minor refactoring --- src/ClientCache.cs | 1 - src/SpacetimeDBClient.cs | 41 ++++++++++++++++++++-------------------- src/Stats.cs | 33 ++++++++++++++++---------------- src/WebSocket.cs | 2 +- 4 files changed, 38 insertions(+), 39 deletions(-) diff --git a/src/ClientCache.cs b/src/ClientCache.cs index 658c65d6..f4f67d36 100644 --- a/src/ClientCache.cs +++ b/src/ClientCache.cs @@ -1,6 +1,5 @@ using System; using System.Collections; -using System.Collections.Concurrent; using System.Collections.Generic; using System.Linq; using SpacetimeDB.BSATN; diff --git a/src/SpacetimeDBClient.cs b/src/SpacetimeDBClient.cs index 6c23086f..70d75e77 100644 --- a/src/SpacetimeDBClient.cs +++ b/src/SpacetimeDBClient.cs @@ -5,7 +5,6 @@ using System.IO.Compression; using System.Linq; using System.Net.WebSockets; -using System.Reflection; using System.Threading; using System.Threading.Tasks; using ClientApi; @@ -273,8 +272,7 @@ HashSet GetInsertHashSet(System.Type tableType, int tableSize) { if ((op.insert is not null && oldOp.insert is not null) || (op.delete is not null && oldOp.delete is not null)) { - Logger.LogWarning($"Update with the same primary key was " + - $"applied multiple times! tableName={tableName}"); + Logger.LogWarning($"Update with the same primary key was applied multiple times! tableName={tableName}"); // TODO(jdetter): Is this a correctable error? This would be a major error on the // SpacetimeDB side. continue; @@ -320,7 +318,7 @@ HashSet GetInsertHashSet(System.Type tableType, int tableSize) if (!waitingOneOffQueries.Remove(messageId, out var resultSource)) { - Logger.LogError("Response to unknown one-off-query: " + messageId); + Logger.LogError($"Response to unknown one-off-query: {messageId}"); break; } @@ -506,10 +504,10 @@ private void OnMessageProcessComplete(PreProcessedMessage preProcessed) switch (message) { - case { TypeCase: Message.TypeOneofCase.SubscriptionUpdate }: + case { TypeCase: Message.TypeOneofCase.SubscriptionUpdate, SubscriptionUpdate: var subscriptionUpdate }: onBeforeSubscriptionApplied?.Invoke(); - stats.ParseMessageTracker.InsertRequest(DateTime.UtcNow, DateTime.UtcNow - timestamp, "type=" + message.TypeCase.ToString()); - stats.SubscriptionRequestTracker.FinishTrackingRequest(message.SubscriptionUpdate.RequestId); + stats.ParseMessageTracker.InsertRequest(timestamp, $"type={message.TypeCase}"); + stats.SubscriptionRequestTracker.FinishTrackingRequest(subscriptionUpdate.RequestId); OnMessageProcessCompleteUpdate(null, dbOps); try { @@ -520,17 +518,20 @@ private void OnMessageProcessComplete(PreProcessedMessage preProcessed) Logger.LogException(e); } break; - case { TypeCase: Message.TypeOneofCase.TransactionUpdate, TransactionUpdate: { Event: var transactionEvent } }: - stats.ParseMessageTracker.InsertRequest(DateTime.UtcNow, DateTime.UtcNow - timestamp, "type=" + message.TypeCase.ToString() + ",reducer=" + message.TransactionUpdate.Event.FunctionCall.Reducer); - stats.AllReducersTracker.InsertRequest(DateTime.UtcNow, TimeSpan.FromMilliseconds(message.TransactionUpdate.Event.HostExecutionDurationMicros / 1000.0d), "reducer=" + message.TransactionUpdate.Event.FunctionCall.Reducer); - var callerIdentity = Identity.From(message.TransactionUpdate.Event.CallerIdentity.ToByteArray()); + case { TypeCase: Message.TypeOneofCase.TransactionUpdate, TransactionUpdate: var transactionUpdate }: + var transactionEvent = transactionUpdate.Event; + var reducer = transactionEvent.FunctionCall.Reducer; + stats.ParseMessageTracker.InsertRequest(timestamp, $"type={message.TypeCase},reducer={reducer}"); + var hostDuration = TimeSpan.FromMilliseconds(transactionEvent.HostExecutionDurationMicros / 1000.0d); + stats.AllReducersTracker.InsertRequest(hostDuration, $"reducer={reducer}"); + var callerIdentity = Identity.From(transactionEvent.CallerIdentity.ToByteArray()); if (callerIdentity == clientIdentity) { // This was a request that we initiated - if (!stats.ReducerRequestTracker.FinishTrackingRequest(message.TransactionUpdate.SubscriptionUpdate.RequestId)) + var requestId = transactionUpdate.SubscriptionUpdate.RequestId; + if (!stats.ReducerRequestTracker.FinishTrackingRequest(requestId)) { - Logger.LogWarning("Failed to finish tracking reducer request: " + - message.TransactionUpdate.SubscriptionUpdate.RequestId); + Logger.LogWarning($"Failed to finish tracking reducer request: {requestId}"); } } OnMessageProcessCompleteUpdate(transactionEvent, dbOps); @@ -647,7 +648,7 @@ public async Task OneOffQuery(string query) // unsanitized here, but writes will be prevented serverside. // the best they can do is send multiple selects, which will just result in them getting no data back. - string queryString = "SELECT * FROM " + type.Name + " " + query; + string queryString = $"SELECT * FROM {type.Name} ${query}"; var requestId = stats.OneOffRequestTracker.StartTrackingRequest(); webSocket.Send(new Message @@ -664,12 +665,12 @@ public async Task OneOffQuery(string query) if (!stats.OneOffRequestTracker.FinishTrackingRequest(requestId)) { - Logger.LogWarning("Failed to finish tracking one off request: " + requestId); + Logger.LogWarning($"Failed to finish tracking one off request: {requestId}"); } T[] LogAndThrow(string error) { - error = "While processing one-off-query `" + queryString + "`, ID " + messageId + ": " + error; + error = $"While processing one-off-query `{queryString}`, ID {messageId}: {error}"; Logger.LogError(error); throw new Exception(error); } @@ -677,12 +678,12 @@ T[] LogAndThrow(string error) // The server got back to us if (result.Error != null && result.Error != "") { - return LogAndThrow("Server error: " + result.Error); + return LogAndThrow($"Server error: {result.Error}"); } if (result.Tables.Count != 1) { - return LogAndThrow("Expected a single table, but got " + result.Tables.Count); + return LogAndThrow($"Expected a single table, but got {result.Tables.Count}"); } var resultTable = result.Tables[0]; @@ -690,7 +691,7 @@ T[] LogAndThrow(string error) if (cacheTable?.ClientTableType != type) { - return LogAndThrow("Mismatched result type, expected " + type + " but got " + resultTable.TableName); + return LogAndThrow($"Mismatched result type, expected {type} but got {resultTable.TableName}"); } return resultTable.Row.Select(row => BSATNHelpers.FromProtoBytes(row)).ToArray(); diff --git a/src/Stats.cs b/src/Stats.cs index 94ef97b1..42c56b6d 100644 --- a/src/Stats.cs +++ b/src/Stats.cs @@ -1,20 +1,16 @@ -using System.Diagnostics.CodeAnalysis; using System; using System.Collections.Concurrent; using System.Collections.Generic; using System.Linq; -using System.Threading; -using UnityEngine; namespace SpacetimeDB { public class NetworkRequestTracker { - private readonly ConcurrentQueue<(DateTime, TimeSpan, string)> _requestDurations = - new ConcurrentQueue<(DateTime, TimeSpan, string)>(); + private readonly ConcurrentQueue<(DateTime, TimeSpan, string)> _requestDurations = new(); private uint _nextRequestId; - private Dictionary _requests = new Dictionary(); + private readonly Dictionary _requests = new(); public uint StartTrackingRequest(string metadata = "") { @@ -40,15 +36,18 @@ public bool FinishTrackingRequest(uint requestId) } // Calculate the duration and add it to the queue - var endTime = DateTime.UtcNow; - var duration = endTime - entry.Item1; - _requestDurations.Enqueue((endTime, duration, entry.Item2)); + InsertRequest(entry.Item1, entry.Item2); return true; } - public void InsertRequest(DateTime timestamp, TimeSpan duration, string metadata) + public void InsertRequest(TimeSpan duration, string metadata) { - _requestDurations.Enqueue((timestamp, duration, metadata)); + _requestDurations.Enqueue((DateTime.UtcNow, duration, metadata)); + } + + public void InsertRequest(DateTime start, string metadata) + { + InsertRequest(DateTime.UtcNow - start, metadata); } public ((TimeSpan, string), (TimeSpan, string)) GetMinMaxTimes(int lastSeconds) @@ -73,10 +72,10 @@ public void InsertRequest(DateTime timestamp, TimeSpan duration, string metadata public class Stats { - public NetworkRequestTracker ReducerRequestTracker = new NetworkRequestTracker(); - public NetworkRequestTracker OneOffRequestTracker = new NetworkRequestTracker(); - public NetworkRequestTracker SubscriptionRequestTracker = new NetworkRequestTracker(); - public NetworkRequestTracker AllReducersTracker = new NetworkRequestTracker(); - public NetworkRequestTracker ParseMessageTracker = new NetworkRequestTracker(); + public NetworkRequestTracker ReducerRequestTracker = new(); + public NetworkRequestTracker OneOffRequestTracker = new(); + public NetworkRequestTracker SubscriptionRequestTracker = new(); + public NetworkRequestTracker AllReducersTracker = new(); + public NetworkRequestTracker ParseMessageTracker = new(); } -} \ No newline at end of file +} diff --git a/src/WebSocket.cs b/src/WebSocket.cs index cbc4626c..8422b146 100644 --- a/src/WebSocket.cs +++ b/src/WebSocket.cs @@ -59,7 +59,7 @@ public async Task Connect(string? auth, string host, string nameOrAddress, Addre { var tokenBytes = Encoding.UTF8.GetBytes($"token:{auth}"); var base64 = Convert.ToBase64String(tokenBytes); - Ws.Options.SetRequestHeader("Authorization", "Basic " + base64); + Ws.Options.SetRequestHeader("Authorization", $"Basic {base64}"); } else {