Skip to content

Commit

Permalink
Merge pull request #3741 from anoma/tiago/run-aux-arbitrary-tasks
Browse files Browse the repository at this point in the history
Dynamic ledger managed tasks
  • Loading branch information
mergify[bot] authored Sep 2, 2024
2 parents 9139dbe + 233378b commit 5f20b23
Show file tree
Hide file tree
Showing 4 changed files with 333 additions and 151 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
- Support dynamically joining ledger managed tasks. With this change, adding
or removing managed tasks from the ledger should be a far easier process to
contend with. ([\#3741](https://github.com/anoma/namada/pull/3741))
286 changes: 258 additions & 28 deletions crates/node/src/abortable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ use namada_sdk::control_flow::{
use tokio::sync::mpsc::{self, UnboundedReceiver, UnboundedSender};
use tokio::task::JoinHandle;

use crate::shell::ShellResult;

/// Serves to identify an aborting async task, which is spawned
/// with an [`AbortableSpawner`].
pub type AbortingTask = &'static str;
Expand All @@ -18,13 +20,17 @@ pub struct AbortableSpawner {
abort_send: UnboundedSender<AbortingTask>,
abort_recv: UnboundedReceiver<AbortingTask>,
cleanup_jobs: Vec<Pin<Box<dyn Future<Output = ()>>>>,
pinned: Option<(AbortingTask, JoinHandle<ShellResult<()>>)>,
batch: Vec<(AbortingTask, JoinHandle<ShellResult<()>>)>,
}

/// Contains the state of an on-going [`AbortableSpawner`] task spawn.
pub struct WithCleanup<'a, A> {
pub struct AbortableTaskBuilder<'a, A> {
who: AbortingTask,
abortable: A,
spawner: &'a mut AbortableSpawner,
cleanup: Option<Pin<Box<dyn Future<Output = ()>>>>,
pin: bool,
}

impl Default for AbortableSpawner {
Expand All @@ -44,6 +50,8 @@ impl AbortableSpawner {
abort_recv,
shutdown_recv,
cleanup_jobs: Vec::new(),
batch: Vec::new(),
pinned: None,
}
}

Expand All @@ -55,35 +63,44 @@ impl AbortableSpawner {
/// ```ignore
/// let mut spawner = AbortableSpawner::new();
/// spawner
/// .spawn_abortable("ExampleTask", |aborter| async {
/// .abortable("ExampleTask", |aborter| async {
/// drop(aborter);
/// println!("I have signaled a control task that I am no longer running!");
/// })
/// .with_no_cleanup();
/// .spawn();
/// spawner.run_to_completion().await;
/// ```
///
/// The return type of this method is [`WithCleanup`], such that a cleanup
/// routine, after the abort is received, can be configured to execute.
pub fn spawn_abortable<A>(
/// The return type of this method is [`AbortableTaskBuilder`], such that a
/// cleanup routine, after the abort is received, can be configured to
/// execute.
#[inline]
pub fn abortable<A>(
&mut self,
who: AbortingTask,
abortable: A,
) -> WithCleanup<'_, A> {
WithCleanup {
) -> AbortableTaskBuilder<'_, A> {
AbortableTaskBuilder {
who,
abortable,
spawner: self,
cleanup: None,
pin: false,
}
}

/// This future will resolve when:
/// Wait for any of the spawned tasks to abort.
///
/// ## Resolving this future
///
/// This future runs to completion if:
///
/// 1. A user sends a shutdown signal (e.g. SIGINT), or...
/// 2. One of the child processes of the ledger terminates, which
/// generates a notification upon dropping an [`Aborter`].
///
/// These two scenarios are represented by the [`AborterStatus`] enum.
pub async fn wait_for_abort(mut self) -> AborterStatus {
async fn wait_for_abort(mut self) -> AborterStatus {
let status = tokio::select! {
_ = self.shutdown_recv.wait_for_shutdown() => AborterStatus::UserShutdownLedger,
msg = self.abort_recv.recv() => {
Expand All @@ -103,49 +120,142 @@ impl AbortableSpawner {
status
}

/// This method is responsible for actually spawning the async task into the
/// runtime.
fn spawn_abortable_task<A, F, R>(
/// Run all the spawned tasks to completion.
pub async fn run_to_completion(mut self) {
let pinned_task = self.pinned.take();

let batch = std::mem::take(&mut self.batch);
let (task_ids, task_handles): (Vec<_>, Vec<_>) =
batch.into_iter().unzip();

// Wait for interrupt signal or abort message
let aborted = self.wait_for_abort().await.child_terminated();

// Wait for all managed tasks to finish
match futures::future::try_join_all(task_handles).await {
Ok(results) => {
for (i, res) in results.into_iter().enumerate() {
match res {
Err(err) if aborted => {
let who = task_ids[i];
tracing::error!("{who} error: {err}");
}
_ => {}
}
}
}
Err(err) => {
// Ignore cancellation errors
if !err.is_cancelled() {
tracing::error!("Abortable spawner error: {err}");
}
}
}

if let Some((who, pinned_task)) = pinned_task {
match pinned_task.await {
Err(err) if err.is_panic() => {
std::panic::resume_unwind(err.into_panic())
}
Err(err) => tracing::error!("{who} error: {err}"),
_ => {}
}
}
}

fn spawn_abortable_task<A, F>(
&self,
who: AbortingTask,
abortable: A,
) -> JoinHandle<R>
) -> JoinHandle<ShellResult<()>>
where
A: FnOnce(Aborter) -> F,
F: Future<Output = R> + Send + 'static,
R: Send + 'static,
F: Future<Output = ShellResult<()>> + Send + 'static,
{
let abort = Aborter {
who,
sender: self.abort_send.clone(),
};
tokio::spawn(abortable(abort))
}

fn spawn_abortable_task_blocking<A>(
&self,
who: AbortingTask,
abortable: A,
) -> JoinHandle<ShellResult<()>>
where
A: FnOnce(Aborter) -> ShellResult<()> + Send + 'static,
{
let abort = Aborter {
who,
sender: self.abort_send.clone(),
};
tokio::task::spawn_blocking(move || abortable(abort))
}
}

impl<'a, A> WithCleanup<'a, A> {
/// No cleanup routine will be executed for the associated task.
impl<'a, A> AbortableTaskBuilder<'a, A> {
/// Spawn the built abortable task into the runtime.
#[inline]
pub fn with_no_cleanup<F, R>(self) -> JoinHandle<R>
pub fn spawn<F>(self)
where
A: FnOnce(Aborter) -> F,
F: Future<Output = R> + Send + 'static,
R: Send + 'static,
F: Future<Output = ShellResult<()>> + Send + 'static,
{
if let Some(cleanup) = self.cleanup {
self.spawner.cleanup_jobs.push(cleanup);
}
let task = self.spawner.spawn_abortable_task(self.who, self.abortable);
if self.pin {
if let Some(pinned_task) = self.spawner.pinned.take() {
self.spawner.batch.push(pinned_task);
}
self.spawner.pinned = Some((self.who, task));
} else {
self.spawner.batch.push((self.who, task));
}
}

/// Spawn the built abortable (blocking) task into the runtime.
#[inline]
pub fn spawn_blocking(self)
where
A: FnOnce(Aborter) -> ShellResult<()> + Send + 'static,
{
self.spawner.spawn_abortable_task(self.who, self.abortable)
if let Some(cleanup) = self.cleanup {
self.spawner.cleanup_jobs.push(cleanup);
}
let task = self
.spawner
.spawn_abortable_task_blocking(self.who, self.abortable);
if self.pin {
if let Some(pinned_task) = self.spawner.pinned.take() {
self.spawner.batch.push(pinned_task);
}
self.spawner.pinned = Some((self.who, task));
} else {
self.spawner.batch.push((self.who, task));
}
}

/// A cleanup routine `cleanup` will be executed for the associated task.
/// This method replaces the previous cleanup routine, if any.
#[inline]
pub fn with_cleanup<F, R, C>(self, cleanup: C) -> JoinHandle<R>
pub fn with_cleanup<C>(mut self, cleanup: C) -> Self
where
A: FnOnce(Aborter) -> F,
F: Future<Output = R> + Send + 'static,
R: Send + 'static,
C: Future<Output = ()> + Send + 'static,
{
self.spawner.cleanup_jobs.push(Box::pin(cleanup));
self.with_no_cleanup()
self.cleanup = Some(Box::pin(cleanup));
self
}

/// Pin the task to spawn. The main purpose behind this operation
/// is to resume unwinding the stack if the pinned task panics.
#[inline]
pub fn pin(mut self) -> Self {
self.pin = true;
self
}
}

Expand Down Expand Up @@ -180,3 +290,123 @@ impl AborterStatus {
matches!(self, AborterStatus::ChildProcessTerminated)
}
}

#[cfg(test)]
mod abortale_spawner_tests {
use std::sync::{Arc, Mutex};

use super::*;

/// Test panicking a non-pinned task shouldn't cause the entire spawner to
/// come crashing down.
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_abortable_spawner_panic_non_pinned_task() {
let mut spawner = AbortableSpawner::new();

spawner
.abortable("TestTask", |_aborter| async {
panic!();
})
.spawn();

spawner.run_to_completion().await;
}

/// Test panicking a pinned task must cause the entire spawner to come
/// crashing down.
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
#[should_panic = "AbortableSpawnerPanic"]
async fn test_abortable_spawner_panic_pinned_task() {
let mut spawner = AbortableSpawner::new();

spawner
.abortable("TestTask", |_aborter| async {
panic!("AbortableSpawnerPanic");
})
.pin()
.spawn();

spawner.run_to_completion().await;
}

/// Test that cleanup jobs get triggered.
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_cleanup_job() {
let mut spawner = AbortableSpawner::new();

struct Slot {
task_data: [String; 3],
}

let slot = Arc::new(Mutex::new(Slot {
task_data: [String::new(), String::new(), String::new()],
}));

let task_ids = ["TestTask#1", "TestTask#2", "TestTask#3"];

for (task_no, &id) in task_ids.iter().enumerate() {
let slot = Arc::clone(&slot);

spawner
.abortable(id, |aborter| async move {
drop(aborter);
Ok(())
})
.with_cleanup(async move {
slot.lock().unwrap().task_data[task_no] = id.into();
})
.spawn();
}

spawner.run_to_completion().await;

let slot_handle = slot.lock().unwrap();
assert_eq!(slot_handle.task_data[0].as_str(), task_ids[0]);
assert_eq!(slot_handle.task_data[1].as_str(), task_ids[1]);
assert_eq!(slot_handle.task_data[2].as_str(), task_ids[2]);
}

/// Test blocking jobs.
#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
async fn test_blocking_spawn() {
let (bing_tx, bing_rx) = tokio::sync::oneshot::channel();
let (bong_tx, bong_rx) = tokio::sync::oneshot::channel();

let mut spawner = AbortableSpawner::new();
spawner
.abortable("Bing", move |aborter| {
bing_rx.blocking_recv().unwrap();
drop(aborter);
Ok(())
})
.spawn_blocking();
spawner
.abortable("Bong", move |aborter| {
bong_rx.blocking_recv().unwrap();
drop(aborter);
Ok(())
})
.spawn_blocking();

let spawner_run_fut = Box::pin(spawner.run_to_completion());
let select_result =
futures::future::select(spawner_run_fut, std::future::ready(()))
.await;
let spawner_run_fut = match select_result {
futures::future::Either::Left(_) => unreachable!("Test failed"),
futures::future::Either::Right(((), fut)) => fut,
};

bing_tx.send(()).unwrap();
let select_result =
futures::future::select(spawner_run_fut, std::future::ready(()))
.await;
let spawner_run_fut = match select_result {
futures::future::Either::Left(_) => unreachable!("Test failed"),
futures::future::Either::Right(((), fut)) => fut,
};

bong_tx.send(()).unwrap();
spawner_run_fut.await;
}
}
Loading

0 comments on commit 5f20b23

Please sign in to comment.