From fd4a6523f64d722da7326a9fc2dc92cbfb404474 Mon Sep 17 00:00:00 2001 From: Mahmoud Mazouz Date: Thu, 3 Oct 2024 13:26:26 +0000 Subject: [PATCH 1/4] Encapsulate `flume::Receiver` --- .../src/storages_mgt/service.rs | 2 +- zenoh-ext/src/publication_cache.rs | 7 +- zenoh-ext/src/subscriber_ext.rs | 7 +- zenoh/src/api/handlers/fifo.rs | 287 +++++++++++++++++- zenoh/src/lib.rs | 11 +- zenoh/tests/matching.rs | 29 +- 6 files changed, 322 insertions(+), 21 deletions(-) diff --git a/plugins/zenoh-plugin-storage-manager/src/storages_mgt/service.rs b/plugins/zenoh-plugin-storage-manager/src/storages_mgt/service.rs index 8ae3d77634..391ee7a380 100644 --- a/plugins/zenoh-plugin-storage-manager/src/storages_mgt/service.rs +++ b/plugins/zenoh-plugin-storage-manager/src/storages_mgt/service.rs @@ -523,7 +523,7 @@ impl StorageService { Some(cache_guard) } - async fn reply_query(&self, query: Result) { + async fn reply_query(&self, query: ZResult) { let q = match query { Ok(q) => q, Err(e) => { diff --git a/zenoh-ext/src/publication_cache.rs b/zenoh-ext/src/publication_cache.rs index a2cfe3490a..3d3e9e6107 100644 --- a/zenoh-ext/src/publication_cache.rs +++ b/zenoh-ext/src/publication_cache.rs @@ -19,9 +19,10 @@ use std::{ }; use zenoh::{ + handlers::FifoChannelHandler, internal::{bail, runtime::ZRuntime, ResolveFuture, TerminatableTask}, key_expr::{keyexpr, KeyExpr, OwnedKeyExpr}, - pubsub::FlumeSubscriber, + pubsub::Subscriber, query::{Query, Queryable, ZenohParameters}, sample::{Locality, Sample}, Error, Resolvable, Resolve, Result as ZResult, Session, Wait, @@ -115,8 +116,8 @@ impl IntoFuture for PublicationCacheBuilder<'_, '_, '_> { #[zenoh_macros::unstable] pub struct PublicationCache { - local_sub: FlumeSubscriber, - _queryable: Queryable>, + local_sub: Subscriber>, + _queryable: Queryable>, task: TerminatableTask, } diff --git a/zenoh-ext/src/subscriber_ext.rs b/zenoh-ext/src/subscriber_ext.rs index 26ebbf60c0..d8d64c6760 100644 --- a/zenoh-ext/src/subscriber_ext.rs +++ b/zenoh-ext/src/subscriber_ext.rs @@ -13,9 +13,9 @@ // use std::time::Duration; -use flume::r#async::RecvStream; use futures::stream::{Forward, Map}; use zenoh::{ + handlers::{fifo, FifoChannelHandler}, liveliness::LivelinessSubscriberBuilder, pubsub::{Subscriber, SubscriberBuilder}, query::{QueryConsolidation, QueryTarget, ReplyKeyExpr}, @@ -33,11 +33,12 @@ pub trait SubscriberForward<'a, S> { type Output; fn forward(&'a mut self, sink: S) -> Self::Output; } -impl<'a, S> SubscriberForward<'a, S> for Subscriber> +impl<'a, S> SubscriberForward<'a, S> for Subscriber> where S: futures::sink::Sink, { - type Output = Forward, fn(Sample) -> Result>, S>; + type Output = + Forward, fn(Sample) -> Result>, S>; fn forward(&'a mut self, sink: S) -> Self::Output { futures::StreamExt::forward(futures::StreamExt::map(self.stream(), Ok), sink) } diff --git a/zenoh/src/api/handlers/fifo.rs b/zenoh/src/api/handlers/fifo.rs index 44a542e538..64f7b7a5e2 100644 --- a/zenoh/src/api/handlers/fifo.rs +++ b/zenoh/src/api/handlers/fifo.rs @@ -14,7 +14,15 @@ //! Callback handler trait. -use std::sync::Arc; +use std::{ + future::Future, + pin::Pin, + sync::Arc, + task::{Context, Poll}, + time::{Duration, Instant}, +}; + +use zenoh_result::ZResult; use crate::api::handlers::{callback::Callback, IntoHandler, API_DATA_RECEPTION_CHANNEL_SIZE}; @@ -41,11 +49,284 @@ impl Default for FifoChannel { } } +/// [`FifoChannel`] handler. +#[derive(Debug, Clone)] +pub struct FifoChannelHandler(flume::Receiver); + impl IntoHandler for FifoChannel { - type Handler = flume::Receiver; + type Handler = FifoChannelHandler; fn into_handler(self) -> (Callback, Self::Handler) { - flume::bounded(self.capacity).into_handler() + let (sender, receiver) = flume::bounded(self.capacity); + ( + Callback::new(Arc::new(move |t| { + if let Err(error) = sender.send(t) { + tracing::error!(%error) + } + })), + FifoChannelHandler(receiver), + ) + } +} + +impl FifoChannelHandler { + /// Attempt to fetch an incoming value from the channel associated with this receiver, returning + /// an error if the channel is empty or if all senders have been dropped. + pub fn try_recv(&self) -> ZResult { + self.0.try_recv().map_err(Into::into) + } + + /// Wait for an incoming value from the channel associated with this receiver, returning an + /// error if all senders have been dropped. + pub fn recv(&self) -> ZResult { + self.0.recv().map_err(Into::into) + } + + /// Wait for an incoming value from the channel associated with this receiver, returning an + /// error if all senders have been dropped or the deadline has passed. + pub fn recv_deadline(&self, deadline: Instant) -> ZResult { + self.0.recv_deadline(deadline).map_err(Into::into) + } + + /// Wait for an incoming value from the channel associated with this receiver, returning an + /// error if all senders have been dropped or the timeout has expired. + pub fn recv_timeout(&self, dur: Duration) -> ZResult { + self.0 + .recv_deadline(Instant::now().checked_add(dur).unwrap()) + .map_err(Into::into) + } + + /// Create a blocking iterator over the values received on the channel that finishes iteration + /// when all senders have been dropped. + pub fn iter(&self) -> Iter<'_, T> { + Iter(self.0.iter()) + } + + /// A non-blocking iterator over the values received on the channel that finishes iteration when + /// all senders have been dropped or the channel is empty. + pub fn try_iter(&self) -> TryIter<'_, T> { + TryIter(self.0.try_iter()) + } + + /// Take all msgs currently sitting in the channel and produce an iterator over them. Unlike + /// `try_iter`, the iterator will not attempt to fetch any more values from the channel once + /// the function has been called. + pub fn drain(&self) -> Drain<'_, T> { + Drain(self.0.drain()) + } + + /// Returns true if all senders for this channel have been dropped. + pub fn is_disconnected(&self) -> bool { + self.0.is_disconnected() + } + + /// Returns true if the channel is empty. + /// Note: Zero-capacity channels are always empty. + pub fn is_empty(&self) -> bool { + self.0.is_empty() + } + + /// Returns true if the channel is full. + /// Note: Zero-capacity channels are always full. + pub fn is_full(&self) -> bool { + self.0.is_full() + } + + /// Returns the number of messages in the channel. + pub fn len(&self) -> usize { + self.0.len() + } + + /// If the channel is bounded, returns its capacity. + pub fn capacity(&self) -> Option { + self.0.capacity() + } + + /// Get the number of senders that currently exist. + pub fn sender_count(&self) -> usize { + self.0.sender_count() + } + + /// Get the number of receivers that currently exist, including this one. + pub fn receiver_count(&self) -> usize { + self.0.receiver_count() + } + + /// Returns whether the receivers are belong to the same channel. + pub fn same_channel(&self, other: &Self) -> bool { + self.0.same_channel(&other.0) + } +} + +/// This exists as a shorthand for [`FifoChannelHandler::iter`]. +impl<'a, T> IntoIterator for &'a FifoChannelHandler { + type Item = T; + type IntoIter = Iter<'a, T>; + + fn into_iter(self) -> Self::IntoIter { + Iter(self.0.iter()) + } +} + +impl IntoIterator for FifoChannelHandler { + type Item = T; + type IntoIter = IntoIter; + + /// Creates a self-owned but semantically equivalent alternative to [`FifoChannelHandler::iter`]. + fn into_iter(self) -> Self::IntoIter { + IntoIter(self.0.into_iter()) + } +} + +/// An iterator over the msgs received from a channel. +pub struct Iter<'a, T>(flume::Iter<'a, T>); + +impl<'a, T> Iterator for Iter<'a, T> { + type Item = T; + + fn next(&mut self) -> Option { + self.0.next() + } +} + +/// An non-blocking iterator over the msgs received from a channel. +pub struct TryIter<'a, T>(flume::TryIter<'a, T>); + +impl<'a, T> Iterator for TryIter<'a, T> { + type Item = T; + + fn next(&mut self) -> Option { + self.0.next() + } +} + +/// An fixed-sized iterator over the msgs drained from a channel. +#[derive(Debug)] +pub struct Drain<'a, T>(flume::Drain<'a, T>); + +impl<'a, T> Iterator for Drain<'a, T> { + type Item = T; + + fn next(&mut self) -> Option { + self.0.next() + } +} + +impl<'a, T> ExactSizeIterator for Drain<'a, T> { + fn len(&self) -> usize { + self.0.len() + } +} + +/// An owned iterator over the msgs received from a channel. +pub struct IntoIter(flume::IntoIter); + +impl Iterator for IntoIter { + type Item = T; + + fn next(&mut self) -> Option { + self.0.next() + } +} + +impl FifoChannelHandler { + /// Asynchronously receive a value from the channel, returning an error if all senders have been + /// dropped. If the channel is empty, the returned future will yield to the async runtime. + pub fn recv_async(&self) -> RecvFut<'_, T> { + RecvFut(self.0.recv_async()) + } + + /// Convert this receiver into a future that asynchronously receives a single message from the + /// channel, returning an error if all senders have been dropped. If the channel is empty, this + /// future will yield to the async runtime. + pub fn into_recv_async<'a>(self) -> RecvFut<'a, T> { + RecvFut(self.0.into_recv_async()) + } +} + +/// A future which allows asynchronously receiving a message. +/// +/// Can be created via [`FifoChannelHandler::recv_async`] or [`FifoChannelHandler::into_recv_async`]. +#[must_use = "futures/streams/sinks do nothing unless you `.await` or poll them"] +pub struct RecvFut<'a, T>(flume::r#async::RecvFut<'a, T>); + +impl<'a, T> Future for RecvFut<'a, T> { + type Output = ZResult; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + Future::poll(Pin::new(&mut self.0), cx).map_err(Into::into) + } +} + +impl<'a, T> futures::future::FusedFuture for RecvFut<'a, T> { + fn is_terminated(&self) -> bool { + futures::future::FusedFuture::is_terminated(&self.0) + } +} + +impl FifoChannelHandler { + /// Create an asynchronous stream that uses this receiver to asynchronously receive messages + /// from the channel. The receiver will continue to be usable after the stream has been dropped. + pub fn stream(&self) -> RecvStream<'_, T> { + RecvStream(self.0.stream()) + } + + /// Convert this receiver into a stream that allows asynchronously receiving messages from the + /// channel. + pub fn into_stream<'a>(self) -> RecvStream<'a, T> { + RecvStream(self.0.into_stream()) + } +} + +/// A stream which allows asynchronously receiving messages. +/// +/// Can be created via [`FifoChannelHandler::stream`] or [`FifoChannelHandler::into_stream`]. +#[derive(Clone)] +pub struct RecvStream<'a, T>(flume::r#async::RecvStream<'a, T>); + +impl<'a, T> RecvStream<'a, T> { + /// See [`FifoChannelHandler::is_disconnected`]. + pub fn is_disconnected(&self) -> bool { + self.0.is_disconnected() + } + + /// See [`FifoChannelHandler::is_empty`]. + pub fn is_empty(&self) -> bool { + self.0.is_empty() + } + + /// See [`FifoChannelHandler::is_full`]. + pub fn is_full(&self) -> bool { + self.0.is_full() + } + + /// See [`FifoChannelHandler::len`]. + pub fn len(&self) -> usize { + self.0.len() + } + + /// See [`FifoChannelHandler::capacity`]. + pub fn capacity(&self) -> Option { + self.0.capacity() + } + + /// Returns whether the SendSinks are belong to the same channel. + pub fn same_channel(&self, other: &Self) -> bool { + self.0.same_channel(&other.0) + } +} + +impl<'a, T> futures::stream::Stream for RecvStream<'a, T> { + type Item = T; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + futures::stream::Stream::poll_next(Pin::new(&mut self.0), cx) + } +} + +impl<'a, T> futures::stream::FusedStream for RecvStream<'a, T> { + fn is_terminated(&self) -> bool { + futures::stream::FusedStream::is_terminated(&self.0) } } diff --git a/zenoh/src/lib.rs b/zenoh/src/lib.rs index 0d5f0c32af..737b5587b4 100644 --- a/zenoh/src/lib.rs +++ b/zenoh/src/lib.rs @@ -231,7 +231,7 @@ pub mod pubsub { PublisherDeleteBuilder, PublisherPutBuilder, }, publisher::{Publisher, PublisherUndeclaration}, - subscriber::{FlumeSubscriber, Subscriber, SubscriberBuilder}, + subscriber::{Subscriber, SubscriberBuilder}, }; } @@ -260,9 +260,14 @@ pub mod handlers { #[zenoh_macros::internal] pub use crate::api::handlers::locked; pub use crate::api::handlers::{ - Callback, CallbackDrop, DefaultHandler, FifoChannel, IntoHandler, RingChannel, - RingChannelHandler, + Callback, CallbackDrop, DefaultHandler, FifoChannel, FifoChannelHandler, IntoHandler, + RingChannel, RingChannelHandler, }; + pub mod fifo { + pub use crate::api::handlers::{ + Drain, FifoChannel, FifoChannelHandler, IntoIter, Iter, RecvFut, RecvStream, TryIter, + }; + } } /// Quality of service primitives diff --git a/zenoh/tests/matching.rs b/zenoh/tests/matching.rs index efa377863d..16f1376507 100644 --- a/zenoh/tests/matching.rs +++ b/zenoh/tests/matching.rs @@ -15,7 +15,6 @@ use std::time::Duration; -use flume::RecvTimeoutError; use zenoh::{sample::Locality, Result as ZResult, Session}; use zenoh_config::{ModeDependentValue, WhatAmI}; use zenoh_core::ztimeout; @@ -59,7 +58,9 @@ async fn zenoh_matching_status_any() -> ZResult<()> { let matching_listener = ztimeout!(publisher1.matching_listener()).unwrap(); let received_status = matching_listener.recv_timeout(RECV_TIMEOUT); - assert!(received_status.err() == Some(RecvTimeoutError::Timeout)); + assert!( + received_status.err().unwrap().downcast_ref() == Some(&flume::RecvTimeoutError::Timeout) + ); let matching_status = ztimeout!(publisher1.matching_status()).unwrap(); assert!(!matching_status.matching_subscribers()); @@ -113,7 +114,9 @@ async fn zenoh_matching_status_remote() -> ZResult<()> { let matching_listener = ztimeout!(publisher1.matching_listener()).unwrap(); let received_status = matching_listener.recv_timeout(RECV_TIMEOUT); - assert!(received_status.err() == Some(RecvTimeoutError::Timeout)); + assert!( + received_status.err().unwrap().downcast_ref() == Some(&flume::RecvTimeoutError::Timeout) + ); let matching_status = ztimeout!(publisher1.matching_status()).unwrap(); assert!(!matching_status.matching_subscribers()); @@ -121,7 +124,9 @@ async fn zenoh_matching_status_remote() -> ZResult<()> { let sub = ztimeout!(session1.declare_subscriber("zenoh_matching_status_remote_test")).unwrap(); let received_status = matching_listener.recv_timeout(RECV_TIMEOUT); - assert!(received_status.err() == Some(RecvTimeoutError::Timeout)); + assert!( + received_status.err().unwrap().downcast_ref() == Some(&flume::RecvTimeoutError::Timeout) + ); let matching_status = ztimeout!(publisher1.matching_status()).unwrap(); assert!(!matching_status.matching_subscribers()); @@ -129,7 +134,9 @@ async fn zenoh_matching_status_remote() -> ZResult<()> { ztimeout!(sub.undeclare()).unwrap(); let received_status = matching_listener.recv_timeout(RECV_TIMEOUT); - assert!(received_status.err() == Some(RecvTimeoutError::Timeout)); + assert!( + received_status.err().unwrap().downcast_ref() == Some(&flume::RecvTimeoutError::Timeout) + ); let matching_status = ztimeout!(publisher1.matching_status()).unwrap(); assert!(!matching_status.matching_subscribers()); @@ -168,7 +175,9 @@ async fn zenoh_matching_status_local() -> ZResult<()> { let matching_listener = ztimeout!(publisher1.matching_listener()).unwrap(); let received_status = matching_listener.recv_timeout(RECV_TIMEOUT); - assert!(received_status.err() == Some(RecvTimeoutError::Timeout)); + assert!( + received_status.err().unwrap().downcast_ref() == Some(&flume::RecvTimeoutError::Timeout) + ); let matching_status = ztimeout!(publisher1.matching_status()).unwrap(); assert!(!matching_status.matching_subscribers()); @@ -192,7 +201,9 @@ async fn zenoh_matching_status_local() -> ZResult<()> { let sub = ztimeout!(session2.declare_subscriber("zenoh_matching_status_local_test")).unwrap(); let received_status = matching_listener.recv_timeout(RECV_TIMEOUT); - assert!(received_status.err() == Some(RecvTimeoutError::Timeout)); + assert!( + received_status.err().unwrap().downcast_ref() == Some(&flume::RecvTimeoutError::Timeout) + ); let matching_status = ztimeout!(publisher1.matching_status()).unwrap(); assert!(!matching_status.matching_subscribers()); @@ -200,7 +211,9 @@ async fn zenoh_matching_status_local() -> ZResult<()> { ztimeout!(sub.undeclare()).unwrap(); let received_status = matching_listener.recv_timeout(RECV_TIMEOUT); - assert!(received_status.err() == Some(RecvTimeoutError::Timeout)); + assert!( + received_status.err().unwrap().downcast_ref() == Some(&flume::RecvTimeoutError::Timeout) + ); let matching_status = ztimeout!(publisher1.matching_status()).unwrap(); assert!(!matching_status.matching_subscribers()); From fbc11453a23570b0a4eb8b81684005b71b7d4a84 Mon Sep 17 00:00:00 2001 From: Mahmoud Mazouz Date: Thu, 3 Oct 2024 14:19:47 +0000 Subject: [PATCH 2/4] Remove `FlumeSubscriber` This was marked as unused after fd4a652 --- zenoh/src/api/subscriber.rs | 3 --- 1 file changed, 3 deletions(-) diff --git a/zenoh/src/api/subscriber.rs b/zenoh/src/api/subscriber.rs index ba03546b08..f6d6a1f08b 100644 --- a/zenoh/src/api/subscriber.rs +++ b/zenoh/src/api/subscriber.rs @@ -470,9 +470,6 @@ impl DerefMut for Subscriber { } } -/// A [`Subscriber`] that provides data through a `flume` channel. -pub type FlumeSubscriber = Subscriber>; - #[derive(Debug, Clone, Copy, PartialEq, Eq)] pub(crate) enum SubscriberKind { Subscriber, From 3cc03147c52cddc88052350edcccfe1f5cfb3472 Mon Sep 17 00:00:00 2001 From: Mahmoud Mazouz Date: Thu, 3 Oct 2024 14:28:42 +0000 Subject: [PATCH 3/4] Remove unused `flume` dependency in storage-manager plugin --- Cargo.lock | 1 - plugins/zenoh-plugin-storage-manager/Cargo.toml | 1 - 2 files changed, 2 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index e4a14b05c9..2387bd28d3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5934,7 +5934,6 @@ dependencies = [ "async-trait", "bincode", "bloomfilter", - "flume", "futures", "git-version", "jsonschema", diff --git a/plugins/zenoh-plugin-storage-manager/Cargo.toml b/plugins/zenoh-plugin-storage-manager/Cargo.toml index 42d1cbd359..25685d9388 100644 --- a/plugins/zenoh-plugin-storage-manager/Cargo.toml +++ b/plugins/zenoh-plugin-storage-manager/Cargo.toml @@ -35,7 +35,6 @@ crate-type = ["cdylib", "rlib"] async-trait = { workspace = true } bincode = { workspace = true } bloomfilter = "1" -flume = { workspace = true } futures = { workspace = true } git-version = { workspace = true } lazy_static = { workspace = true } From 4fbc88b0795ad8f61f4b496b9529977d7a7e27d2 Mon Sep 17 00:00:00 2001 From: Mahmoud Mazouz Date: Mon, 7 Oct 2024 12:28:31 +0000 Subject: [PATCH 4/4] Fix `FifoChannelHandler::recv_timeout` impl --- zenoh/src/api/handlers/fifo.rs | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/zenoh/src/api/handlers/fifo.rs b/zenoh/src/api/handlers/fifo.rs index 64f7b7a5e2..47cdf7b5b6 100644 --- a/zenoh/src/api/handlers/fifo.rs +++ b/zenoh/src/api/handlers/fifo.rs @@ -90,10 +90,8 @@ impl FifoChannelHandler { /// Wait for an incoming value from the channel associated with this receiver, returning an /// error if all senders have been dropped or the timeout has expired. - pub fn recv_timeout(&self, dur: Duration) -> ZResult { - self.0 - .recv_deadline(Instant::now().checked_add(dur).unwrap()) - .map_err(Into::into) + pub fn recv_timeout(&self, duration: Duration) -> ZResult { + self.0.recv_timeout(duration).map_err(Into::into) } /// Create a blocking iterator over the values received on the channel that finishes iteration