Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Encapsulate flume::Receiver #1504

Merged
merged 4 commits into from
Oct 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion plugins/zenoh-plugin-storage-manager/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ crate-type = ["cdylib", "rlib"]
async-trait = { workspace = true }
bincode = { workspace = true }
bloomfilter = "1"
flume = { workspace = true }
futures = { workspace = true }
git-version = { workspace = true }
lazy_static = { workspace = true }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -523,7 +523,7 @@ impl StorageService {
Some(cache_guard)
}

async fn reply_query(&self, query: Result<zenoh::query::Query, flume::RecvError>) {
async fn reply_query(&self, query: ZResult<zenoh::query::Query>) {
let q = match query {
Ok(q) => q,
Err(e) => {
Expand Down
7 changes: 4 additions & 3 deletions zenoh-ext/src/publication_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,10 @@ use std::{
};

use zenoh::{
handlers::FifoChannelHandler,
internal::{bail, runtime::ZRuntime, ResolveFuture, TerminatableTask},
key_expr::{keyexpr, KeyExpr, OwnedKeyExpr},
pubsub::FlumeSubscriber,
pubsub::Subscriber,
query::{Query, Queryable, ZenohParameters},
sample::{Locality, Sample},
Error, Resolvable, Resolve, Result as ZResult, Session, Wait,
Expand Down Expand Up @@ -115,8 +116,8 @@ impl IntoFuture for PublicationCacheBuilder<'_, '_, '_> {

#[zenoh_macros::unstable]
pub struct PublicationCache {
local_sub: FlumeSubscriber,
_queryable: Queryable<flume::Receiver<Query>>,
local_sub: Subscriber<FifoChannelHandler<Sample>>,
_queryable: Queryable<FifoChannelHandler<Query>>,
task: TerminatableTask,
}

Expand Down
7 changes: 4 additions & 3 deletions zenoh-ext/src/subscriber_ext.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,9 @@
//
use std::time::Duration;

use flume::r#async::RecvStream;
use futures::stream::{Forward, Map};
use zenoh::{
handlers::{fifo, FifoChannelHandler},
liveliness::LivelinessSubscriberBuilder,
pubsub::{Subscriber, SubscriberBuilder},
query::{QueryConsolidation, QueryTarget, ReplyKeyExpr},
Expand All @@ -33,11 +33,12 @@ pub trait SubscriberForward<'a, S> {
type Output;
fn forward(&'a mut self, sink: S) -> Self::Output;
}
impl<'a, S> SubscriberForward<'a, S> for Subscriber<flume::Receiver<Sample>>
impl<'a, S> SubscriberForward<'a, S> for Subscriber<FifoChannelHandler<Sample>>
where
S: futures::sink::Sink<Sample>,
{
type Output = Forward<Map<RecvStream<'a, Sample>, fn(Sample) -> Result<Sample, S::Error>>, S>;
type Output =
Forward<Map<fifo::RecvStream<'a, Sample>, fn(Sample) -> Result<Sample, S::Error>>, S>;
fn forward(&'a mut self, sink: S) -> Self::Output {
futures::StreamExt::forward(futures::StreamExt::map(self.stream(), Ok), sink)
}
Expand Down
285 changes: 282 additions & 3 deletions zenoh/src/api/handlers/fifo.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,15 @@

//! Callback handler trait.

use std::sync::Arc;
use std::{
future::Future,
pin::Pin,
sync::Arc,
task::{Context, Poll},
time::{Duration, Instant},
};

use zenoh_result::ZResult;

use crate::api::handlers::{callback::Callback, IntoHandler, API_DATA_RECEPTION_CHANNEL_SIZE};

Expand All @@ -41,11 +49,282 @@ impl Default for FifoChannel {
}
}

/// [`FifoChannel`] handler.
#[derive(Debug, Clone)]
pub struct FifoChannelHandler<T>(flume::Receiver<T>);

impl<T: Send + 'static> IntoHandler<T> for FifoChannel {
type Handler = flume::Receiver<T>;
type Handler = FifoChannelHandler<T>;

fn into_handler(self) -> (Callback<T>, Self::Handler) {
flume::bounded(self.capacity).into_handler()
let (sender, receiver) = flume::bounded(self.capacity);
(
Callback::new(Arc::new(move |t| {
if let Err(error) = sender.send(t) {
tracing::error!(%error)
}
})),
FifoChannelHandler(receiver),
)
}
}

impl<T> FifoChannelHandler<T> {
OlivierHecart marked this conversation as resolved.
Show resolved Hide resolved
/// Attempt to fetch an incoming value from the channel associated with this receiver, returning
/// an error if the channel is empty or if all senders have been dropped.
pub fn try_recv(&self) -> ZResult<T> {
self.0.try_recv().map_err(Into::into)
}

/// Wait for an incoming value from the channel associated with this receiver, returning an
/// error if all senders have been dropped.
pub fn recv(&self) -> ZResult<T> {
self.0.recv().map_err(Into::into)
}

/// Wait for an incoming value from the channel associated with this receiver, returning an
/// error if all senders have been dropped or the deadline has passed.
pub fn recv_deadline(&self, deadline: Instant) -> ZResult<T> {
self.0.recv_deadline(deadline).map_err(Into::into)
}

/// Wait for an incoming value from the channel associated with this receiver, returning an
/// error if all senders have been dropped or the timeout has expired.
pub fn recv_timeout(&self, duration: Duration) -> ZResult<T> {
self.0.recv_timeout(duration).map_err(Into::into)
}

/// Create a blocking iterator over the values received on the channel that finishes iteration
/// when all senders have been dropped.
pub fn iter(&self) -> Iter<'_, T> {
Iter(self.0.iter())
}

/// A non-blocking iterator over the values received on the channel that finishes iteration when
/// all senders have been dropped or the channel is empty.
pub fn try_iter(&self) -> TryIter<'_, T> {
TryIter(self.0.try_iter())
}

/// Take all msgs currently sitting in the channel and produce an iterator over them. Unlike
/// `try_iter`, the iterator will not attempt to fetch any more values from the channel once
/// the function has been called.
pub fn drain(&self) -> Drain<'_, T> {
Drain(self.0.drain())
}

/// Returns true if all senders for this channel have been dropped.
pub fn is_disconnected(&self) -> bool {
self.0.is_disconnected()
}

/// Returns true if the channel is empty.
/// Note: Zero-capacity channels are always empty.
pub fn is_empty(&self) -> bool {
self.0.is_empty()
}

/// Returns true if the channel is full.
/// Note: Zero-capacity channels are always full.
pub fn is_full(&self) -> bool {
self.0.is_full()
}

/// Returns the number of messages in the channel.
pub fn len(&self) -> usize {
self.0.len()
}

/// If the channel is bounded, returns its capacity.
pub fn capacity(&self) -> Option<usize> {
self.0.capacity()
}

/// Get the number of senders that currently exist.
pub fn sender_count(&self) -> usize {
self.0.sender_count()
}

/// Get the number of receivers that currently exist, including this one.
pub fn receiver_count(&self) -> usize {
self.0.receiver_count()
}

/// Returns whether the receivers are belong to the same channel.
pub fn same_channel(&self, other: &Self) -> bool {
self.0.same_channel(&other.0)
}
}

/// This exists as a shorthand for [`FifoChannelHandler::iter`].
impl<'a, T> IntoIterator for &'a FifoChannelHandler<T> {
type Item = T;
type IntoIter = Iter<'a, T>;

fn into_iter(self) -> Self::IntoIter {
Iter(self.0.iter())
}
}

impl<T> IntoIterator for FifoChannelHandler<T> {
type Item = T;
type IntoIter = IntoIter<T>;

/// Creates a self-owned but semantically equivalent alternative to [`FifoChannelHandler::iter`].
fn into_iter(self) -> Self::IntoIter {
IntoIter(self.0.into_iter())
}
}

/// An iterator over the msgs received from a channel.
pub struct Iter<'a, T>(flume::Iter<'a, T>);

impl<'a, T> Iterator for Iter<'a, T> {
type Item = T;

fn next(&mut self) -> Option<Self::Item> {
self.0.next()
}
}

/// An non-blocking iterator over the msgs received from a channel.
pub struct TryIter<'a, T>(flume::TryIter<'a, T>);

impl<'a, T> Iterator for TryIter<'a, T> {
type Item = T;

fn next(&mut self) -> Option<Self::Item> {
self.0.next()
}
}

/// An fixed-sized iterator over the msgs drained from a channel.
#[derive(Debug)]
pub struct Drain<'a, T>(flume::Drain<'a, T>);

impl<'a, T> Iterator for Drain<'a, T> {
type Item = T;

fn next(&mut self) -> Option<Self::Item> {
self.0.next()
}
}

impl<'a, T> ExactSizeIterator for Drain<'a, T> {
fn len(&self) -> usize {
self.0.len()
}
}

/// An owned iterator over the msgs received from a channel.
pub struct IntoIter<T>(flume::IntoIter<T>);

impl<T> Iterator for IntoIter<T> {
type Item = T;

fn next(&mut self) -> Option<Self::Item> {
self.0.next()
}
}

impl<T> FifoChannelHandler<T> {
/// Asynchronously receive a value from the channel, returning an error if all senders have been
/// dropped. If the channel is empty, the returned future will yield to the async runtime.
pub fn recv_async(&self) -> RecvFut<'_, T> {
RecvFut(self.0.recv_async())
}

/// Convert this receiver into a future that asynchronously receives a single message from the
/// channel, returning an error if all senders have been dropped. If the channel is empty, this
/// future will yield to the async runtime.
pub fn into_recv_async<'a>(self) -> RecvFut<'a, T> {
RecvFut(self.0.into_recv_async())
}
}

/// A future which allows asynchronously receiving a message.
///
/// Can be created via [`FifoChannelHandler::recv_async`] or [`FifoChannelHandler::into_recv_async`].
#[must_use = "futures/streams/sinks do nothing unless you `.await` or poll them"]
pub struct RecvFut<'a, T>(flume::r#async::RecvFut<'a, T>);

impl<'a, T> Future for RecvFut<'a, T> {
type Output = ZResult<T>;

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
Future::poll(Pin::new(&mut self.0), cx).map_err(Into::into)
}
}

impl<'a, T> futures::future::FusedFuture for RecvFut<'a, T> {
fn is_terminated(&self) -> bool {
futures::future::FusedFuture::is_terminated(&self.0)
}
}

impl<T> FifoChannelHandler<T> {
/// Create an asynchronous stream that uses this receiver to asynchronously receive messages
/// from the channel. The receiver will continue to be usable after the stream has been dropped.
pub fn stream(&self) -> RecvStream<'_, T> {
RecvStream(self.0.stream())
}

/// Convert this receiver into a stream that allows asynchronously receiving messages from the
/// channel.
pub fn into_stream<'a>(self) -> RecvStream<'a, T> {
RecvStream(self.0.into_stream())
}
}

/// A stream which allows asynchronously receiving messages.
///
/// Can be created via [`FifoChannelHandler::stream`] or [`FifoChannelHandler::into_stream`].
#[derive(Clone)]
pub struct RecvStream<'a, T>(flume::r#async::RecvStream<'a, T>);

impl<'a, T> RecvStream<'a, T> {
/// See [`FifoChannelHandler::is_disconnected`].
pub fn is_disconnected(&self) -> bool {
self.0.is_disconnected()
}

/// See [`FifoChannelHandler::is_empty`].
pub fn is_empty(&self) -> bool {
self.0.is_empty()
}

/// See [`FifoChannelHandler::is_full`].
pub fn is_full(&self) -> bool {
self.0.is_full()
}

/// See [`FifoChannelHandler::len`].
pub fn len(&self) -> usize {
self.0.len()
}

/// See [`FifoChannelHandler::capacity`].
pub fn capacity(&self) -> Option<usize> {
self.0.capacity()
}

/// Returns whether the SendSinks are belong to the same channel.
pub fn same_channel(&self, other: &Self) -> bool {
self.0.same_channel(&other.0)
}
}

impl<'a, T> futures::stream::Stream for RecvStream<'a, T> {
type Item = T;

fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
futures::stream::Stream::poll_next(Pin::new(&mut self.0), cx)
}
}

impl<'a, T> futures::stream::FusedStream for RecvStream<'a, T> {
fn is_terminated(&self) -> bool {
futures::stream::FusedStream::is_terminated(&self.0)
}
}

Expand Down
3 changes: 0 additions & 3 deletions zenoh/src/api/subscriber.rs
Original file line number Diff line number Diff line change
Expand Up @@ -470,9 +470,6 @@ impl<Handler> DerefMut for Subscriber<Handler> {
}
}

/// A [`Subscriber`] that provides data through a `flume` channel.
pub type FlumeSubscriber = Subscriber<flume::Receiver<Sample>>;

#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub(crate) enum SubscriberKind {
Subscriber,
Expand Down
Loading