Skip to content

Commit

Permalink
Store all ledger task handles onto a buffer
Browse files Browse the repository at this point in the history
The abortable spawner then proceeds to await on all of these handles.
There is a special pinned handle, that allows resuming unwinding upon
encountering a panic on that task.
  • Loading branch information
sug0 committed Sep 2, 2024
1 parent 5d9ca5a commit f212986
Show file tree
Hide file tree
Showing 3 changed files with 114 additions and 93 deletions.
93 changes: 86 additions & 7 deletions crates/node/src/abortable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ pub struct AbortableSpawner {
abort_send: UnboundedSender<AbortingTask>,
abort_recv: UnboundedReceiver<AbortingTask>,
cleanup_jobs: Vec<Pin<Box<dyn Future<Output = ()>>>>,
pinned: Option<(AbortingTask, JoinHandle<ShellResult<()>>)>,
batch: Vec<(AbortingTask, JoinHandle<ShellResult<()>>)>,
}

/// Contains the state of an on-going [`AbortableSpawner`] task spawn.
Expand All @@ -28,6 +30,7 @@ pub struct AbortableTaskBuilder<'a, A> {
abortable: A,
spawner: &'a mut AbortableSpawner,
cleanup: Option<Pin<Box<dyn Future<Output = ()>>>>,
pin: bool,
}

impl Default for AbortableSpawner {
Expand All @@ -47,6 +50,8 @@ impl AbortableSpawner {
abort_recv,
shutdown_recv,
cleanup_jobs: Vec::new(),
batch: Vec::new(),
pinned: None,
}
}

Expand All @@ -63,6 +68,7 @@ impl AbortableSpawner {
/// println!("I have signaled a control task that I am no longer running!");
/// })
/// .spawn();
/// spawner.run_to_completion().await;
/// ```
///
/// The return type of this method is [`AbortableTaskBuilder`], such that a
Expand All @@ -79,17 +85,22 @@ impl AbortableSpawner {
abortable,
spawner: self,
cleanup: None,
pin: false,
}
}

/// This future will resolve when:
/// Wait for any of the spawned tasks to abort.
///
/// ## Resolving this future
///
/// This future runs to completion if:
///
/// 1. A user sends a shutdown signal (e.g. SIGINT), or...
/// 2. One of the child processes of the ledger terminates, which
/// generates a notification upon dropping an [`Aborter`].
///
/// These two scenarios are represented by the [`AborterStatus`] enum.
pub async fn wait_for_abort(mut self) -> AborterStatus {
async fn wait_for_abort(mut self) -> AborterStatus {
let status = tokio::select! {
_ = self.shutdown_recv.wait_for_shutdown() => AborterStatus::UserShutdownLedger,
msg = self.abort_recv.recv() => {
Expand All @@ -109,6 +120,49 @@ impl AbortableSpawner {
status
}

/// Run all the spawned tasks to completion.
pub async fn run_to_completion(mut self) {
let pinned_task = self.pinned.take();

let batch = std::mem::take(&mut self.batch);
let (task_ids, task_handles): (Vec<_>, Vec<_>) =
batch.into_iter().unzip();

// Wait for interrupt signal or abort message
let aborted = self.wait_for_abort().await.child_terminated();

// Wait for all managed tasks to finish
match futures::future::try_join_all(task_handles).await {
Ok(results) => {
for (i, res) in results.into_iter().enumerate() {
match res {
Err(err) if aborted => {
let who = task_ids[i];
tracing::error!("{who} error: {err}");
}
_ => {}
}
}
}
Err(err) => {
// Ignore cancellation errors
if !err.is_cancelled() {
tracing::error!("Abortable spawner error: {err}");
}
}
}

if let Some((who, pinned_task)) = pinned_task {
match pinned_task.await {
Err(err) if err.is_panic() => {
std::panic::resume_unwind(err.into_panic())
}
Err(err) => tracing::error!("{who} error: {err}"),
_ => {}
}
}
}

fn spawn_abortable_task<A, F>(
&self,
who: AbortingTask,
Expand Down Expand Up @@ -144,28 +198,45 @@ impl AbortableSpawner {
impl<'a, A> AbortableTaskBuilder<'a, A> {
/// Spawn the built abortable task into the runtime.
#[inline]
pub fn spawn<F>(self) -> JoinHandle<ShellResult<()>>
pub fn spawn<F>(self)
where
A: FnOnce(Aborter) -> F,
F: Future<Output = ShellResult<()>> + Send + 'static,
{
if let Some(cleanup) = self.cleanup {
self.spawner.cleanup_jobs.push(cleanup);
}
self.spawner.spawn_abortable_task(self.who, self.abortable)
let task = self.spawner.spawn_abortable_task(self.who, self.abortable);
if self.pin {
if let Some(pinned_task) = self.spawner.pinned.take() {
self.spawner.batch.push(pinned_task);
}
self.spawner.pinned = Some((self.who, task));
} else {
self.spawner.batch.push((self.who, task));
}
}

/// Spawn the built abortable (blocking) task into the runtime.
#[inline]
pub fn spawn_blocking(self) -> JoinHandle<ShellResult<()>>
pub fn spawn_blocking(self)
where
A: FnOnce(Aborter) -> ShellResult<()> + Send + 'static,
{
if let Some(cleanup) = self.cleanup {
self.spawner.cleanup_jobs.push(cleanup);
}
self.spawner
.spawn_abortable_task_blocking(self.who, self.abortable)
let task = self
.spawner
.spawn_abortable_task_blocking(self.who, self.abortable);
if self.pin {
if let Some(pinned_task) = self.spawner.pinned.take() {
self.spawner.batch.push(pinned_task);
}
self.spawner.pinned = Some((self.who, task));
} else {
self.spawner.batch.push((self.who, task));
}
}

/// A cleanup routine `cleanup` will be executed for the associated task.
Expand All @@ -178,6 +249,14 @@ impl<'a, A> AbortableTaskBuilder<'a, A> {
self.cleanup = Some(Box::pin(cleanup));
self
}

/// Pin the task to spawn. The main purpose behind this operation
/// is to resume unwinding the stack if the pinned task panics.
#[inline]
pub fn pin(mut self) -> Self {
self.pin = true;
self
}
}

/// A panic-proof handle for aborting a future. Will abort during stack
Expand Down
4 changes: 2 additions & 2 deletions crates/node/src/ethereum_oracle/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,7 @@ pub fn run_oracle<C: RpcClient>(
control: control::Receiver,
last_processed_block: last_processed_block::Sender,
spawner: &mut AbortableSpawner,
) -> tokio::task::JoinHandle<Result<(), crate::shell::Error>> {
) {
let url = url.as_ref().to_owned();
spawner
.abortable("Ethereum Oracle", move |aborter| {
Expand Down Expand Up @@ -317,7 +317,7 @@ pub fn run_oracle<C: RpcClient>(
drop(aborter);
Ok(())
})
.spawn_blocking()
.spawn_blocking();
}

/// Determine what action to take after attempting to
Expand Down
Loading

0 comments on commit f212986

Please sign in to comment.