diff --git a/crates/hyperqueue/src/bin/hq.rs b/crates/hyperqueue/src/bin/hq.rs index f4516fede..137f4aea4 100644 --- a/crates/hyperqueue/src/bin/hq.rs +++ b/crates/hyperqueue/src/bin/hq.rs @@ -154,7 +154,13 @@ async fn command_job_task_ids( async fn command_job_wait(gsettings: &GlobalSettings, opts: JobWaitOpts) -> anyhow::Result<()> { let mut connection = get_client_session(gsettings.server_directory()).await?; - wait_for_jobs(gsettings, &mut connection, opts.selector).await + wait_for_jobs( + gsettings, + &mut connection, + opts.selector, + !opts.without_close, + ) + .await } async fn command_job_progress( diff --git a/crates/hyperqueue/src/client/commands/submit/command.rs b/crates/hyperqueue/src/client/commands/submit/command.rs index 6d0bcd12e..64c1bddd8 100644 --- a/crates/hyperqueue/src/client/commands/submit/command.rs +++ b/crates/hyperqueue/src/client/commands/submit/command.rs @@ -720,6 +720,7 @@ pub(crate) async fn send_submit_request( gsettings, session, IdSelector::Specific(IntArray::from_id(info.id.into())), + true, ) .await?; } else if progress { diff --git a/crates/hyperqueue/src/client/commands/wait.rs b/crates/hyperqueue/src/client/commands/wait.rs index b0d62c9ab..70a3d0ca1 100644 --- a/crates/hyperqueue/src/client/commands/wait.rs +++ b/crates/hyperqueue/src/client/commands/wait.rs @@ -25,12 +25,14 @@ pub async fn wait_for_jobs( gsettings: &GlobalSettings, session: &mut ClientSession, selector: IdSelector, + wait_for_close: bool, ) -> anyhow::Result<()> { let start = SystemTime::now(); let response = rpc_call!( session.connection(), FromClientMessage::WaitForJobs(WaitForJobsRequest { selector: selector.clone(), + wait_for_close, }), ToClientMessage::WaitForJobsResponse(r) => r ) diff --git a/crates/hyperqueue/src/common/cli.rs b/crates/hyperqueue/src/common/cli.rs index cb4385189..27cc6393f 100644 --- a/crates/hyperqueue/src/common/cli.rs +++ b/crates/hyperqueue/src/common/cli.rs @@ -335,6 +335,10 @@ pub struct JobWaitOpts { /// Select job(s) to wait for #[arg(value_parser = parse_last_all_range)] pub selector: IdSelector, + + /// Wait only for task completion and terminate even job is open + #[clap(long, action)] + pub without_close: bool, } #[derive(Parser)] diff --git a/crates/hyperqueue/src/server/client/mod.rs b/crates/hyperqueue/src/server/client/mod.rs index 00fbe1e9d..bcc63460c 100644 --- a/crates/hyperqueue/src/server/client/mod.rs +++ b/crates/hyperqueue/src/server/client/mod.rs @@ -157,7 +157,8 @@ pub async fn client_rpc_loop< autoalloc::handle_autoalloc_message(&server_dir, senders, msg).await } FromClientMessage::WaitForJobs(msg) => { - handle_wait_for_jobs_message(&state_ref, msg.selector).await + handle_wait_for_jobs_message(&state_ref, msg.selector, msg.wait_for_close) + .await } FromClientMessage::OpenJob(job_description) => { handle_open_job(&state_ref, senders, job_description) @@ -209,6 +210,7 @@ pub async fn client_rpc_loop< async fn handle_wait_for_jobs_message( state_ref: &StateRef, selector: IdSelector, + wait_for_close: bool, ) -> ToClientMessage { let update_counters = |response: &mut WaitForJobsResponse, counters: &JobTaskCounters| { if counters.n_canceled_tasks > 0 { @@ -230,10 +232,10 @@ async fn handle_wait_for_jobs_message( for job_id in job_ids { match state.get_job_mut(job_id) { Some(job) => { - if job.is_terminated() { + if job.has_no_active_tasks() && !(wait_for_close && job.is_open) { update_counters(&mut response, &job.counters); } else { - let rx = job.subscribe_to_completion(); + let rx = job.subscribe_to_completion(wait_for_close); receivers.push(rx); } } diff --git a/crates/hyperqueue/src/server/job.rs b/crates/hyperqueue/src/server/job.rs index 041c08aca..c479c30cd 100644 --- a/crates/hyperqueue/src/server/job.rs +++ b/crates/hyperqueue/src/server/job.rs @@ -113,6 +113,11 @@ pub struct JobPart { pub job_desc: Arc, } +pub struct JobCompletionCallback { + callback: oneshot::Sender, + wait_for_close: bool, +} + pub struct Job { pub job_id: JobId, pub counters: JobTaskCounters, @@ -131,7 +136,7 @@ pub struct Job { /// Holds channels that will receive information about the job after the it finishes in any way. /// You can subscribe to the completion message with [`Self::subscribe_to_completion`]. - completion_callbacks: Vec>, + completion_callbacks: Vec, } impl Job { @@ -207,6 +212,10 @@ impl Job { self.tasks.len() as JobTaskCount } + pub fn has_no_active_tasks(&self) -> bool { + self.counters.is_terminated(self.n_tasks()) + } + pub fn is_terminated(&self) -> bool { !self.is_open && self.counters.is_terminated(self.n_tasks()) } @@ -264,23 +273,38 @@ impl Job { } pub fn check_termination(&mut self, senders: &Senders, now: DateTime) { - if self.is_terminated() { - self.completion_date = Some(now); - if self - .submit_descs - .first() - .map(|x| x.0.log.is_some()) - .unwrap_or(false) - { - senders - .backend - .send_stream_control(StreamServerControlMessage::UnregisterStream(self.job_id)); - } + if self.has_no_active_tasks() { + if self.is_open { + let callbacks = std::mem::take(&mut self.completion_callbacks); + self.completion_callbacks = callbacks + .into_iter() + .filter_map(|c| { + if !c.wait_for_close { + c.callback.send(self.job_id).ok(); + None + } else { + Some(c) + } + }) + .collect(); + } else { + self.completion_date = Some(now); + if self + .submit_descs + .first() + .map(|x| x.0.log.is_some()) + .unwrap_or(false) + { + senders.backend.send_stream_control( + StreamServerControlMessage::UnregisterStream(self.job_id), + ); + } - for handler in self.completion_callbacks.drain(..) { - handler.send(self.job_id).ok(); + for handler in self.completion_callbacks.drain(..) { + handler.callback.send(self.job_id).ok(); + } + senders.events.on_job_completed(self.job_id); } - senders.events.on_job_completed(self.job_id); } } @@ -378,9 +402,12 @@ impl Job { /// Subscribes to the completion event of this job. /// When the job finishes in any way (completion, failure, cancellation), the channel will /// receive a single message. - pub fn subscribe_to_completion(&mut self) -> oneshot::Receiver { + pub fn subscribe_to_completion(&mut self, wait_for_close: bool) -> oneshot::Receiver { let (tx, rx) = oneshot::channel(); - self.completion_callbacks.push(tx); + self.completion_callbacks.push(JobCompletionCallback { + callback: tx, + wait_for_close, + }); rx } } diff --git a/crates/hyperqueue/src/transfer/messages.rs b/crates/hyperqueue/src/transfer/messages.rs index 5487adcf4..d767fa129 100644 --- a/crates/hyperqueue/src/transfer/messages.rs +++ b/crates/hyperqueue/src/transfer/messages.rs @@ -301,6 +301,7 @@ pub struct AllocationQueueParams { #[derive(Serialize, Deserialize, Debug)] pub struct WaitForJobsRequest { pub selector: IdSelector, + pub wait_for_close: bool, } #[derive(Serialize, Deserialize, Debug)] diff --git a/docs/jobs/openjobs.md b/docs/jobs/openjobs.md index 490e7b58a..c276cc509 100644 --- a/docs/jobs/openjobs.md +++ b/docs/jobs/openjobs.md @@ -103,3 +103,6 @@ Closing of already closed job throws an error. Leaving open jobs has no overhead, but it does affect the semantics of job completion. A job is considered completed when all tasks have been completed and the job is *closed*. Therefore, `hq job wait ...` will wait until all tasks of the selected jobs are complete and the jobs are closed. + +If you want to wait only for completion of tasks and ignoring if job is open or closed then there +is `hq job wait --without-close ...`. diff --git a/tests/test_job.py b/tests/test_job.py index 163daa211..8cccc6ee4 100644 --- a/tests/test_job.py +++ b/tests/test_job.py @@ -1291,6 +1291,18 @@ def test_job_wait_for_close(hq_env: HqEnv): assert r == 0 +def test_job_wait_for_open_job_without_close(hq_env: HqEnv): + hq_env.start_server() + hq_env.command(["job", "open"]) + + hq_env.command(["job", "wait", "1", "--without-close"]) + + hq_env.start_worker() + hq_env.command(["submit", "--job=1", "--", "sleep", "0"]) + + hq_env.command(["job", "wait", "1", "--without-close"]) + + def test_invalid_attach(hq_env: HqEnv): hq_env.start_server() hq_env.command(["submit", "--", "sleep", "0"])