From 055d71d55ff50c4b8e23f96b17c015209d9a1637 Mon Sep 17 00:00:00 2001 From: Didier Wenzek Date: Fri, 15 Mar 2024 17:51:05 +0100 Subject: [PATCH 1/4] Deprecate MessageReceiver::recv_message This method was only used in a test. Also deprecate `WrappedInput` type. Which was adding little compared to Result, RuntimeRequest>. Signed-off-by: Didier Wenzek --- crates/core/tedge_actors/src/message_boxes.rs | 75 +++---------------- crates/core/tedge_actors/src/test_helpers.rs | 20 ----- crates/extensions/c8y_mapper_ext/src/tests.rs | 5 +- crates/extensions/tedge_mqtt_ext/src/lib.rs | 5 -- 4 files changed, 12 insertions(+), 93 deletions(-) diff --git a/crates/core/tedge_actors/src/message_boxes.rs b/crates/core/tedge_actors/src/message_boxes.rs index b5abbda46ba..6d20c0884ce 100644 --- a/crates/core/tedge_actors/src/message_boxes.rs +++ b/crates/core/tedge_actors/src/message_boxes.rs @@ -72,11 +72,6 @@ //! but also to add specific coordination among input and output channels, //! each [Actor](crate::Actor) is free to choose its own [message box](crate::message_boxes) implementation: //! -//! ```no_run -//! trait Actor { -//! } -//! ``` -//! //! This crates provides several built-in message box implementations: //! //! - [SimpleMessageBox] for actors that simply process messages in turn, @@ -101,34 +96,12 @@ use futures::StreamExt; use log::debug; use std::fmt::Debug; -/// Either a message or a [RuntimeRequest] -pub enum WrappedInput { - Message(Input), - RuntimeRequest(RuntimeRequest), -} - -impl Debug for WrappedInput { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - match self { - Self::Message(input) => f.debug_tuple("Message").field(input).finish(), - Self::RuntimeRequest(runtime_request) => f - .debug_tuple("RuntimeRequest") - .field(runtime_request) - .finish(), - } - } -} - #[async_trait] pub trait MessageReceiver { /// Return the next received message if any, returning [RuntimeRequest]'s as errors. /// Returning [RuntimeRequest] takes priority over messages. async fn try_recv(&mut self) -> Result, RuntimeRequest>; - /// Returns [Some] [WrappedInput] the next time a message is received. Returns [None] if - /// the underlying channels are closed. Returning [RuntimeRequest] takes priority over messages. - async fn recv_message(&mut self) -> Option>; - /// Returns [Some] message the next time a message is received. Returns [None] if /// both of the underlying channels are closed or if a [RuntimeRequest] is received. /// Handling [RuntimeRequest]'s by returning [None] takes priority over messages. @@ -195,12 +168,6 @@ impl MessageReceiver for LoggingReceiver { message } - async fn recv_message(&mut self) -> Option> { - let message = self.receiver.recv_message().await; - debug!(target: &self.name, "recv {:?}", message); - message - } - async fn recv(&mut self) -> Option { let message = self.receiver.recv().await; debug!(target: &self.name, "recv {:?}", message); @@ -269,17 +236,17 @@ impl UnboundedLoggingReceiver { } } - async fn next_message(&mut self) -> Option> { + async fn next_message(&mut self) -> Result, RuntimeRequest> { tokio::select! { biased; Some(runtime_request) = self.signal_receiver.next() => { - Some(WrappedInput::RuntimeRequest(runtime_request)) + Err(runtime_request) } Some(message) = self.input_receiver.next() => { - Some(WrappedInput::Message(message)) + Ok(Some(message)) } - else => None + else => Ok(None) } } } @@ -287,16 +254,6 @@ impl UnboundedLoggingReceiver { #[async_trait] impl MessageReceiver for UnboundedLoggingReceiver { async fn try_recv(&mut self) -> Result, RuntimeRequest> { - let message = match self.next_message().await { - Some(WrappedInput::Message(message)) => Ok(Some(message)), - Some(WrappedInput::RuntimeRequest(runtime_request)) => Err(runtime_request), - None => Ok(None), - }; - debug!(target: &self.name, "recv {:?}", message); - message - } - - async fn recv_message(&mut self) -> Option> { let message = self.next_message().await; debug!(target: &self.name, "recv {:?}", message); message @@ -304,7 +261,7 @@ impl MessageReceiver for UnboundedLoggingReceiver Option { let message = match self.next_message().await { - Some(WrappedInput::Message(message)) => Some(message), + Ok(Some(message)) => Some(message), _ => None, }; debug!(target: &self.name, "recv {:?}", message); @@ -366,10 +323,6 @@ impl MessageReceiver for SimpleMessageBo self.input_receiver.try_recv().await } - async fn recv_message(&mut self) -> Option> { - self.input_receiver.recv_message().await - } - async fn recv(&mut self) -> Option { self.input_receiver.recv().await } @@ -416,30 +369,22 @@ impl CombinedReceiver { #[async_trait] impl MessageReceiver for CombinedReceiver { async fn try_recv(&mut self) -> Result, RuntimeRequest> { - match self.recv_message().await { - Some(WrappedInput::Message(message)) => Ok(Some(message)), - Some(WrappedInput::RuntimeRequest(runtime_request)) => Err(runtime_request), - None => Ok(None), - } - } - - async fn recv_message(&mut self) -> Option> { tokio::select! { biased; Some(runtime_request) = self.signal_receiver.next() => { - Some(WrappedInput::RuntimeRequest(runtime_request)) + Err(runtime_request) } Some(message) = self.input_receiver.next() => { - Some(WrappedInput::Message(message)) + Ok(Some(message)) } - else => None + else => Ok(None) } } async fn recv(&mut self) -> Option { - match self.recv_message().await { - Some(WrappedInput::Message(message)) => Some(message), + match self.try_recv().await { + Ok(Some(message)) => Some(message), _ => None, } } diff --git a/crates/core/tedge_actors/src/test_helpers.rs b/crates/core/tedge_actors/src/test_helpers.rs index 901b2d6c854..30cfc105919 100644 --- a/crates/core/tedge_actors/src/test_helpers.rs +++ b/crates/core/tedge_actors/src/test_helpers.rs @@ -17,7 +17,6 @@ use crate::ServerMessageBoxBuilder; use crate::Service; use crate::SimpleMessageBox; use crate::SimpleMessageBoxBuilder; -use crate::WrappedInput; use async_trait::async_trait; use core::future::Future; use std::collections::VecDeque; @@ -342,12 +341,6 @@ where .unwrap_or(Ok(None)) } - async fn recv_message(&mut self) -> Option> { - tokio::time::timeout(self.timeout, self.inner.recv_message()) - .await - .unwrap_or(None) - } - async fn recv(&mut self) -> Option { tokio::time::timeout(self.timeout, self.inner.recv()) .await @@ -743,19 +736,6 @@ impl MessageReceiver } } - async fn recv_message(&mut self) -> Option> { - match self.messages.recv_message().await { - None => None, - Some(WrappedInput::Message(RequestEnvelope { request, reply_to })) => { - self.reply_to.push_back(reply_to); - Some(WrappedInput::Message(request)) - } - Some(WrappedInput::RuntimeRequest(signal)) => { - Some(WrappedInput::RuntimeRequest(signal)) - } - } - } - async fn recv(&mut self) -> Option { match self.messages.recv().await { None => None, diff --git a/crates/extensions/c8y_mapper_ext/src/tests.rs b/crates/extensions/c8y_mapper_ext/src/tests.rs index c1b9aa7f0e5..6c1d8688f29 100644 --- a/crates/extensions/c8y_mapper_ext/src/tests.rs +++ b/crates/extensions/c8y_mapper_ext/src/tests.rs @@ -30,7 +30,6 @@ use tedge_actors::NoMessage; use tedge_actors::Sender; use tedge_actors::SimpleMessageBox; use tedge_actors::SimpleMessageBoxBuilder; -use tedge_actors::WrappedInput; use tedge_api::main_device_health_topic; use tedge_api::mqtt_topics::EntityTopicId; use tedge_api::mqtt_topics::MqttSchema; @@ -1542,7 +1541,7 @@ async fn mapper_dynamically_updates_supported_operations_for_child_device() { // Assert that the creation of the operation file alone doesn't trigger the supported operations update assert!( - mqtt.recv_message().await.is_none(), + mqtt.recv().await.is_none(), "No messages expected on operation file creation event" ); @@ -2053,7 +2052,7 @@ async fn inventory_registers_unknown_entity_once() { } let mut messages = vec![]; - while let Some(WrappedInput::Message(msg)) = mqtt.recv_message().await { + while let Ok(Some(msg)) = mqtt.try_recv().await { messages.push(msg); } diff --git a/crates/extensions/tedge_mqtt_ext/src/lib.rs b/crates/extensions/tedge_mqtt_ext/src/lib.rs index 97b30e1aa8a..f0720d3976d 100644 --- a/crates/extensions/tedge_mqtt_ext/src/lib.rs +++ b/crates/extensions/tedge_mqtt_ext/src/lib.rs @@ -21,7 +21,6 @@ use tedge_actors::RuntimeError; use tedge_actors::RuntimeRequest; use tedge_actors::RuntimeRequestSink; use tedge_actors::Sender; -use tedge_actors::WrappedInput; pub type MqttConfig = mqtt_channel::Config; pub type MqttMessage = mqtt_channel::Message; @@ -162,10 +161,6 @@ impl MessageReceiver for FromPeers { self.input_receiver.try_recv().await } - async fn recv_message(&mut self) -> Option> { - self.input_receiver.recv_message().await - } - async fn recv(&mut self) -> Option { self.input_receiver.recv().await } From be9a8b0cf86e6049d8ba1762898ead6921919b19 Mon Sep 17 00:00:00 2001 From: Didier Wenzek Date: Fri, 15 Mar 2024 18:57:40 +0100 Subject: [PATCH 2/4] Deprecate unused event_sender parameter on Runtime creation My plan was to fully deprecate the RuntimeEvent type. However, these events are actually useful to test the runtime. So let this unchanged for now. Signed-off-by: Didier Wenzek --- crates/core/tedge_actors/src/lib.rs | 3 +-- crates/core/tedge_actors/src/runtime.rs | 19 +++++++++++++------ .../tedge_actors/src/servers/message_boxes.rs | 2 +- crates/core/tedge_agent/src/agent.rs | 3 +-- crates/core/tedge_mapper/src/core/mapper.rs | 3 +-- plugins/c8y_firmware_plugin/src/lib.rs | 3 +-- 6 files changed, 18 insertions(+), 15 deletions(-) diff --git a/crates/core/tedge_actors/src/lib.rs b/crates/core/tedge_actors/src/lib.rs index 805a77cd336..3e85b48d2c6 100644 --- a/crates/core/tedge_actors/src/lib.rs +++ b/crates/core/tedge_actors/src/lib.rs @@ -281,8 +281,7 @@ //! //! # #[tokio::main] //! # async fn main() -> Result<(), RuntimeError> { -//! let runtime_events_logger = None; -//! let mut runtime = Runtime::try_new(runtime_events_logger).await?; +//! let mut runtime = Runtime::new(); //! //! let my_actor_builder = MyActorBuilder::default(); //! diff --git a/crates/core/tedge_actors/src/runtime.rs b/crates/core/tedge_actors/src/runtime.rs index de7a73a940a..d046e6e6e19 100644 --- a/crates/core/tedge_actors/src/runtime.rs +++ b/crates/core/tedge_actors/src/runtime.rs @@ -48,23 +48,30 @@ pub struct Runtime { bg_task: JoinHandle>, } +impl Default for Runtime { + fn default() -> Self { + Runtime::new() + } +} + impl Runtime { /// Launch the runtime, returning a runtime handler /// /// TODO: ensure this can only be called once - pub async fn try_new( - events_sender: Option>, - ) -> Result { + pub fn new() -> Runtime { + Self::with_events_sender(None) + } + + fn with_events_sender(events_sender: Option>) -> Runtime { let (actions_sender, actions_receiver) = mpsc::channel(16); let runtime_actor = RuntimeActor::new(actions_receiver, events_sender, Duration::from_secs(60)); let runtime_task = tokio::spawn(runtime_actor.run()); - let runtime = Runtime { + Runtime { handle: RuntimeHandle { actions_sender }, bg_task: runtime_task, - }; - Ok(runtime) + } } pub fn get_handle(&self) -> RuntimeHandle { diff --git a/crates/core/tedge_actors/src/servers/message_boxes.rs b/crates/core/tedge_actors/src/servers/message_boxes.rs index 69dc8615414..323003b5d48 100644 --- a/crates/core/tedge_actors/src/servers/message_boxes.rs +++ b/crates/core/tedge_actors/src/servers/message_boxes.rs @@ -306,7 +306,7 @@ mod tests { } } - let mut runtime = Runtime::try_new(None).await.unwrap(); + let mut runtime = Runtime::new(); let (mut server_actor_builder, mut test_rx) = TestServerBuilder::new(); let mut client_box = server_actor_builder.client_box(); diff --git a/crates/core/tedge_agent/src/agent.rs b/crates/core/tedge_agent/src/agent.rs index 7e0256c42f4..5a05dec4f7d 100644 --- a/crates/core/tedge_agent/src/agent.rs +++ b/crates/core/tedge_agent/src/agent.rs @@ -226,8 +226,7 @@ impl Agent { self.init()?; // Runtime - let runtime_events_logger = None; - let mut runtime = Runtime::try_new(runtime_events_logger).await?; + let mut runtime = Runtime::new(); // Operation workflows let workflows = self.load_operation_workflows().await?; diff --git a/crates/core/tedge_mapper/src/core/mapper.rs b/crates/core/tedge_mapper/src/core/mapper.rs index f93fccd2ba8..b8d18173c37 100644 --- a/crates/core/tedge_mapper/src/core/mapper.rs +++ b/crates/core/tedge_mapper/src/core/mapper.rs @@ -15,8 +15,7 @@ pub async fn start_basic_actors( mapper_name: &str, config: &TEdgeConfig, ) -> Result<(Runtime, MqttActorBuilder), anyhow::Error> { - let runtime_events_logger = None; - let mut runtime = Runtime::try_new(runtime_events_logger).await?; + let mut runtime = Runtime::new(); let mut mqtt_actor = get_mqtt_actor(mapper_name, config).await?; diff --git a/plugins/c8y_firmware_plugin/src/lib.rs b/plugins/c8y_firmware_plugin/src/lib.rs index 0620cafada9..822f61aa318 100644 --- a/plugins/c8y_firmware_plugin/src/lib.rs +++ b/plugins/c8y_firmware_plugin/src/lib.rs @@ -76,8 +76,7 @@ pub async fn run(firmware_plugin_opt: FirmwarePluginOpt) -> Result<(), anyhow::E } async fn run_with(tedge_config: TEdgeConfig) -> Result<(), anyhow::Error> { - let runtime_events_logger = None; - let mut runtime = Runtime::try_new(runtime_events_logger).await?; + let mut runtime = Runtime::new(); // Create actor instances let mqtt_config = tedge_config.mqtt_config()?; From 85aa79c0a86b71aa94e8ebf66d2dc17a7d5b5ae8 Mon Sep 17 00:00:00 2001 From: Didier Wenzek Date: Fri, 15 Mar 2024 19:36:58 +0100 Subject: [PATCH 3/4] Deprecate ServiceConsumerExt and Probe test helpers I'm proud of this Probe abstraction which really demonstrate the flexibility of the actor builder traits. However, this abstraction is used by not even a single test. Aiming to reduce the number of concepts introduce by this crate, I prefer to remove it. Signed-off-by: Didier Wenzek --- .../src/examples/calculator_server.rs | 65 ----- crates/core/tedge_actors/src/test_helpers.rs | 274 ------------------ 2 files changed, 339 deletions(-) diff --git a/crates/core/tedge_actors/src/examples/calculator_server.rs b/crates/core/tedge_actors/src/examples/calculator_server.rs index 21b0a66a799..2e3e705ff01 100644 --- a/crates/core/tedge_actors/src/examples/calculator_server.rs +++ b/crates/core/tedge_actors/src/examples/calculator_server.rs @@ -67,68 +67,3 @@ impl Actor for Player { Ok(()) } } - -#[cfg(test)] -#[cfg(feature = "test-helpers")] -mod tests { - use crate::examples::calculator_server::*; - use crate::test_helpers::Probe; - use crate::test_helpers::ProbeEvent::Recv; - use crate::test_helpers::ProbeEvent::Send; - use crate::test_helpers::ServiceConsumerExt; - use crate::Actor; - use crate::Builder; - use crate::ChannelError; - use crate::ServerActor; - use crate::ServerMessageBoxBuilder; - use crate::SimpleMessageBoxBuilder; - - #[tokio::test] - async fn observing_an_actor() -> Result<(), ChannelError> { - // Build the actor message boxes - let mut service_box_builder = ServerMessageBoxBuilder::new("Calculator", 16); - let mut player_box_builder = SimpleMessageBoxBuilder::new("Player 1", 1); - - // Connect the two actor message boxes interposing a probe. - let mut probe = Probe::new(); - player_box_builder - .with_probe(&mut probe) - .connect_to_server(&mut service_box_builder); - - // Spawn the actors - tokio::spawn(async move { - ServerActor::new(Calculator::default(), service_box_builder.build()) - .run() - .await - }); - tokio::spawn(async move { - Player { - name: "Player".to_string(), - target: 42, - messages: player_box_builder.build(), - } - .run() - .await - }); - - // Observe the messages sent and received by the player. - assert_eq!(probe.observe().await, Send(Operation::Add(0))); - assert_eq!(probe.observe().await, Recv(Update { from: 0, to: 0 })); - assert_eq!(probe.observe().await, Send(Operation::Add(21))); - assert_eq!(probe.observe().await, Recv(Update { from: 0, to: 21 })); - assert_eq!(probe.observe().await, Send(Operation::Add(10))); - assert_eq!(probe.observe().await, Recv(Update { from: 21, to: 31 })); - assert_eq!(probe.observe().await, Send(Operation::Add(5))); - assert_eq!(probe.observe().await, Recv(Update { from: 31, to: 36 })); - assert_eq!(probe.observe().await, Send(Operation::Add(3))); - assert_eq!(probe.observe().await, Recv(Update { from: 36, to: 39 })); - assert_eq!(probe.observe().await, Send(Operation::Add(1))); - assert_eq!(probe.observe().await, Recv(Update { from: 39, to: 40 })); - assert_eq!(probe.observe().await, Send(Operation::Add(1))); - assert_eq!(probe.observe().await, Recv(Update { from: 40, to: 41 })); - assert_eq!(probe.observe().await, Send(Operation::Add(0))); - assert_eq!(probe.observe().await, Recv(Update { from: 41, to: 41 })); - - Ok(()) - } -} diff --git a/crates/core/tedge_actors/src/test_helpers.rs b/crates/core/tedge_actors/src/test_helpers.rs index 30cfc105919..312bfdfc667 100644 --- a/crates/core/tedge_actors/src/test_helpers.rs +++ b/crates/core/tedge_actors/src/test_helpers.rs @@ -14,7 +14,6 @@ use crate::RequestSender; use crate::RuntimeRequest; use crate::Sender; use crate::ServerMessageBoxBuilder; -use crate::Service; use crate::SimpleMessageBox; use crate::SimpleMessageBoxBuilder; use async_trait::async_trait; @@ -377,279 +376,6 @@ impl AsMut for TimedMessageBox { } } -/// A message that can be broadcast -pub trait MessagePlus: Message + Clone + Eq {} -impl MessagePlus for T {} - -use crate::mpsc; -use crate::NullSender; -use futures::stream::FusedStream; -use futures::SinkExt; -use futures::StreamExt; - -/// For testing purpose, a `Probe` can be interposed between two actors to observe their interactions. -/// -/// The two actors under test, as well as their message boxes, are built and launched as usual, -/// the only interaction being on the wire: -/// - A probe is set on one side using the [with_probe()](crate::test_helpers::ServiceConsumerExt::with_probe) -/// method added by the [ServiceConsumerExt] -/// to any actor or message box builder. -/// - The [Probe::observe()](crate::test_helpers::Probe::observe) method can then be used -/// to observe all the messages either [sent](crate::test_helpers::ProbeEvent::Send) -/// or [received](crate::test_helpers::ProbeEvent::Recv) by the actor on which the probe has been set. -/// -/// ``` -/// # use tedge_actors::{Actor, Builder, ChannelError, NoConfig, ServerActor, ServerMessageBoxBuilder, SimpleMessageBoxBuilder}; -/// -/// # use tedge_actors::test_helpers::ProbeEvent::{Recv, Send}; -/// # use crate::tedge_actors::examples::calculator_server::*; -/// # #[tokio::main] -/// # async fn main_test() -> Result<(),ChannelError> { -/// # -/// // The [ServiceConsumerExt] trait must be in scope to add the `with_probe()` method on actor builders -/// use tedge_actors::test_helpers::{ServiceConsumerExt, Probe, ProbeEvent}; -/// -/// // Build the actor message boxes -/// let mut server_box_builder = ServerMessageBoxBuilder::new("Calculator", 16); -/// let mut player_box_builder = SimpleMessageBoxBuilder::new("Player 1", 1); -/// -/// // Connect the two actor message boxes interposing a probe. -/// let mut probe = Probe::new(); -/// player_box_builder.with_probe(&mut probe).connect_to_server(&mut server_box_builder); -/// -/// // Spawn the actors -/// let calculator = Calculator::default(); -/// tokio::spawn(async move { ServerActor::new(calculator, server_box_builder.build()).run().await } ); -/// tokio::spawn(async move { Player { name: "Player".to_string(), target: 42, messages: player_box_builder.build()}.run().await } ); -/// -/// // Observe the messages sent and received by the player. -/// assert_eq!(probe.observe().await, Send(Operation::Add(0))); -/// assert_eq!(probe.observe().await, Recv(Update{from:0, to:0})); -/// assert_eq!(probe.observe().await, Send(Operation::Add(21))); -/// assert_eq!(probe.observe().await, Recv(Update{from:0, to:21})); -/// assert_eq!(probe.observe().await, Send(Operation::Add(10))); -/// assert_eq!(probe.observe().await, Recv(Update{from:21, to:31})); -/// assert_eq!(probe.observe().await, Send(Operation::Add(5))); -/// assert_eq!(probe.observe().await, Recv(Update{from:31, to:36})); -/// assert_eq!(probe.observe().await, Send(Operation::Add(3))); -/// assert_eq!(probe.observe().await, Recv(Update{from:36, to:39})); -/// assert_eq!(probe.observe().await, Send(Operation::Add(1))); -/// assert_eq!(probe.observe().await, Recv(Update{from:39, to:40})); -/// assert_eq!(probe.observe().await, Send(Operation::Add(1))); -/// assert_eq!(probe.observe().await, Recv(Update{from:40, to:41})); -/// assert_eq!(probe.observe().await, Send(Operation::Add(0))); -/// assert_eq!(probe.observe().await, Recv(Update{from:41, to:41})); -/// # -/// # Ok(()) -/// # } -/// ``` -pub struct Probe { - input_interceptor: mpsc::Sender, - input_receiver: mpsc::Receiver, - input_forwarder: DynSender, - output_interceptor: mpsc::Sender, - output_receiver: mpsc::Receiver, - output_forwarder: DynSender, -} - -/// An event observed by a [Probe] -/// -/// These events have to be interpreted from the point of view of -/// the actor on which the probe has been set. -#[derive(Debug, Clone, Eq, PartialEq)] -pub enum ProbeEvent { - /// The observed actor received some input message - Recv(I), - - /// The observed actor sent some output message - Send(O), - - /// The input stream of the observed actor has been closed - CloseRecv, - - /// The output stream of the observed actor has been closed - CloseSend, - - /// The observed actor is fully disconnected - Closed, -} - -impl Default for Probe { - fn default() -> Self { - Self::new() - } -} - -impl Probe { - /// Create a new `Probe` ready to be interposed between two actors. - /// - /// The connection is done using the [with_probe()](crate::test_helpers::ServiceConsumerExt::with_probe) - /// method added by [ServiceConsumerExt] to any actor builder implementing [MessageSource] and [MessageSink]. - pub fn new() -> Self { - // The capacity of the interceptor channels is 1, - // so the probe will control at which pace input/output messages are sent. - let (input_interceptor, input_receiver) = mpsc::channel(1); - let (output_interceptor, output_receiver) = mpsc::channel(1); - - // Use null senders till this probe is connected to actual message boxes. - let input_forwarder = NullSender.into(); - let output_forwarder = NullSender.into(); - - Probe { - input_interceptor, - input_receiver, - input_forwarder, - output_interceptor, - output_receiver, - output_forwarder, - } - } - - /// Connect this probe to a source and a sink - pub fn connect_to_peers( - &mut self, - config: C, - source: &mut impl MessageSource, - sink: &mut impl MessageSink, - ) { - let input_interceptor: DynSender = self.input_interceptor.clone().into(); - self.output_forwarder = sink.get_sender(); - source.connect_sink(config, &input_interceptor); - } - - /// Connect this probe to a service provider - pub fn connect_to_server(&mut self, service: &mut impl Service) { - self.output_forwarder = service.connect_client(self.input_interceptor.clone().into()) - } - - /// Return the next event observed between the two connected actors. - /// - /// Note that calling `observe()` is mandatory for the actors to make progress. - /// Indeed the observed channels are blocked on each action till the event is actually observed. - /// Hence a probe also controls the pace at which input/output messages are sent - /// over the observed connection between the two actors - pub async fn observe(&mut self) -> ProbeEvent { - // Ensure that input/output can only be sent by the observed actors - let _ = self.input_interceptor.close().await; - let _ = self.output_interceptor.close().await; - - // Both input and output sender actors might have completed - if self.input_receiver.is_terminated() && self.output_receiver.is_terminated() { - return ProbeEvent::Closed; - } - - // When the input sender has completed: focus on output - if self.input_receiver.is_terminated() { - let output = self.output_receiver.next().await; - return self.notify_output(output).await; - } - - // When the output sender has completed: focus on input - if self.output_receiver.is_terminated() { - let input = self.input_receiver.next().await; - return self.notify_input(input).await; - } - - // Notify either input or output depending which is first - tokio::select! { - input = self.input_receiver.next() => { - self.notify_input(input).await - }, - output = self.output_receiver.next() => { - self.notify_output(output).await - }, - } - } - - async fn notify_input(&mut self, input: Option) -> ProbeEvent { - match input { - None => ProbeEvent::CloseRecv, - Some(input) => { - let event = input.clone(); - self.input_forwarder - .send(input) - .await - .expect("input to be forwarded"); - ProbeEvent::Recv(event) - } - } - } - - async fn notify_output(&mut self, output: Option) -> ProbeEvent { - match output { - None => ProbeEvent::CloseSend, - Some(output) => { - let event = output.clone(); - self.output_forwarder - .send(output) - .await - .expect("output to be forwarded"); - ProbeEvent::Send(event) - } - } - } -} - -/// Extend with a `with_probe` method any actor builder implementing [MessageSource] and [MessageSink]. -pub trait ServiceConsumerExt { - /// Add a probe to an actor `self` that is a [MessageSource] and [MessageSink]. - /// - /// Return a [MessageSource] and [MessageSink] - /// that can be plugged into another actor which consumes the source messages and produces messages for the sink. - /// - /// The added `Probe` is then interposed between the two actors, - /// observing all the [ProbeEvent] exchanged between them. - /// - /// ``` - /// # use tedge_actors::{NoConfig, ServerMessageBoxBuilder, SimpleMessageBoxBuilder}; - /// # use crate::tedge_actors::examples::calculator::*; - /// use tedge_actors::test_helpers::Probe; // The probe struct - /// use tedge_actors::MessageSource; // is a `MessageSource` - /// use tedge_actors::MessageSink; // is a `MessageSink` - /// use tedge_actors::test_helpers::ServiceConsumerExt; // Adds `.with_probe()` - /// - /// // Build the actor message boxes - /// let mut server_box_builder : ServerMessageBoxBuilder = ServerMessageBoxBuilder::new("Calculator", 16); - /// let mut client_box_builder : SimpleMessageBoxBuilder = SimpleMessageBoxBuilder::new("Player 1", 1); - /// - /// // Connect the two actor message boxes interposing a probe. - /// let mut probe = Probe::new(); - /// client_box_builder.with_probe(&mut probe).connect_to_server(&mut server_box_builder); - /// ``` - fn with_probe<'a>( - &'a mut self, - probe: &'a mut Probe, - ) -> &'a mut Probe; -} - -impl ServiceConsumerExt for T -where - T: MessageSource, - T: MessageSink, -{ - fn with_probe<'a>( - &'a mut self, - probe: &'a mut Probe, - ) -> &'a mut Probe { - let output_interceptor: DynSender = probe.output_interceptor.clone().into(); - probe.input_forwarder = self.get_sender(); - self.connect_sink(NoConfig, &output_interceptor); - probe - } -} - -impl MessageSource for Probe { - fn connect_sink(&mut self, _config: NoConfig, peer: &impl MessageSink) { - self.output_forwarder = peer.get_sender(); - } -} - -impl MessageSink for Probe { - fn get_sender(&self) -> DynSender { - self.input_interceptor.clone().into() - } -} - pub trait ServiceProviderExt { /// Create a simple message box connected to a server box under construction. fn new_client_box(&mut self) -> SimpleMessageBox; From e508f114f1cae35fda992c47a4b1439f33265483 Mon Sep 17 00:00:00 2001 From: Didier Wenzek Date: Mon, 18 Mar 2024 10:13:30 +0100 Subject: [PATCH 4/4] Deprecate ServiceProviderExt and new_client_box test helpers The new_client_box method has not been fully deprecated but moved to the Service trait. This makes this trait a bit more contrieved (with 3 ways to connect clients) and will have to be fixed. However, I prefer this temporary situation because moving the method in a trait Ext was only hidding the issue. I will address that in a following step. Signed-off-by: Didier Wenzek --- crates/common/batcher/src/driver.rs | 10 +++-- crates/core/tedge_actors/src/actors.rs | 6 ++- crates/core/tedge_actors/src/lib.rs | 25 ++++++----- .../tedge_actors/src/servers/message_boxes.rs | 2 +- crates/core/tedge_actors/src/servers/mod.rs | 20 ++++++++- crates/core/tedge_actors/src/test_helpers.rs | 41 ------------------- crates/core/tedge_actors/src/tests.rs | 1 - 7 files changed, 45 insertions(+), 60 deletions(-) diff --git a/crates/common/batcher/src/driver.rs b/crates/common/batcher/src/driver.rs index c09c2c6cf23..2dfd7003cb7 100644 --- a/crates/common/batcher/src/driver.rs +++ b/crates/common/batcher/src/driver.rs @@ -176,8 +176,9 @@ mod tests { use crate::config::BatchConfigBuilder; use crate::driver::BatchDriver; use std::time::Duration; - use tedge_actors::test_helpers::ServiceProviderExt; use tedge_actors::Builder; + use tedge_actors::MessageSource; + use tedge_actors::NoConfig; use tedge_actors::SimpleMessageBoxBuilder; use tokio::time::timeout; @@ -254,8 +255,11 @@ mod tests { .message_leap_limit(0) .build(); let batcher = Batcher::new(config); - let mut box_builder = SimpleMessageBoxBuilder::new("test", 1); - let test_box = box_builder.new_client_box(); + let mut box_builder = SimpleMessageBoxBuilder::new("SUT", 1); + let mut test_box_builder = SimpleMessageBoxBuilder::new("Test", 1); + box_builder.connect_sink(NoConfig, &test_box_builder); + test_box_builder.connect_sink(NoConfig, &box_builder); + let test_box = test_box_builder.build(); let driver_box = box_builder.build(); let driver = BatchDriver::new(batcher, driver_box); diff --git a/crates/core/tedge_actors/src/actors.rs b/crates/core/tedge_actors/src/actors.rs index 30dd6555da9..f73755b0def 100644 --- a/crates/core/tedge_actors/src/actors.rs +++ b/crates/core/tedge_actors/src/actors.rs @@ -48,7 +48,6 @@ where #[cfg(test)] #[cfg(feature = "test-helpers")] pub mod tests { - use crate::test_helpers::ServiceProviderExt; use crate::*; use async_trait::async_trait; use futures::channel::mpsc; @@ -77,7 +76,10 @@ pub mod tests { #[tokio::test] async fn running_an_actor_without_a_runtime() { let mut box_builder = SimpleMessageBoxBuilder::new("test", 16); - let mut client_message_box = box_builder.new_client_box(); + let mut client_box_builder = SimpleMessageBoxBuilder::new("client", 16); + client_box_builder.connect_sink(NoConfig, &box_builder); + client_box_builder.connect_source(NoConfig, &mut box_builder); + let mut client_message_box = client_box_builder.build(); let mut runtime_box = box_builder.get_signal_sender(); let actor_message_box = box_builder.build(); let actor = Echo { diff --git a/crates/core/tedge_actors/src/lib.rs b/crates/core/tedge_actors/src/lib.rs index 3e85b48d2c6..2adead4097b 100644 --- a/crates/core/tedge_actors/src/lib.rs +++ b/crates/core/tedge_actors/src/lib.rs @@ -104,17 +104,17 @@ //! [Actor and message box builders](crate::builders) are provided to address these specificities //! with a generic approach without exposing the internal structure of the actors. //! -//! To test the `Calculator` example we need first to create its box using a [SimpleMessageBoxBuilder], -//! as this actor expects a [SimpleMessageBox]. -//! And then, to create a test box connected to the actor message box, -//! we use the [ServiceProviderExt](crate::test_helpers::ServiceProviderExt) test helper extension -//! and the [new_client_box](crate::test_helpers::ServiceProviderExt::new_client_box) method. +//! To test the `Calculator` example we create two message boxes [SimpleMessageBox]es using [SimpleMessageBoxBuilder]s. +//! The first box will given to the actor under test, +//! the second box will be used by the test to control the actor input and to observe its output. +//! For that, the two boxes are connected so the output messages of the first are sent to the second, and vice-versa. //! //! ``` //! # use crate::tedge_actors::Actor; //! # use crate::tedge_actors::Builder; //! # use crate::tedge_actors::ChannelError; //! # use crate::tedge_actors::MessageReceiver; +//! # use crate::tedge_actors::MessageSink; //! # use crate::tedge_actors::NoConfig; //! # use crate::tedge_actors::Sender; //! # use crate::tedge_actors::SimpleMessageBox; @@ -125,15 +125,18 @@ //! # #[tokio::main] //! # async fn main() { //! # -//! // Add the `new_client_box()` extension to the `SimpleMessageBoxBuilder`. -//! use tedge_actors::test_helpers::ServiceProviderExt; -//! -//! // Use a builder for the actor message box +//! // Use builders to create message boxes for the actor and test driver +//! use tedge_actors::MessageSource; //! let mut actor_box_builder = SimpleMessageBoxBuilder::new("Actor", 10); +//! let mut test_box_builder = SimpleMessageBoxBuilder::new("Test", 10); +//! +//! // Connect the two boxes under construction, so each receives the message sent by the other +//! actor_box_builder.connect_source(NoConfig, &mut test_box_builder); +//! test_box_builder.connect_source(NoConfig, &mut actor_box_builder); //! -//! // Create a test box ready then the actor box -//! let mut test_box = actor_box_builder.new_client_box(); +//! // Build the boxes //! let actor_box = actor_box_builder.build(); +//! let mut test_box = test_box_builder.build(); //! //! // The actor is then spawn in the background with its message box. //! let mut actor = Calculator::new(actor_box); diff --git a/crates/core/tedge_actors/src/servers/message_boxes.rs b/crates/core/tedge_actors/src/servers/message_boxes.rs index 323003b5d48..c570468f519 100644 --- a/crates/core/tedge_actors/src/servers/message_boxes.rs +++ b/crates/core/tedge_actors/src/servers/message_boxes.rs @@ -159,7 +159,6 @@ impl Sender for RequestSender: /// The client provides a `response_sender` on which it wants to response to be sent to. /// In exchange the client is returned a request sender on which its requests will have to be sent. fn connect_client(&mut self, response_sender: DynSender) -> DynSender; + + #[cfg(feature = "test-helpers")] + /// Create a simple message box connected to a server box under construction. + fn new_client_box(&mut self) -> SimpleMessageBox; } impl Service for T @@ -164,4 +173,13 @@ where }; request_sender.into() } + + #[cfg(feature = "test-helpers")] + fn new_client_box(&mut self) -> SimpleMessageBox { + let name = "client-box"; + let capacity = 16; + let mut client_box = SimpleMessageBoxBuilder::new(name, capacity); + client_box.connect_sink(NoConfig, &self.connect_client(client_box.get_sender())); + client_box.build() + } } diff --git a/crates/core/tedge_actors/src/test_helpers.rs b/crates/core/tedge_actors/src/test_helpers.rs index 312bfdfc667..81dcade80f9 100644 --- a/crates/core/tedge_actors/src/test_helpers.rs +++ b/crates/core/tedge_actors/src/test_helpers.rs @@ -1,19 +1,14 @@ //! Testing actors use crate::Builder; use crate::ChannelError; -use crate::CloneSender; use crate::DynSender; use crate::Message; use crate::MessageReceiver; use crate::MessageSink; -use crate::MessageSource; -use crate::NoConfig; use crate::NoMessage; use crate::RequestEnvelope; -use crate::RequestSender; use crate::RuntimeRequest; use crate::Sender; -use crate::ServerMessageBoxBuilder; use crate::SimpleMessageBox; use crate::SimpleMessageBoxBuilder; use async_trait::async_trait; @@ -376,42 +371,6 @@ impl AsMut for TimedMessageBox { } } -pub trait ServiceProviderExt { - /// Create a simple message box connected to a server box under construction. - fn new_client_box(&mut self) -> SimpleMessageBox; -} - -impl ServiceProviderExt for DynSender> { - fn new_client_box(&mut self) -> SimpleMessageBox { - let name = "client-box"; - let capacity = 16; - let mut client_box = SimpleMessageBoxBuilder::new(name, capacity); - let request_sender = Box::new(RequestSender { - sender: self.sender_clone(), - reply_to: client_box.get_sender(), - }); - client_box.connect_sink(NoConfig, &request_sender.sender_clone()); - client_box.build() - } -} - -impl ServiceProviderExt for ServerMessageBoxBuilder { - fn new_client_box(&mut self) -> SimpleMessageBox { - self.request_sender().new_client_box() - } -} - -impl ServiceProviderExt for SimpleMessageBoxBuilder { - fn new_client_box(&mut self) -> SimpleMessageBox { - let name = "client-box"; - let capacity = 16; - let mut client_box = SimpleMessageBoxBuilder::new(name, capacity); - self.connect_sink(NoConfig, &client_box); - self.connect_source(NoConfig, &mut client_box); - client_box.build() - } -} - pub trait WithTimeout where T: Future, diff --git a/crates/core/tedge_actors/src/tests.rs b/crates/core/tedge_actors/src/tests.rs index 53a226a14b4..3311e812be3 100644 --- a/crates/core/tedge_actors/src/tests.rs +++ b/crates/core/tedge_actors/src/tests.rs @@ -1,4 +1,3 @@ -use crate::test_helpers::ServiceProviderExt; use crate::*; use async_trait::async_trait; use std::time::Duration;