Skip to content

Commit

Permalink
Update client example
Browse files Browse the repository at this point in the history
  • Loading branch information
RReverser committed Oct 1, 2024
1 parent c4cd459 commit 9b3b21f
Showing 1 changed file with 30 additions and 25 deletions.
55 changes: 30 additions & 25 deletions examples~/quickstart/client/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,30 +5,27 @@
using System.Net.WebSockets;
using System.Threading;
using SpacetimeDB;
using SpacetimeDB.ClientApi;
using SpacetimeDB.Types;

const string HOST = "http://localhost:3000";
const string DBNAME = "chatqs";

DbConnection? conn = null;

// our local client SpacetimeDB identity
Identity? local_identity = null;
// declare a thread safe queue to store commands
var input_queue = new ConcurrentQueue<(string Command, string Args)>();
// declare a threadsafe cancel token to cancel the process loop
var cancel_token = new CancellationTokenSource();

void Main()
{
AuthToken.Init(".spacetime_csharp_quickstart");

DbConnection? conn = null;

conn = DbConnection.Builder()
.WithUri(HOST)
.WithModuleName(DBNAME)
//.WithCredentials((null, AuthToken.Token))
.OnConnect(OnConnect)
.OnConnect((identity, authToken) => OnConnect(conn!, identity, authToken))
.OnConnectError(OnConnectError)
.OnDisconnect(OnDisconnect)
.Build();
Expand All @@ -41,17 +38,19 @@ void Main()
conn.RemoteReducers.OnSetName += Reducer_OnSetNameEvent;
conn.RemoteReducers.OnSendMessage += Reducer_OnSendMessageEvent;

conn.onSubscriptionApplied += OnSubscriptionApplied;
conn.onUnhandledReducerError += onUnhandledReducerError;

// declare a threadsafe cancel token to cancel the process loop
var cancellationTokenSource = new CancellationTokenSource();

// spawn a thread to call process updates and process commands
var thread = new Thread(ProcessThread);
var thread = new Thread(() => ProcessThread(conn, cancellationTokenSource.Token));
thread.Start();

InputLoop();

// this signals the ProcessThread to stop
cancel_token.Cancel();
cancellationTokenSource.Cancel();
thread.Join();
}

Expand Down Expand Up @@ -84,9 +83,9 @@ void User_OnUpdate(EventContext ctx, User oldValue, User newValue)
}
}

void PrintMessage(Message message)
void PrintMessage(RemoteTables tables, Message message)
{
var sender = conn.RemoteTables.User.FindByIdentity(message.Sender);
var sender = tables.User.FindByIdentity(message.Sender);
var senderName = "unknown";
if (sender != null)
{
Expand All @@ -100,7 +99,7 @@ void Message_OnInsert(EventContext ctx, Message insertedValue)
{
if (ctx.Reducer is not Event<Reducer>.SubscribeApplied)
{
PrintMessage(insertedValue);
PrintMessage(ctx.Db, insertedValue);
}
}

Expand Down Expand Up @@ -128,12 +127,18 @@ void Reducer_OnSendMessageEvent(EventContext ctx, string text)
}
}

void OnConnect(Identity identity, string authToken)
void OnConnect(DbConnection conn, Identity identity, string authToken)
{
local_identity = identity;
AuthToken.SaveToken(authToken);

conn!.Subscribe(new List<string> { "SELECT * FROM User", "SELECT * FROM Message" });
conn.SubscriptionBuilder()
.OnApplied(OnSubscriptionApplied)
.Subscribe("SELECT * FROM User");

conn.SubscriptionBuilder()
.OnApplied(OnSubscriptionApplied)
.Subscribe("SELECT * FROM Message");
}

void OnConnectError(WebSocketError? error, string message)
Expand All @@ -146,35 +151,35 @@ void OnDisconnect(DbConnection conn, WebSocketCloseStatus? status, WebSocketErro

}

void PrintMessagesInOrder()
void PrintMessagesInOrder(RemoteTables tables)
{
foreach (Message message in conn.RemoteTables.Message.Iter().OrderBy(item => item.Sent))
foreach (Message message in tables.Message.Iter().OrderBy(item => item.Sent))
{
PrintMessage(message);
PrintMessage(tables, message);
}
}

void OnSubscriptionApplied()
void OnSubscriptionApplied(EventContext ctx)
{
Console.WriteLine("Connected");
PrintMessagesInOrder();
PrintMessagesInOrder(ctx.Db);
}

void onUnhandledReducerError(ReducerEvent<Reducer> reducerEvent)
{
Console.WriteLine($"Unhandled reducer error in {reducerEvent.Reducer}: {reducerEvent.Status}");
}

void ProcessThread()
void ProcessThread(DbConnection conn, CancellationToken ct)
{
try
{
// loop until cancellation token
while (!cancel_token.IsCancellationRequested)
while (!ct.IsCancellationRequested)
{
conn.Update();

ProcessCommands();
ProcessCommands(conn.RemoteReducers);

Thread.Sleep(100);
}
Expand Down Expand Up @@ -207,18 +212,18 @@ void InputLoop()
}
}

void ProcessCommands()
void ProcessCommands(RemoteReducers reducers)
{
// process input queue commands
while (input_queue.TryDequeue(out var command))
{
switch (command.Command)
{
case "message":
conn.RemoteReducers.SendMessage(command.Args);
reducers.SendMessage(command.Args);
break;
case "name":
conn.RemoteReducers.SetName(command.Args);
reducers.SetName(command.Args);
break;
}
}
Expand Down

0 comments on commit 9b3b21f

Please sign in to comment.