diff --git a/src/lib.rs b/src/lib.rs index 801b418..67032a4 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -752,6 +752,37 @@ impl Sender { Ok(ret) } + + /// Broadcasts a message on the channel using the blocking strategy. + /// + /// If the channel is full, this method will block until there is room. + /// + /// If the channel is closed, this method returns an error. + /// + /// # Blocking + /// + /// Rather than using asynchronous waiting, like the [`send`](Self::broadcast) method, + /// this method will block the current thread until the message is sent. + /// + /// This method should not be used in an asynchronous context. It is intended + /// to be used such that a channel can be used in both asynchronous and synchronous contexts. + /// Calling this method in an asynchronous context may result in deadlocks. + /// + /// # Examples + /// + /// ``` + /// use async_broadcast::{broadcast, SendError}; + /// + /// let (s, r) = broadcast(1); + /// + /// assert_eq!(s.broadcast_blocking(1), Ok(None)); + /// drop(r); + /// assert_eq!(s.broadcast_blocking(2), Err(SendError(2))); + /// ``` + #[cfg(not(target_family = "wasm"))] + pub fn broadcast_blocking(&self, msg: T) -> Result, SendError> { + self.broadcast_direct(msg).wait() + } } impl Drop for Sender { @@ -1228,6 +1259,44 @@ impl Receiver { .map(|cow| cow.unwrap_or_else(T::clone)) } + /// Receives a message from the channel using the blocking strategy. + /// + /// If the channel is empty, this method will block until there is a message. + /// + /// If the channel is closed, this method receives a message or returns an error if there are + /// no more messages. + /// + /// If this receiver has missed a message (only possible if overflow mode is enabled), then + /// this method returns an error and readjusts its cursor to point to the first available + /// message. + /// + /// # Blocking + /// + /// Rather than using asynchronous waiting, like the [`recv`](Self::recv) method, + /// this method will block the current thread until the message is sent. + /// + /// This method should not be used in an asynchronous context. It is intended + /// to be used such that a channel can be used in both asynchronous and synchronous contexts. + /// Calling this method in an asynchronous context may result in deadlocks. + /// + /// # Examples + /// + /// ``` + /// use async_broadcast::{broadcast, RecvError}; + /// + /// let (s, mut r) = broadcast(1); + /// + /// assert_eq!(s.broadcast_blocking(1), Ok(None)); + /// drop(s); + /// + /// assert_eq!(r.recv_blocking(), Ok(1)); + /// assert_eq!(r.recv_blocking(), Err(RecvError::Closed)); + /// ``` + #[cfg(not(target_family = "wasm"))] + pub fn recv_blocking(&mut self) -> Result { + self.recv_direct().wait() + } + /// Produce a new Sender for this channel. /// /// This will not re-open the channel if it was closed due to all senders being dropped. diff --git a/tests/test.rs b/tests/test.rs index 79718cb..7b9c13e 100644 --- a/tests/test.rs +++ b/tests/test.rs @@ -45,6 +45,23 @@ fn basic_async() { }); } +#[cfg(not(target_family = "wasm"))] +#[test] +fn basic_blocking() { + let (s, mut r) = broadcast(1); + + s.broadcast_blocking(7).unwrap(); + assert_eq!(r.try_recv(), Ok(7)); + + s.broadcast_blocking(8).unwrap(); + assert_eq!(block_on(r.recv()), Ok(8)); + + block_on(s.broadcast(9)).unwrap(); + assert_eq!(r.recv_blocking(), Ok(9)); + + assert_eq!(r.try_recv(), Err(TryRecvError::Empty)); +} + #[test] fn parallel() { let (s1, mut r1) = broadcast(2);