Skip to content

Commit

Permalink
Add disconnected flavor which immediately fails
Browse files Browse the repository at this point in the history
  • Loading branch information
ryoqun committed Dec 8, 2023
1 parent a57e655 commit 53cfb8b
Show file tree
Hide file tree
Showing 5 changed files with 143 additions and 1 deletion.
30 changes: 30 additions & 0 deletions crossbeam-channel/src/channel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,13 @@ pub fn never<T>() -> Receiver<T> {
}
}

/// Creates a receiver that is disconnected always.
pub fn disconnected<T>() -> Receiver<T> {
Receiver {
flavor: ReceiverFlavor::Disconnected(flavors::disconnected::Channel::new()),
}
}

/// Creates a receiver that delivers messages periodically.
///
/// The channel is bounded with capacity of 1 and never gets disconnected. Messages will be
Expand Down Expand Up @@ -731,6 +738,9 @@ enum ReceiverFlavor<T> {

/// The never flavor.
Never(flavors::never::Channel<T>),

/// The disconnected flavor.
Disconnected(flavors::disconnected::Channel<T>),
}

unsafe impl<T: Send> Send for Receiver<T> {}
Expand Down Expand Up @@ -784,6 +794,7 @@ impl<T> Receiver<T> {
}
}
ReceiverFlavor::Never(chan) => chan.try_recv(),
ReceiverFlavor::Disconnected(chan) => chan.try_recv(),
}
}

Expand Down Expand Up @@ -839,6 +850,7 @@ impl<T> Receiver<T> {
}
}
ReceiverFlavor::Never(chan) => chan.recv(None),
ReceiverFlavor::Disconnected(chan) => chan.recv(None),
}
.map_err(|_| RecvError)
}
Expand Down Expand Up @@ -950,6 +962,7 @@ impl<T> Receiver<T> {
}
}
ReceiverFlavor::Never(chan) => chan.recv(Some(deadline)),
ReceiverFlavor::Disconnected(chan) => chan.recv(Some(deadline)),
}
}

Expand All @@ -976,6 +989,7 @@ impl<T> Receiver<T> {
ReceiverFlavor::At(chan) => chan.is_empty(),
ReceiverFlavor::Tick(chan) => chan.is_empty(),
ReceiverFlavor::Never(chan) => chan.is_empty(),
ReceiverFlavor::Disconnected(chan) => chan.is_empty(),
}
}

Expand All @@ -1002,6 +1016,7 @@ impl<T> Receiver<T> {
ReceiverFlavor::At(chan) => chan.is_full(),
ReceiverFlavor::Tick(chan) => chan.is_full(),
ReceiverFlavor::Never(chan) => chan.is_full(),
ReceiverFlavor::Disconnected(chan) => chan.is_full(),
}
}

Expand All @@ -1027,6 +1042,7 @@ impl<T> Receiver<T> {
ReceiverFlavor::At(chan) => chan.len(),
ReceiverFlavor::Tick(chan) => chan.len(),
ReceiverFlavor::Never(chan) => chan.len(),
ReceiverFlavor::Disconnected(chan) => chan.len(),
}
}

Expand Down Expand Up @@ -1054,6 +1070,7 @@ impl<T> Receiver<T> {
ReceiverFlavor::At(chan) => chan.capacity(),
ReceiverFlavor::Tick(chan) => chan.capacity(),
ReceiverFlavor::Never(chan) => chan.capacity(),
ReceiverFlavor::Disconnected(chan) => chan.capacity(),
}
}

Expand Down Expand Up @@ -1165,6 +1182,7 @@ impl<T> Drop for Receiver<T> {
ReceiverFlavor::At(_) => {}
ReceiverFlavor::Tick(_) => {}
ReceiverFlavor::Never(_) => {}
ReceiverFlavor::Disconnected(_) => {}
}
}
}
Expand All @@ -1179,6 +1197,9 @@ impl<T> Clone for Receiver<T> {
ReceiverFlavor::At(chan) => ReceiverFlavor::At(chan.clone()),
ReceiverFlavor::Tick(chan) => ReceiverFlavor::Tick(chan.clone()),
ReceiverFlavor::Never(_) => ReceiverFlavor::Never(flavors::never::Channel::new()),
ReceiverFlavor::Disconnected(_) => {
ReceiverFlavor::Disconnected(flavors::disconnected::Channel::new())
}
};

Self { flavor }
Expand Down Expand Up @@ -1428,6 +1449,7 @@ impl<T> SelectHandle for Receiver<T> {
ReceiverFlavor::At(chan) => chan.try_select(token),
ReceiverFlavor::Tick(chan) => chan.try_select(token),
ReceiverFlavor::Never(chan) => chan.try_select(token),
ReceiverFlavor::Disconnected(chan) => chan.try_select(token),
}
}

Expand All @@ -1439,6 +1461,7 @@ impl<T> SelectHandle for Receiver<T> {
ReceiverFlavor::At(chan) => chan.deadline(),
ReceiverFlavor::Tick(chan) => chan.deadline(),
ReceiverFlavor::Never(chan) => chan.deadline(),
ReceiverFlavor::Disconnected(chan) => chan.deadline(),
}
}

Expand All @@ -1450,6 +1473,7 @@ impl<T> SelectHandle for Receiver<T> {
ReceiverFlavor::At(chan) => chan.register(oper, cx),
ReceiverFlavor::Tick(chan) => chan.register(oper, cx),
ReceiverFlavor::Never(chan) => chan.register(oper, cx),
ReceiverFlavor::Disconnected(chan) => chan.register(oper, cx),
}
}

Expand All @@ -1461,6 +1485,7 @@ impl<T> SelectHandle for Receiver<T> {
ReceiverFlavor::At(chan) => chan.unregister(oper),
ReceiverFlavor::Tick(chan) => chan.unregister(oper),
ReceiverFlavor::Never(chan) => chan.unregister(oper),
ReceiverFlavor::Disconnected(chan) => chan.unregister(oper),
}
}

Expand All @@ -1472,6 +1497,7 @@ impl<T> SelectHandle for Receiver<T> {
ReceiverFlavor::At(chan) => chan.accept(token, cx),
ReceiverFlavor::Tick(chan) => chan.accept(token, cx),
ReceiverFlavor::Never(chan) => chan.accept(token, cx),
ReceiverFlavor::Disconnected(chan) => chan.accept(token, cx),
}
}

Expand All @@ -1483,6 +1509,7 @@ impl<T> SelectHandle for Receiver<T> {
ReceiverFlavor::At(chan) => chan.is_ready(),
ReceiverFlavor::Tick(chan) => chan.is_ready(),
ReceiverFlavor::Never(chan) => chan.is_ready(),
ReceiverFlavor::Disconnected(chan) => chan.is_ready(),
}
}

Expand All @@ -1494,6 +1521,7 @@ impl<T> SelectHandle for Receiver<T> {
ReceiverFlavor::At(chan) => chan.watch(oper, cx),
ReceiverFlavor::Tick(chan) => chan.watch(oper, cx),
ReceiverFlavor::Never(chan) => chan.watch(oper, cx),
ReceiverFlavor::Disconnected(chan) => chan.watch(oper, cx),
}
}

Expand All @@ -1505,6 +1533,7 @@ impl<T> SelectHandle for Receiver<T> {
ReceiverFlavor::At(chan) => chan.unwatch(oper),
ReceiverFlavor::Tick(chan) => chan.unwatch(oper),
ReceiverFlavor::Never(chan) => chan.unwatch(oper),
ReceiverFlavor::Disconnected(chan) => chan.unwatch(oper),
}
}
}
Expand All @@ -1531,5 +1560,6 @@ pub(crate) unsafe fn read<T>(r: &Receiver<T>, token: &mut Token) -> Result<T, ()
mem::transmute_copy::<Result<Instant, ()>, Result<T, ()>>(&chan.read(token))
}
ReceiverFlavor::Never(chan) => chan.read(token),
ReceiverFlavor::Disconnected(chan) => chan.read(token),
}
}
108 changes: 108 additions & 0 deletions crossbeam-channel/src/flavors/disconnected.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
//! Channel that is always disconnected.
//!
//! Messages cannot be sent into this kind of channel.
use std::marker::PhantomData;
use std::time::Instant;

use crate::context::Context;
use crate::err::{RecvTimeoutError, TryRecvError};
use crate::select::{Operation, SelectHandle, Token};

/// This flavor doesn't need a token.
pub(crate) type DisconnectedToken = ();

/// Channel that always delivers messages.
pub(crate) struct Channel<T> {
_marker: PhantomData<T>,
}

impl<T> Channel<T> {
/// Creates a channel that always delivers messages.
#[inline]
pub(crate) fn new() -> Self {
Channel {
_marker: PhantomData,
}
}

/// Attempts to receive a message without blocking.
#[inline]
pub(crate) fn try_recv(&self) -> Result<T, TryRecvError> {
Err(TryRecvError::Disconnected)
}

/// Receives a message from the channel.
#[inline]
pub(crate) fn recv(&self, _deadline: Option<Instant>) -> Result<T, RecvTimeoutError> {
Err(RecvTimeoutError::Disconnected)
}

/// Reads a message from the channel.
#[inline]
pub(crate) unsafe fn read(&self, _token: &mut Token) -> Result<T, ()> {
Err(())
}

/// Returns `true` if the channel is empty.
#[inline]
pub(crate) fn is_empty(&self) -> bool {
true
}

/// Returns `true` if the channel is full.
#[inline]
pub(crate) fn is_full(&self) -> bool {
true
}

/// Returns the number of messages in the channel.
#[inline]
pub(crate) fn len(&self) -> usize {
0
}

/// Returns the capacity of the channel.
#[inline]
pub(crate) fn capacity(&self) -> Option<usize> {
Some(0)
}
}

impl<T> SelectHandle for Channel<T> {
#[inline]
fn try_select(&self, _token: &mut Token) -> bool {
true
}

#[inline]
fn deadline(&self) -> Option<Instant> {
None
}

#[inline]
fn register(&self, _oper: Operation, _cx: &Context) -> bool {
self.is_ready()
}

#[inline]
fn unregister(&self, _oper: Operation) {}

#[inline]
fn accept(&self, token: &mut Token, _cx: &Context) -> bool {
self.try_select(token)
}

#[inline]
fn is_ready(&self) -> bool {
true
}

#[inline]
fn watch(&self, _oper: Operation, _cx: &Context) -> bool {
self.is_ready()
}

#[inline]
fn unwatch(&self, _oper: Operation) {}
}
2 changes: 2 additions & 0 deletions crossbeam-channel/src/flavors/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,13 @@
//! 2. `array` - Bounded channel based on a preallocated array.
//! 3. `list` - Unbounded channel implemented as a linked list.
//! 4. `never` - Channel that never delivers messages.
//! 5. `disconnected` - Channel that is always disconnected.
//! 5. `tick` - Channel that delivers messages periodically.
//! 6. `zero` - Zero-capacity channel.
pub(crate) mod array;
pub(crate) mod at;
pub(crate) mod disconnected;
pub(crate) mod list;
pub(crate) mod never;
pub(crate) mod tick;
Expand Down
2 changes: 1 addition & 1 deletion crossbeam-channel/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -356,7 +356,7 @@ cfg_if! {
pub use crate::select::{select, select_timeout, try_select};
}

pub use crate::channel::{after, at, never, tick};
pub use crate::channel::{after, at, never, disconnected, tick};
pub use crate::channel::{bounded, unbounded};
pub use crate::channel::{IntoIter, Iter, TryIter};
pub use crate::channel::{Receiver, Sender};
Expand Down
2 changes: 2 additions & 0 deletions crossbeam-channel/src/select.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ pub struct Token {
pub(crate) list: flavors::list::ListToken,
#[allow(dead_code)]
pub(crate) never: flavors::never::NeverToken,
#[allow(dead_code)]
pub(crate) disconnected: flavors::disconnected::DisconnectedToken,
pub(crate) tick: flavors::tick::TickToken,
pub(crate) zero: flavors::zero::ZeroToken,
}
Expand Down

0 comments on commit 53cfb8b

Please sign in to comment.