Skip to content

Commit

Permalink
hq wait --without-close
Browse files Browse the repository at this point in the history
  • Loading branch information
spirali authored and Kobzol committed Aug 23, 2024
1 parent 21b6b61 commit ffe4be7
Show file tree
Hide file tree
Showing 9 changed files with 80 additions and 22 deletions.
8 changes: 7 additions & 1 deletion crates/hyperqueue/src/bin/hq.rs
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,13 @@ async fn command_job_task_ids(

async fn command_job_wait(gsettings: &GlobalSettings, opts: JobWaitOpts) -> anyhow::Result<()> {
let mut connection = get_client_session(gsettings.server_directory()).await?;
wait_for_jobs(gsettings, &mut connection, opts.selector).await
wait_for_jobs(
gsettings,
&mut connection,
opts.selector,
!opts.without_close,
)
.await
}

async fn command_job_progress(
Expand Down
1 change: 1 addition & 0 deletions crates/hyperqueue/src/client/commands/submit/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -720,6 +720,7 @@ pub(crate) async fn send_submit_request(
gsettings,
session,
IdSelector::Specific(IntArray::from_id(info.id.into())),
true,
)
.await?;
} else if progress {
Expand Down
2 changes: 2 additions & 0 deletions crates/hyperqueue/src/client/commands/wait.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,14 @@ pub async fn wait_for_jobs(
gsettings: &GlobalSettings,
session: &mut ClientSession,
selector: IdSelector,
wait_for_close: bool,
) -> anyhow::Result<()> {
let start = SystemTime::now();
let response = rpc_call!(
session.connection(),
FromClientMessage::WaitForJobs(WaitForJobsRequest {
selector: selector.clone(),
wait_for_close,
}),
ToClientMessage::WaitForJobsResponse(r) => r
)
Expand Down
4 changes: 4 additions & 0 deletions crates/hyperqueue/src/common/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -335,6 +335,10 @@ pub struct JobWaitOpts {
/// Select job(s) to wait for
#[arg(value_parser = parse_last_all_range)]
pub selector: IdSelector,

/// Wait only for task completion and terminate even job is open
#[clap(long, action)]
pub without_close: bool,
}

#[derive(Parser)]
Expand Down
8 changes: 5 additions & 3 deletions crates/hyperqueue/src/server/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,8 @@ pub async fn client_rpc_loop<
autoalloc::handle_autoalloc_message(&server_dir, senders, msg).await
}
FromClientMessage::WaitForJobs(msg) => {
handle_wait_for_jobs_message(&state_ref, msg.selector).await
handle_wait_for_jobs_message(&state_ref, msg.selector, msg.wait_for_close)
.await
}
FromClientMessage::OpenJob(job_description) => {
handle_open_job(&state_ref, senders, job_description)
Expand Down Expand Up @@ -209,6 +210,7 @@ pub async fn client_rpc_loop<
async fn handle_wait_for_jobs_message(
state_ref: &StateRef,
selector: IdSelector,
wait_for_close: bool,
) -> ToClientMessage {
let update_counters = |response: &mut WaitForJobsResponse, counters: &JobTaskCounters| {
if counters.n_canceled_tasks > 0 {
Expand All @@ -230,10 +232,10 @@ async fn handle_wait_for_jobs_message(
for job_id in job_ids {
match state.get_job_mut(job_id) {
Some(job) => {
if job.is_terminated() {
if job.has_no_active_tasks() && !(wait_for_close && job.is_open) {
update_counters(&mut response, &job.counters);
} else {
let rx = job.subscribe_to_completion();
let rx = job.subscribe_to_completion(wait_for_close);
receivers.push(rx);
}
}
Expand Down
63 changes: 45 additions & 18 deletions crates/hyperqueue/src/server/job.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,11 @@ pub struct JobPart {
pub job_desc: Arc<JobDescription>,
}

pub struct JobCompletionCallback {
callback: oneshot::Sender<JobId>,
wait_for_close: bool,
}

pub struct Job {
pub job_id: JobId,
pub counters: JobTaskCounters,
Expand All @@ -131,7 +136,7 @@ pub struct Job {

/// Holds channels that will receive information about the job after the it finishes in any way.
/// You can subscribe to the completion message with [`Self::subscribe_to_completion`].
completion_callbacks: Vec<oneshot::Sender<JobId>>,
completion_callbacks: Vec<JobCompletionCallback>,
}

impl Job {
Expand Down Expand Up @@ -207,6 +212,10 @@ impl Job {
self.tasks.len() as JobTaskCount
}

pub fn has_no_active_tasks(&self) -> bool {
self.counters.is_terminated(self.n_tasks())
}

pub fn is_terminated(&self) -> bool {
!self.is_open && self.counters.is_terminated(self.n_tasks())
}
Expand Down Expand Up @@ -264,23 +273,38 @@ impl Job {
}

pub fn check_termination(&mut self, senders: &Senders, now: DateTime<Utc>) {
if self.is_terminated() {
self.completion_date = Some(now);
if self
.submit_descs
.first()
.map(|x| x.0.log.is_some())
.unwrap_or(false)
{
senders
.backend
.send_stream_control(StreamServerControlMessage::UnregisterStream(self.job_id));
}
if self.has_no_active_tasks() {
if self.is_open {
let callbacks = std::mem::take(&mut self.completion_callbacks);
self.completion_callbacks = callbacks
.into_iter()
.filter_map(|c| {
if !c.wait_for_close {
c.callback.send(self.job_id).ok();
None
} else {
Some(c)
}
})
.collect();
} else {
self.completion_date = Some(now);
if self
.submit_descs
.first()
.map(|x| x.0.log.is_some())
.unwrap_or(false)
{
senders.backend.send_stream_control(
StreamServerControlMessage::UnregisterStream(self.job_id),
);
}

for handler in self.completion_callbacks.drain(..) {
handler.send(self.job_id).ok();
for handler in self.completion_callbacks.drain(..) {
handler.callback.send(self.job_id).ok();
}
senders.events.on_job_completed(self.job_id);
}
senders.events.on_job_completed(self.job_id);
}
}

Expand Down Expand Up @@ -378,9 +402,12 @@ impl Job {
/// Subscribes to the completion event of this job.
/// When the job finishes in any way (completion, failure, cancellation), the channel will
/// receive a single message.
pub fn subscribe_to_completion(&mut self) -> oneshot::Receiver<JobId> {
pub fn subscribe_to_completion(&mut self, wait_for_close: bool) -> oneshot::Receiver<JobId> {
let (tx, rx) = oneshot::channel();
self.completion_callbacks.push(tx);
self.completion_callbacks.push(JobCompletionCallback {
callback: tx,
wait_for_close,
});
rx
}
}
Expand Down
1 change: 1 addition & 0 deletions crates/hyperqueue/src/transfer/messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -301,6 +301,7 @@ pub struct AllocationQueueParams {
#[derive(Serialize, Deserialize, Debug)]
pub struct WaitForJobsRequest {
pub selector: IdSelector,
pub wait_for_close: bool,
}

#[derive(Serialize, Deserialize, Debug)]
Expand Down
3 changes: 3 additions & 0 deletions docs/jobs/openjobs.md
Original file line number Diff line number Diff line change
Expand Up @@ -103,3 +103,6 @@ Closing of already closed job throws an error.
Leaving open jobs has no overhead, but it does affect the semantics of job completion.
A job is considered completed when all tasks have been completed and the job is *closed*.
Therefore, `hq job wait ...` will wait until all tasks of the selected jobs are complete and the jobs are closed.

If you want to wait only for completion of tasks and ignoring if job is open or closed then there
is `hq job wait --without-close ...`.
12 changes: 12 additions & 0 deletions tests/test_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -1291,6 +1291,18 @@ def test_job_wait_for_close(hq_env: HqEnv):
assert r == 0


def test_job_wait_for_open_job_without_close(hq_env: HqEnv):
hq_env.start_server()
hq_env.command(["job", "open"])

hq_env.command(["job", "wait", "1", "--without-close"])

hq_env.start_worker()
hq_env.command(["submit", "--job=1", "--", "sleep", "0"])

hq_env.command(["job", "wait", "1", "--without-close"])


def test_invalid_attach(hq_env: HqEnv):
hq_env.start_server()
hq_env.command(["submit", "--", "sleep", "0"])
Expand Down

0 comments on commit ffe4be7

Please sign in to comment.