-
Notifications
You must be signed in to change notification settings - Fork 109
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Server-side implementation of incremental subscription changes #2030
base: master
Are you sure you want to change the base?
Conversation
/// The ID included in the `SubscribeApplied` and `Unsubscribe` messages. | ||
pub query_id: QueryId, | ||
/// The matching rows for this query. | ||
/// Note, this makes unsubscribing potentially very expensive. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you make an issue to track this?
WORKER_METRICS | ||
.request_round_trip | ||
.with_label_values(&WorkloadType::Subscribe, &address, "") | ||
.observe(timer.elapsed().as_secs_f64()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This bit is repeated 5x modulo the workload and reducer string. I'd extract this into a closure taking the workload and the string.
pub struct SubscriptionRows { | ||
pub table_id: TableId, | ||
pub table_name: Box<str>, | ||
pub table_rows: FormatSwitch<ws::TableUpdate<BsatnFormat>, ws::TableUpdate<JsonFormat>>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm sad we cannot do type Switched<T> = FormatSwitch<T<BsatnFormat>, T<JsonFormat>>;
:( My kingdom for higher kinded types...
ws::UnsubscribeApplied { | ||
total_host_execution_duration_micros, | ||
request_id, | ||
query_id, | ||
rows: ws::SubscribeRows { | ||
table_id: result.table_id, | ||
table_name: result.table_name, | ||
table_rows, | ||
}, | ||
} | ||
.into(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
And my other kingdom for generic closures...
let query = match subscriptions.remove_subscription((sender.id.identity, sender.id.address), request.query_id) { | ||
Ok(query) => query, | ||
Err(error) => { | ||
// Apparently we ignore errors sending messages. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would you do something differently?
let client_info = self.clients.get(client); | ||
if client_info.is_none() { | ||
return; | ||
} | ||
let client_info = client_info.unwrap(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could use let Some(...) = ... else { ... }
?
if query_state.is_none() { | ||
tracing::warn!("Query state not found for query hash: {:?}", query_hash); | ||
return; | ||
} | ||
}; | ||
|
||
self.clients.remove(client); | ||
self.subscribers.retain(|hash, ids| { | ||
ids.remove(client); | ||
if ids.is_empty() { | ||
if let Some(query) = self.queries.remove(hash) { | ||
remove_table_query(query.return_table(), hash); | ||
remove_table_query(query.filter_table(), hash); | ||
} | ||
let query_state = query_state.unwrap(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here too... using let...else ? or are there some borrowing issues?
SubscriptionManager::remove_table_query(&mut self.tables, query_state.query.return_table(), query_hash); | ||
SubscriptionManager::remove_table_query(&mut self.tables, query_state.query.filter_table(), query_hash); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Make into a helper taking &mut self.tables, &query_state, query_hash
? Seems to occur 3x.
let db = TestDB::durable()?; | ||
|
||
create_table(&db, "T")?; | ||
let sql = "select * from T"; | ||
let plan = compile_plan(&db, sql)?; | ||
|
||
let client = Arc::new(client(0)); | ||
|
||
let query_id: ClientQueryId = QueryId::new(1); | ||
let mut subscriptions = SubscriptionManager::default(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This seems copy-pasted a few times -- for the sake of readability, I'd like to see these moved to a helper / higher order function.
} | ||
|
||
#[test] | ||
fn test_unsubscribe_from_the_only_subscription() -> ResultTest<()> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would like to see a test where we add two different queries for the same client and remove one of them and assert that only one of them remain.
Description of Changes
This adds the new message types for being able to subscribe and unsubscribe to individual queries without affecting other subscriptions. Much of this was taken from #1997, but this version does not remove the existing way of subscribing to a set of queries. This renames that to
legacy_
in many places, and we eventually want to get rid of it, but by making this an additive change we don't need to update all the clients at the same time.This biggest changes are in the
SubscriptionManager
code. Previously we just stored two mappings:Now we also store a mapping from client to the set of subscriptions for that client (which is useful for unsubscribing), and we keep track of both legacy subscriptions (which have a set of subscriptions per client), and the new subscriptions (which are identified by a
(ClientId, RequestId)
pair).API and ABI breaking changes
This is additive, and only adds new enum types to existing messages, so this is not breaking. In the future we will want to make a breaking change to remove the legacy versions.
Expected complexity level and risk
Testing
This passes existing tests (which exercise the legacy versions), and there are a few basic tests for the new version.
More unit test coverage for incremental subscription changes would be good, but I will probably add that when I start adding client support for it.
Follow-up work
We still need: