Skip to content

Commit

Permalink
Merge pull request #68 from hozan23/add_recv_broadcast_blocking
Browse files Browse the repository at this point in the history
Add `Sender::broadcast_blocking` and `Receiver::recv_blocking`
  • Loading branch information
zeenix authored Jul 3, 2024
2 parents ef37a05 + 56406d8 commit de420a3
Show file tree
Hide file tree
Showing 2 changed files with 86 additions and 0 deletions.
69 changes: 69 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -752,6 +752,37 @@ impl<T: Clone> Sender<T> {

Ok(ret)
}

/// Broadcasts a message on the channel using the blocking strategy.
///
/// If the channel is full, this method will block until there is room.
///
/// If the channel is closed, this method returns an error.
///
/// # Blocking
///
/// Rather than using asynchronous waiting, like the [`send`](Self::broadcast) method,
/// this method will block the current thread until the message is sent.
///
/// This method should not be used in an asynchronous context. It is intended
/// to be used such that a channel can be used in both asynchronous and synchronous contexts.
/// Calling this method in an asynchronous context may result in deadlocks.
///
/// # Examples
///
/// ```
/// use async_broadcast::{broadcast, SendError};
///
/// let (s, r) = broadcast(1);
///
/// assert_eq!(s.broadcast_blocking(1), Ok(None));
/// drop(r);
/// assert_eq!(s.broadcast_blocking(2), Err(SendError(2)));
/// ```
#[cfg(not(target_family = "wasm"))]
pub fn broadcast_blocking(&self, msg: T) -> Result<Option<T>, SendError<T>> {
self.broadcast_direct(msg).wait()
}
}

impl<T> Drop for Sender<T> {
Expand Down Expand Up @@ -1228,6 +1259,44 @@ impl<T: Clone> Receiver<T> {
.map(|cow| cow.unwrap_or_else(T::clone))
}

/// Receives a message from the channel using the blocking strategy.
///
/// If the channel is empty, this method will block until there is a message.
///
/// If the channel is closed, this method receives a message or returns an error if there are
/// no more messages.
///
/// If this receiver has missed a message (only possible if overflow mode is enabled), then
/// this method returns an error and readjusts its cursor to point to the first available
/// message.
///
/// # Blocking
///
/// Rather than using asynchronous waiting, like the [`recv`](Self::recv) method,
/// this method will block the current thread until the message is sent.
///
/// This method should not be used in an asynchronous context. It is intended
/// to be used such that a channel can be used in both asynchronous and synchronous contexts.
/// Calling this method in an asynchronous context may result in deadlocks.
///
/// # Examples
///
/// ```
/// use async_broadcast::{broadcast, RecvError};
///
/// let (s, mut r) = broadcast(1);
///
/// assert_eq!(s.broadcast_blocking(1), Ok(None));
/// drop(s);
///
/// assert_eq!(r.recv_blocking(), Ok(1));
/// assert_eq!(r.recv_blocking(), Err(RecvError::Closed));
/// ```
#[cfg(not(target_family = "wasm"))]
pub fn recv_blocking(&mut self) -> Result<T, RecvError> {
self.recv_direct().wait()
}

/// Produce a new Sender for this channel.
///
/// This will not re-open the channel if it was closed due to all senders being dropped.
Expand Down
17 changes: 17 additions & 0 deletions tests/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,23 @@ fn basic_async() {
});
}

#[cfg(not(target_family = "wasm"))]
#[test]
fn basic_blocking() {
let (s, mut r) = broadcast(1);

s.broadcast_blocking(7).unwrap();
assert_eq!(r.try_recv(), Ok(7));

s.broadcast_blocking(8).unwrap();
assert_eq!(block_on(r.recv()), Ok(8));

block_on(s.broadcast(9)).unwrap();
assert_eq!(r.recv_blocking(), Ok(9));

assert_eq!(r.try_recv(), Err(TryRecvError::Empty));
}

#[test]
fn parallel() {
let (s1, mut r1) = broadcast(2);
Expand Down

0 comments on commit de420a3

Please sign in to comment.