Skip to content

Commit

Permalink
Merge pull request #3291 from jarhodes314/bug/tedge-mapper-shutdown
Browse files Browse the repository at this point in the history
fix: ensure concurrent server always responds to shutdown signal
  • Loading branch information
jarhodes314 authored Dec 13, 2024
2 parents 7c17eb0 + 8d1b650 commit 44a56e3
Showing 1 changed file with 66 additions and 3 deletions.
69 changes: 66 additions & 3 deletions crates/core/tedge_actors/src/servers/message_boxes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,8 @@ impl<Request: Message, Response: Message> ConcurrentServerMessageBox<Request, Re

loop {
tokio::select! {
Some(request) = self.requests.recv() => {
return Some(request);
received = self.requests.recv() => {
return received
}
Some(result) = self.running_request_handlers.next() => {
if let Err(err) = result {
Expand Down Expand Up @@ -156,7 +156,7 @@ impl<Request: Message, Response: Message> Sender<Request> for RequestSender<Requ

#[cfg(test)]
#[cfg(feature = "test-helpers")]
mod tests {
mod tests_using_helpers {
use super::*;

use crate::Builder;
Expand Down Expand Up @@ -340,3 +340,66 @@ mod tests {
);
}
}

#[cfg(test)]
mod tests {
use crate::Builder;
use crate::ConcurrentServerMessageBox;
use crate::RuntimeRequest;
use crate::RuntimeRequestSink;
use crate::ServerMessageBox;
use crate::ServerMessageBoxBuilder;
use tokio::time::timeout;
use tokio::time::Duration;

#[tokio::test]
async fn does_not_block_runtime_exit_with_non_finishing_requests() {
let box_builder = ServerMessageBoxBuilder::new("ConcurrentServerMessageBoxTest", 16);

let mut sig = box_builder.get_signal_sender();
let message_box: ServerMessageBox<i32, i32> = box_builder.build();
let mut message_box = ConcurrentServerMessageBox::new(4, message_box);
message_box
.running_request_handlers
.push(tokio::spawn(async { std::future::pending().await }));

sig.send(RuntimeRequest::Shutdown).await.unwrap();

loop {
let res = timeout(Duration::from_millis(500), message_box.next_request()).await;

match res {
Err(_elapsed) => panic!("Timeout elapsed: actor ignored shutdown request"),
Ok(None) => break,
Ok(_) => {}
}
}
}

#[tokio::test]
async fn does_not_block_runtime_exit_with_well_behaved_requests() {
let box_builder = ServerMessageBoxBuilder::new("ConcurrentServerMessageBoxTest", 16);

let mut sig = box_builder.get_signal_sender();
let message_box: ServerMessageBox<i32, i32> = box_builder.build();
let mut message_box = ConcurrentServerMessageBox::new(4, message_box);
message_box
.running_request_handlers
.push(tokio::spawn(async {}));
message_box
.running_request_handlers
.push(tokio::spawn(async {}));

sig.send(RuntimeRequest::Shutdown).await.unwrap();

loop {
let res = timeout(Duration::from_millis(500), message_box.next_request()).await;

match res {
Err(_elapsed) => panic!("Timeout elapsed: actor ignored shutdown request"),
Ok(None) => break,
Ok(_) => {}
}
}
}
}

0 comments on commit 44a56e3

Please sign in to comment.