Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(watch): exit command if persistent task exits #8858

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions crates/turborepo-lib/src/run/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,13 @@ impl Run {
Ok(Some((sender, handle)))
}

/// Returns a handle that can be used to stop a run
pub fn stopper(&self) -> RunStopper {
RunStopper {
manager: self.processes.clone(),
}
}

pub async fn run(
&mut self,
experimental_ui_sender: Option<AppSender>,
Expand All @@ -215,6 +222,11 @@ impl Run {
if let Some(subscriber) = self.signal_handler.subscribe() {
let run_cache = self.run_cache.clone();
tokio::spawn(async move {
// Caching is disabled for watch so we don't need to wait on shutting down the
// cache.
if is_watch {
return;
}
let _guard = subscriber.listen().await;
let spinner = turborepo_ui::start_spinner("...Finishing writing to cache...");
if let Ok((status, closed)) = run_cache.shutdown_cache().await {
Expand Down Expand Up @@ -439,3 +451,14 @@ impl Run {
Ok(exit_code)
}
}

#[derive(Debug, Clone)]
pub struct RunStopper {
manager: ProcessManager,
}

impl RunStopper {
pub async fn stop(&self) {
self.manager.stop().await;
}
}
77 changes: 61 additions & 16 deletions crates/turborepo-lib/src/run/watch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ impl ChangedPackages {
pub struct WatchClient {
run: Run,
watched_packages: HashSet<PackageName>,
persistent_tasks_handle: Option<JoinHandle<Result<i32, run::Error>>>,
persistent_tasks_handle: Option<PersistentRunHandle>,
connector: DaemonConnector,
base: CommandBase,
telemetry: CommandEventBuilder,
Expand All @@ -56,6 +56,11 @@ pub struct WatchClient {
ui_handle: Option<JoinHandle<Result<(), tui::Error>>>,
}

struct PersistentRunHandle {
stopper: run::RunStopper,
run_task: JoinHandle<Result<i32, run::Error>>,
}

#[derive(Debug, Error, Diagnostic)]
pub enum Error {
#[error("failed to connect to daemon")]
Expand Down Expand Up @@ -95,6 +100,8 @@ pub enum Error {
PackageChange(#[from] tonic::Status),
#[error("could not connect to UI thread")]
UISend(String),
#[error("persistent tasks exited unexpectedly")]
PersistentExit,
}

impl WatchClient {
Expand Down Expand Up @@ -169,14 +176,32 @@ impl WatchClient {
};

let run_fut = async {
loop {
notify_run.notified().await;
let mut persistent_exit = None;
'outer: loop {
if let Some(persistent) = &mut persistent_exit {
// here we watch both notify *and* persistent task
// if notify exits, then continue per usual
// if persist exits, then we break out of loop with a
select! {
_ = notify_run.notified() => {},
_ = persistent => {
break 'outer;
}
}
} else {
notify_run.notified().await;
}
let changed_packages_guard = changed_packages.lock().await;
if !changed_packages_guard.borrow().is_empty() {
let changed_packages = changed_packages_guard.take();
self.execute_run(changed_packages).await?;
let (_result, persistent_exit_rx) = self.execute_run(changed_packages).await?;
// Only update persist exit if a new one was created
if let Some(rx) = persistent_exit_rx {
persistent_exit = Some(rx);
}
}
}
Err(Error::PersistentExit)
};

select! {
Expand Down Expand Up @@ -226,7 +251,10 @@ impl WatchClient {
Ok(())
}

async fn execute_run(&mut self, changed_packages: ChangedPackages) -> Result<i32, Error> {
async fn execute_run(
&mut self,
changed_packages: ChangedPackages,
) -> Result<(i32, Option<tokio::sync::oneshot::Receiver<()>>), Error> {
// Should we recover here?
match changed_packages {
ChangedPackages::Some(packages) => {
Expand Down Expand Up @@ -270,7 +298,7 @@ impl WatchClient {
.build(&signal_handler, telemetry)
.await?;

Ok(run.run(self.ui_sender.clone(), true).await?)
Ok((run.run(self.ui_sender.clone(), true).await?, None))
}
ChangedPackages::All => {
let mut args = self.base.args().clone();
Expand Down Expand Up @@ -304,6 +332,14 @@ impl WatchClient {

self.watched_packages = self.run.get_relevant_packages();

// Clean up currently running persistent tasks
if let Some(PersistentRunHandle { stopper, run_task }) =
self.persistent_tasks_handle.take()
{
// Shut down the tasks for the run
stopper.stop().await;
run_task.abort();
}
if let Some(sender) = &self.ui_sender {
let task_names = self.run.engine.tasks_with_command(&self.run.pkg_dep_graph);
sender
Expand All @@ -312,24 +348,33 @@ impl WatchClient {
}

if self.run.has_persistent_tasks() {
// Abort old run
if let Some(run) = self.persistent_tasks_handle.take() {
run.abort();
}

debug_assert!(
self.persistent_tasks_handle.is_none(),
"persistent handle should be empty before creating a new one"
);
let mut persistent_run = self.run.create_run_for_persistent_tasks();
let ui_sender = self.ui_sender.clone();
// If we have persistent tasks, we run them on a separate thread
// since persistent tasks don't finish
self.persistent_tasks_handle = Some(tokio::spawn(async move {
persistent_run.run(ui_sender, true).await
}));
let (persist_guard, persist_exit) = tokio::sync::oneshot::channel::<()>();
self.persistent_tasks_handle = Some(PersistentRunHandle {
stopper: persistent_run.stopper(),
run_task: tokio::spawn(async move {
// We move the guard in here so we can determine if the persist tasks
// exit as it'll go out of scope and drop.
let _guard = persist_guard;
persistent_run.run(ui_sender, true).await
}),
});

// But we still run the regular tasks blocking
let mut non_persistent_run = self.run.create_run_without_persistent_tasks();
Ok(non_persistent_run.run(self.ui_sender.clone(), true).await?)
Ok((
non_persistent_run.run(self.ui_sender.clone(), true).await?,
Some(persist_exit),
))
} else {
Ok(self.run.run(self.ui_sender.clone(), true).await?)
Ok((self.run.run(self.ui_sender.clone(), true).await?, None))
}
}
}
Expand Down
3 changes: 3 additions & 0 deletions crates/turborepo-ui/src/tui/app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,7 @@ impl<W> App<W> {
/// If finished, removes from finished and starts again as new task.
#[tracing::instrument(skip(self, output_logs))]
pub fn start_task(&mut self, task: &str, output_logs: OutputLogs) -> Result<(), Error> {
debug!("starting {task}");
// Name of currently highlighted task.
// We will use this after the order switches.
let highlighted_task = self
Expand Down Expand Up @@ -202,6 +203,7 @@ impl<W> App<W> {
/// Errors if given task wasn't a running task
#[tracing::instrument(skip(self, result))]
pub fn finish_task(&mut self, task: &str, result: TaskResult) -> Result<(), Error> {
debug!("finishing task {task}");
// Name of currently highlighted task.
// We will use this after the order switches.
let highlighted_task = self
Expand Down Expand Up @@ -265,6 +267,7 @@ impl<W> App<W> {

#[tracing::instrument(skip(self))]
pub fn update_tasks(&mut self, tasks: Vec<String>) {
debug!("updating task list: {tasks:?}");
// Make sure all tasks have a terminal output
for task in &tasks {
self.tasks
Expand Down
Loading