Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: shutdown ordering tasks with sigint #629

Merged
merged 2 commits into from
Dec 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion event-svc/src/event/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,5 +6,5 @@ mod service;
mod store;
mod validator;

pub use service::{BlockStore, DeliverableRequirement, EventService};
pub use service::{BlockStore, DeliverableRequirement, EventService, UndeliveredEventReview};
pub use validator::ChainInclusionProvider;
239 changes: 170 additions & 69 deletions event-svc/src/event/ordering_task.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use std::collections::{HashMap, VecDeque};
use std::future::Future;
use std::sync::Arc;

use anyhow::anyhow;
Expand Down Expand Up @@ -53,12 +54,14 @@ impl OrderingTask {
event_access: Arc<EventAccess>,
max_iterations: usize,
batch_size: u32,
shutdown_signal: Box<dyn Future<Output = ()>>,
) -> Result<usize> {
Self::process_all_undelivered_events_with_tasks(
event_access,
max_iterations,
batch_size,
UNDELIVERED_EVENTS_STARTUP_TASKS,
shutdown_signal,
)
.await
}
Expand All @@ -69,6 +72,7 @@ impl OrderingTask {
max_iterations: usize,
batch_size: u32,
num_tasks: u32,
shutdown_signal: Box<dyn Future<Output = ()>>,
) -> Result<usize> {
let (tx, rx_inserted) =
tokio::sync::mpsc::channel::<DiscoveredEvent>(PENDING_EVENTS_CHANNEL_DEPTH);
Expand All @@ -83,6 +87,7 @@ impl OrderingTask {
batch_size,
tx,
num_tasks,
Box::into_pin(shutdown_signal),
)
.await
{
Expand Down Expand Up @@ -653,11 +658,78 @@ impl OrderingState {
batch_size: u32,
tx: Sender<DiscoveredEvent>,
partition_size: u32,
shutdown_signal: std::pin::Pin<Box<dyn Future<Output = ()>>>,
) -> Result<usize> {
info!("Attempting to process all undelivered events. This could take some time.");
let (rx, tasks) = Self::spawn_tasks_for_undelivered_event_processing(
event_access,
max_iterations,
batch_size,
tx,
partition_size,
);

tokio::select! {
event_cnt = Self::collect_ordering_task_output(rx, tasks) => {
Ok(event_cnt)
}
_ = shutdown_signal => {
Err(Error::new_app(anyhow!("Ordering tasks aborted due to shutdown signal")))
}
}
}

let mut tasks: tokio::task::JoinSet<Result<()>> = tokio::task::JoinSet::new();
let (cnt_tx, mut rx) = tokio::sync::mpsc::channel(8);
async fn collect_ordering_task_output(
mut rx: tokio::sync::mpsc::Receiver<OrderingTaskStatus>,
mut tasks: JoinSet<Result<()>>,
) -> usize {
let mut event_cnt = 0;
while let Some(OrderingTaskStatus {
number_discovered,
new_highwater,
task_id,
}) = rx.recv().await
{
event_cnt += number_discovered;
if event_cnt % LOG_EVERY_N_ENTRIES < number_discovered {
// these values are useful but can be slightly misleading. the highwater mark will move forward/backward
// based on the task reporting, and we're counting the number discovered and sent by the task, even though
// the task doing the ordering may discover and order additional events while reviewing the events sent
info!(count=%event_cnt, highwater=%new_highwater, %task_id, "Processed undelivered events");
}
}
while let Some(res) = tasks.join_next().await {
match res {
Ok(v) => match v {
Ok(_) => {
// task finished so nothing to do
}
Err(error) => {
warn!(?error, "event ordering task failed while processing");
}
},
Err(error) => {
warn!(?error, "event ordering task failed with JoinError");
}
}
}
event_cnt
}

/// Process all undelivered events in the database. This is a blocking operation that could take a long time.
/// It is intended to be run at startup but could be used on an interval or after some errors to recover.
fn spawn_tasks_for_undelivered_event_processing(
event_access: Arc<EventAccess>,
max_iterations: usize,
batch_size: u32,
tx: Sender<DiscoveredEvent>,
partition_size: u32,
) -> (
tokio::sync::mpsc::Receiver<OrderingTaskStatus>,
JoinSet<Result<()>>,
) {
let mut tasks: JoinSet<Result<()>> = JoinSet::new();
let (cnt_tx, rx) = tokio::sync::mpsc::channel(8);
for task_id in 0..partition_size {
debug!("starting task {task_id} of {partition_size} to process undelivered events");
let tx = tx.clone();
Expand All @@ -668,10 +740,10 @@ impl OrderingState {
let mut highwater = 0;
while iter_cnt < max_iterations {
iter_cnt += 1;
let (undelivered, new_hw) = event_access
let (undelivered, new_highwater) = event_access
.undelivered_with_values(highwater, batch_size.into(), partition_size, task_id)
.await?;
highwater = new_hw;
highwater = new_highwater;
let found_something = !undelivered.is_empty();
let found_everything = undelivered.len() < batch_size as usize;
if found_something {
Expand All @@ -680,13 +752,13 @@ impl OrderingState {
// at the beginning and mark them all delivered, or we find a gap and stop processing and leave them in memory.
// In this case, we won't discover them until we start running recon with a peer, so maybe we should drop them
// or otherwise mark them ignored somehow. When this function ends, we do drop everything so for now it's probably okay.
let number_processed = OrderingState::process_undelivered_events_batch(
let number_discovered = OrderingState::process_undelivered_events_batch(
&event_access,
undelivered,
&tx,
)
.await?;
if cnt_tx.send((number_processed, new_hw, task_id)).await.is_err() {
if cnt_tx.send(OrderingTaskStatus {number_discovered, new_highwater, task_id}).await.is_err() {
warn!("undelivered task manager not available... exiting task_id={task_id}");
return Err(crate::Error::new_fatal(anyhow!("delivered task manager not available... exiting task_id={task_id}")));
}
Expand All @@ -702,36 +774,7 @@ impl OrderingState {
Ok(())
});
}
// drop our senders so the background tasks exit without waiting on us
drop(cnt_tx);
drop(tx);

let mut event_cnt = 0;
while let Some((number_processed, new_hw, task_id)) = rx.recv().await {
event_cnt += number_processed;
if event_cnt % LOG_EVERY_N_ENTRIES < number_processed {
// these values are useful but can be slightly misleading. the highwater mark will move forward/backward
// based on the task reporting, and we're counting the number discovered and sent by the task, even though
// the task doing the ordering may discover and order additional events while reviewing the events sent
info!(count=%event_cnt, highwater=%new_hw, %task_id, "Processed undelivered events");
}
}
while let Some(res) = tasks.join_next().await {
match res {
Ok(v) => match v {
Ok(_) => {
// task finished so nothing to do
}
Err(error) => {
warn!(?error, "event ordering task failed while processing");
}
},
Err(error) => {
warn!(?error, "event ordering task failed with JoinError");
}
}
}
Ok(event_cnt)
(rx, tasks)
}

async fn process_undelivered_events_batch(
Expand Down Expand Up @@ -828,6 +871,13 @@ impl OrderingState {
}
}

#[derive(Debug)]
struct OrderingTaskStatus {
task_id: u32,
new_highwater: i64,
number_discovered: usize,
}

#[cfg(test)]
mod test {
use crate::store::EventInsertable;
Expand All @@ -841,6 +891,13 @@ mod test {
};

use super::*;
fn fake_shutdown_signal() -> Box<dyn Future<Output = ()>> {
Box::new(async move {
loop {
tokio::time::sleep(std::time::Duration::from_secs(60)).await
}
})
}

async fn get_n_insertable_events(n: usize) -> Vec<EventInsertable> {
let mut res = Vec::with_capacity(n);
Expand All @@ -861,9 +918,14 @@ mod test {
async fn test_undelivered_batch_empty() {
let pool = SqlitePool::connect_in_memory().await.unwrap();
let event_access = Arc::new(EventAccess::try_new(pool).await.unwrap());
let processed = OrderingTask::process_all_undelivered_events(event_access, 1, 5)
.await
.unwrap();
let processed = OrderingTask::process_all_undelivered_events(
event_access,
1,
5,
fake_shutdown_signal(),
)
.await
.unwrap();
assert_eq!(0, processed);
}

Expand Down Expand Up @@ -905,24 +967,39 @@ mod test {
assert_eq!(1, events.len());

// we make sure to use 1 task in this test as we want to measure the progress of each iteration
let processed =
OrderingTask::process_all_undelivered_events_with_tasks(event_access.clone(), 1, 5, 1)
.await
.unwrap();
let processed = OrderingTask::process_all_undelivered_events_with_tasks(
event_access.clone(),
1,
5,
1,
fake_shutdown_signal(),
)
.await
.unwrap();
assert_eq!(5, processed);
let (_, events) = event_access.new_events_since_value(0, 100).await.unwrap();
assert_eq!(6, events.len());
let processed =
OrderingTask::process_all_undelivered_events_with_tasks(event_access.clone(), 1, 5, 1)
.await
.unwrap();
let processed = OrderingTask::process_all_undelivered_events_with_tasks(
event_access.clone(),
1,
5,
1,
fake_shutdown_signal(),
)
.await
.unwrap();
assert_eq!(4, processed);
let (_, events) = event_access.new_events_since_value(0, 100).await.unwrap();
assert_eq!(10, events.len());
let processed =
OrderingTask::process_all_undelivered_events_with_tasks(event_access.clone(), 1, 5, 1)
.await
.unwrap();
let processed = OrderingTask::process_all_undelivered_events_with_tasks(
event_access.clone(),
1,
5,
1,
fake_shutdown_signal(),
)
.await
.unwrap();
assert_eq!(0, processed);
}

Expand All @@ -940,10 +1017,15 @@ mod test {
let (_hw, event) = event_access.new_events_since_value(0, 1000).await.unwrap();
assert_eq!(5, event.len());
// we specify 1 task so we can easily expect how far it gets each run, rather than doing math against the number of spawned tasks
let processed =
OrderingTask::process_all_undelivered_events_with_tasks(event_access.clone(), 4, 10, 1)
.await
.unwrap();
let processed = OrderingTask::process_all_undelivered_events_with_tasks(
event_access.clone(),
4,
10,
1,
fake_shutdown_signal(),
)
.await
.unwrap();

let (_hw, event) = event_access.new_events_since_value(0, 1000).await.unwrap();
assert_eq!(45, event.len());
Expand All @@ -960,9 +1042,14 @@ mod test {
let _new = event_access.insert_many(insertable.iter()).await.unwrap();
let (_hw, event) = event_access.new_events_since_value(0, 1000).await.unwrap();
assert_eq!(0, event.len());
let res = OrderingTask::process_all_undelivered_events(Arc::clone(&event_access), 4, 3)
.await
.unwrap();
let res = OrderingTask::process_all_undelivered_events(
Arc::clone(&event_access),
4,
3,
fake_shutdown_signal(),
)
.await
.unwrap();
assert_eq!(res, 9);

let (_hw, event) = event_access.new_events_since_value(0, 1000).await.unwrap();
Expand All @@ -983,10 +1070,14 @@ mod test {
let (_hw, event) = event_access.new_events_since_value(0, 1000).await.unwrap();
assert_eq!(5, event.len());

let processed =
OrderingTask::process_all_undelivered_events(event_access.clone(), 100_000_000, 5)
.await
.unwrap();
let processed = OrderingTask::process_all_undelivered_events(
event_access.clone(),
100_000_000,
5,
fake_shutdown_signal(),
)
.await
.unwrap();

let (_hw, event) = event_access.new_events_since_value(0, 1000).await.unwrap();
assert_eq!(50, event.len());
Expand All @@ -1007,10 +1098,14 @@ mod test {
.await
.unwrap();
assert_eq!(1000, event.len());
let _res =
OrderingTask::process_all_undelivered_events(Arc::clone(&event_access), 100_000_000, 5)
.await
.unwrap();
let _res = OrderingTask::process_all_undelivered_events(
Arc::clone(&event_access),
100_000_000,
5,
fake_shutdown_signal(),
)
.await
.unwrap();

let (_hw, event) = event_access
.new_events_since_value(0, 100_000)
Expand Down Expand Up @@ -1058,6 +1153,7 @@ mod test {
Arc::clone(&event_access),
100_000_000,
100,
fake_shutdown_signal(),
)
.await
.unwrap();
Expand Down Expand Up @@ -1106,9 +1202,14 @@ mod test {

let (_hw, event) = event_access.new_events_since_value(0, 1000).await.unwrap();
assert_eq!(5, event.len());
let processed = OrderingTask::process_all_undelivered_events(event_access.clone(), 1, 100)
.await
.unwrap();
let processed = OrderingTask::process_all_undelivered_events(
event_access.clone(),
1,
100,
fake_shutdown_signal(),
)
.await
.unwrap();

let (_hw, cids) = event_access.new_events_since_value(0, 1000).await.unwrap();
assert_eq!(50, cids.len());
Expand Down
Loading
Loading