Skip to content

Commit

Permalink
Switch from Channel to TaskCompletionSource
Browse files Browse the repository at this point in the history
There is only one place where we use Channels, and it's to create and await a channel with one element - which is functionally the same as a more precise and low-weight TaskCompletionSource.

Switching also makes the SDK compatible with the widely supported .NET Standard 2.1 subset, which is supported natively in Unity and allows to remove a custom System.Threading.Channels package from Unity SDK dependencies.
  • Loading branch information
RReverser committed May 13, 2024
1 parent e79dcea commit f7a7fe2
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 18 deletions.
2 changes: 1 addition & 1 deletion SpacetimeDB.ClientSDK.csproj
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<TargetFramework>net7.0</TargetFramework>
<TargetFramework>netstandard2.1</TargetFramework>
<LangVersion>9</LangVersion>
<ImplicitUsings>disable</ImplicitUsings>
<Nullable>enable</Nullable>
Expand Down
28 changes: 11 additions & 17 deletions src/SpacetimeDBClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,10 @@
using System.Reflection;
using System.Text;
using System.Threading;
using System.Threading.Channels;
using System.Threading.Tasks;
using ClientApi;
using Newtonsoft.Json;
using SpacetimeDB.SATS;
using Channel = System.Threading.Channels.Channel;
using Thread = System.Threading.Thread;

namespace SpacetimeDB
Expand Down Expand Up @@ -101,14 +99,11 @@ public struct DbOp
private bool connectionClosed;
public static ClientCache clientDB;

public static Dictionary<string, Func<ClientApi.Event, bool>> reducerEventCache =
new Dictionary<string, Func<ClientApi.Event, bool>>();
private static Dictionary<string, Func<ClientApi.Event, bool>> reducerEventCache = new();

public static Dictionary<string, Action<ClientApi.Event>> deserializeEventCache =
new Dictionary<string, Action<ClientApi.Event>>();
private static Dictionary<string, Action<ClientApi.Event>> deserializeEventCache = new();

private static Dictionary<Guid, Channel<OneOffQueryResponse>> waitingOneOffQueries =
new Dictionary<Guid, Channel<OneOffQueryResponse>>();
private static Dictionary<Guid, TaskCompletionSource<OneOffQueryResponse>> waitingOneOffQueries = new();

private bool isClosing;
private Thread networkMessageProcessThread;
Expand Down Expand Up @@ -459,16 +454,15 @@ HashSet<byte[]> GetInsertHashSet(string tableName, int tableSize)
case ClientApi.Message.TypeOneofCase.OneOffQueryResponse:
/// This case does NOT produce a list of DBOps, because it should not modify the client cache state!
var resp = message.OneOffQueryResponse;
Guid messageId = new Guid(resp.MessageId.Span);
var messageId = new Guid(resp.MessageId.Span);

if (!waitingOneOffQueries.ContainsKey(messageId))
if (!waitingOneOffQueries.Remove(messageId, out var resultSource))
{
Logger.LogError("Response to unknown one-off-query: " + messageId);
break;
}

waitingOneOffQueries[messageId].Writer.TryWrite(resp);
waitingOneOffQueries.Remove(messageId);
resultSource.SetResult(resp);
break;
}

Expand Down Expand Up @@ -897,10 +891,10 @@ public void Subscribe(List<string> queries)
/// Usage: SpacetimeDBClient.instance.OneOffQuery<Message>("WHERE sender = \"bob\"");
public async Task<T[]> OneOffQuery<T>(string query) where T : IDatabaseTable
{
Guid messageId = Guid.NewGuid();
Type type = typeof(T);
Channel<OneOffQueryResponse> resultChannel = Channel.CreateBounded<OneOffQueryResponse>(1);
waitingOneOffQueries[messageId] = resultChannel;
var messageId = Guid.NewGuid();
var type = typeof(T);
var resultSource = new TaskCompletionSource<OneOffQueryResponse>();
waitingOneOffQueries[messageId] = resultSource;

// unsanitized here, but writes will be prevented serverside.
// the best they can do is send multiple selects, which will just result in them getting no data back.
Expand All @@ -913,7 +907,7 @@ public async Task<T[]> OneOffQuery<T>(string query) where T : IDatabaseTable
webSocket.Send(Encoding.UTF8.GetBytes(serializedQuery));

// Suspend for an arbitrary amount of time
var result = await resultChannel.Reader.ReadAsync();
var result = await resultSource.Task;

T[] LogAndThrow(string error)
{
Expand Down

0 comments on commit f7a7fe2

Please sign in to comment.