Skip to content

Commit

Permalink
Encapsulate flume::Receiver (#1504)
Browse files Browse the repository at this point in the history
* Encapsulate `flume::Receiver`

* Remove `FlumeSubscriber`

This was marked as unused after fd4a652

* Remove unused `flume` dependency in storage-manager plugin

* Fix `FifoChannelHandler::recv_timeout` impl
  • Loading branch information
fuzzypixelz authored Oct 7, 2024
1 parent 0985806 commit 3740564
Show file tree
Hide file tree
Showing 9 changed files with 320 additions and 26 deletions.
1 change: 0 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion plugins/zenoh-plugin-storage-manager/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ crate-type = ["cdylib", "rlib"]
async-trait = { workspace = true }
bincode = { workspace = true }
bloomfilter = "1"
flume = { workspace = true }
futures = { workspace = true }
git-version = { workspace = true }
lazy_static = { workspace = true }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -523,7 +523,7 @@ impl StorageService {
Some(cache_guard)
}

async fn reply_query(&self, query: Result<zenoh::query::Query, flume::RecvError>) {
async fn reply_query(&self, query: ZResult<zenoh::query::Query>) {
let q = match query {
Ok(q) => q,
Err(e) => {
Expand Down
7 changes: 4 additions & 3 deletions zenoh-ext/src/publication_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,10 @@ use std::{
};

use zenoh::{
handlers::FifoChannelHandler,
internal::{bail, runtime::ZRuntime, ResolveFuture, TerminatableTask},
key_expr::{keyexpr, KeyExpr, OwnedKeyExpr},
pubsub::FlumeSubscriber,
pubsub::Subscriber,
query::{Query, Queryable, ZenohParameters},
sample::{Locality, Sample},
Error, Resolvable, Resolve, Result as ZResult, Session, Wait,
Expand Down Expand Up @@ -115,8 +116,8 @@ impl IntoFuture for PublicationCacheBuilder<'_, '_, '_> {

#[zenoh_macros::unstable]
pub struct PublicationCache {
local_sub: FlumeSubscriber,
_queryable: Queryable<flume::Receiver<Query>>,
local_sub: Subscriber<FifoChannelHandler<Sample>>,
_queryable: Queryable<FifoChannelHandler<Query>>,
task: TerminatableTask,
}

Expand Down
7 changes: 4 additions & 3 deletions zenoh-ext/src/subscriber_ext.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,9 @@
//
use std::time::Duration;

use flume::r#async::RecvStream;
use futures::stream::{Forward, Map};
use zenoh::{
handlers::{fifo, FifoChannelHandler},
liveliness::LivelinessSubscriberBuilder,
pubsub::{Subscriber, SubscriberBuilder},
query::{QueryConsolidation, QueryTarget, ReplyKeyExpr},
Expand All @@ -33,11 +33,12 @@ pub trait SubscriberForward<'a, S> {
type Output;
fn forward(&'a mut self, sink: S) -> Self::Output;
}
impl<'a, S> SubscriberForward<'a, S> for Subscriber<flume::Receiver<Sample>>
impl<'a, S> SubscriberForward<'a, S> for Subscriber<FifoChannelHandler<Sample>>
where
S: futures::sink::Sink<Sample>,
{
type Output = Forward<Map<RecvStream<'a, Sample>, fn(Sample) -> Result<Sample, S::Error>>, S>;
type Output =
Forward<Map<fifo::RecvStream<'a, Sample>, fn(Sample) -> Result<Sample, S::Error>>, S>;
fn forward(&'a mut self, sink: S) -> Self::Output {
futures::StreamExt::forward(futures::StreamExt::map(self.stream(), Ok), sink)
}
Expand Down
285 changes: 282 additions & 3 deletions zenoh/src/api/handlers/fifo.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,15 @@

//! Callback handler trait.

use std::sync::Arc;
use std::{
future::Future,
pin::Pin,
sync::Arc,
task::{Context, Poll},
time::{Duration, Instant},
};

use zenoh_result::ZResult;

use crate::api::handlers::{callback::Callback, IntoHandler, API_DATA_RECEPTION_CHANNEL_SIZE};

Expand All @@ -41,11 +49,282 @@ impl Default for FifoChannel {
}
}

/// [`FifoChannel`] handler.
#[derive(Debug, Clone)]
pub struct FifoChannelHandler<T>(flume::Receiver<T>);

impl<T: Send + 'static> IntoHandler<T> for FifoChannel {
type Handler = flume::Receiver<T>;
type Handler = FifoChannelHandler<T>;

fn into_handler(self) -> (Callback<T>, Self::Handler) {
flume::bounded(self.capacity).into_handler()
let (sender, receiver) = flume::bounded(self.capacity);
(
Callback::new(Arc::new(move |t| {
if let Err(error) = sender.send(t) {
tracing::error!(%error)
}
})),
FifoChannelHandler(receiver),
)
}
}

impl<T> FifoChannelHandler<T> {
/// Attempt to fetch an incoming value from the channel associated with this receiver, returning
/// an error if the channel is empty or if all senders have been dropped.
pub fn try_recv(&self) -> ZResult<T> {
self.0.try_recv().map_err(Into::into)
}

/// Wait for an incoming value from the channel associated with this receiver, returning an
/// error if all senders have been dropped.
pub fn recv(&self) -> ZResult<T> {
self.0.recv().map_err(Into::into)
}

/// Wait for an incoming value from the channel associated with this receiver, returning an
/// error if all senders have been dropped or the deadline has passed.
pub fn recv_deadline(&self, deadline: Instant) -> ZResult<T> {
self.0.recv_deadline(deadline).map_err(Into::into)
}

/// Wait for an incoming value from the channel associated with this receiver, returning an
/// error if all senders have been dropped or the timeout has expired.
pub fn recv_timeout(&self, duration: Duration) -> ZResult<T> {
self.0.recv_timeout(duration).map_err(Into::into)
}

/// Create a blocking iterator over the values received on the channel that finishes iteration
/// when all senders have been dropped.
pub fn iter(&self) -> Iter<'_, T> {
Iter(self.0.iter())
}

/// A non-blocking iterator over the values received on the channel that finishes iteration when
/// all senders have been dropped or the channel is empty.
pub fn try_iter(&self) -> TryIter<'_, T> {
TryIter(self.0.try_iter())
}

/// Take all msgs currently sitting in the channel and produce an iterator over them. Unlike
/// `try_iter`, the iterator will not attempt to fetch any more values from the channel once
/// the function has been called.
pub fn drain(&self) -> Drain<'_, T> {
Drain(self.0.drain())
}

/// Returns true if all senders for this channel have been dropped.
pub fn is_disconnected(&self) -> bool {
self.0.is_disconnected()
}

/// Returns true if the channel is empty.
/// Note: Zero-capacity channels are always empty.
pub fn is_empty(&self) -> bool {
self.0.is_empty()
}

/// Returns true if the channel is full.
/// Note: Zero-capacity channels are always full.
pub fn is_full(&self) -> bool {
self.0.is_full()
}

/// Returns the number of messages in the channel.
pub fn len(&self) -> usize {
self.0.len()
}

/// If the channel is bounded, returns its capacity.
pub fn capacity(&self) -> Option<usize> {
self.0.capacity()
}

/// Get the number of senders that currently exist.
pub fn sender_count(&self) -> usize {
self.0.sender_count()
}

/// Get the number of receivers that currently exist, including this one.
pub fn receiver_count(&self) -> usize {
self.0.receiver_count()
}

/// Returns whether the receivers are belong to the same channel.
pub fn same_channel(&self, other: &Self) -> bool {
self.0.same_channel(&other.0)
}
}

/// This exists as a shorthand for [`FifoChannelHandler::iter`].
impl<'a, T> IntoIterator for &'a FifoChannelHandler<T> {
type Item = T;
type IntoIter = Iter<'a, T>;

fn into_iter(self) -> Self::IntoIter {
Iter(self.0.iter())
}
}

impl<T> IntoIterator for FifoChannelHandler<T> {
type Item = T;
type IntoIter = IntoIter<T>;

/// Creates a self-owned but semantically equivalent alternative to [`FifoChannelHandler::iter`].
fn into_iter(self) -> Self::IntoIter {
IntoIter(self.0.into_iter())
}
}

/// An iterator over the msgs received from a channel.
pub struct Iter<'a, T>(flume::Iter<'a, T>);

impl<'a, T> Iterator for Iter<'a, T> {
type Item = T;

fn next(&mut self) -> Option<Self::Item> {
self.0.next()
}
}

/// An non-blocking iterator over the msgs received from a channel.
pub struct TryIter<'a, T>(flume::TryIter<'a, T>);

impl<'a, T> Iterator for TryIter<'a, T> {
type Item = T;

fn next(&mut self) -> Option<Self::Item> {
self.0.next()
}
}

/// An fixed-sized iterator over the msgs drained from a channel.
#[derive(Debug)]
pub struct Drain<'a, T>(flume::Drain<'a, T>);

impl<'a, T> Iterator for Drain<'a, T> {
type Item = T;

fn next(&mut self) -> Option<Self::Item> {
self.0.next()
}
}

impl<'a, T> ExactSizeIterator for Drain<'a, T> {
fn len(&self) -> usize {
self.0.len()
}
}

/// An owned iterator over the msgs received from a channel.
pub struct IntoIter<T>(flume::IntoIter<T>);

impl<T> Iterator for IntoIter<T> {
type Item = T;

fn next(&mut self) -> Option<Self::Item> {
self.0.next()
}
}

impl<T> FifoChannelHandler<T> {
/// Asynchronously receive a value from the channel, returning an error if all senders have been
/// dropped. If the channel is empty, the returned future will yield to the async runtime.
pub fn recv_async(&self) -> RecvFut<'_, T> {
RecvFut(self.0.recv_async())
}

/// Convert this receiver into a future that asynchronously receives a single message from the
/// channel, returning an error if all senders have been dropped. If the channel is empty, this
/// future will yield to the async runtime.
pub fn into_recv_async<'a>(self) -> RecvFut<'a, T> {
RecvFut(self.0.into_recv_async())
}
}

/// A future which allows asynchronously receiving a message.
///
/// Can be created via [`FifoChannelHandler::recv_async`] or [`FifoChannelHandler::into_recv_async`].
#[must_use = "futures/streams/sinks do nothing unless you `.await` or poll them"]
pub struct RecvFut<'a, T>(flume::r#async::RecvFut<'a, T>);

impl<'a, T> Future for RecvFut<'a, T> {
type Output = ZResult<T>;

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
Future::poll(Pin::new(&mut self.0), cx).map_err(Into::into)
}
}

impl<'a, T> futures::future::FusedFuture for RecvFut<'a, T> {
fn is_terminated(&self) -> bool {
futures::future::FusedFuture::is_terminated(&self.0)
}
}

impl<T> FifoChannelHandler<T> {
/// Create an asynchronous stream that uses this receiver to asynchronously receive messages
/// from the channel. The receiver will continue to be usable after the stream has been dropped.
pub fn stream(&self) -> RecvStream<'_, T> {
RecvStream(self.0.stream())
}

/// Convert this receiver into a stream that allows asynchronously receiving messages from the
/// channel.
pub fn into_stream<'a>(self) -> RecvStream<'a, T> {
RecvStream(self.0.into_stream())
}
}

/// A stream which allows asynchronously receiving messages.
///
/// Can be created via [`FifoChannelHandler::stream`] or [`FifoChannelHandler::into_stream`].
#[derive(Clone)]
pub struct RecvStream<'a, T>(flume::r#async::RecvStream<'a, T>);

impl<'a, T> RecvStream<'a, T> {
/// See [`FifoChannelHandler::is_disconnected`].
pub fn is_disconnected(&self) -> bool {
self.0.is_disconnected()
}

/// See [`FifoChannelHandler::is_empty`].
pub fn is_empty(&self) -> bool {
self.0.is_empty()
}

/// See [`FifoChannelHandler::is_full`].
pub fn is_full(&self) -> bool {
self.0.is_full()
}

/// See [`FifoChannelHandler::len`].
pub fn len(&self) -> usize {
self.0.len()
}

/// See [`FifoChannelHandler::capacity`].
pub fn capacity(&self) -> Option<usize> {
self.0.capacity()
}

/// Returns whether the SendSinks are belong to the same channel.
pub fn same_channel(&self, other: &Self) -> bool {
self.0.same_channel(&other.0)
}
}

impl<'a, T> futures::stream::Stream for RecvStream<'a, T> {
type Item = T;

fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
futures::stream::Stream::poll_next(Pin::new(&mut self.0), cx)
}
}

impl<'a, T> futures::stream::FusedStream for RecvStream<'a, T> {
fn is_terminated(&self) -> bool {
futures::stream::FusedStream::is_terminated(&self.0)
}
}

Expand Down
3 changes: 0 additions & 3 deletions zenoh/src/api/subscriber.rs
Original file line number Diff line number Diff line change
Expand Up @@ -470,9 +470,6 @@ impl<Handler> DerefMut for Subscriber<Handler> {
}
}

/// A [`Subscriber`] that provides data through a `flume` channel.
pub type FlumeSubscriber = Subscriber<flume::Receiver<Sample>>;

#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub(crate) enum SubscriberKind {
Subscriber,
Expand Down
Loading

0 comments on commit 3740564

Please sign in to comment.