Skip to content

Commit

Permalink
Implement new subscription API (#219)
Browse files Browse the repository at this point in the history
## Description of Changes

As proposed. No upstream codegen changes needed :)

## API

 - [x] This is an API breaking change to the SDK

*If the API is breaking, please state below what will break*
The subscription API is slightly different.

## Requires SpacetimeDB PRs
clockworklabs/SpacetimeDB#2111

## Testsuite

SpacetimeDB branch name: jsdt/subscribe-sdk-3

## Testing
So far I have performed manual testing with the chat example. Working on
updating the unity and unit tests.

- [ ] Describe a test for this PR that you have completed
  • Loading branch information
kazimuth authored Jan 23, 2025
1 parent 887ba98 commit 2535e3f
Show file tree
Hide file tree
Showing 10 changed files with 1,219 additions and 156 deletions.
17 changes: 11 additions & 6 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ jobs:
# Now, setup the Unity tests.

- name: Patch spacetimedb dependency in Cargo.toml
working-directory: unity-tests~/server
working-directory: unity-tests~/server-rust
run: |
sed -i "s|spacetimedb *=.*|spacetimedb = \{ path = \"../../SpacetimeDB~/crates/bindings\" \}|" Cargo.toml
cat Cargo.toml
Expand All @@ -84,21 +84,24 @@ jobs:
- name: Cache Rust dependencies
uses: Swatinem/rust-cache@v2
with:
workspaces: unity-tests~/server
workspaces: unity-tests~/server-rust
shared-key: UnityTestServer

- name: Install SpacetimeDB CLI from the local checkout
run: |
cargo install --path SpacetimeDB~/crates/cli --locked --message-format=short
cargo install --path SpacetimeDB~/crates/standalone --locked --message-format=short
# Add a handy alias using the old binary name, so that we don't have to rewrite all scripts (incl. in submodules).
rm $HOME/.cargo/bin/spacetime || echo "haven't run on this host before"
ln -s $HOME/.cargo/bin/spacetimedb-cli $HOME/.cargo/bin/spacetime
# Clear any existing information
spacetime server clear -y
env:
# Share the target directory with our local project to avoid rebuilding same SpacetimeDB crates twice.
CARGO_TARGET_DIR: unity-tests~/server/target
CARGO_TARGET_DIR: unity-tests~/server-rust/target

- name: Generate client bindings
working-directory: unity-tests~/server
working-directory: unity-tests~/server-rust
run: bash ./generate.sh -y

- name: Check for changes
Expand All @@ -121,8 +124,10 @@ jobs:
disown
- name: Publish module to SpacetimeDB
working-directory: unity-tests~/server
run: bash ./publish.sh
working-directory: unity-tests~/server-rust
run: |
spacetime logout && spacetime login --server-issued-login local
bash ./publish.sh
- uses: actions/cache@v3
with:
Expand Down
28 changes: 23 additions & 5 deletions examples~/quickstart/client/Program.cs
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Net.WebSockets;
using System.Threading;
using SpacetimeDB;
using SpacetimeDB.Types;
Expand Down Expand Up @@ -135,9 +133,29 @@ void OnConnect(DbConnection conn, Identity identity, string authToken)
local_identity = identity;
AuthToken.SaveToken(authToken);

conn.SubscriptionBuilder()
.OnApplied(OnSubscriptionApplied)
.Subscribe("SELECT * FROM user", "SELECT * FROM message");
var subscriptions = 0;
SubscriptionBuilder<EventContext>.Callback waitForSubscriptions = (EventContext ctx) =>
{
// Note: callbacks are always invoked on the main thread, so you don't need to
// worry about thread synchronization or anything like that.
subscriptions += 1;

if (subscriptions == 2)
{
OnSubscriptionApplied(ctx);
}
};

var userSubscription = conn.SubscriptionBuilder()
.OnApplied(waitForSubscriptions)
.Subscribe("SELECT * FROM user");
var messageSubscription = conn.SubscriptionBuilder()
.OnApplied(waitForSubscriptions)
.Subscribe("SELECT * FROM message");

// You can also use SubscribeToAllTables, but it should be avoided if you have any large tables:
// conn.SubscriptionBuilder().OnApplied(OnSubscriptionApplied).SubscribeToAllTables();

}

void OnConnectError(Exception e)
Expand Down
151 changes: 140 additions & 11 deletions src/Event.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using System;
using SpacetimeDB.ClientApi;

namespace SpacetimeDB
{
Expand Down Expand Up @@ -62,41 +63,169 @@ public SubscriptionBuilder<EventContext> OnError(Callback callback)
return this;
}

public SubscriptionHandle<EventContext> Subscribe(params string[] querySqls) => new(conn, Applied, Error, querySqls);
public SubscriptionHandle<EventContext> Subscribe(string querySql) => new(conn, Applied, Error, querySql);

public void SubscribeToAllTables()
{
Subscribe("SELECT * FROM *");
// Make sure we use the legacy handle constructor here, even though there's only 1 query.
// We drop the error handler, since it can't be called for legacy subscriptions.
new SubscriptionHandle<EventContext>(conn, Applied, new string[] { "SELECT * FROM *" });
}
}

public interface ISubscriptionHandle
{
void OnApplied(IEventContext ctx);
void OnApplied(IEventContext ctx, SubscriptionAppliedType state);
void OnError(IEventContext ctx);
void OnEnded(IEventContext ctx);
}

/// <summary>
/// An applied subscription can either be a new-style subscription (with a query ID),
/// or a legacy subscription (no query ID).
/// </summary>
[Type]
public partial record SubscriptionAppliedType : TaggedEnum<(
QueryId Active,
Unit LegacyActive)>
{ }

/// <summary>
/// State flow chart:
/// <c>
/// |
/// v
/// Pending
/// | |
/// v v
/// Active LegacyActive
/// |
/// v
/// Ended
/// </c>
/// </summary>
[Type]
public partial record SubscriptionState : TaggedEnum<(
Unit Pending,
QueryId Active,
Unit LegacyActive,
Unit Ended)>
{ }

public class SubscriptionHandle<EventContext> : ISubscriptionHandle
where EventContext : IEventContext
{
private readonly IDbConnection conn;
private readonly SubscriptionBuilder<EventContext>.Callback? onApplied;
private readonly SubscriptionBuilder<EventContext>.Callback? onError;
private SubscriptionBuilder<EventContext>.Callback? onEnded;

void ISubscriptionHandle.OnApplied(IEventContext ctx)
private SubscriptionState state;

/// <summary>
/// Whether the subscription has ended.
/// </summary>
public bool IsEnded
{
IsActive = true;
get
{
return state is SubscriptionState.Ended;
}
}

/// <summary>
/// Whether the subscription is active.
/// </summary>
public bool IsActive
{
get
{
return state is SubscriptionState.Active || state is SubscriptionState.LegacyActive;
}
}

void ISubscriptionHandle.OnApplied(IEventContext ctx, SubscriptionAppliedType type)
{
if (type is SubscriptionAppliedType.Active active)
{
state = new SubscriptionState.Active(active.Active_);
}
else if (type is SubscriptionAppliedType.LegacyActive)
{
state = new SubscriptionState.LegacyActive(new());
}
onApplied?.Invoke((EventContext)ctx);
}

internal SubscriptionHandle(IDbConnection conn, SubscriptionBuilder<EventContext>.Callback? onApplied, SubscriptionBuilder<EventContext>.Callback? onError, string[] querySqls)
void ISubscriptionHandle.OnEnded(IEventContext ctx)
{
state = new SubscriptionState.Ended(new());
onEnded?.Invoke((EventContext)ctx);
}

void ISubscriptionHandle.OnError(IEventContext ctx)
{
state = new SubscriptionState.Ended(new());
onError?.Invoke((EventContext)ctx);
}

/// <summary>
/// TODO: remove this constructor once legacy subscriptions are removed.
/// </summary>
/// <param name="conn"></param>
/// <param name="onApplied"></param>
/// <param name="onError"></param>
/// <param name="querySqls"></param>
internal SubscriptionHandle(IDbConnection conn, SubscriptionBuilder<EventContext>.Callback? onApplied, string[] querySqls)
{
state = new SubscriptionState.Pending(new());
this.conn = conn;
this.onApplied = onApplied;
conn.Subscribe(this, querySqls);
conn.LegacySubscribe(this, querySqls);
}

public void Unsubscribe() => throw new NotImplementedException();
/// <summary>
/// Construct a subscription handle.
/// </summary>
/// <param name="conn"></param>
/// <param name="onApplied"></param>
/// <param name="onError"></param>
/// <param name="querySql"></param>
internal SubscriptionHandle(IDbConnection conn, SubscriptionBuilder<EventContext>.Callback? onApplied, SubscriptionBuilder<EventContext>.Callback? onError, string querySql)
{
state = new SubscriptionState.Pending(new());
this.onApplied = onApplied;
this.onError = onError;
this.conn = conn;
conn.Subscribe(this, querySql);
}

public void UnsuscribeThen(SubscriptionBuilder<EventContext>.Callback onEnd) => throw new NotImplementedException();
/// <summary>
/// Unsubscribe from the query controlled by this subscription handle.
///
/// Calling this more than once will result in an exception.
/// </summary>
public void Unsubscribe()
{
UnsubscribeThen(null);
}

public bool IsEnded => false;
public bool IsActive { get; private set; }
/// <summary>
/// Unsubscribe from the query controlled by this subscription handle,
/// and call onEnded when its rows are removed from the client cache.
/// </summary>
public void UnsubscribeThen(SubscriptionBuilder<EventContext>.Callback? onEnded)
{
if (state is not SubscriptionState.Active)
{
throw new Exception("Cannot unsubscribe from inactive subscription.");
}
if (onEnded != null)
{
// TODO: should we just log here instead? Do we try not to throw exceptions on the main thread?
throw new Exception("Unsubscribe already called.");
}
this.onEnded = onEnded;
}
}
}
4 changes: 4 additions & 0 deletions src/SpacetimeDB/ClientApi/SubscriptionError.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ public partial class SubscriptionError
public ulong TotalHostExecutionDurationMicros;
[DataMember(Name = "request_id")]
public uint? RequestId;
[DataMember(Name = "query_id")]
public uint? QueryId;
[DataMember(Name = "table_id")]
public uint? TableId;
[DataMember(Name = "error")]
Expand All @@ -27,12 +29,14 @@ public partial class SubscriptionError
public SubscriptionError(
ulong TotalHostExecutionDurationMicros,
uint? RequestId,
uint? QueryId,
uint? TableId,
string Error
)
{
this.TotalHostExecutionDurationMicros = TotalHostExecutionDurationMicros;
this.RequestId = RequestId;
this.QueryId = QueryId;
this.TableId = TableId;
this.Error = Error;
}
Expand Down
Loading

0 comments on commit 2535e3f

Please sign in to comment.