Skip to content

Commit 860113d

Browse files
goffrieConvex, Inc.
authored andcommitted
Return the subscription's invalid_ts from wait_for_invalidation (#39457)
GitOrigin-RevId: 866df98fec22d82643a08030c6e01b0ed000277a
1 parent 70f0985 commit 860113d

File tree

4 files changed

+25
-19
lines changed

4 files changed

+25
-19
lines changed

crates/application/src/api.rs

Lines changed: 18 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -575,10 +575,10 @@ impl<RT: Runtime> SubscriptionClient for ApplicationSubscriptionClient<RT> {
575575
}
576576
}
577577

578-
pub enum ExtendValidityResult {
579-
/// The subscription's validity can be extended to the requested timestamp.
580-
Extended,
581-
/// The subscription may no longer be valid at the requested timestamp.
578+
pub enum SubscriptionValidity {
579+
/// The subscription is valid.
580+
Valid,
581+
/// The subscription may no longer be valid.
582582
/// This result can be returned spuriously even if there were no conflicting
583583
/// writes.
584584
Invalid {
@@ -590,10 +590,14 @@ pub enum ExtendValidityResult {
590590

591591
#[async_trait]
592592
pub trait SubscriptionTrait: Send + Sync {
593-
fn wait_for_invalidation(&self) -> BoxFuture<'static, anyhow::Result<()>>;
594-
595-
/// See comments on [`ExtendValidityResult`]
596-
async fn extend_validity(&self, new_ts: Timestamp) -> anyhow::Result<ExtendValidityResult>;
593+
/// Returns a future that completes after the subscription becomes invalid.
594+
/// The future yields the invalidation timestamp, if known; this is the same
595+
/// thing as [`SubscriptionValidity`]'s `invalid_ts`.
596+
fn wait_for_invalidation(&self) -> BoxFuture<'static, anyhow::Result<Option<Timestamp>>>;
597+
598+
/// Checks if the subscription is still valid as of `new_ts`. See comments
599+
/// on [`SubscriptionValidity`].
600+
async fn extend_validity(&self, new_ts: Timestamp) -> anyhow::Result<SubscriptionValidity>;
597601
}
598602

599603
struct ApplicationSubscription {
@@ -608,15 +612,15 @@ struct ApplicationSubscription {
608612

609613
#[async_trait]
610614
impl SubscriptionTrait for ApplicationSubscription {
611-
fn wait_for_invalidation(&self) -> BoxFuture<'static, anyhow::Result<()>> {
615+
fn wait_for_invalidation(&self) -> BoxFuture<'static, anyhow::Result<Option<Timestamp>>> {
612616
self.inner.wait_for_invalidation().map(Ok).boxed()
613617
}
614618

615619
#[fastrace::trace]
616-
async fn extend_validity(&self, new_ts: Timestamp) -> anyhow::Result<ExtendValidityResult> {
620+
async fn extend_validity(&self, new_ts: Timestamp) -> anyhow::Result<SubscriptionValidity> {
617621
if new_ts < self.initial_ts {
618622
// new_ts is before the initial subscription timestamp.
619-
return Ok(ExtendValidityResult::Invalid { invalid_ts: None });
623+
return Ok(SubscriptionValidity::Invalid { invalid_ts: None });
620624
}
621625

622626
// The inner subscription is periodically updated by the subscription
@@ -625,18 +629,18 @@ impl SubscriptionTrait for ApplicationSubscription {
625629
// Subscription is no longer valid. We could check validity from end_ts
626630
// to new_ts, but this is likely to fail and is potentially unbounded amount of
627631
// work, so we return false here. This is valid per the function contract.
628-
return Ok(ExtendValidityResult::Invalid {
632+
return Ok(SubscriptionValidity::Invalid {
629633
invalid_ts: self.inner.invalid_ts(),
630634
});
631635
};
632636

633637
let current_token = Token::new(self.reads.clone(), current_ts);
634638
Ok(match self.log.refresh_token(current_token, new_ts)? {
635-
Ok(_new_token) => ExtendValidityResult::Extended,
639+
Ok(_new_token) => SubscriptionValidity::Valid,
636640
Err(invalid_ts) => {
637641
// Subscription validity can't be extended. Note that returning false
638642
// here also doesn't mean there is a conflict.
639-
ExtendValidityResult::Invalid { invalid_ts }
643+
SubscriptionValidity::Invalid { invalid_ts }
640644
},
641645
})
642646
}

crates/database/src/subscription.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -606,13 +606,15 @@ impl Subscription {
606606
self.validity.invalid_ts()
607607
}
608608

609-
pub fn wait_for_invalidation(&self) -> impl Future<Output = ()> {
609+
pub fn wait_for_invalidation(&self) -> impl Future<Output = Option<Timestamp>> {
610610
let mut valid = self.valid.clone();
611+
let validity = self.validity.clone();
611612
let span = fastrace::Span::enter_with_local_parent("wait_for_invalidation");
612613
async move {
613614
let _: Result<_, _> = valid
614615
.wait_for(|state| matches!(state, SubscriptionState::Invalid))
615616
.await;
617+
validity.invalid_ts()
616618
}
617619
.in_span(span)
618620
}

crates/sync/src/state.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -462,7 +462,7 @@ impl SyncState {
462462
.as_ref()
463463
.ok_or_else(|| anyhow::anyhow!("Missing subscription for {}", query_id))?
464464
.wait_for_invalidation()
465-
.map(move |r| r.map(move |()| query_id));
465+
.map(move |r| r.map(move |_| query_id));
466466
let (future, handle) = future::abortable(future);
467467
sq.invalidation_future = Some(handle);
468468
self.invalidation_futures.push(future.boxed());

crates/sync/src/worker.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,9 @@ use application::{
1515
api::{
1616
ApplicationApi,
1717
ExecuteQueryTimestamp,
18-
ExtendValidityResult,
1918
SubscriptionClient,
2019
SubscriptionTrait,
20+
SubscriptionValidity,
2121
},
2222
redaction::{
2323
RedactedJsError,
@@ -821,8 +821,8 @@ impl<RT: Runtime> SyncWorker<RT> {
821821
let new_subscription = match current_subscription {
822822
Some(subscription) => {
823823
match subscription.extend_validity(new_ts).await? {
824-
ExtendValidityResult::Extended => Some(subscription),
825-
ExtendValidityResult::Invalid { invalid_ts } => {
824+
SubscriptionValidity::Valid => Some(subscription),
825+
SubscriptionValidity::Invalid { invalid_ts } => {
826826
metrics::log_query_invalidated(
827827
partition_id,
828828
invalid_ts,

0 commit comments

Comments
 (0)