Skip to content

Commit

Permalink
Modify Actor trait to take self instead of &mut self (#2329)
Browse files Browse the repository at this point in the history
Signed-off-by: James Rhodes <[email protected]>
  • Loading branch information
jarhodes314 authored Oct 18, 2023
1 parent d39c1c2 commit ca4b74e
Show file tree
Hide file tree
Showing 44 changed files with 107 additions and 114 deletions.
4 changes: 2 additions & 2 deletions crates/common/batcher/src/driver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ impl<B: Batchable> Actor for BatchDriver<B> {
}

/// Start the batching - runs until receiving a Flush message
async fn run(&mut self) -> Result<(), RuntimeError> {
async fn run(mut self) -> Result<(), RuntimeError> {
loop {
let message = match self.time_to_next_timer() {
TimeTo::Unbounded => self.recv(None),
Expand Down Expand Up @@ -259,7 +259,7 @@ mod tests {
let test_box = box_builder.new_client_box(NoConfig);
let driver_box = box_builder.build();

let mut driver = BatchDriver::new(batcher, driver_box);
let driver = BatchDriver::new(batcher, driver_box);
tokio::spawn(async move { driver.run().await });

test_box
Expand Down
29 changes: 23 additions & 6 deletions crates/core/tedge_actors/src/actors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use async_trait::async_trait;
/// - sending responses for requests, possibly deferring some responses,
/// - acting as a source of messages ...
#[async_trait]
pub trait Actor: 'static + Send + Sync {
pub trait Actor: 'static + Send + Sync + ActorBoxed {
/// Return the actor instance name
fn name(&self) -> &str;

Expand All @@ -25,7 +25,24 @@ pub trait Actor: 'static + Send + Sync {
/// Processing input messages,
/// updating internal state,
/// and sending messages to peers.
async fn run(&mut self) -> Result<(), RuntimeError>;
async fn run(self) -> Result<(), RuntimeError>;
}

// The following madness comes from
// https://users.rust-lang.org/t/call-consuming-method-for-dyn-trait-object/69596/8
#[async_trait]
pub trait ActorBoxed {
async fn run_boxed(self: Box<Self>) -> Result<(), RuntimeError>;
}

#[async_trait]
impl<A> ActorBoxed for A
where
A: Actor,
{
async fn run_boxed(self: Box<Self>) -> Result<(), RuntimeError> {
(*self).run().await
}
}

#[cfg(test)]
Expand All @@ -48,7 +65,7 @@ pub mod tests {
"Echo"
}

async fn run(&mut self) -> Result<(), RuntimeError> {
async fn run(mut self) -> Result<(), RuntimeError> {
while let Some(message) = self.messages.recv().await {
self.messages.send(message).await?
}
Expand All @@ -62,7 +79,7 @@ pub mod tests {
let mut box_builder = SimpleMessageBoxBuilder::new("test", 16);
let mut client_message_box = box_builder.new_client_box(NoConfig);
let actor_message_box = box_builder.build();
let mut actor = Echo {
let actor = Echo {
messages: actor_message_box,
};
let actor_task = spawn(async move { actor.run().await });
Expand Down Expand Up @@ -93,7 +110,7 @@ pub mod tests {
let (output_sender, mut output_receiver) = mpsc::channel(10);

let (input_sender, message_box) = SpecificMessageBox::new_box(10, output_sender.into());
let mut actor = ActorWithSpecificMessageBox {
let actor = ActorWithSpecificMessageBox {
messages: message_box,
};
let actor_task = spawn(async move { actor.run().await });
Expand Down Expand Up @@ -139,7 +156,7 @@ pub mod tests {
"ActorWithSpecificMessageBox"
}

async fn run(&mut self) -> Result<(), RuntimeError> {
async fn run(mut self) -> Result<(), RuntimeError> {
while let Some(message) = self.messages.next().await {
if message.contains("this") {
self.messages.do_this(message.to_string()).await?
Expand Down
2 changes: 1 addition & 1 deletion crates/core/tedge_actors/src/builders.rs
Original file line number Diff line number Diff line change
Expand Up @@ -434,7 +434,7 @@ pub trait ServiceConsumer<Request: Message, Response: Message, Config> {
/// "My Actor"
/// }
///
/// async fn run(&mut self) -> Result<(), RuntimeError> {
/// async fn run(mut self) -> Result<(), RuntimeError> {
/// while let Some(input) = self.messages.recv().await {
/// let output = input * 2;
/// self.messages.send(output).await?;
Expand Down
2 changes: 1 addition & 1 deletion crates/core/tedge_actors/src/converter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ impl<C: Converter> Actor for ConvertingActor<C> {
&self.name
}

async fn run(&mut self) -> Result<(), RuntimeError> {
async fn run(mut self) -> Result<(), RuntimeError> {
let init_messages = self.init_messages()?;
self.send(init_messages).await?;

Expand Down
2 changes: 1 addition & 1 deletion crates/core/tedge_actors/src/examples/calculator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ impl Actor for Calculator {
"Calculator"
}

async fn run(&mut self) -> Result<(), RuntimeError> {
async fn run(mut self) -> Result<(), RuntimeError> {
while let Some(op) = self.messages.recv().await {
// Process in turn each input message
let from = self.state;
Expand Down
2 changes: 1 addition & 1 deletion crates/core/tedge_actors/src/examples/calculator_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ impl Actor for Player {
&self.name
}

async fn run(&mut self) -> Result<(), RuntimeError> {
async fn run(mut self) -> Result<(), RuntimeError> {
// Send a first identity `Operation` to see where we are.
self.messages.send(Operation::Add(0)).await?;

Expand Down
2 changes: 1 addition & 1 deletion crates/core/tedge_actors/src/examples/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
//! &self.name
//! }
//!
//! async fn run(&mut self) -> Result<(), RuntimeError> {
//! async fn run(mut self) -> Result<(), RuntimeError> {
//! // Send a first identity `Operation` to see where we are.
//! self.messages.send(Operation::Add(0)).await?;
//!
Expand Down
4 changes: 2 additions & 2 deletions crates/core/tedge_actors/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@
//! /// However, there are no constraints on the behavior of an actor.
//! /// A more sophisticated actor might send output messages independently of any request
//! /// or process concurrently several requests.
//! async fn run(&mut self)-> Result<(), RuntimeError> {
//! async fn run(mut self)-> Result<(), RuntimeError> {
//! while let Some(op) = self.messages.recv().await {
//! // Process in turn each input message
//! let from = self.state;
Expand Down Expand Up @@ -286,7 +286,7 @@
//! todo!()
//! }
//!
//! async fn run(&mut self) -> Result<(), RuntimeError> {
//! async fn run(mut self) -> Result<(), RuntimeError> {
//! todo!()
//! }
//! }
Expand Down
4 changes: 2 additions & 2 deletions crates/core/tedge_actors/src/run_actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,8 @@ impl RunActor {
self.actor.name()
}

pub async fn run(mut self) -> Result<(), RuntimeError> {
self.actor.run().await
pub async fn run(self) -> Result<(), RuntimeError> {
self.actor.run_boxed().await
}
}

Expand Down
6 changes: 3 additions & 3 deletions crates/core/tedge_actors/src/runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -309,7 +309,7 @@ mod tests {
"Echo"
}

async fn run(&mut self) -> Result<(), RuntimeError> {
async fn run(mut self) -> Result<(), RuntimeError> {
while let Some(message) = self.messages.recv().await {
match message {
EchoMessage::String(message) => {
Expand Down Expand Up @@ -338,7 +338,7 @@ mod tests {
"Ending"
}

async fn run(&mut self) -> Result<(), RuntimeError> {
async fn run(self) -> Result<(), RuntimeError> {
Ok(())
}
}
Expand All @@ -357,7 +357,7 @@ mod tests {
"Panic"
}

async fn run(&mut self) -> Result<(), RuntimeError> {
async fn run(self) -> Result<(), RuntimeError> {
panic!("Oh dear");
}
}
Expand Down
4 changes: 2 additions & 2 deletions crates/core/tedge_actors/src/servers/actors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ impl<S: Server> Actor for ServerActor<S> {
self.server.name()
}

async fn run(&mut self) -> Result<(), RuntimeError> {
async fn run(mut self) -> Result<(), RuntimeError> {
let server = &mut self.server;
while let Some((client_id, request)) = self.messages.recv().await {
tokio::select! {
Expand Down Expand Up @@ -66,7 +66,7 @@ impl<S: Server + Clone> Actor for ConcurrentServerActor<S> {
self.server.name()
}

async fn run(&mut self) -> Result<(), RuntimeError> {
async fn run(mut self) -> Result<(), RuntimeError> {
while let Some((client_id, request)) = self.messages.recv().await {
// Spawn the request
let mut server = self.server.clone();
Expand Down
4 changes: 2 additions & 2 deletions crates/core/tedge_actors/src/servers/builders.rs
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ impl<S: Server, K> ServerActorBuilder<S, K> {
impl<S: Server> ServerActorBuilder<S, Sequential> {
pub async fn run(self) -> Result<(), RuntimeError> {
let messages = self.box_builder.build();
let mut actor = ServerActor::new(self.server, messages);
let actor = ServerActor::new(self.server, messages);

actor.run().await
}
Expand All @@ -162,7 +162,7 @@ impl<S: Server> ServerActorBuilder<S, Sequential> {
impl<S: Server + Clone> ServerActorBuilder<S, Concurrent> {
pub async fn run(self) -> Result<(), RuntimeError> {
let messages = self.box_builder.build();
let mut actor = ConcurrentServerActor::new(self.server, messages);
let actor = ConcurrentServerActor::new(self.server, messages);

actor.run().await
}
Expand Down
4 changes: 2 additions & 2 deletions crates/core/tedge_actors/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ async fn spawn_sleep_service() -> SimpleMessageBox<(ClientId, u64), (ClientId, u
let mut box_builder = SimpleMessageBoxBuilder::new("test", 16);
let handle = box_builder.new_client_box(NoConfig);
let messages = box_builder.build();
let mut actor = ServerActor::new(service, messages);
let actor = ServerActor::new(service, messages);

tokio::spawn(async move { actor.run().await });

Expand All @@ -49,7 +49,7 @@ async fn spawn_concurrent_sleep_service(

let handle = handle_builder.build();
let messages = ConcurrentServerMessageBox::new(max_concurrency, box_builder.build());
let mut actor = ConcurrentServerActor::new(service, messages);
let actor = ConcurrentServerActor::new(service, messages);

tokio::spawn(async move { actor.run().await });

Expand Down
4 changes: 2 additions & 2 deletions crates/core/tedge_agent/src/file_transfer_server/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ impl Actor for FileTransferServerActor {
"HttpFileTransferServer"
}

async fn run(&mut self) -> Result<(), RuntimeError> {
async fn run(mut self) -> Result<(), RuntimeError> {
let http_config = self.config.clone();
let server = http_file_transfer_server(&http_config)?;

Expand Down Expand Up @@ -100,7 +100,7 @@ mod tests {

// Spawn HTTP file transfer server
let builder = FileTransferServerBuilder::new(http_config);
let mut actor = builder.build();
let actor = builder.build();
tokio::spawn(async move { actor.run().await });

// Create PUT request to file transfer service
Expand Down
2 changes: 1 addition & 1 deletion crates/core/tedge_agent/src/restart_manager/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ impl Actor for RestartManagerActor {
"RestartManagerActor"
}

async fn run(&mut self) -> Result<(), RuntimeError> {
async fn run(mut self) -> Result<(), RuntimeError> {
if let Some(response) = self.process_pending_restart_operation().await {
self.message_box.send(response).await?;
}
Expand Down
2 changes: 1 addition & 1 deletion crates/core/tedge_agent/src/restart_manager/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ async fn spawn_restart_manager(

let converter_box = converter_builder.build().with_timeout(TEST_TIMEOUT_MS);

let mut restart_actor = restart_actor_builder.build();
let restart_actor = restart_actor_builder.build();
tokio::spawn(async move { restart_actor.run().await });

Ok(converter_box)
Expand Down
2 changes: 1 addition & 1 deletion crates/core/tedge_agent/src/software_manager/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ impl Actor for SoftwareManagerActor {
"SoftwareManagerActor"
}

async fn run(&mut self) -> Result<(), RuntimeError> {
async fn run(mut self) -> Result<(), RuntimeError> {
let operation_logs = OperationLogs::try_new(self.config.log_dir.clone().into())
.map_err(SoftwareManagerError::FromOperationsLogs)?;

Expand Down
2 changes: 1 addition & 1 deletion crates/core/tedge_agent/src/software_manager/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ async fn spawn_software_manager(

let converter_box = converter_builder.build().with_timeout(TEST_TIMEOUT_MS);

let mut software_actor = software_actor_builder.build();
let software_actor = software_actor_builder.build();
tokio::spawn(async move { software_actor.run().await });

Ok(converter_box)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ impl Actor for TedgeOperationConverterActor {
"TedgeOperationConverter"
}

async fn run(&mut self) -> Result<(), RuntimeError> {
async fn run(mut self) -> Result<(), RuntimeError> {
self.publish_operation_capabilities().await?;

while let Some(input) = self.input_receiver.recv().await {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,7 @@ async fn spawn_mqtt_operation_converter(
let restart_box = restart_builder.build().with_timeout(TEST_TIMEOUT_MS);
let mqtt_message_box = mqtt_builder.build().with_timeout(TEST_TIMEOUT_MS);

let mut converter_actor = converter_actor_builder.build();
let converter_actor = converter_actor_builder.build();
tokio::spawn(async move { converter_actor.run().await });

Ok((software_box, restart_box, mqtt_message_box))
Expand Down
2 changes: 1 addition & 1 deletion crates/extensions/c8y_auth_proxy/src/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ impl Actor for C8yAuthProxy {
"C8yAuthProxy"
}

async fn run(&mut self) -> Result<(), RuntimeError> {
async fn run(mut self) -> Result<(), RuntimeError> {
let server = Server::try_init(self.app_state.clone(), self.bind_address, self.bind_port)
.map_err(BoxError::from)?
.wait();
Expand Down
2 changes: 1 addition & 1 deletion crates/extensions/c8y_config_manager/src/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -351,7 +351,7 @@ impl Actor for ConfigManagerActor {
"ConfigManager"
}

async fn run(&mut self) -> Result<(), RuntimeError> {
async fn run(mut self) -> Result<(), RuntimeError> {
self.publish_supported_config_types().await?;
self.get_pending_operations_from_cloud().await?;

Expand Down
2 changes: 1 addition & 1 deletion crates/extensions/c8y_config_manager/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1014,7 +1014,7 @@ async fn spawn_config_manager(
let c8y_proxy_message_box = c8y_proxy_builder.build();
let timer_message_box = timer_builder.build();

let mut actor = config_manager_builder.build();
let actor = config_manager_builder.build();
let _join_handle = tokio::spawn(async move { actor.run().await });

Ok((mqtt_message_box, c8y_proxy_message_box, timer_message_box))
Expand Down
2 changes: 1 addition & 1 deletion crates/extensions/c8y_firmware_manager/src/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ impl Actor for FirmwareManagerActor {
// 2. Operation timeouts from the TimerActor for requests for which the child devices don't respond within the timeout window
// 3. Download results from the DownloaderActor for firmware download requests

async fn run(&mut self) -> Result<(), RuntimeError> {
async fn run(mut self) -> Result<(), RuntimeError> {
self.resend_operations_to_child_device().await?;
// TODO: We need a dedicated actor to publish 500 later.
self.get_pending_operations_from_cloud().await?;
Expand Down
2 changes: 1 addition & 1 deletion crates/extensions/c8y_firmware_manager/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -744,7 +744,7 @@ async fn spawn_firmware_manager(
let timer_message_box = timer_builder.build();
let downloader_message_box = downloader_builder.build().with_timeout(TEST_TIMEOUT_MS);

let mut firmware_manager_actor = firmware_manager_builder.build();
let firmware_manager_actor = firmware_manager_builder.build();
let handle = tokio::spawn(async move { firmware_manager_actor.run().await });

Ok((
Expand Down
2 changes: 1 addition & 1 deletion crates/extensions/c8y_http_proxy/src/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ impl Actor for C8YHttpProxyActor {
"C8Y-REST"
}

async fn run(&mut self) -> Result<(), RuntimeError> {
async fn run(mut self) -> Result<(), RuntimeError> {
self.init().await.map_err(Box::new)?;

while let Some((client_id, request)) = self.peers.clients.recv().await {
Expand Down
Loading

1 comment on commit ca4b74e

@github-actions
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Robot Results

✅ Passed ❌ Failed ⏭️ Skipped Total Pass % ⏱️ Duration
318 1 3 319 99.69 57m27.015999999s

Failed Tests

Name Message ⏱️ Duration Suite
Publish measurements varying period benchmark.py run --count 1000 ... returned an unexpected exit code stdout: { "ok": false, "iterations": 5, "passed": 4, "failed": 1, "results": [ { "worker": 0, "messages": 1000, "dropped_percent": 24.7, "dropped_messages": 247, "total": 0.724058, "total_non_idle": 0.72406, "idle": 0.0, "parameters": { "count": 1000, "beats": 100, "beats_delay": 0, [ Message content over the limit has been removed. ] 2023-10-18 08:59:28,747 - root - INFO - Starting benchmark: count=1000, beats=100, beats_delay=0ms, period=50ms 2023-10-18 08:59:28,778 - root - INFO - Subscribing to cloud topic 2023-10-18 08:59:29,291 - root - INFO - Waiting for last message to be published 2023-10-18 08:59:29,902 - root - INFO - Stopping mqtt client 2023-10-18 08:59:30,908 - root - INFO - Starting benchmark: count=1000, beats=100, beats_delay=0ms, period=75ms 2023-10-18 08:59:30,919 - root - INFO - Subscribing to cloud topic 2023-10-18 08:59:31,779 - root - INFO - Waiting for last message to be published 2023-10-18 08:59:31,880 - root - INFO - Stopping mqtt client 2023-10-18 08:59:32,885 - root - INFO - Starting benchmark: count=1000, beats=100, beats_delay=0ms, period=100ms 2023-10-18 08:59:32,925 - root - INFO - Subscribing to cloud topic 2023-10-18 08:59:34,041 - root - INFO - Waiting for last message to be published 2023-10-18 08:59:34,241 - root - INFO - Stopping mqtt client 2023-10-18 08:59:35,250 - root - INFO - Finished benchmark Benchmark failed 47.617 s Benchmarks

Please sign in to comment.