Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Server-side implementation of incremental subscription changes #2030

Open
wants to merge 13 commits into
base: master
Choose a base branch
from
10 changes: 5 additions & 5 deletions crates/bench/benches/subscription.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use spacetimedb::execution_context::Workload;
use spacetimedb::host::module_host::DatabaseTableUpdate;
use spacetimedb::identity::AuthCtx;
use spacetimedb::messages::websocket::BsatnFormat;
use spacetimedb::subscription::query::compile_read_only_query;
use spacetimedb::subscription::query::compile_read_only_queryset;
use spacetimedb::subscription::subscription::ExecutionSet;
use spacetimedb::{db::relational_db::RelationalDB, messages::websocket::Compression};
use spacetimedb_bench::database::BenchDatabase as _;
Expand Down Expand Up @@ -102,7 +102,7 @@ fn eval(c: &mut Criterion) {
let bench_eval = |c: &mut Criterion, name, sql| {
c.bench_function(name, |b| {
let tx = raw.db.begin_tx(Workload::Update);
let query = compile_read_only_query(&raw.db, &AuthCtx::for_testing(), &tx, sql).unwrap();
let query = compile_read_only_queryset(&raw.db, &AuthCtx::for_testing(), &tx, sql).unwrap();
let query: ExecutionSet = query.into();

b.iter(|| {
Expand Down Expand Up @@ -141,8 +141,8 @@ fn eval(c: &mut Criterion) {
let select_lhs = "select * from footprint";
let select_rhs = "select * from location";
let tx = &raw.db.begin_tx(Workload::Update);
let query_lhs = compile_read_only_query(&raw.db, &AuthCtx::for_testing(), tx, select_lhs).unwrap();
let query_rhs = compile_read_only_query(&raw.db, &AuthCtx::for_testing(), tx, select_rhs).unwrap();
let query_lhs = compile_read_only_queryset(&raw.db, &AuthCtx::for_testing(), tx, select_lhs).unwrap();
let query_rhs = compile_read_only_queryset(&raw.db, &AuthCtx::for_testing(), tx, select_rhs).unwrap();
let query = ExecutionSet::from_iter(query_lhs.into_iter().chain(query_rhs));
let tx = &tx.into();

Expand All @@ -160,7 +160,7 @@ fn eval(c: &mut Criterion) {
where location.chunk_index = {chunk_index}"
);
let tx = &raw.db.begin_tx(Workload::Update);
let query = compile_read_only_query(&raw.db, &AuthCtx::for_testing(), tx, &join).unwrap();
let query = compile_read_only_queryset(&raw.db, &AuthCtx::for_testing(), tx, &join).unwrap();
let query: ExecutionSet = query.into();
let tx = &tx.into();

Expand Down
158 changes: 157 additions & 1 deletion crates/client-api-messages/src/websocket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,10 @@ pub enum ClientMessage<Args> {
Subscribe(Subscribe),
/// Send a one-off SQL query without establishing a subscription.
OneOffQuery(OneOffQuery),
/// Register a SQL query to to subscribe to updates. This does not affect other subscriptions.
SubscribeSingle(SubscribeSingle),
/// Remove a subscription to a SQL query that was added with SubscribeSingle.
Unsubscribe(Unsubscribe),
}

impl<Args> ClientMessage<Args> {
Expand All @@ -106,8 +110,10 @@ impl<Args> ClientMessage<Args> {
request_id,
flags,
}),
ClientMessage::Subscribe(x) => ClientMessage::Subscribe(x),
ClientMessage::OneOffQuery(x) => ClientMessage::OneOffQuery(x),
ClientMessage::SubscribeSingle(x) => ClientMessage::SubscribeSingle(x),
ClientMessage::Unsubscribe(x) => ClientMessage::Unsubscribe(x),
ClientMessage::Subscribe(x) => ClientMessage::Subscribe(x),
}
}
}
Expand Down Expand Up @@ -162,6 +168,20 @@ impl_deserialize!([] CallReducerFlags, de => match de.deserialize_u8()? {
x => Err(D::Error::custom(format_args!("invalid call reducer flag {x}"))),
});

/// An opaque id generated by the client to refer to a subscription.
/// This is used in Unsubscribe messages and errors.
#[derive(SpacetimeType, Copy, Clone, Debug, PartialEq, Eq, Hash)]
#[sats(crate = spacetimedb_lib)]
pub struct QueryId {
pub id: u32,
}

impl QueryId {
pub fn new(id: u32) -> Self {
Self { id }
}
}

/// Sent by client to database to register a set of queries, about which the client will
/// receive `TransactionUpdate`s.
///
Expand All @@ -184,6 +204,41 @@ pub struct Subscribe {
pub request_id: u32,
}

/// Sent by client to register a subscription to single query, for which the client should receive
/// receive relevant `TransactionUpdate`s.
///
/// After issuing a `SubscribeSingle` message, the client will receive a single
/// `SubscribeApplied` message containing every current row which matches the query. Then, any
/// time a reducer updates the query's results, the client will receive a `TransactionUpdate`
/// containing the relevant updates.
///
/// If a client subscribes to queries with overlapping results, the client will receive
/// multiple copies of rows that appear in multiple queries.
#[derive(SpacetimeType)]
#[sats(crate = spacetimedb_lib)]
pub struct SubscribeSingle {
/// A single SQL `SELECT` query to subscribe to.
pub query: Box<str>,
/// An identifier for a client request.
pub request_id: u32,

/// An identifier for this subscription, which should not be used for any other subscriptions on the same connection.
/// This is used to refer to this subscription in Unsubscribe messages from the client and errors sent from the server.
/// These only have meaning given a ConnectionId.
pub query_id: QueryId,
}

/// Client request for removing a query from a subscription.
#[derive(SpacetimeType)]
#[sats(crate = spacetimedb_lib)]
pub struct Unsubscribe {
/// An identifier for a client request.
pub request_id: u32,

/// The ID used in the corresponding `SubscribeSingle` message.
pub query_id: QueryId,
}

/// A one-off query submission.
///
/// Query should be a "SELECT * FROM Table WHERE ...". Other types of queries will be rejected.
Expand Down Expand Up @@ -213,6 +268,7 @@ pub const SERVER_MSG_COMPRESSION_TAG_GZIP: u8 = 2;
#[sats(crate = spacetimedb_lib)]
pub enum ServerMessage<F: WebsocketFormat> {
/// Informs of changes to subscribed rows.
/// This will be removed when we switch to `SubscribeSingle`.
InitialSubscription(InitialSubscription<F>),
/// Upon reducer run.
TransactionUpdate(TransactionUpdate<F>),
Expand All @@ -222,6 +278,97 @@ pub enum ServerMessage<F: WebsocketFormat> {
IdentityToken(IdentityToken),
/// Return results to a one off SQL query.
OneOffQueryResponse(OneOffQueryResponse<F>),
/// Sent in response to a `SubscribeSingle` message. This contains the initial matching rows.
SubscribeApplied(SubscribeApplied<F>),
/// Sent in response to an `Unsubscribe` message. This contains the matching rows.
UnsubscribeApplied(UnsubscribeApplied<F>),
/// Communicate an error in the subscription lifecycle.
SubscriptionError(SubscriptionError),
}

/// The matching rows of a subscription query.
#[derive(SpacetimeType)]
#[sats(crate = spacetimedb_lib)]
pub struct SubscribeRows<F: WebsocketFormat> {
/// The table ID of the query.
pub table_id: TableId,
/// The table name of the query.
pub table_name: Box<str>,
/// The BSATN row values.
pub table_rows: TableUpdate<F>,
}

/// Response to [`Subscribe`] containing the initial matching rows.
#[derive(SpacetimeType)]
#[sats(crate = spacetimedb_lib)]
pub struct SubscribeApplied<F: WebsocketFormat> {
/// The request_id of the corresponding `SubscribeSingle` message.
pub request_id: u32,
/// The overall time between the server receiving a request and sending the response.
pub total_host_execution_duration_micros: u64,
/// An identifier for the subscribed query sent by the client.
pub query_id: QueryId,
/// The matching rows for this query.
pub rows: SubscribeRows<F>,
}

/// Server response to a client [`Unsubscribe`] request.
#[derive(SpacetimeType)]
#[sats(crate = spacetimedb_lib)]
pub struct UnsubscribeApplied<F: WebsocketFormat> {
/// Provided by the client via the `Subscribe` message.
/// TODO: switch to subscription id?
pub request_id: u32,
/// The overall time between the server receiving a request and sending the response.
pub total_host_execution_duration_micros: u64,
/// The ID included in the `SubscribeApplied` and `Unsubscribe` messages.
pub query_id: QueryId,
/// The matching rows for this query.
/// Note, this makes unsubscribing potentially very expensive.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you make an issue to track this?

/// To remove this in the future, we would need to send query_ids with rows in transaction updates,
/// and we would need clients to track which rows exist in which queries.
pub rows: SubscribeRows<F>,
}

/// Server response to an error at any point of the subscription lifecycle.
/// If this error doesn't have a request_id, the client should drop all subscriptions.
#[derive(SpacetimeType)]
#[sats(crate = spacetimedb_lib)]
pub struct SubscriptionError {
/// The overall time between the server receiving a request and sending the response.
pub total_host_execution_duration_micros: u64,
/// Provided by the client via a [`Subscribe`] or [`Unsubscribe`] message.
/// [`None`] if this occurred as the result of a [`TransactionUpdate`].
pub request_id: Option<u32>,
/// The return table of the query in question.
/// The server is not required to set this field.
/// It has been added to avoid a breaking change post 1.0.
///
/// If unset, an error results in the entire subscription being dropped.
/// Otherwise only queries of this table type must be dropped.
pub table_id: Option<TableId>,
/// An error message describing the failure.
///
/// This should reference specific fragments of the query where applicable,
/// but should not include the full text of the query,
/// as the client can retrieve that from the `request_id`.
///
/// This is intended for diagnostic purposes.
/// It need not have a predictable/parseable format.
pub error: Box<str>,
}

/// Response to [`Subscribe`] containing the initial matching rows.
#[derive(SpacetimeType)]
#[sats(crate = spacetimedb_lib)]
pub struct SubscriptionUpdate<F: WebsocketFormat> {
/// A [`DatabaseUpdate`] containing only inserts, the rows which match the subscription queries.
pub database_update: DatabaseUpdate<F>,
/// An identifier sent by the client in requests.
/// The server will include the same request_id in the response.
pub request_id: u32,
/// The overall time between the server receiving a request and sending the response.
pub total_host_execution_duration_micros: u64,
}

/// Response to [`Subscribe`] containing the initial matching rows.
Expand Down Expand Up @@ -397,6 +544,15 @@ impl<F: WebsocketFormat> TableUpdate<F> {
}
}

pub fn empty(table_id: TableId, table_name: Box<str>) -> Self {
Self {
table_id,
table_name,
num_rows: 0,
updates: SmallVec::new(),
}
}

pub fn push(&mut self, (update, num_rows): (F::QueryUpdate, u64)) {
self.updates.push(update);
self.num_rows += num_rows;
Expand Down
24 changes: 22 additions & 2 deletions crates/core/src/client/client_connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,9 @@ use crate::util::prometheus_handle::IntGaugeExt;
use crate::worker_metrics::WORKER_METRICS;
use derive_more::From;
use futures::prelude::*;
use spacetimedb_client_api_messages::websocket::{CallReducerFlags, Compression, FormatSwitch};
use spacetimedb_client_api_messages::websocket::{
CallReducerFlags, Compression, FormatSwitch, SubscribeSingle, Unsubscribe,
};
use spacetimedb_lib::identity::RequestId;
use tokio::sync::{mpsc, oneshot, watch};
use tokio::task::AbortHandle;
Expand Down Expand Up @@ -283,12 +285,30 @@ impl ClientConnection {
.await
}

pub async fn subscribe_single(&self, subscription: SubscribeSingle, timer: Instant) -> Result<(), DBError> {
let me = self.clone();
tokio::task::spawn_blocking(move || {
me.module
.subscriptions()
.add_subscription(me.sender, subscription, timer, None)
})
.await
.unwrap() // TODO: is unwrapping right here?
}

pub async fn unsubscribe(&self, request: Unsubscribe, timer: Instant) -> Result<(), DBError> {
let me = self.clone();
tokio::task::spawn_blocking(move || me.module.subscriptions().remove_subscription(me.sender, request, timer))
.await
.unwrap() // TODO: is unwrapping right here?
}

pub async fn subscribe(&self, subscription: Subscribe, timer: Instant) -> Result<(), DBError> {
let me = self.clone();
tokio::task::spawn_blocking(move || {
me.module
.subscriptions()
.add_subscriber(me.sender, subscription, timer, None)
.add_legacy_subscriber(me.sender, subscription, timer, None)
})
.await
.unwrap()
Expand Down
16 changes: 16 additions & 0 deletions crates/core/src/client/message_handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,22 @@ pub async fn handle(client: &ClientConnection, message: DataMessage, timer: Inst
)
})
}
ClientMessage::SubscribeSingle(subscription) => {
let res = client.subscribe_single(subscription, timer).await;
WORKER_METRICS
.request_round_trip
.with_label_values(&WorkloadType::Subscribe, &address, "")
.observe(timer.elapsed().as_secs_f64());
Comment on lines +84 to +87
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This bit is repeated 5x modulo the workload and reducer string. I'd extract this into a closure taking the workload and the string.

res.map_err(|e| (None, None, e.into()))
}
ClientMessage::Unsubscribe(request) => {
let res = client.unsubscribe(request, timer).await;
WORKER_METRICS
.request_round_trip
.with_label_values(&WorkloadType::Unsubscribe, &address, "")
.observe(timer.elapsed().as_secs_f64());
res.map_err(|e| (None, None, e.into()))
}
ClientMessage::Subscribe(subscription) => {
let res = client.subscribe(subscription, timer).await;
WORKER_METRICS
Expand Down
Loading
Loading