Skip to content

Adding an abort trigger and changing how tasks shutdown #230

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion sim-cli/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ async fn main() -> anyhow::Result<()> {

ctrlc::set_handler(move || {
log::info!("Shutting down simulation.");
sim2.shutdown();
sim2.abort();
})?;

sim.run().await?;
Expand Down
202 changes: 115 additions & 87 deletions simln-lib/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -521,6 +521,8 @@ pub struct Simulation {
/// High level triggers used to manage simulation tasks and shutdown.
shutdown_trigger: Trigger,
shutdown_listener: Listener,
abort_trigger: Trigger,
abort_listener: Listener,
}

#[derive(Clone)]
Expand Down Expand Up @@ -548,13 +550,16 @@ impl Simulation {
activity: Vec<ActivityDefinition>,
) -> Self {
let (shutdown_trigger, shutdown_listener) = triggered::trigger();
let (abort_trigger, abort_listener) = triggered::trigger();
Self {
cfg,
nodes,
activity,
results: Arc::new(Mutex::new(PaymentResultLogger::new())),
shutdown_trigger,
shutdown_listener,
abort_trigger,
abort_listener,
}
}

Expand Down Expand Up @@ -661,90 +666,95 @@ impl Simulation {
);
let mut tasks = JoinSet::new();

// Before we start the simulation up, start tasks that will be responsible for gathering simulation data.
// The event channels are shared across our functionality:
// - Event Sender: used by the simulation to inform data reporting that it needs to start tracking the
// final result of the event that it has taken.
// - Event Receiver: used by data reporting to receive events that have been simulated that need to be
// tracked and recorded.
let (event_sender, event_receiver) = channel(1);
self.run_data_collection(event_receiver, &mut tasks);

// Get an execution kit per activity that we need to generate and spin up consumers for each source node.
let activities = match self.activity_executors().await {
Ok(a) => a,
Err(e) => {
// If we encounter an error while setting up the activity_executors,
// we need to shutdown and wait for tasks to finish. We have started background tasks in the
// run_data_collection function, so we should shut those down before returning.
self.shutdown();
while let Some(res) = tasks.join_next().await {
if let Err(e) = res {
log::error!("Task exited with error: {e}.");
}
}
return Err(e);
},
};
let consumer_channels = self.dispatch_consumers(
activities
.iter()
.map(|generator| generator.source_info.pubkey)
.collect(),
event_sender.clone(),
&mut tasks,
);

// Next, we'll spin up our actual producers that will be responsible for triggering the configured activity.
// The producers will use their own JoinSet so that the simulation can be shutdown if they all finish.
let mut producer_tasks = JoinSet::new();
match self
.dispatch_producers(activities, consumer_channels, &mut producer_tasks)
.await
{
Ok(_) => {},
Err(e) => {
// If we encounter an error in dispatch_producers, we need to shutdown and wait for tasks to finish.
// We have started background tasks in the run_data_collection function,
// so we should shut those down before returning.
self.shutdown();
while let Some(res) = tasks.join_next().await {
if let Err(e) = res {
log::error!("Task exited with error: {e}.");
// A block to control the scope of consumer_channels and event_sender. These need to go out of scope so that receivers will close.

// Before we start the simulation up, start tasks that will be responsible for gathering simulation data.
// The event channels are shared across our functionality:
// - Event Sender: used by the simulation to inform data reporting that it needs to start tracking the
// final result of the event that it has taken.
// - Event Receiver: used by data reporting to receive events that have been simulated that need to be
// tracked and recorded.
let (event_sender, event_receiver) = channel(1);
self.run_data_collection(event_receiver, &mut tasks);

// Get an execution kit per activity that we need to generate and spin up consumers for each source node.
let activities = match self.activity_executors().await {
Ok(a) => a,
Err(e) => {
// If we encounter an error while setting up the activity_executors,
// we need to shutdown and wait for tasks to finish. We have started background tasks in the
// run_data_collection function, so we should shut those down before returning.
// The tasks started in run_data_collection are listening for the abort trigger.
self.abort_trigger.trigger();
while let Some(res) = tasks.join_next().await {
if let Err(e) = res {
log::error!("Task exited with error: {e}.");
}
}
}
return Err(e);
},
}
return Err(e);
},
};
let consumer_channels = self.dispatch_consumers(
activities
.iter()
.map(|generator| generator.source_info.pubkey)
.collect(),
event_sender.clone(),
&mut tasks,
);

// Start a task that waits for the producers to finish.
// If all producers finish, then there is nothing left to do and the simulation can be shutdown.
let producer_trigger = self.shutdown_trigger.clone();
tasks.spawn(async move {
while let Some(res) = producer_tasks.join_next().await {
if let Err(e) = res {
log::error!("Producer exited with error: {e}.");
}
// Next, we'll spin up our actual producers that will be responsible for triggering the configured activity.
// The producers will use their own JoinSet so that the simulation can be shutdown if they all finish.
let mut producer_tasks = JoinSet::new();
match self
.dispatch_producers(activities, consumer_channels, &mut producer_tasks)
.await
{
Ok(_) => {},
Err(e) => {
// If we encounter an error in dispatch_producers, we need to shutdown and wait for tasks to finish.
// We have started background tasks in the run_data_collection function,
// so we should shut those down before returning.
self.shutdown();
while let Some(res) = tasks.join_next().await {
if let Err(e) = res {
log::error!("Task exited with error: {e}.");
}
}
return Err(e);
},
}
log::info!("All producers finished. Shutting down.");
producer_trigger.trigger()
});

// Start a task that will shutdown the simulation if the total_time is met.
if let Some(total_time) = self.cfg.total_time {
let t = self.shutdown_trigger.clone();
let l = self.shutdown_listener.clone();

// Start a task that waits for the producers to finish.
// If all producers finish, then there is nothing left to do and the simulation can be shutdown.
let producer_trigger = self.shutdown_trigger.clone();
tasks.spawn(async move {
if time::timeout(total_time, l).await.is_err() {
log::info!(
"Simulation run for {}s. Shutting down.",
total_time.as_secs()
);
t.trigger()
while let Some(res) = producer_tasks.join_next().await {
if let Err(e) = res {
log::error!("Producer exited with error: {e}.");
}
}
log::info!("All producers finished. Shutting down.");
producer_trigger.trigger()
});
}

// Start a task that will shutdown the simulation if the total_time is met.
if let Some(total_time) = self.cfg.total_time {
let t = self.shutdown_trigger.clone();
let l = self.shutdown_listener.clone();

tasks.spawn(async move {
if time::timeout(total_time, l).await.is_err() {
log::info!(
"Simulation run for {}s. Shutting down.",
total_time.as_secs()
);
t.trigger()
}
});
}
} // A block to control the scope of consumer_channels and event_sender. These need to go out of scope so that receivers will close.

// We always want to wait for all threads to exit, so we wait for all of them to exit and track any errors
// that surface. It's okay if there are multiple and one is overwritten, we just want to know whether we
Expand All @@ -764,6 +774,11 @@ impl Simulation {
self.shutdown_trigger.trigger()
}

pub fn abort(&self) {
self.shutdown_trigger.trigger();
self.abort_trigger.trigger();
}

pub async fn get_total_payments(&self) -> u64 {
self.results.lock().await.total_attempts()
}
Expand All @@ -779,17 +794,17 @@ impl Simulation {
output_receiver: Receiver<SimulationOutput>,
tasks: &mut JoinSet<()>,
) {
let listener = self.shutdown_listener.clone();
let shutdown = self.shutdown_trigger.clone();
log::debug!("Setting up simulator data collection.");

// Create a sender/receiver pair that will be used to report final results of simulation.
let (results_sender, results_receiver) = channel(1);

let nodes = self.nodes.clone();
// psr: produce simulation results
let psr_listener = listener.clone();
let psr_shutdown = shutdown.clone();
// psr should trigger a clean shutdown if there is an error
// psr should listen for an abort, not shutdown because it will cleanly shutdown when the receiver closes
let psr_listener = self.abort_listener.clone();
let psr_shutdown = self.shutdown_trigger.clone();
tasks.spawn(async move {
log::debug!("Starting simulation results producer.");
if let Err(e) =
Expand All @@ -806,11 +821,15 @@ impl Simulation {
let result_logger = self.results.clone();

let result_logger_clone = result_logger.clone();
let result_logger_listener = listener.clone();
// rl: results logger
// rl should listen for both shutdowns and aborts because it does not have any channels that will cause a shutdown
let rl_shutdown_listener = self.shutdown_listener.clone();
let rl_abort_listener = self.abort_listener.clone();
tasks.spawn(async move {
log::debug!("Starting results logger.");
run_results_logger(
result_logger_listener,
rl_shutdown_listener,
rl_abort_listener,
result_logger_clone,
Duration::from_secs(60),
)
Expand All @@ -819,18 +838,22 @@ impl Simulation {
});

// csr: consume simulation results
// crs should trigger a clean shutdown if there is an error
// crs should listen for an abort, not shutdown because it will cleanly shutdown when the receiver closes
let csr_write_results = self.cfg.write_results.clone();
let csr_shutdown = self.shutdown_trigger.clone();
let csr_abort = self.abort_listener.clone();
tasks.spawn(async move {
log::debug!("Starting simulation results consumer.");
if let Err(e) = consume_simulation_results(
result_logger,
results_receiver,
listener,
csr_abort,
csr_write_results,
)
.await
{
shutdown.trigger();
csr_shutdown.trigger();
log::error!("Consume simulation results exited with error: {e:?}.");
} else {
log::debug!("Consume simulation result received shutdown signal.");
Expand Down Expand Up @@ -957,7 +980,7 @@ impl Simulation {
// Generate a consumer for the receiving end of the channel. It takes the event receiver that it'll pull
// events from and the results sender to report the events it has triggered for further monitoring.
// ce: consume event
let ce_listener = self.shutdown_listener.clone();
let ce_listener = self.abort_listener.clone();
let ce_shutdown = self.shutdown_trigger.clone();
let ce_output_sender = output_sender.clone();
let ce_node = node.clone();
Expand Down Expand Up @@ -1316,7 +1339,8 @@ impl Display for PaymentResultLogger {
/// Note that `run_results_logger` does not error in any way, thus it has no
/// trigger. It listens for triggers to ensure clean exit.
async fn run_results_logger(
listener: Listener,
shutdown_listener: Listener,
abort_listener: Listener,
logger: Arc<Mutex<PaymentResultLogger>>,
interval: Duration,
) {
Expand All @@ -1325,7 +1349,11 @@ async fn run_results_logger(
loop {
select! {
biased;
_ = listener.clone() => {
_ = shutdown_listener.clone() => {
break
}

_ = abort_listener.clone() => {
break
}

Expand Down