Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

async: Fix hang in wait_all_exit #227

Merged
merged 1 commit into from
Sep 25, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 14 additions & 13 deletions src/asynchronous/shutdown.rs
Original file line number Diff line number Diff line change
Expand Up @@ -144,24 +144,25 @@ impl Notifier {
/// Wait for all [`Waiter`]s to drop.
pub async fn wait_all_exit(&self) -> Result<(), Elapsed> {
//debug_assert!(self.shared.is_shutdown());
if self.waiters() == 0 {
return Ok(());
}
let wait = self.wait();
if self.waiters() == 0 {
return Ok(());
}
wait.await
}

async fn wait(&self) -> Result<(), Elapsed> {
if let Some(tm) = self.wait_time {
timeout(tm, self.shared.notify_exit.notified()).await
timeout(tm, self.wait()).await
} else {
self.shared.notify_exit.notified().await;
self.wait().await;
Ok(())
}
}

async fn wait(&self) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
async fn wait(&self) {
pub async fn wait_all_exit(&self) -> Result<(), Elapsed> {
while self.waiters() > 0 {
let wait = self.wait(self.shared.notify_exit.notified());
....
}
}
async fn wait<F>(&self, future: F) -> Result<F::Output, Elapsed>
where F: Future
{
if let Some(tm) = self.wait_time {
timeout(tm, future).await
} else {
Ok(future.await)
}
}

wait() is just waiting, do while self.waiters() in wait_all ?

Copy link
Contributor Author

@alex-matei alex-matei May 10, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This doesn't fix the issue, you need to check the waiters count after the notified() object is created but before calling await on it. Let's take an example with two threads running in parallel:

Thread 1  (executing last waiter.drop())        |               Thread 2 (in wait_all_exit() call)
                                                |
self.shared.waiters = 0                         |
                                                |
self.shared.notify_exit.notify_waiters()        |               
                                                |
                                                |               future = self.shared.notify_exit.notified();
                                                |
                                                |
                                                |               future.await;

The notify_waiters() call doesn't store any permit, it only unblocks existing notified() objects: https://docs.rs/tokio/latest/tokio/sync/struct.Notify.html#method.notify_waiters .

If the notified object is created after notify_waiters() call, await will block forever. There won't be anyone left to unblock it
because notify_waiters() is called only when the last waiter is dropped (unless a subscribe() call and a drop() happens in the meantime).

The sync.Notify struct (the type of self.shared.notify_exit) uses internally seq_cst operations on an AtomicUsize state value. This means that they form a happens before relationship between notified() and notify_waiters() calls, anything that happened before the call to notify_waiters() in thread 1, will be visible for the thread 2 after the notified() call.

Hence, we can test for waiters == 0 just after the notified() call:

  1. If the value is 0 awaiting at this point would be dangerous because of the scenario described above, but we can just exit.
  2. If the value is not 0 then that means there is still a waiter that is not yet dropped and we can expect a notify_waiters() call on another thread later on. This will unblock our existing notified() object so we can safely await.

By adding a check for the waiters count right before await happens we can protect against this hang.

Copy link
Collaborator

@wllenyj wllenyj May 11, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the explanation, very detailed. I know how the race happens and have read the documentation.

The Notified future is guaranteed to receive wakeups from notify_waiters() as soon as it has been created, even if it has not yet been polled.

The problem is that Notified need to be created first, then it makes sense to test waiters. So we can move notified() forward.

My point is that the wait function is a helper function, and not public api, it only wrap the timeout or not. It maybe used to unit test or other intenal situation. The wait_all_exit is provided to users to ensure concurrency safety.

Copy link
Contributor Author

@alex-matei alex-matei May 13, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are two reasons for moving the bulk of the code in wait():

  1. Let's inline wait() call in the snippet you suggested:

     pub async fn wait_all_exit(&self) -> Result<(), Elapsed> {
            while self.waiters() > 0 {
                let future = self.shared.notify_exit.notified();
                if let Some(tm) = self.wait_time {
                    let wait = timeout(tm, future).await
                } else {
                    let wait = Ok(future.await)
                }
                ....
            }
        }
    

    Between the notified() call and the await there is no check for waiters count so awaiting can still block forever if waiters count is 0. The solution is to add the test for waiters() at the beginning of wait() function. This solution while it seem ok at first it doesn't work because of the second issue.

  2. The timeout, when specified, should apply to the outer while loop as well. Otherwise the code might loop multiple times, beyond the specified timeout, which is incorrect.

Copy link
Collaborator

@wllenyj wllenyj May 14, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The ...... is the code that leave it as is for yours.

Suggested change
async fn wait(&self) {
pub async fn wait_all_exit(&self) -> Result<(), Elapsed> {
while self.waiters() > 0 {
let wait = self.wait(self.shared.notify_exit.notified());
if self.waiters() == 0 {
return;
}
wait.await;
}
}

Will this work?

Copy link
Contributor Author

@alex-matei alex-matei May 14, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This won't work, it doesn't resolve the second issue that is related to timeout. The timeout should be applied to the whole while loop because it can iterate multiple times, in case someone calls subscribe() and the waiters() goes from 0 to >0.
For example, if you pass 5 seconds for timeout this code can behave something like this:

  1. wait for 4 seconds at first in wait.await
  2. waiters() is > 0 after that, because someone did a subscribe() after the last waiter was dropped
  3. 3 seconds in wait.await
  4. waiters() is again >0;
  5. wait.await waits for another 5 seconds.
  6. waiters() is finally 0 and the code can exit.

The code waits for a total of twelve seconds in total although the original intent was to wait only for 5 seconds at most.
The idea is to move the while block inside the future that is passed to timeout() function. By moving it there, you'll get to the same piece of code I wrote.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for your comment.
I'm going to take some time to review this.

while self.waiters() > 0 {
let notified = self.shared.notify_exit.notified();
if self.waiters() == 0 {
return;
}
notified.await;
// Some waiters could have been created in the meantime
// by calling `subscribe`, loop again
}
}
}

impl Drop for Notifier {
Expand Down
Loading