Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Subscription API #137

Merged
merged 5 commits into from
Oct 1, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 32 additions & 25 deletions examples~/quickstart/client/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,30 +5,29 @@
using System.Net.WebSockets;
using System.Threading;
using SpacetimeDB;
using SpacetimeDB.ClientApi;
using SpacetimeDB.Types;

const string HOST = "http://localhost:3000";
const string DBNAME = "chatqs";

DbConnection? conn = null;

// our local client SpacetimeDB identity
Identity? local_identity = null;
// declare a thread safe queue to store commands
var input_queue = new ConcurrentQueue<(string Command, string Args)>();
// declare a threadsafe cancel token to cancel the process loop
var cancel_token = new CancellationTokenSource();

void Main()
{
AuthToken.Init(".spacetime_csharp_quickstart");

// TODO: just do `var conn = DbConnection...` when OnConnect signature is fixed.
DbConnection? conn = null;

conn = DbConnection.Builder()
.WithUri(HOST)
.WithModuleName(DBNAME)
//.WithCredentials((null, AuthToken.Token))
.OnConnect(OnConnect)
// TODO: change this to just `(OnConnect)` when signature is fixed in #131.
.OnConnect((identity, authToken) => OnConnect(conn!, identity, authToken))
.OnConnectError(OnConnectError)
.OnDisconnect(OnDisconnect)
.Build();
Expand All @@ -41,17 +40,19 @@ void Main()
conn.RemoteReducers.OnSetName += Reducer_OnSetNameEvent;
conn.RemoteReducers.OnSendMessage += Reducer_OnSendMessageEvent;

conn.onSubscriptionApplied += OnSubscriptionApplied;
conn.onUnhandledReducerError += onUnhandledReducerError;

// declare a threadsafe cancel token to cancel the process loop
var cancellationTokenSource = new CancellationTokenSource();

// spawn a thread to call process updates and process commands
var thread = new Thread(ProcessThread);
var thread = new Thread(() => ProcessThread(conn, cancellationTokenSource.Token));
thread.Start();

InputLoop();

// this signals the ProcessThread to stop
cancel_token.Cancel();
cancellationTokenSource.Cancel();
thread.Join();
}

Expand Down Expand Up @@ -84,9 +85,9 @@ void User_OnUpdate(EventContext ctx, User oldValue, User newValue)
}
}

void PrintMessage(Message message)
void PrintMessage(RemoteTables tables, Message message)
{
var sender = conn.RemoteTables.User.FindByIdentity(message.Sender);
var sender = tables.User.FindByIdentity(message.Sender);
var senderName = "unknown";
if (sender != null)
{
Expand All @@ -100,7 +101,7 @@ void Message_OnInsert(EventContext ctx, Message insertedValue)
{
if (ctx.Reducer is not Event<Reducer>.SubscribeApplied)
{
PrintMessage(insertedValue);
PrintMessage(ctx.Db, insertedValue);
}
}

Expand Down Expand Up @@ -128,12 +129,18 @@ void Reducer_OnSendMessageEvent(EventContext ctx, string text)
}
}

void OnConnect(Identity identity, string authToken)
void OnConnect(DbConnection conn, Identity identity, string authToken)
{
local_identity = identity;
AuthToken.SaveToken(authToken);

conn!.Subscribe(new List<string> { "SELECT * FROM User", "SELECT * FROM Message" });
conn.SubscriptionBuilder()
.OnApplied(OnSubscriptionApplied)
.Subscribe("SELECT * FROM User");

conn.SubscriptionBuilder()
.OnApplied(OnSubscriptionApplied)
.Subscribe("SELECT * FROM Message");
}

void OnConnectError(WebSocketError? error, string message)
Expand All @@ -146,35 +153,35 @@ void OnDisconnect(DbConnection conn, WebSocketCloseStatus? status, WebSocketErro

}

void PrintMessagesInOrder()
void PrintMessagesInOrder(RemoteTables tables)
{
foreach (Message message in conn.RemoteTables.Message.Iter().OrderBy(item => item.Sent))
foreach (Message message in tables.Message.Iter().OrderBy(item => item.Sent))
{
PrintMessage(message);
PrintMessage(tables, message);
}
}

void OnSubscriptionApplied()
void OnSubscriptionApplied(EventContext ctx)
{
Console.WriteLine("Connected");
PrintMessagesInOrder();
PrintMessagesInOrder(ctx.Db);
}

void onUnhandledReducerError(ReducerEvent<Reducer> reducerEvent)
{
Console.WriteLine($"Unhandled reducer error in {reducerEvent.Reducer}: {reducerEvent.Status}");
}

void ProcessThread()
void ProcessThread(DbConnection conn, CancellationToken ct)
{
try
{
// loop until cancellation token
while (!cancel_token.IsCancellationRequested)
while (!ct.IsCancellationRequested)
{
conn.Update();

ProcessCommands();
ProcessCommands(conn.RemoteReducers);

Thread.Sleep(100);
}
Expand Down Expand Up @@ -207,18 +214,18 @@ void InputLoop()
}
}

void ProcessCommands()
void ProcessCommands(RemoteReducers reducers)
{
// process input queue commands
while (input_queue.TryDequeue(out var command))
{
switch (command.Command)
{
case "message":
conn.RemoteReducers.SendMessage(command.Args);
reducers.SendMessage(command.Args);
break;
case "name":
conn.RemoteReducers.SetName(command.Args);
reducers.SetName(command.Args);
break;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,5 +158,7 @@
_ => throw new ArgumentOutOfRangeException("Reducer", $"Unknown reducer {reducer}")
};
}

public SubscriptionBuilder<EventContext> SubscriptionBuilder() => new(this);

Check failure on line 162 in examples~/quickstart/client/module_bindings/_Globals/SpacetimeDBClient.cs

View workflow job for this annotation

GitHub Actions / build

'SubscriptionBuilder<EventContext>' does not contain a constructor that takes 1 arguments

Check failure on line 162 in examples~/quickstart/client/module_bindings/_Globals/SpacetimeDBClient.cs

View workflow job for this annotation

GitHub Actions / build

'SubscriptionBuilder<EventContext>' does not contain a constructor that takes 1 arguments
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've done this manually for now, it will need codegen changes once the client codegen part of clockworklabs/SpacetimeDB#1707 lands.

}
}
60 changes: 59 additions & 1 deletion src/Event.cs
Original file line number Diff line number Diff line change
Expand Up @@ -35,4 +35,62 @@ public record UnsubscribeApplied : Event<R>;
public record SubscribeError(Exception Exception) : Event<R>;
public record UnknownTransaction : Event<R>;
}
}

// TODO: Move those classes into EventContext, so that we wouldn't need repetitive generics.
public sealed class SubscriptionBuilder<EventContext>
where EventContext : IEventContext
{
private readonly IDbConnection conn;
private event Action<EventContext>? Applied;
private event Action<EventContext>? Error;

internal SubscriptionBuilder(IDbConnection conn)
{
this.conn = conn;
}

public SubscriptionBuilder<EventContext> OnApplied(Action<EventContext> callback)
{
Applied += callback;
return this;
}

public SubscriptionBuilder<EventContext> OnError(Action<EventContext> callback)
{
Error += callback;
return this;
}

public SubscriptionHandle<EventContext> Subscribe(string querySql) => new(conn, Applied, Error, querySql);
}

internal interface ISubscriptionHandle
{
void OnApplied(IEventContext ctx);
}

public class SubscriptionHandle<EventContext> : ISubscriptionHandle
where EventContext : IEventContext
{
private readonly Action<EventContext>? onApplied;

void ISubscriptionHandle.OnApplied(IEventContext ctx)
{
IsActive = true;
onApplied?.Invoke((EventContext)ctx);
}

internal SubscriptionHandle(IDbConnection conn, Action<EventContext>? onApplied, Action<EventContext>? onError, string querySql)
{
this.onApplied = onApplied;
conn.Subscribe(this, querySql);
}

public void Unsubscribe() => throw new NotImplementedException();

public void UnsuscribeThen(Action<EventContext> onEnd) => throw new NotImplementedException();

public bool IsEnded => false;
public bool IsActive { get; private set; }
}
}
42 changes: 25 additions & 17 deletions src/SpacetimeDBClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,12 @@ public DbConnectionBuilder<DbConnection, Reducer> OnDisconnect(Action<DbConnecti
}
}

public abstract class DbConnectionBase<DbConnection, Reducer>
internal interface IDbConnection
{
void Subscribe(ISubscriptionHandle handle, string query);
}

public abstract class DbConnectionBase<DbConnection, Reducer> : IDbConnection
where DbConnection : DbConnectionBase<DbConnection, Reducer>, new()
{
public static DbConnectionBuilder<DbConnection, Reducer> Builder() => new();
Expand Down Expand Up @@ -108,16 +113,13 @@ struct DbOp
/// </summary>
public event Action<Exception>? onSendError;

private readonly Dictionary<uint, ISubscriptionHandle> subscriptions = new();

/// <summary>
/// Invoked when a subscription is about to start being processed. This is called even before OnBeforeDelete.
/// </summary>
public event Action? onBeforeSubscriptionApplied;

/// <summary>
/// Invoked when the local client cache is updated as a result of changes made to the subscription queries.
/// </summary>
public event Action? onSubscriptionApplied;

/// <summary>
/// Invoked when a reducer is returned with an error and has no client-side handler.
/// </summary>
Expand Down Expand Up @@ -604,21 +606,24 @@ private void OnMessageProcessComplete(PreProcessedMessage preProcessed)
switch (message)
{
case ServerMessage.InitialSubscription(var initialSubscription):
{
onBeforeSubscriptionApplied?.Invoke();
stats.ParseMessageTracker.InsertRequest(timestamp, $"type={nameof(ServerMessage.InitialSubscription)}");
stats.SubscriptionRequestTracker.FinishTrackingRequest(initialSubscription.RequestId);
OnMessageProcessCompleteUpdate(ToEventContext(new Event<Reducer>.SubscribeApplied()), dbOps);
var eventContext = ToEventContext(new Event<Reducer>.SubscribeApplied());
OnMessageProcessCompleteUpdate(eventContext, dbOps);
try
{
onSubscriptionApplied?.Invoke();
subscriptions[initialSubscription.RequestId].OnApplied(eventContext);
}
catch (Exception e)
{
Log.Exception(e);
}
break;

}
case ServerMessage.TransactionUpdate(var transactionUpdate):
{
var reducer = transactionUpdate.ReducerCall.ReducerName;
stats.ParseMessageTracker.InsertRequest(timestamp, $"type={nameof(ServerMessage.TransactionUpdate)},reducer={reducer}");
var hostDuration = TimeSpan.FromMilliseconds(transactionUpdate.HostExecutionDurationMicros / 1000.0d);
Expand Down Expand Up @@ -673,7 +678,7 @@ private void OnMessageProcessComplete(PreProcessedMessage preProcessed)
}
}
break;

}
case ServerMessage.IdentityToken(var identityToken):
try
{
Expand Down Expand Up @@ -725,20 +730,23 @@ public void InternalCallReducer<T>(T args)
));
}

public void Subscribe(List<string> queries)
void IDbConnection.Subscribe(ISubscriptionHandle handle, string query)
{
if (!webSocket.IsConnected)
{
Log.Error("Cannot subscribe, not connected to server!");
return;
}

var request = new Subscribe
{
RequestId = stats.SubscriptionRequestTracker.StartTrackingRequest(),
};
request.QueryStrings.AddRange(queries);
webSocket.Send(new ClientMessage.Subscribe(request));
var id = stats.SubscriptionRequestTracker.StartTrackingRequest();
subscriptions[id] = handle;
webSocket.Send(new ClientMessage.Subscribe(
new Subscribe
{
RequestId = id,
QueryStrings = { query }
}
));
}

/// Usage: SpacetimeDBClientBase.instance.OneOffQuery<Message>("WHERE sender = \"bob\"");
Expand Down
1 change: 0 additions & 1 deletion tests~/SnapshotTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -297,7 +297,6 @@ public async Task VerifyAllTablesParsed()
});
client.onConnect += (identity, _token) =>
events.Add("OnIdentityReceived", identity);
client.onSubscriptionApplied += () => events.Add("OnSubscriptionApplied");
client.onUnhandledReducerError += (exception) =>
events.Add("OnUnhandledReducerError", exception);

Expand Down
Loading