diff --git a/nextest-runner/src/runner/dispatcher.rs b/nextest-runner/src/runner/dispatcher.rs index da31ed618f3..47bbba14782 100644 --- a/nextest-runner/src/runner/dispatcher.rs +++ b/nextest-runner/src/runner/dispatcher.rs @@ -25,7 +25,6 @@ use debug_ignore::DebugIgnore; use quick_junit::ReportUuid; use std::{collections::BTreeMap, time::Duration}; use tokio::sync::{ - broadcast, mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender}, oneshot, }; @@ -98,7 +97,6 @@ where signal_handler: &mut SignalHandler, input_handler: &mut InputHandler, report_cancel_rx: oneshot::Receiver<()>, - cancellation_sender: broadcast::Sender<()>, ) -> RunnerTaskState { let mut report_cancel_rx = std::pin::pin!(report_cancel_rx); @@ -271,17 +269,17 @@ where } HandleEventResponse::Cancel(cancel) => { // A cancellation notice was received. - let _ = cancellation_sender.send(()); match cancel { // Some of the branches here don't do anything, but are specified // for readability. CancelEvent::Report => { // An error was produced by the reporter, and cancellation has // begun. + self.broadcast_request(RunUnitRequest::OtherCancel); } CancelEvent::TestFailure => { - // A test failure has caused cancellation to begin. Nothing to - // do here. + // A test failure has caused cancellation to begin. + self.broadcast_request(RunUnitRequest::OtherCancel); } CancelEvent::Signal(req) => { // A signal has caused cancellation to begin. Let all the child diff --git a/nextest-runner/src/runner/executor.rs b/nextest-runner/src/runner/executor.rs index 05faa927bca..bae979e087f 100644 --- a/nextest-runner/src/runner/executor.rs +++ b/nextest-runner/src/runner/executor.rs @@ -45,7 +45,6 @@ use std::{ use tokio::{ process::Child, sync::{ - broadcast, mpsc::{UnboundedReceiver, UnboundedSender}, oneshot, }, @@ -163,7 +162,6 @@ impl<'a> ExecutorContext<'a> { test_instance: TestInstance<'a>, settings: TestSettings<'a>, resp_tx: UnboundedSender>, - mut cancel_receiver: broadcast::Receiver<()>, setup_script_data: Arc>, ) { debug!(test_name = test_instance.name, "running test"); @@ -268,7 +266,6 @@ impl<'a> ExecutorContext<'a> { previous_result, previous_slow, delay, - &mut cancel_receiver, &mut req_rx, ) .await; @@ -464,6 +461,10 @@ impl<'a> ExecutorContext<'a> { slow_timeout.grace_period ).await; } + RunUnitRequest::OtherCancel => { + // Ignore non-signal cancellation requests -- + // let the script finish. + } RunUnitRequest::Query(RunUnitQuery::GetInfo(sender)) => { _ = sender.send(script.info_response( UnitState::Running { @@ -706,6 +707,10 @@ impl<'a> ExecutorContext<'a> { slow_timeout.grace_period, ).await; } + RunUnitRequest::OtherCancel => { + // Ignore non-signal cancellation requests -- + // let the test finish. + } RunUnitRequest::Query(RunUnitQuery::GetInfo(tx)) => { _ = tx.send(test.info_response( UnitState::Running { @@ -986,7 +991,6 @@ async fn handle_delay_between_attempts<'a>( previous_result: ExecutionResult, previous_slow: bool, delay: Duration, - cancel_receiver: &mut broadcast::Receiver<()>, req_rx: &mut UnboundedReceiver>, ) { let mut sleep = std::pin::pin!(crate::time::pausable_sleep(delay)); @@ -999,10 +1003,6 @@ async fn handle_delay_between_attempts<'a>( // The timer has expired. break; } - _ = cancel_receiver.recv() => { - // The cancel signal was received. - break; - } recv = req_rx.recv() => { let req = recv.expect("req_rx sender is open"); @@ -1025,6 +1025,11 @@ async fn handle_delay_between_attempts<'a>( // shutdown. break; } + RunUnitRequest::OtherCancel => { + // If a cancellation was requested, break out of the + // loop. + break; + } RunUnitRequest::Query(RunUnitQuery::GetInfo(tx)) => { let waiting_snapshot = waiting_stopwatch.snapshot(); _ = tx.send( @@ -1096,6 +1101,10 @@ async fn detect_fd_leaks<'a>( RunUnitRequest::Signal(_) => { // The process is done executing, so signals are moot. } + RunUnitRequest::OtherCancel => { + // Ignore non-signal cancellation requests -- let the + // unit finish. + } RunUnitRequest::Query(RunUnitQuery::GetInfo(sender)) => { let snapshot = waiting_stopwatch.snapshot(); let resp = cx.info_response( diff --git a/nextest-runner/src/runner/imp.rs b/nextest-runner/src/runner/imp.rs index 4f7a794db3e..3a6f27ca970 100644 --- a/nextest-runner/src/runner/imp.rs +++ b/nextest-runner/src/runner/imp.rs @@ -23,7 +23,7 @@ use quick_junit::ReportUuid; use std::{convert::Infallible, fmt, sync::Arc}; use tokio::{ runtime::Runtime, - sync::{broadcast, mpsc::unbounded_channel, oneshot}, + sync::{mpsc::unbounded_channel, oneshot}, task::JoinError, }; use tracing::{debug, warn}; @@ -280,16 +280,10 @@ impl<'a> TestRunnerInner<'a> { let ((), results) = TokioScope::scope_and_block(move |scope| { let (resp_tx, resp_rx) = unbounded_channel::>(); - let (cancellation_sender, _cancel_receiver) = broadcast::channel(1); // Run the dispatcher to completion in a task. - let dispatcher_fut = dispatcher_cx_mut.run( - resp_rx, - signal_handler, - input_handler, - report_cancel_rx, - cancellation_sender.clone(), - ); + let dispatcher_fut = + dispatcher_cx_mut.run(resp_rx, signal_handler, input_handler, report_cancel_rx); scope.spawn_cancellable(dispatcher_fut, || RunnerTaskState::Cancelled); let (script_tx, mut script_rx) = unbounded_channel::>(); @@ -330,7 +324,6 @@ impl<'a> TestRunnerInner<'a> { TestGroup::Global => None, TestGroup::Custom(name) => Some(name.clone()), }; - let cancel_rx = cancellation_sender.subscribe(); let resp_tx = resp_tx.clone(); let setup_script_data = setup_script_data.clone(); @@ -359,7 +352,6 @@ impl<'a> TestRunnerInner<'a> { test_instance, settings, resp_tx.clone(), - cancel_rx, setup_script_data, )) }) diff --git a/nextest-runner/src/runner/internal_events.rs b/nextest-runner/src/runner/internal_events.rs index 3d47783a015..cb03206d226 100644 --- a/nextest-runner/src/runner/internal_events.rs +++ b/nextest-runner/src/runner/internal_events.rs @@ -183,6 +183,9 @@ impl InternalSetupScriptExecuteStatus<'_> { #[derive(Clone, Debug)] pub(super) enum RunUnitRequest<'a> { Signal(SignalRequest), + /// Non-signal cancellation requests (e.g. test failures) which should cause + /// tests to exit in some states. + OtherCancel, Query(RunUnitQuery<'a>), } @@ -197,6 +200,7 @@ impl<'a> RunUnitRequest<'a> { #[cfg(unix)] Self::Signal(SignalRequest::Continue) => {} Self::Signal(SignalRequest::Shutdown(_)) => {} + Self::OtherCancel => {} Self::Query(RunUnitQuery::GetInfo(tx)) => { // The receiver being dead isn't really important. _ = tx.send(status.info_response()); diff --git a/nextest-runner/src/runner/unix.rs b/nextest-runner/src/runner/unix.rs index de621eca0eb..8a26549c610 100644 --- a/nextest-runner/src/runner/unix.rs +++ b/nextest-runner/src/runner/unix.rs @@ -155,6 +155,10 @@ pub(super) async fn terminate_child<'a>( } break; } + RunUnitRequest::OtherCancel => { + // Ignore non-signal cancellation requests (most + // likely another test failed). Let the unit finish. + } RunUnitRequest::Query(RunUnitQuery::GetInfo(sender)) => { let waiting_snapshot = waiting_stopwatch.snapshot(); _ = sender.send( diff --git a/nextest-runner/src/runner/windows.rs b/nextest-runner/src/runner/windows.rs index 5a9226ade3e..8f0b4cccc57 100644 --- a/nextest-runner/src/runner/windows.rs +++ b/nextest-runner/src/runner/windows.rs @@ -119,6 +119,10 @@ pub(super) async fn terminate_child<'a>( // immediately -- go to the next step. break false; } + RunUnitRequest::OtherCancel => { + // Ignore non-signal cancellation requests (most + // likely another test failed). Let the unit finish. + } RunUnitRequest::Query(RunUnitQuery::GetInfo(sender)) => { let waiting_snapshot = waiting_stopwatch.snapshot(); _ = sender.send(