Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Switch to TaskCompletionSource #89

Merged
merged 1 commit into from
May 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion SpacetimeDB.ClientSDK.csproj
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<TargetFramework>net7.0</TargetFramework>
<TargetFramework>netstandard2.1</TargetFramework>
<LangVersion>9</LangVersion>
<ImplicitUsings>disable</ImplicitUsings>
<Nullable>enable</Nullable>
Expand Down
28 changes: 11 additions & 17 deletions src/SpacetimeDBClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,10 @@
using System.Reflection;
using System.Text;
using System.Threading;
using System.Threading.Channels;
using System.Threading.Tasks;
using ClientApi;
using Newtonsoft.Json;
using SpacetimeDB.SATS;
using Channel = System.Threading.Channels.Channel;
using Thread = System.Threading.Thread;

namespace SpacetimeDB
Expand Down Expand Up @@ -101,14 +99,11 @@ public struct DbOp
private bool connectionClosed;
public static ClientCache clientDB;

public static Dictionary<string, Func<ClientApi.Event, bool>> reducerEventCache =
new Dictionary<string, Func<ClientApi.Event, bool>>();
private static Dictionary<string, Func<ClientApi.Event, bool>> reducerEventCache = new();

public static Dictionary<string, Action<ClientApi.Event>> deserializeEventCache =
new Dictionary<string, Action<ClientApi.Event>>();
private static Dictionary<string, Action<ClientApi.Event>> deserializeEventCache = new();

private static Dictionary<Guid, Channel<OneOffQueryResponse>> waitingOneOffQueries =
new Dictionary<Guid, Channel<OneOffQueryResponse>>();
private static Dictionary<Guid, TaskCompletionSource<OneOffQueryResponse>> waitingOneOffQueries = new();

private bool isClosing;
private Thread networkMessageProcessThread;
Expand Down Expand Up @@ -459,16 +454,15 @@ HashSet<byte[]> GetInsertHashSet(string tableName, int tableSize)
case ClientApi.Message.TypeOneofCase.OneOffQueryResponse:
/// This case does NOT produce a list of DBOps, because it should not modify the client cache state!
var resp = message.OneOffQueryResponse;
Guid messageId = new Guid(resp.MessageId.Span);
var messageId = new Guid(resp.MessageId.Span);

if (!waitingOneOffQueries.ContainsKey(messageId))
if (!waitingOneOffQueries.Remove(messageId, out var resultSource))
{
Logger.LogError("Response to unknown one-off-query: " + messageId);
break;
}

waitingOneOffQueries[messageId].Writer.TryWrite(resp);
waitingOneOffQueries.Remove(messageId);
resultSource.SetResult(resp);
break;
}

Expand Down Expand Up @@ -897,10 +891,10 @@ public void Subscribe(List<string> queries)
/// Usage: SpacetimeDBClient.instance.OneOffQuery<Message>("WHERE sender = \"bob\"");
public async Task<T[]> OneOffQuery<T>(string query) where T : IDatabaseTable
{
Guid messageId = Guid.NewGuid();
Type type = typeof(T);
Channel<OneOffQueryResponse> resultChannel = Channel.CreateBounded<OneOffQueryResponse>(1);
waitingOneOffQueries[messageId] = resultChannel;
var messageId = Guid.NewGuid();
var type = typeof(T);
var resultSource = new TaskCompletionSource<OneOffQueryResponse>();
waitingOneOffQueries[messageId] = resultSource;

// unsanitized here, but writes will be prevented serverside.
// the best they can do is send multiple selects, which will just result in them getting no data back.
Expand All @@ -913,7 +907,7 @@ public async Task<T[]> OneOffQuery<T>(string query) where T : IDatabaseTable
webSocket.Send(Encoding.UTF8.GetBytes(serializedQuery));

// Suspend for an arbitrary amount of time
var result = await resultChannel.Reader.ReadAsync();
var result = await resultSource.Task;

T[] LogAndThrow(string error)
{
Expand Down
Loading