Skip to content

Commit

Permalink
fix: shutdown ordering tasks with sigint (#629)
Browse files Browse the repository at this point in the history
* fix: pass shutdown signal to startup ordering tasks

could take hours and it wouldn't shutdown with interrupt. if you killed the process you might leave the lock on the sqlite db and need fuser to find and kill the pid to release it

* chore: clippy lint
  • Loading branch information
dav1do authored Dec 18, 2024
1 parent e95f3df commit 560dfa3
Show file tree
Hide file tree
Showing 9 changed files with 234 additions and 87 deletions.
2 changes: 1 addition & 1 deletion event-svc/src/event/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,5 +6,5 @@ mod service;
mod store;
mod validator;

pub use service::{BlockStore, DeliverableRequirement, EventService};
pub use service::{BlockStore, DeliverableRequirement, EventService, UndeliveredEventReview};
pub use validator::ChainInclusionProvider;
239 changes: 170 additions & 69 deletions event-svc/src/event/ordering_task.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use std::collections::{HashMap, VecDeque};
use std::future::Future;
use std::sync::Arc;

use anyhow::anyhow;
Expand Down Expand Up @@ -53,12 +54,14 @@ impl OrderingTask {
event_access: Arc<EventAccess>,
max_iterations: usize,
batch_size: u32,
shutdown_signal: Box<dyn Future<Output = ()>>,
) -> Result<usize> {
Self::process_all_undelivered_events_with_tasks(
event_access,
max_iterations,
batch_size,
UNDELIVERED_EVENTS_STARTUP_TASKS,
shutdown_signal,
)
.await
}
Expand All @@ -69,6 +72,7 @@ impl OrderingTask {
max_iterations: usize,
batch_size: u32,
num_tasks: u32,
shutdown_signal: Box<dyn Future<Output = ()>>,
) -> Result<usize> {
let (tx, rx_inserted) =
tokio::sync::mpsc::channel::<DiscoveredEvent>(PENDING_EVENTS_CHANNEL_DEPTH);
Expand All @@ -83,6 +87,7 @@ impl OrderingTask {
batch_size,
tx,
num_tasks,
Box::into_pin(shutdown_signal),
)
.await
{
Expand Down Expand Up @@ -653,11 +658,78 @@ impl OrderingState {
batch_size: u32,
tx: Sender<DiscoveredEvent>,
partition_size: u32,
shutdown_signal: std::pin::Pin<Box<dyn Future<Output = ()>>>,
) -> Result<usize> {
info!("Attempting to process all undelivered events. This could take some time.");
let (rx, tasks) = Self::spawn_tasks_for_undelivered_event_processing(
event_access,
max_iterations,
batch_size,
tx,
partition_size,
);

tokio::select! {
event_cnt = Self::collect_ordering_task_output(rx, tasks) => {
Ok(event_cnt)
}
_ = shutdown_signal => {
Err(Error::new_app(anyhow!("Ordering tasks aborted due to shutdown signal")))
}
}
}

let mut tasks: tokio::task::JoinSet<Result<()>> = tokio::task::JoinSet::new();
let (cnt_tx, mut rx) = tokio::sync::mpsc::channel(8);
async fn collect_ordering_task_output(
mut rx: tokio::sync::mpsc::Receiver<OrderingTaskStatus>,
mut tasks: JoinSet<Result<()>>,
) -> usize {
let mut event_cnt = 0;
while let Some(OrderingTaskStatus {
number_discovered,
new_highwater,
task_id,
}) = rx.recv().await
{
event_cnt += number_discovered;
if event_cnt % LOG_EVERY_N_ENTRIES < number_discovered {
// these values are useful but can be slightly misleading. the highwater mark will move forward/backward
// based on the task reporting, and we're counting the number discovered and sent by the task, even though
// the task doing the ordering may discover and order additional events while reviewing the events sent
info!(count=%event_cnt, highwater=%new_highwater, %task_id, "Processed undelivered events");
}
}
while let Some(res) = tasks.join_next().await {
match res {
Ok(v) => match v {
Ok(_) => {
// task finished so nothing to do
}
Err(error) => {
warn!(?error, "event ordering task failed while processing");
}
},
Err(error) => {
warn!(?error, "event ordering task failed with JoinError");
}
}
}
event_cnt
}

/// Process all undelivered events in the database. This is a blocking operation that could take a long time.
/// It is intended to be run at startup but could be used on an interval or after some errors to recover.
fn spawn_tasks_for_undelivered_event_processing(
event_access: Arc<EventAccess>,
max_iterations: usize,
batch_size: u32,
tx: Sender<DiscoveredEvent>,
partition_size: u32,
) -> (
tokio::sync::mpsc::Receiver<OrderingTaskStatus>,
JoinSet<Result<()>>,
) {
let mut tasks: JoinSet<Result<()>> = JoinSet::new();
let (cnt_tx, rx) = tokio::sync::mpsc::channel(8);
for task_id in 0..partition_size {
debug!("starting task {task_id} of {partition_size} to process undelivered events");
let tx = tx.clone();
Expand All @@ -668,10 +740,10 @@ impl OrderingState {
let mut highwater = 0;
while iter_cnt < max_iterations {
iter_cnt += 1;
let (undelivered, new_hw) = event_access
let (undelivered, new_highwater) = event_access
.undelivered_with_values(highwater, batch_size.into(), partition_size, task_id)
.await?;
highwater = new_hw;
highwater = new_highwater;
let found_something = !undelivered.is_empty();
let found_everything = undelivered.len() < batch_size as usize;
if found_something {
Expand All @@ -680,13 +752,13 @@ impl OrderingState {
// at the beginning and mark them all delivered, or we find a gap and stop processing and leave them in memory.
// In this case, we won't discover them until we start running recon with a peer, so maybe we should drop them
// or otherwise mark them ignored somehow. When this function ends, we do drop everything so for now it's probably okay.
let number_processed = OrderingState::process_undelivered_events_batch(
let number_discovered = OrderingState::process_undelivered_events_batch(
&event_access,
undelivered,
&tx,
)
.await?;
if cnt_tx.send((number_processed, new_hw, task_id)).await.is_err() {
if cnt_tx.send(OrderingTaskStatus {number_discovered, new_highwater, task_id}).await.is_err() {
warn!("undelivered task manager not available... exiting task_id={task_id}");
return Err(crate::Error::new_fatal(anyhow!("delivered task manager not available... exiting task_id={task_id}")));
}
Expand All @@ -702,36 +774,7 @@ impl OrderingState {
Ok(())
});
}
// drop our senders so the background tasks exit without waiting on us
drop(cnt_tx);
drop(tx);

let mut event_cnt = 0;
while let Some((number_processed, new_hw, task_id)) = rx.recv().await {
event_cnt += number_processed;
if event_cnt % LOG_EVERY_N_ENTRIES < number_processed {
// these values are useful but can be slightly misleading. the highwater mark will move forward/backward
// based on the task reporting, and we're counting the number discovered and sent by the task, even though
// the task doing the ordering may discover and order additional events while reviewing the events sent
info!(count=%event_cnt, highwater=%new_hw, %task_id, "Processed undelivered events");
}
}
while let Some(res) = tasks.join_next().await {
match res {
Ok(v) => match v {
Ok(_) => {
// task finished so nothing to do
}
Err(error) => {
warn!(?error, "event ordering task failed while processing");
}
},
Err(error) => {
warn!(?error, "event ordering task failed with JoinError");
}
}
}
Ok(event_cnt)
(rx, tasks)
}

async fn process_undelivered_events_batch(
Expand Down Expand Up @@ -828,6 +871,13 @@ impl OrderingState {
}
}

#[derive(Debug)]
struct OrderingTaskStatus {
task_id: u32,
new_highwater: i64,
number_discovered: usize,
}

#[cfg(test)]
mod test {
use crate::store::EventInsertable;
Expand All @@ -841,6 +891,13 @@ mod test {
};

use super::*;
fn fake_shutdown_signal() -> Box<dyn Future<Output = ()>> {
Box::new(async move {
loop {
tokio::time::sleep(std::time::Duration::from_secs(60)).await
}
})
}

async fn get_n_insertable_events(n: usize) -> Vec<EventInsertable> {
let mut res = Vec::with_capacity(n);
Expand All @@ -861,9 +918,14 @@ mod test {
async fn test_undelivered_batch_empty() {
let pool = SqlitePool::connect_in_memory().await.unwrap();
let event_access = Arc::new(EventAccess::try_new(pool).await.unwrap());
let processed = OrderingTask::process_all_undelivered_events(event_access, 1, 5)
.await
.unwrap();
let processed = OrderingTask::process_all_undelivered_events(
event_access,
1,
5,
fake_shutdown_signal(),
)
.await
.unwrap();
assert_eq!(0, processed);
}

Expand Down Expand Up @@ -905,24 +967,39 @@ mod test {
assert_eq!(1, events.len());

// we make sure to use 1 task in this test as we want to measure the progress of each iteration
let processed =
OrderingTask::process_all_undelivered_events_with_tasks(event_access.clone(), 1, 5, 1)
.await
.unwrap();
let processed = OrderingTask::process_all_undelivered_events_with_tasks(
event_access.clone(),
1,
5,
1,
fake_shutdown_signal(),
)
.await
.unwrap();
assert_eq!(5, processed);
let (_, events) = event_access.new_events_since_value(0, 100).await.unwrap();
assert_eq!(6, events.len());
let processed =
OrderingTask::process_all_undelivered_events_with_tasks(event_access.clone(), 1, 5, 1)
.await
.unwrap();
let processed = OrderingTask::process_all_undelivered_events_with_tasks(
event_access.clone(),
1,
5,
1,
fake_shutdown_signal(),
)
.await
.unwrap();
assert_eq!(4, processed);
let (_, events) = event_access.new_events_since_value(0, 100).await.unwrap();
assert_eq!(10, events.len());
let processed =
OrderingTask::process_all_undelivered_events_with_tasks(event_access.clone(), 1, 5, 1)
.await
.unwrap();
let processed = OrderingTask::process_all_undelivered_events_with_tasks(
event_access.clone(),
1,
5,
1,
fake_shutdown_signal(),
)
.await
.unwrap();
assert_eq!(0, processed);
}

Expand All @@ -940,10 +1017,15 @@ mod test {
let (_hw, event) = event_access.new_events_since_value(0, 1000).await.unwrap();
assert_eq!(5, event.len());
// we specify 1 task so we can easily expect how far it gets each run, rather than doing math against the number of spawned tasks
let processed =
OrderingTask::process_all_undelivered_events_with_tasks(event_access.clone(), 4, 10, 1)
.await
.unwrap();
let processed = OrderingTask::process_all_undelivered_events_with_tasks(
event_access.clone(),
4,
10,
1,
fake_shutdown_signal(),
)
.await
.unwrap();

let (_hw, event) = event_access.new_events_since_value(0, 1000).await.unwrap();
assert_eq!(45, event.len());
Expand All @@ -960,9 +1042,14 @@ mod test {
let _new = event_access.insert_many(insertable.iter()).await.unwrap();
let (_hw, event) = event_access.new_events_since_value(0, 1000).await.unwrap();
assert_eq!(0, event.len());
let res = OrderingTask::process_all_undelivered_events(Arc::clone(&event_access), 4, 3)
.await
.unwrap();
let res = OrderingTask::process_all_undelivered_events(
Arc::clone(&event_access),
4,
3,
fake_shutdown_signal(),
)
.await
.unwrap();
assert_eq!(res, 9);

let (_hw, event) = event_access.new_events_since_value(0, 1000).await.unwrap();
Expand All @@ -983,10 +1070,14 @@ mod test {
let (_hw, event) = event_access.new_events_since_value(0, 1000).await.unwrap();
assert_eq!(5, event.len());

let processed =
OrderingTask::process_all_undelivered_events(event_access.clone(), 100_000_000, 5)
.await
.unwrap();
let processed = OrderingTask::process_all_undelivered_events(
event_access.clone(),
100_000_000,
5,
fake_shutdown_signal(),
)
.await
.unwrap();

let (_hw, event) = event_access.new_events_since_value(0, 1000).await.unwrap();
assert_eq!(50, event.len());
Expand All @@ -1007,10 +1098,14 @@ mod test {
.await
.unwrap();
assert_eq!(1000, event.len());
let _res =
OrderingTask::process_all_undelivered_events(Arc::clone(&event_access), 100_000_000, 5)
.await
.unwrap();
let _res = OrderingTask::process_all_undelivered_events(
Arc::clone(&event_access),
100_000_000,
5,
fake_shutdown_signal(),
)
.await
.unwrap();

let (_hw, event) = event_access
.new_events_since_value(0, 100_000)
Expand Down Expand Up @@ -1058,6 +1153,7 @@ mod test {
Arc::clone(&event_access),
100_000_000,
100,
fake_shutdown_signal(),
)
.await
.unwrap();
Expand Down Expand Up @@ -1106,9 +1202,14 @@ mod test {

let (_hw, event) = event_access.new_events_since_value(0, 1000).await.unwrap();
assert_eq!(5, event.len());
let processed = OrderingTask::process_all_undelivered_events(event_access.clone(), 1, 100)
.await
.unwrap();
let processed = OrderingTask::process_all_undelivered_events(
event_access.clone(),
1,
100,
fake_shutdown_signal(),
)
.await
.unwrap();

let (_hw, cids) = event_access.new_events_since_value(0, 1000).await.unwrap();
assert_eq!(50, cids.len());
Expand Down
Loading

0 comments on commit 560dfa3

Please sign in to comment.