Skip to content

Commit

Permalink
Merge pull request #2784 from didier-wenzek/refactor/deprecate-unused…
Browse files Browse the repository at this point in the history
…-actor-methods

refactor: Deprecate unused actor methods and types
  • Loading branch information
didier-wenzek authored Mar 21, 2024
2 parents 90fbf7f + e508f11 commit fd7a532
Show file tree
Hide file tree
Showing 15 changed files with 75 additions and 507 deletions.
10 changes: 7 additions & 3 deletions crates/common/batcher/src/driver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -176,8 +176,9 @@ mod tests {
use crate::config::BatchConfigBuilder;
use crate::driver::BatchDriver;
use std::time::Duration;
use tedge_actors::test_helpers::ServiceProviderExt;
use tedge_actors::Builder;
use tedge_actors::MessageSource;
use tedge_actors::NoConfig;
use tedge_actors::SimpleMessageBoxBuilder;
use tokio::time::timeout;

Expand Down Expand Up @@ -254,8 +255,11 @@ mod tests {
.message_leap_limit(0)
.build();
let batcher = Batcher::new(config);
let mut box_builder = SimpleMessageBoxBuilder::new("test", 1);
let test_box = box_builder.new_client_box();
let mut box_builder = SimpleMessageBoxBuilder::new("SUT", 1);
let mut test_box_builder = SimpleMessageBoxBuilder::new("Test", 1);
box_builder.connect_sink(NoConfig, &test_box_builder);
test_box_builder.connect_sink(NoConfig, &box_builder);
let test_box = test_box_builder.build();
let driver_box = box_builder.build();

let driver = BatchDriver::new(batcher, driver_box);
Expand Down
6 changes: 4 additions & 2 deletions crates/core/tedge_actors/src/actors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@ where
#[cfg(test)]
#[cfg(feature = "test-helpers")]
pub mod tests {
use crate::test_helpers::ServiceProviderExt;
use crate::*;
use async_trait::async_trait;
use futures::channel::mpsc;
Expand Down Expand Up @@ -77,7 +76,10 @@ pub mod tests {
#[tokio::test]
async fn running_an_actor_without_a_runtime() {
let mut box_builder = SimpleMessageBoxBuilder::new("test", 16);
let mut client_message_box = box_builder.new_client_box();
let mut client_box_builder = SimpleMessageBoxBuilder::new("client", 16);
client_box_builder.connect_sink(NoConfig, &box_builder);
client_box_builder.connect_source(NoConfig, &mut box_builder);
let mut client_message_box = client_box_builder.build();
let mut runtime_box = box_builder.get_signal_sender();
let actor_message_box = box_builder.build();
let actor = Echo {
Expand Down
65 changes: 0 additions & 65 deletions crates/core/tedge_actors/src/examples/calculator_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,68 +67,3 @@ impl Actor for Player {
Ok(())
}
}

#[cfg(test)]
#[cfg(feature = "test-helpers")]
mod tests {
use crate::examples::calculator_server::*;
use crate::test_helpers::Probe;
use crate::test_helpers::ProbeEvent::Recv;
use crate::test_helpers::ProbeEvent::Send;
use crate::test_helpers::ServiceConsumerExt;
use crate::Actor;
use crate::Builder;
use crate::ChannelError;
use crate::ServerActor;
use crate::ServerMessageBoxBuilder;
use crate::SimpleMessageBoxBuilder;

#[tokio::test]
async fn observing_an_actor() -> Result<(), ChannelError> {
// Build the actor message boxes
let mut service_box_builder = ServerMessageBoxBuilder::new("Calculator", 16);
let mut player_box_builder = SimpleMessageBoxBuilder::new("Player 1", 1);

// Connect the two actor message boxes interposing a probe.
let mut probe = Probe::new();
player_box_builder
.with_probe(&mut probe)
.connect_to_server(&mut service_box_builder);

// Spawn the actors
tokio::spawn(async move {
ServerActor::new(Calculator::default(), service_box_builder.build())
.run()
.await
});
tokio::spawn(async move {
Player {
name: "Player".to_string(),
target: 42,
messages: player_box_builder.build(),
}
.run()
.await
});

// Observe the messages sent and received by the player.
assert_eq!(probe.observe().await, Send(Operation::Add(0)));
assert_eq!(probe.observe().await, Recv(Update { from: 0, to: 0 }));
assert_eq!(probe.observe().await, Send(Operation::Add(21)));
assert_eq!(probe.observe().await, Recv(Update { from: 0, to: 21 }));
assert_eq!(probe.observe().await, Send(Operation::Add(10)));
assert_eq!(probe.observe().await, Recv(Update { from: 21, to: 31 }));
assert_eq!(probe.observe().await, Send(Operation::Add(5)));
assert_eq!(probe.observe().await, Recv(Update { from: 31, to: 36 }));
assert_eq!(probe.observe().await, Send(Operation::Add(3)));
assert_eq!(probe.observe().await, Recv(Update { from: 36, to: 39 }));
assert_eq!(probe.observe().await, Send(Operation::Add(1)));
assert_eq!(probe.observe().await, Recv(Update { from: 39, to: 40 }));
assert_eq!(probe.observe().await, Send(Operation::Add(1)));
assert_eq!(probe.observe().await, Recv(Update { from: 40, to: 41 }));
assert_eq!(probe.observe().await, Send(Operation::Add(0)));
assert_eq!(probe.observe().await, Recv(Update { from: 41, to: 41 }));

Ok(())
}
}
28 changes: 15 additions & 13 deletions crates/core/tedge_actors/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,17 +104,17 @@
//! [Actor and message box builders](crate::builders) are provided to address these specificities
//! with a generic approach without exposing the internal structure of the actors.
//!
//! To test the `Calculator` example we need first to create its box using a [SimpleMessageBoxBuilder],
//! as this actor expects a [SimpleMessageBox].
//! And then, to create a test box connected to the actor message box,
//! we use the [ServiceProviderExt](crate::test_helpers::ServiceProviderExt) test helper extension
//! and the [new_client_box](crate::test_helpers::ServiceProviderExt::new_client_box) method.
//! To test the `Calculator` example we create two message boxes [SimpleMessageBox]es using [SimpleMessageBoxBuilder]s.
//! The first box will given to the actor under test,
//! the second box will be used by the test to control the actor input and to observe its output.
//! For that, the two boxes are connected so the output messages of the first are sent to the second, and vice-versa.
//!
//! ```
//! # use crate::tedge_actors::Actor;
//! # use crate::tedge_actors::Builder;
//! # use crate::tedge_actors::ChannelError;
//! # use crate::tedge_actors::MessageReceiver;
//! # use crate::tedge_actors::MessageSink;
//! # use crate::tedge_actors::NoConfig;
//! # use crate::tedge_actors::Sender;
//! # use crate::tedge_actors::SimpleMessageBox;
Expand All @@ -125,15 +125,18 @@
//! # #[tokio::main]
//! # async fn main() {
//! #
//! // Add the `new_client_box()` extension to the `SimpleMessageBoxBuilder`.
//! use tedge_actors::test_helpers::ServiceProviderExt;
//!
//! // Use a builder for the actor message box
//! // Use builders to create message boxes for the actor and test driver
//! use tedge_actors::MessageSource;
//! let mut actor_box_builder = SimpleMessageBoxBuilder::new("Actor", 10);
//! let mut test_box_builder = SimpleMessageBoxBuilder::new("Test", 10);
//!
//! // Connect the two boxes under construction, so each receives the message sent by the other
//! actor_box_builder.connect_source(NoConfig, &mut test_box_builder);
//! test_box_builder.connect_source(NoConfig, &mut actor_box_builder);
//!
//! // Create a test box ready then the actor box
//! let mut test_box = actor_box_builder.new_client_box();
//! // Build the boxes
//! let actor_box = actor_box_builder.build();
//! let mut test_box = test_box_builder.build();
//!
//! // The actor is then spawn in the background with its message box.
//! let mut actor = Calculator::new(actor_box);
Expand Down Expand Up @@ -281,8 +284,7 @@
//!
//! # #[tokio::main]
//! # async fn main() -> Result<(), RuntimeError> {
//! let runtime_events_logger = None;
//! let mut runtime = Runtime::try_new(runtime_events_logger).await?;
//! let mut runtime = Runtime::new();
//!
//! let my_actor_builder = MyActorBuilder::default();
//!
Expand Down
75 changes: 10 additions & 65 deletions crates/core/tedge_actors/src/message_boxes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,11 +72,6 @@
//! but also to add specific coordination among input and output channels,
//! each [Actor](crate::Actor) is free to choose its own [message box](crate::message_boxes) implementation:
//!
//! ```no_run
//! trait Actor {
//! }
//! ```
//!
//! This crates provides several built-in message box implementations:
//!
//! - [SimpleMessageBox] for actors that simply process messages in turn,
Expand All @@ -101,34 +96,12 @@ use futures::StreamExt;
use log::debug;
use std::fmt::Debug;

/// Either a message or a [RuntimeRequest]
pub enum WrappedInput<Input> {
Message(Input),
RuntimeRequest(RuntimeRequest),
}

impl<Input: Debug> Debug for WrappedInput<Input> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::Message(input) => f.debug_tuple("Message").field(input).finish(),
Self::RuntimeRequest(runtime_request) => f
.debug_tuple("RuntimeRequest")
.field(runtime_request)
.finish(),
}
}
}

#[async_trait]
pub trait MessageReceiver<Input> {
/// Return the next received message if any, returning [RuntimeRequest]'s as errors.
/// Returning [RuntimeRequest] takes priority over messages.
async fn try_recv(&mut self) -> Result<Option<Input>, RuntimeRequest>;

/// Returns [Some] [WrappedInput] the next time a message is received. Returns [None] if
/// the underlying channels are closed. Returning [RuntimeRequest] takes priority over messages.
async fn recv_message(&mut self) -> Option<WrappedInput<Input>>;

/// Returns [Some] message the next time a message is received. Returns [None] if
/// both of the underlying channels are closed or if a [RuntimeRequest] is received.
/// Handling [RuntimeRequest]'s by returning [None] takes priority over messages.
Expand Down Expand Up @@ -195,12 +168,6 @@ impl<Input: Send + Debug> MessageReceiver<Input> for LoggingReceiver<Input> {
message
}

async fn recv_message(&mut self) -> Option<WrappedInput<Input>> {
let message = self.receiver.recv_message().await;
debug!(target: &self.name, "recv {:?}", message);
message
}

async fn recv(&mut self) -> Option<Input> {
let message = self.receiver.recv().await;
debug!(target: &self.name, "recv {:?}", message);
Expand Down Expand Up @@ -269,42 +236,32 @@ impl<Input: Debug> UnboundedLoggingReceiver<Input> {
}
}

async fn next_message(&mut self) -> Option<WrappedInput<Input>> {
async fn next_message(&mut self) -> Result<Option<Input>, RuntimeRequest> {
tokio::select! {
biased;

Some(runtime_request) = self.signal_receiver.next() => {
Some(WrappedInput::RuntimeRequest(runtime_request))
Err(runtime_request)
}
Some(message) = self.input_receiver.next() => {
Some(WrappedInput::Message(message))
Ok(Some(message))
}
else => None
else => Ok(None)
}
}
}

#[async_trait]
impl<Input: Send + Debug> MessageReceiver<Input> for UnboundedLoggingReceiver<Input> {
async fn try_recv(&mut self) -> Result<Option<Input>, RuntimeRequest> {
let message = match self.next_message().await {
Some(WrappedInput::Message(message)) => Ok(Some(message)),
Some(WrappedInput::RuntimeRequest(runtime_request)) => Err(runtime_request),
None => Ok(None),
};
debug!(target: &self.name, "recv {:?}", message);
message
}

async fn recv_message(&mut self) -> Option<WrappedInput<Input>> {
let message = self.next_message().await;
debug!(target: &self.name, "recv {:?}", message);
message
}

async fn recv(&mut self) -> Option<Input> {
let message = match self.next_message().await {
Some(WrappedInput::Message(message)) => Some(message),
Ok(Some(message)) => Some(message),
_ => None,
};
debug!(target: &self.name, "recv {:?}", message);
Expand Down Expand Up @@ -366,10 +323,6 @@ impl<Input: Message, Output: Message> MessageReceiver<Input> for SimpleMessageBo
self.input_receiver.try_recv().await
}

async fn recv_message(&mut self) -> Option<WrappedInput<Input>> {
self.input_receiver.recv_message().await
}

async fn recv(&mut self) -> Option<Input> {
self.input_receiver.recv().await
}
Expand Down Expand Up @@ -416,30 +369,22 @@ impl<Input> CombinedReceiver<Input> {
#[async_trait]
impl<Input: Send> MessageReceiver<Input> for CombinedReceiver<Input> {
async fn try_recv(&mut self) -> Result<Option<Input>, RuntimeRequest> {
match self.recv_message().await {
Some(WrappedInput::Message(message)) => Ok(Some(message)),
Some(WrappedInput::RuntimeRequest(runtime_request)) => Err(runtime_request),
None => Ok(None),
}
}

async fn recv_message(&mut self) -> Option<WrappedInput<Input>> {
tokio::select! {
biased;

Some(runtime_request) = self.signal_receiver.next() => {
Some(WrappedInput::RuntimeRequest(runtime_request))
Err(runtime_request)
}
Some(message) = self.input_receiver.next() => {
Some(WrappedInput::Message(message))
Ok(Some(message))
}
else => None
else => Ok(None)
}
}

async fn recv(&mut self) -> Option<Input> {
match self.recv_message().await {
Some(WrappedInput::Message(message)) => Some(message),
match self.try_recv().await {
Ok(Some(message)) => Some(message),
_ => None,
}
}
Expand Down
19 changes: 13 additions & 6 deletions crates/core/tedge_actors/src/runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,23 +48,30 @@ pub struct Runtime {
bg_task: JoinHandle<Result<(), RuntimeError>>,
}

impl Default for Runtime {
fn default() -> Self {
Runtime::new()
}
}

impl Runtime {
/// Launch the runtime, returning a runtime handler
///
/// TODO: ensure this can only be called once
pub async fn try_new(
events_sender: Option<DynSender<RuntimeEvent>>,
) -> Result<Runtime, RuntimeError> {
pub fn new() -> Runtime {
Self::with_events_sender(None)
}

fn with_events_sender(events_sender: Option<DynSender<RuntimeEvent>>) -> Runtime {
let (actions_sender, actions_receiver) = mpsc::channel(16);
let runtime_actor =
RuntimeActor::new(actions_receiver, events_sender, Duration::from_secs(60));

let runtime_task = tokio::spawn(runtime_actor.run());
let runtime = Runtime {
Runtime {
handle: RuntimeHandle { actions_sender },
bg_task: runtime_task,
};
Ok(runtime)
}
}

pub fn get_handle(&self) -> RuntimeHandle {
Expand Down
4 changes: 2 additions & 2 deletions crates/core/tedge_actors/src/servers/message_boxes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,6 @@ impl<Request: Message, Response: Message> Sender<Request> for RequestSender<Requ
mod tests {
use super::*;

use crate::test_helpers::ServiceProviderExt;
use crate::Builder;
use crate::ConcurrentServerActor;
use crate::DynSender;
Expand All @@ -168,6 +167,7 @@ mod tests {
use crate::RuntimeRequestSink;
use crate::Server;
use crate::ServerMessageBoxBuilder;
use crate::Service;
use crate::SimpleMessageBox;
use async_trait::async_trait;
use std::time::Duration;
Expand Down Expand Up @@ -306,7 +306,7 @@ mod tests {
}
}

let mut runtime = Runtime::try_new(None).await.unwrap();
let mut runtime = Runtime::new();

let (mut server_actor_builder, mut test_rx) = TestServerBuilder::new();
let mut client_box = server_actor_builder.client_box();
Expand Down
Loading

0 comments on commit fd7a532

Please sign in to comment.