Skip to content

Commit

Permalink
fix(spsc_fold): potentially missing wakeup when send()ing in state `S…
Browse files Browse the repository at this point in the history
…enderWaitsForReceiverToConsume` (#10318)

# Problem

Before this PR, there were cases where send() in state
SenderWaitsForReceiverToConsume would never be woken up
by the receiver, because it never registered with `wake_sender`.

Example Scenario 1: we stop polling a send() future A that was waiting
for the receiver to consume. We drop A and create a new send() future B.
B would return Poll::Pending and never regsister a waker.

Example Scenario 2: a send() future A transitions from HasData
to SenderWaitsForReceiverToConsume. This registers the context X
with `wake_sender`. But before the Receiver consumes the data,
we poll A from a different context Y.
The state is still SenderWaitsForReceiverToConsume, but we wouldn't
register the new context with `wake_sender`.
When the Receiver comes around to consume and `wake_sender.notify()`s,
it wakes the old context X instead of Y.

# Fix

Register the waker in the case where we're polled in
state `SenderWaitsForReceiverToConsume`.

# Relation to #10309

I found this bug while investigating #10309.
There was never proof that this bug here is the root cause for #10309.
In the meantime we found a more probably hypothesis
for the root cause than what is being fixed here.
Regardless, let's walk through my thought process about
how it might have been relevant:

There (in page_service), Scenario 1 does not apply because
we poll the send() future to completion.

Scenario 2 (`tokio::join!`) also does not apply with the
current `tokio::join!()` impl, because it will just poll each
future every time, each with the same context.
Although if we ever used something like a FuturesUnordered anywhere,
that will be using a different context, so, in that case,
the bug might materialize.

Regarding tokio & spurious poll in general:
@conradludgate is not aware of any spurious wakeup cases in current
tokio,
but within a `tokio::join!()`, any wake meant for one future will poll
all
the futures, so that can appear as a spurious wake up to the N-1 futures
of the `tokio::join!()`.
  • Loading branch information
problame authored Jan 10, 2025
1 parent 735c66d commit db00eb4
Showing 1 changed file with 5 additions and 1 deletion.
6 changes: 5 additions & 1 deletion libs/utils/src/sync/spsc_fold.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,11 @@ impl<T: Send> Sender<T> {
}
}
State::SenderWaitsForReceiverToConsume(_data) => {
// Really, we shouldn't be polled until receiver has consumed and wakes us.
// SAFETY: send is single threaded due to `&mut self` requirement,
// therefore register is not concurrent.
unsafe {
self.state.wake_sender.register(cx.waker());
}
Poll::Pending
}
State::ReceiverGone => Poll::Ready(Err(SendError::ReceiverGone)),
Expand Down

1 comment on commit db00eb4

@github-actions
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

7293 tests run: 6928 passed, 1 failed, 364 skipped (full report)


Failures on Postgres 14

# Run all failed tests locally:
scripts/pytest -vv -n $(nproc) -k "test_parallel_copy[release-pg14]"

Test coverage report is not available

The comment gets automatically updated with the latest test results
db00eb4 at 2025-01-10T11:58:38.448Z :recycle:

Please sign in to comment.