Skip to content

Commit

Permalink
[nextest-runner] remove cancellation AtomicBool
Browse files Browse the repository at this point in the history
We've changed things such that the executor always checks in with the
dispatcher before starting a unit. This makes the bool unnecessary.

The cancel receiver is currently necessary for the
handle_delay_between_attempts step, which should cancel the run no matter what.
We'll remove this shortly by moving this functionality into req_rx.
  • Loading branch information
sunshowers committed Dec 15, 2024
1 parent b6b7077 commit 95be733
Show file tree
Hide file tree
Showing 3 changed files with 10 additions and 51 deletions.
12 changes: 2 additions & 10 deletions nextest-runner/src/runner/dispatcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,7 @@ use crate::{
use chrono::Local;
use debug_ignore::DebugIgnore;
use quick_junit::ReportUuid;
use std::{
collections::BTreeMap,
sync::atomic::{AtomicBool, Ordering},
time::Duration,
};
use std::{collections::BTreeMap, time::Duration};
use tokio::sync::{
broadcast,
mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender},
Expand Down Expand Up @@ -102,7 +98,6 @@ where
signal_handler: &mut SignalHandler,
input_handler: &mut InputHandler,
report_cancel_rx: oneshot::Receiver<()>,
cancelled_ref: &AtomicBool,
cancellation_sender: broadcast::Sender<()>,
) -> RunnerTaskState {
let mut report_cancel_rx = std::pin::pin!(report_cancel_rx);
Expand Down Expand Up @@ -275,10 +270,7 @@ where
self.info_finished(total.saturating_sub(index + 1));
}
HandleEventResponse::Cancel(cancel) => {
// A cancellation notice was received. Note the ordering here:
// cancelled_ref is set *before* notifications are broadcast. This
// prevents race conditions.
cancelled_ref.store(true, Ordering::Release);
// A cancellation notice was received.
let _ = cancellation_sender.send(());
match cancel {
// Some of the branches here don't do anything, but are specified
Expand Down
34 changes: 6 additions & 28 deletions nextest-runner/src/runner/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,7 @@ use std::{
num::NonZeroUsize,
pin::Pin,
process::{ExitStatus, Stdio},
sync::{
atomic::{AtomicBool, Ordering},
Arc,
},
sync::Arc,
time::Duration,
};
use tokio::{
Expand Down Expand Up @@ -92,7 +89,6 @@ impl<'a> ExecutorContext<'a> {
pub(super) async fn run_setup_scripts(
&self,
resp_tx: UnboundedSender<ExecutorEvent<'a>>,
cancelled_ref: &AtomicBool,
) -> SetupScriptExecuteData<'a> {
let setup_scripts = self.profile.setup_scripts(self.test_list);
let total = setup_scripts.len();
Expand All @@ -108,11 +104,6 @@ impl<'a> ExecutorContext<'a> {
let config = script.config;

let script_fut = async move {
if cancelled_ref.load(Ordering::Acquire) {
// Check for test cancellation.
return None;
}

let (req_rx_tx, req_rx_rx) = oneshot::channel();
let _ = this_resp_tx.send(ExecutorEvent::SetupScriptStarted {
script_id: script_id.clone(),
Expand Down Expand Up @@ -172,15 +163,9 @@ impl<'a> ExecutorContext<'a> {
test_instance: TestInstance<'a>,
settings: TestSettings<'a>,
resp_tx: UnboundedSender<ExecutorEvent<'a>>,
cancelled_ref: &AtomicBool,
mut cancel_receiver: broadcast::Receiver<()>,
setup_script_data: Arc<SetupScriptExecuteData<'a>>,
) {
if cancelled_ref.load(Ordering::Acquire) {
// Check for test cancellation.
return;
}

debug!(test_name = test_instance.name, "running test");

let settings = Arc::new(settings);
Expand Down Expand Up @@ -224,14 +209,10 @@ impl<'a> ExecutorContext<'a> {
total_attempts,
};

// Note: do not check for cancellation here.
// Only check for cancellation after the first
// run, to avoid a situation where run_statuses
// is empty.

if retry_data.attempt > 1 {
// Ensure that the dispatcher believes the run is still ongoing. If the run is
// cancelled, the dispatcher will let us know.
// Ensure that the dispatcher believes the run is still ongoing.
// If the run is cancelled, the dispatcher will let us know by
// dropping the receiver.
let (tx, rx) = oneshot::channel();
_ = resp_tx.send(ExecutorEvent::RetryStarted {
test_instance,
Expand All @@ -242,8 +223,8 @@ impl<'a> ExecutorContext<'a> {
match rx.await {
Ok(()) => {}
Err(_) => {
// The receiver was dropped -- the dispatcher has signaled that this unit
// should exit.
// The receiver was dropped -- the dispatcher has
// signaled that this unit should exit.
return;
}
}
Expand All @@ -265,9 +246,6 @@ impl<'a> ExecutorContext<'a> {
if run_status.result.is_success() {
// The test succeeded.
break run_status;
} else if cancelled_ref.load(Ordering::Acquire) {
// The test was cancelled.
break run_status;
} else if retry_data.attempt < retry_data.total_attempts {
// Retry this test: send a retry event, then retry the loop.
delay = backoff_iter
Expand Down
15 changes: 2 additions & 13 deletions nextest-runner/src/runner/imp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,7 @@ use async_scoped::TokioScope;
use future_queue::StreamExt;
use futures::prelude::*;
use quick_junit::ReportUuid;
use std::{
convert::Infallible,
fmt,
sync::{atomic::AtomicBool, Arc},
};
use std::{convert::Infallible, fmt, sync::Arc};
use tokio::{
runtime::Runtime,
sync::{broadcast, mpsc::unbounded_channel, oneshot},
Expand Down Expand Up @@ -178,7 +174,6 @@ impl<'a> TestRunner<'a> {
F: FnMut(TestEvent<'a>) -> Result<(), E> + Send,
E: fmt::Debug + Send,
{
let cancelled = AtomicBool::new(false);
let (report_cancel_tx, report_cancel_rx) = oneshot::channel();

// If report_cancel_tx is None, at least one error has occurred and the
Expand All @@ -190,7 +185,6 @@ impl<'a> TestRunner<'a> {
let res = self.inner.execute(
&mut self.signal_handler,
&mut self.input_handler,
&cancelled,
report_cancel_rx,
|event| {
match callback(event) {
Expand Down Expand Up @@ -247,7 +241,6 @@ impl<'a> TestRunnerInner<'a> {
&self,
signal_handler: &mut SignalHandler,
input_handler: &mut InputHandler,
cancelled_ref: &AtomicBool,
report_cancel_rx: oneshot::Receiver<()>,
callback: F,
) -> Result<RunStats, Vec<JoinError>>
Expand Down Expand Up @@ -295,7 +288,6 @@ impl<'a> TestRunnerInner<'a> {
signal_handler,
input_handler,
report_cancel_rx,
cancelled_ref,
cancellation_sender.clone(),
);
scope.spawn_cancellable(dispatcher_fut, || RunnerTaskState::Cancelled);
Expand All @@ -305,9 +297,7 @@ impl<'a> TestRunnerInner<'a> {
let run_scripts_fut = async move {
// Since script tasks are run serially, we just reuse the one
// script task.
let script_data = executor_cx_ref
.run_setup_scripts(script_resp_tx, cancelled_ref)
.await;
let script_data = executor_cx_ref.run_setup_scripts(script_resp_tx).await;
if script_tx.send(script_data).is_err() {
// The dispatcher has shut down, so we should too.
debug!("script_tx.send failed, shutting down");
Expand Down Expand Up @@ -369,7 +359,6 @@ impl<'a> TestRunnerInner<'a> {
test_instance,
settings,
resp_tx.clone(),
cancelled_ref,
cancel_rx,
setup_script_data,
))
Expand Down

0 comments on commit 95be733

Please sign in to comment.