diff --git a/Cargo.toml b/Cargo.toml index 83207aa..e6a2ae5 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -20,12 +20,16 @@ harness = false name = "broadcast_bench" [features] +default = ["std"] +std = ["event-listener/std", "event-listener-strategy/std"] + [dependencies] -event-listener = "5.0.0" -event-listener-strategy = "0.5.0" -futures-core = "0.3.21" +event-listener = { version = "5.0.0", default-features = false } +event-listener-strategy = { version = "0.5.0", default-features = false } +futures-core = { version = "0.3.21", default-features = false } pin-project-lite = "0.2.13" +spin = { version = "0.9.8" } [dev-dependencies] criterion = "0.3.5" diff --git a/src/lib.rs b/src/lib.rs index 90334b1..028db7a 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -91,6 +91,7 @@ //! [tbc]: https://docs.rs/tokio/1.6.0/tokio/sync/broadcast/index.html //! [tom]: https://docs.rs/tokio/1.6.0/tokio/sync/broadcast/index.html#lagging //! +#![cfg_attr(not(feature = "std"), no_std)] #![forbid(unsafe_code)] #![deny(missing_debug_implementations, nonstandard_style, rust_2018_idioms)] #![warn(rustdoc::missing_doc_code_examples, unreachable_pub)] @@ -101,21 +102,32 @@ html_logo_url = "https://raw.githubusercontent.com/smol-rs/smol/master/assets/images/logo_fullsize_transparent.png" )] +mod rwlock; + #[cfg(doctest)] mod doctests { doc_comment::doctest!("../README.md"); } -use std::collections::VecDeque; -use std::convert::TryInto; +extern crate alloc; + +#[cfg(feature = "std")] +extern crate std; + +use alloc::boxed::Box; +use alloc::collections::VecDeque; +use alloc::sync::Arc; +use core::convert::TryInto; +use core::fmt; +use core::future::Future; +use core::marker::PhantomPinned; +use core::pin::Pin; +use core::task::{Context, Poll}; + +#[cfg(feature = "std")] use std::error; -use std::fmt; -use std::future::Future; -use std::marker::PhantomPinned; -use std::pin::Pin; -use std::sync::{Arc, RwLock}; -use std::task::{Context, Poll}; +use crate::rwlock::RwLock; use event_listener::{Event, EventListener}; use event_listener_strategy::{easy_wrapper, EventListenerFuture}; use futures_core::{ready, stream::Stream}; @@ -317,7 +329,7 @@ impl Sender { /// assert_eq!(s.capacity(), 5); /// ``` pub fn capacity(&self) -> usize { - self.inner.read().unwrap().capacity + self.inner.read().capacity } /// Set the channel capacity. @@ -351,7 +363,7 @@ impl Sender { /// assert_eq!(s.try_broadcast(2), Err(TrySendError::Full(2))); /// ``` pub fn set_capacity(&mut self, new_cap: usize) { - self.inner.write().unwrap().set_capacity(new_cap); + self.inner.write().set_capacity(new_cap); } /// If overflow mode is enabled on this channel. @@ -365,7 +377,7 @@ impl Sender { /// assert!(!s.overflow()); /// ``` pub fn overflow(&self) -> bool { - self.inner.read().unwrap().overflow + self.inner.read().overflow } /// Set overflow mode on the channel. @@ -392,7 +404,7 @@ impl Sender { /// assert_eq!(r.try_recv(), Err(TryRecvError::Empty)); /// ``` pub fn set_overflow(&mut self, overflow: bool) { - self.inner.write().unwrap().overflow = overflow; + self.inner.write().overflow = overflow; } /// If sender will wait for active receivers. @@ -409,7 +421,7 @@ impl Sender { /// assert!(s.await_active()); /// ``` pub fn await_active(&self) -> bool { - self.inner.read().unwrap().await_active + self.inner.read().await_active } /// Specify if sender will wait for active receivers. @@ -432,7 +444,7 @@ impl Sender { /// # }); /// ``` pub fn set_await_active(&mut self, await_active: bool) { - self.inner.write().unwrap().await_active = await_active; + self.inner.write().await_active = await_active; } /// Closes the channel. @@ -456,7 +468,7 @@ impl Sender { /// # }); /// ``` pub fn close(&self) -> bool { - self.inner.write().unwrap().close() + self.inner.write().close() } /// Returns `true` if the channel is closed. @@ -475,7 +487,7 @@ impl Sender { /// # }); /// ``` pub fn is_closed(&self) -> bool { - self.inner.read().unwrap().is_closed + self.inner.read().is_closed } /// Returns `true` if the channel is empty. @@ -494,7 +506,7 @@ impl Sender { /// # }); /// ``` pub fn is_empty(&self) -> bool { - self.inner.read().unwrap().queue.is_empty() + self.inner.read().queue.is_empty() } /// Returns `true` if the channel is full. @@ -513,7 +525,7 @@ impl Sender { /// # }); /// ``` pub fn is_full(&self) -> bool { - let inner = self.inner.read().unwrap(); + let inner = self.inner.read(); inner.queue.len() == inner.capacity } @@ -535,7 +547,7 @@ impl Sender { /// # }); /// ``` pub fn len(&self) -> usize { - self.inner.read().unwrap().queue.len() + self.inner.read().queue.len() } /// Returns the number of receivers for the channel. @@ -558,7 +570,7 @@ impl Sender { /// assert_eq!(r.inactive_receiver_count(), 1); /// ``` pub fn receiver_count(&self) -> usize { - self.inner.read().unwrap().receiver_count + self.inner.read().receiver_count } /// Returns the number of inactive receivers for the channel. @@ -578,7 +590,7 @@ impl Sender { /// assert_eq!(r.inactive_receiver_count(), 1); /// ``` pub fn inactive_receiver_count(&self) -> usize { - self.inner.read().unwrap().inactive_receiver_count + self.inner.read().inactive_receiver_count } /// Returns the number of senders for the channel. @@ -597,7 +609,7 @@ impl Sender { /// # }); /// ``` pub fn sender_count(&self) -> usize { - self.inner.read().unwrap().sender_count + self.inner.read().sender_count } /// Produce a new Receiver for this channel. @@ -629,7 +641,7 @@ impl Sender { /// # }); /// ``` pub fn new_receiver(&self) -> Receiver { - let mut inner = self.inner.write().unwrap(); + let mut inner = self.inner.write(); inner.receiver_count += 1; Receiver { inner: self.inner.clone(), @@ -725,7 +737,7 @@ impl Sender { /// ``` pub fn try_broadcast(&self, msg: T) -> Result, TrySendError> { let mut ret = None; - let mut inner = self.inner.write().unwrap(); + let mut inner = self.inner.write(); if inner.is_closed { return Err(TrySendError::Closed(msg)); @@ -756,7 +768,7 @@ impl Sender { impl Drop for Sender { fn drop(&mut self) { - let mut inner = self.inner.write().unwrap(); + let mut inner = self.inner.write(); inner.sender_count -= 1; @@ -768,7 +780,7 @@ impl Drop for Sender { impl Clone for Sender { fn clone(&self) -> Self { - self.inner.write().unwrap().sender_count += 1; + self.inner.write().sender_count += 1; Sender { inner: self.inner.clone(), @@ -803,7 +815,7 @@ impl Receiver { /// assert_eq!(r.capacity(), 5); /// ``` pub fn capacity(&self) -> usize { - self.inner.read().unwrap().capacity + self.inner.read().capacity } /// Set the channel capacity. @@ -837,7 +849,7 @@ impl Receiver { /// assert_eq!(s.try_broadcast(2), Err(TrySendError::Full(2))); /// ``` pub fn set_capacity(&mut self, new_cap: usize) { - self.inner.write().unwrap().set_capacity(new_cap); + self.inner.write().set_capacity(new_cap); } /// If overflow mode is enabled on this channel. @@ -851,7 +863,7 @@ impl Receiver { /// assert!(!r.overflow()); /// ``` pub fn overflow(&self) -> bool { - self.inner.read().unwrap().overflow + self.inner.read().overflow } /// Set overflow mode on the channel. @@ -878,7 +890,7 @@ impl Receiver { /// assert_eq!(r.try_recv(), Err(TryRecvError::Empty)); /// ``` pub fn set_overflow(&mut self, overflow: bool) { - self.inner.write().unwrap().overflow = overflow; + self.inner.write().overflow = overflow; } /// If sender will wait for active receivers. @@ -895,7 +907,7 @@ impl Receiver { /// assert!(r.await_active()); /// ``` pub fn await_active(&self) -> bool { - self.inner.read().unwrap().await_active + self.inner.read().await_active } /// Specify if sender will wait for active receivers. @@ -918,7 +930,7 @@ impl Receiver { /// # }); /// ``` pub fn set_await_active(&mut self, await_active: bool) { - self.inner.write().unwrap().await_active = await_active; + self.inner.write().await_active = await_active; } /// Closes the channel. @@ -942,7 +954,7 @@ impl Receiver { /// # }); /// ``` pub fn close(&self) -> bool { - self.inner.write().unwrap().close() + self.inner.write().close() } /// Returns `true` if the channel is closed. @@ -961,7 +973,7 @@ impl Receiver { /// # }); /// ``` pub fn is_closed(&self) -> bool { - self.inner.read().unwrap().is_closed + self.inner.read().is_closed } /// Returns `true` if the channel is empty. @@ -980,7 +992,7 @@ impl Receiver { /// # }); /// ``` pub fn is_empty(&self) -> bool { - self.inner.read().unwrap().queue.is_empty() + self.inner.read().queue.is_empty() } /// Returns `true` if the channel is full. @@ -999,7 +1011,7 @@ impl Receiver { /// # }); /// ``` pub fn is_full(&self) -> bool { - let inner = self.inner.read().unwrap(); + let inner = self.inner.read(); inner.queue.len() == inner.capacity } @@ -1021,7 +1033,7 @@ impl Receiver { /// # }); /// ``` pub fn len(&self) -> usize { - self.inner.read().unwrap().queue.len() + self.inner.read().queue.len() } /// Returns the number of receivers for the channel. @@ -1044,7 +1056,7 @@ impl Receiver { /// assert_eq!(r.inactive_receiver_count(), 1); /// ``` pub fn receiver_count(&self) -> usize { - self.inner.read().unwrap().receiver_count + self.inner.read().receiver_count } /// Returns the number of inactive receivers for the channel. @@ -1064,7 +1076,7 @@ impl Receiver { /// assert_eq!(r.inactive_receiver_count(), 1); /// ``` pub fn inactive_receiver_count(&self) -> usize { - self.inner.read().unwrap().inactive_receiver_count + self.inner.read().inactive_receiver_count } /// Returns the number of senders for the channel. @@ -1083,7 +1095,7 @@ impl Receiver { /// # }); /// ``` pub fn sender_count(&self) -> usize { - self.inner.read().unwrap().sender_count + self.inner.read().sender_count } /// Downgrade to a [`InactiveReceiver`]. @@ -1114,7 +1126,7 @@ impl Receiver { /// ``` pub fn deactivate(self) -> InactiveReceiver { // Drop::drop impl of Receiver will take care of `receiver_count`. - self.inner.write().unwrap().inactive_receiver_count += 1; + self.inner.write().inactive_receiver_count += 1; InactiveReceiver { inner: self.inner.clone(), @@ -1223,7 +1235,6 @@ impl Receiver { pub fn try_recv(&mut self) -> Result { self.inner .write() - .unwrap() .try_recv_at(&mut self.pos) .map(|cow| cow.unwrap_or_else(T::clone)) } @@ -1254,7 +1265,7 @@ impl Receiver { /// # }); /// ``` pub fn new_sender(&self) -> Sender { - self.inner.write().unwrap().sender_count += 1; + self.inner.write().sender_count += 1; Sender { inner: self.inner.clone(), @@ -1290,7 +1301,7 @@ impl Receiver { /// # }); /// ``` pub fn new_receiver(&self) -> Self { - let mut inner = self.inner.write().unwrap(); + let mut inner = self.inner.write(); inner.receiver_count += 1; Receiver { inner: self.inner.clone(), @@ -1369,7 +1380,7 @@ impl Receiver { None => { // Start listening and then try receiving again. self.listener = { - let inner = self.inner.write().unwrap(); + let inner = self.inner.write(); Some(inner.recv_ops.listen()) }; } @@ -1385,7 +1396,7 @@ impl Receiver { impl Drop for Receiver { fn drop(&mut self) { - let mut inner = self.inner.write().unwrap(); + let mut inner = self.inner.write(); // Remove ourself from each item's counter loop { @@ -1426,7 +1437,7 @@ impl Clone for Receiver { /// # }); /// ``` fn clone(&self) -> Self { - let mut inner = self.inner.write().unwrap(); + let mut inner = self.inner.write(); inner.receiver_count += 1; // increment the waiter count on all items not yet received by this object let n = self.pos.saturating_sub(inner.head_pos) as usize; @@ -1459,7 +1470,7 @@ impl Stream for Receiver { impl futures_core::stream::FusedStream for Receiver { fn is_terminated(&self) -> bool { - let inner = self.inner.read().unwrap(); + let inner = self.inner.read(); inner.is_closed && inner.queue.is_empty() } @@ -1479,6 +1490,7 @@ impl SendError { } } +#[cfg(feature = "std")] impl error::Error for SendError {} impl fmt::Debug for SendError { @@ -1541,6 +1553,7 @@ impl TrySendError { } } +#[cfg(feature = "std")] impl error::Error for TrySendError {} impl fmt::Debug for TrySendError { @@ -1576,6 +1589,7 @@ pub enum RecvError { Closed, } +#[cfg(feature = "std")] impl error::Error for RecvError {} impl fmt::Display for RecvError { @@ -1630,6 +1644,7 @@ impl TryRecvError { } } +#[cfg(feature = "std")] impl error::Error for TryRecvError {} impl fmt::Display for TryRecvError { @@ -1649,6 +1664,7 @@ easy_wrapper! { #[derive(Debug)] #[must_use = "futures do nothing unless .awaited"] pub struct Send<'a, T: Clone>(SendInner<'a, T> => Result, SendError>); + #[cfg(all(feature = "std", not(target_family = "wasm")))] pub(crate) wait(); } @@ -1682,7 +1698,7 @@ impl<'a, T: Clone> EventListenerFuture for SendInner<'a, T> { // Attempt to send a message. match this.sender.try_broadcast(msg) { Ok(msg) => { - let inner = inner.write().unwrap(); + let inner = inner.write(); if inner.queue.len() < inner.capacity { // Not full still, so notify the next awaiting sender. @@ -1693,9 +1709,7 @@ impl<'a, T: Clone> EventListenerFuture for SendInner<'a, T> { } Err(TrySendError::Closed(msg)) => return Poll::Ready(Err(SendError(msg))), Err(TrySendError::Full(m)) => *this.msg = Some(m), - Err(TrySendError::Inactive(m)) if inner.read().unwrap().await_active => { - *this.msg = Some(m) - } + Err(TrySendError::Inactive(m)) if inner.read().await_active => *this.msg = Some(m), Err(TrySendError::Inactive(m)) => return Poll::Ready(Err(SendError(m))), } @@ -1703,7 +1717,7 @@ impl<'a, T: Clone> EventListenerFuture for SendInner<'a, T> { match &this.listener { None => { // Start listening and then try sending again. - let inner = inner.write().unwrap(); + let inner = inner.write(); *this.listener = Some(inner.send_ops.listen()); } Some(_) => { @@ -1721,6 +1735,7 @@ easy_wrapper! { #[derive(Debug)] #[must_use = "futures do nothing unless .awaited"] pub struct Recv<'a, T: Clone>(RecvInner<'a, T> => Result); + #[cfg(all(feature = "std", not(target_family = "wasm")))] pub(crate) wait(); } @@ -1762,7 +1777,7 @@ impl<'a, T: Clone> EventListenerFuture for RecvInner<'a, T> { None => { // Start listening and then try receiving again. *this.listener = { - let inner = this.receiver.inner.write().unwrap(); + let inner = this.receiver.inner.write(); Some(inner.recv_ops.listen()) }; } @@ -1823,7 +1838,7 @@ impl InactiveReceiver { /// assert_eq!(r.try_recv(), Ok(10)); /// ``` pub fn activate_cloned(&self) -> Receiver { - let mut inner = self.inner.write().unwrap(); + let mut inner = self.inner.write(); inner.receiver_count += 1; if inner.receiver_count == 1 { @@ -1843,7 +1858,7 @@ impl InactiveReceiver { /// /// See [`Receiver::capacity`] documentation for examples. pub fn capacity(&self) -> usize { - self.inner.read().unwrap().capacity + self.inner.read().capacity } /// Set the channel capacity. @@ -1854,14 +1869,14 @@ impl InactiveReceiver { /// /// See [`Receiver::set_capacity`] documentation for examples. pub fn set_capacity(&mut self, new_cap: usize) { - self.inner.write().unwrap().set_capacity(new_cap); + self.inner.write().set_capacity(new_cap); } /// If overflow mode is enabled on this channel. /// /// See [`Receiver::overflow`] documentation for examples. pub fn overflow(&self) -> bool { - self.inner.read().unwrap().overflow + self.inner.read().overflow } /// Set overflow mode on the channel. @@ -1871,7 +1886,7 @@ impl InactiveReceiver { /// /// See [`Receiver::set_overflow`] documentation for examples. pub fn set_overflow(&mut self, overflow: bool) { - self.inner.write().unwrap().overflow = overflow; + self.inner.write().overflow = overflow; } /// If sender will wait for active receivers. @@ -1889,7 +1904,7 @@ impl InactiveReceiver { /// assert!(r.await_active()); /// ``` pub fn await_active(&self) -> bool { - self.inner.read().unwrap().await_active + self.inner.read().await_active } /// Specify if sender will wait for active receivers. @@ -1912,7 +1927,7 @@ impl InactiveReceiver { /// # }); /// ``` pub fn set_await_active(&mut self, await_active: bool) { - self.inner.write().unwrap().await_active = await_active; + self.inner.write().await_active = await_active; } /// Closes the channel. @@ -1923,28 +1938,28 @@ impl InactiveReceiver { /// /// See [`Receiver::close`] documentation for examples. pub fn close(&self) -> bool { - self.inner.write().unwrap().close() + self.inner.write().close() } /// Returns `true` if the channel is closed. /// /// See [`Receiver::is_closed`] documentation for examples. pub fn is_closed(&self) -> bool { - self.inner.read().unwrap().is_closed + self.inner.read().is_closed } /// Returns `true` if the channel is empty. /// /// See [`Receiver::is_empty`] documentation for examples. pub fn is_empty(&self) -> bool { - self.inner.read().unwrap().queue.is_empty() + self.inner.read().queue.is_empty() } /// Returns `true` if the channel is full. /// /// See [`Receiver::is_full`] documentation for examples. pub fn is_full(&self) -> bool { - let inner = self.inner.read().unwrap(); + let inner = self.inner.read(); inner.queue.len() == inner.capacity } @@ -1953,7 +1968,7 @@ impl InactiveReceiver { /// /// See [`Receiver::len`] documentation for examples. pub fn len(&self) -> usize { - self.inner.read().unwrap().queue.len() + self.inner.read().queue.len() } /// Returns the number of receivers for the channel. @@ -1976,7 +1991,7 @@ impl InactiveReceiver { /// assert_eq!(r.inactive_receiver_count(), 1); /// ``` pub fn receiver_count(&self) -> usize { - self.inner.read().unwrap().receiver_count + self.inner.read().receiver_count } /// Returns the number of inactive receivers for the channel. @@ -1996,20 +2011,20 @@ impl InactiveReceiver { /// assert_eq!(r.inactive_receiver_count(), 1); /// ``` pub fn inactive_receiver_count(&self) -> usize { - self.inner.read().unwrap().inactive_receiver_count + self.inner.read().inactive_receiver_count } /// Returns the number of senders for the channel. /// /// See [`Receiver::sender_count`] documentation for examples. pub fn sender_count(&self) -> usize { - self.inner.read().unwrap().sender_count + self.inner.read().sender_count } } impl Clone for InactiveReceiver { fn clone(&self) -> Self { - self.inner.write().unwrap().inactive_receiver_count += 1; + self.inner.write().inactive_receiver_count += 1; InactiveReceiver { inner: self.inner.clone(), @@ -2019,7 +2034,7 @@ impl Clone for InactiveReceiver { impl Drop for InactiveReceiver { fn drop(&mut self) { - let mut inner = self.inner.write().unwrap(); + let mut inner = self.inner.write(); inner.inactive_receiver_count -= 1; inner.close_channel(); diff --git a/src/rwlock.rs b/src/rwlock.rs new file mode 100644 index 0000000..8b45113 --- /dev/null +++ b/src/rwlock.rs @@ -0,0 +1,52 @@ +#[cfg(feature = "std")] +#[derive(Debug)] +pub(crate) struct RwLock { + rwlock: std::sync::RwLock, +} +#[cfg(not(feature = "std"))] +#[derive(Debug)] +pub(crate) struct RwLock { + rwlock: spin::RwLock, +} + +impl RwLock { + #[cfg(feature = "std")] + #[inline] + pub(crate) const fn new(t: T) -> RwLock { + RwLock { + rwlock: std::sync::RwLock::new(t), + } + } + + #[cfg(not(feature = "std"))] + #[inline] + pub(crate) const fn new(t: T) -> RwLock { + RwLock { + rwlock: spin::RwLock::new(t), + } + } + + #[inline] + #[cfg(feature = "std")] + pub(crate) fn read(&self) -> std::sync::RwLockReadGuard<'_, T> { + return self.rwlock.read().unwrap(); + } + + #[inline] + #[cfg(not(feature = "std"))] + pub(crate) fn read(&self) -> spin::RwLockReadGuard<'_, T> { + return self.rwlock.read(); + } + + #[inline] + #[cfg(feature = "std")] + pub(crate) fn write(&self) -> std::sync::RwLockWriteGuard<'_, T> { + return self.rwlock.write().unwrap(); + } + + #[inline] + #[cfg(not(feature = "std"))] + pub(crate) fn write(&self) -> spin::RwLockWriteGuard<'_, T> { + return self.rwlock.write(); + } +}