Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open jobs #740

Merged
merged 10 commits into from
Aug 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 19 additions & 11 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
# Dev

## New features

* It is now possible to dynamically submit new tasks into an existing job (we call this concept "Open jobs").
See [Open jobs documentation](https://it4innovations.github.io/hyperqueue/stable/jobs/openjobs/)

## Fixes

* HQ should no longer crash while printing job info when a failed task does not have any workers
Expand All @@ -9,7 +14,9 @@

## New features

* Server resilience. Server state can be loaded back from a journal when it crashes. This will restore the state of submitted jobs and also autoallocator queues. Find out more [here](https://it4innovations.github.io/hyperqueue/stable/deployment/server/#resuming-stoppedcrashed-server).
* Server resilience. Server state can be loaded back from a journal when it crashes. This will restore the state of
submitted jobs and also autoallocator queues. Find out
more [here](https://it4innovations.github.io/hyperqueue/stable/deployment/server/#resuming-stoppedcrashed-server).

* `HQ_NUM_NODES` for multi-node tasks introduced. It contains the number of nodes assigned to task.
You do not need to manually count lines in `HQ_NODE_FILE` anymore.
Expand Down Expand Up @@ -150,7 +157,7 @@ an old client/worker to a new server (Connecting a new client/worker to an old s

```console
$ hq job submit-file myfile.toml
```
```

* You can now specify (indexed) resource values provided by workers as strings (previously only
integers were allowed). Notably, automatic detection of Nvidia GPUs specified with string UUIDs
Expand Down Expand Up @@ -517,7 +524,7 @@ would pass `OMP_NUM_THREADS=4` to the executed `<program>`.
```bash
# Print stdout of all tasks of job 1
$ hq job cat 1 stdout

# Print stderr of tasks 1, 2, 3 of job 5
$ hq job cat 5 stderr --tasks 1-3
```
Expand Down Expand Up @@ -564,6 +571,7 @@ would pass `OMP_NUM_THREADS=4` to the executed `<program>`.
* You can now generate shell completion using the `hq generate-completion <shell>` command.

## Changes

### CLI

* The command line interface for jobs has been changed to be more consistent with the interface for
Expand All @@ -572,14 +580,14 @@ would pass `OMP_NUM_THREADS=4` to the executed `<program>`.
is `hq submit`, which is now a shortcut for `hq job submit`. Here is a table of changed commands:

| **Previous command** | **New command** |
|------------------|--------------------|
| `hq jobs` | `hq job list` |
| `hq job` | `hq job info` |
| `hq resubmit` | `hq job resubmit` |
| `hq cancel` | `hq job cancel` |
| `hq wait` | `hq job wait` |
| `hq progress` | `hq job progress` |
| `hq submit` | `hq submit` or `hq job submit` |
|----------------------|--------------------|
| `hq jobs` | `hq job list` |
| `hq job` | `hq job info` |
| `hq resubmit` | `hq job resubmit` |
| `hq cancel` | `hq job cancel` |
| `hq wait` | `hq job wait` |
| `hq progress` | `hq job progress` |
| `hq submit` | `hq submit` or `hq job submit` |

* The `--tasks` flag of the `hq job info <job-id>` command has been removed. If you want to display the
individual tasks of a job, please use the `hq task list <job-id>` command.
Expand Down
10 changes: 10 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/hyperqueue/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ chumsky = "0.9"
toml = "0.8"
signal-hook = "0.3"
core_affinity = "0.8"
itertools = "0.13.0"

# Dashboard
ratatui = { version = "0.26", default-features = false, features = ["termion"], optional = true }
Expand Down
43 changes: 35 additions & 8 deletions crates/hyperqueue/src/bin/hq.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,13 @@ use colored::Colorize;
use hyperqueue::client::commands::autoalloc::command_autoalloc;
use hyperqueue::client::commands::event::command_event_log;
use hyperqueue::client::commands::job::{
cancel_job, forget_job, output_job_cat, output_job_detail, output_job_list, output_job_summary,
JobCancelOpts, JobCatOpts, JobForgetOpts, JobInfoOpts, JobListOpts, JobTaskIdsOpts,
cancel_job, close_job, forget_job, output_job_cat, output_job_detail, output_job_list,
output_job_summary, JobCancelOpts, JobCatOpts, JobCloseOpts, JobForgetOpts, JobInfoOpts,
JobListOpts, JobTaskIdsOpts,
};
use hyperqueue::client::commands::log::command_log;
use hyperqueue::client::commands::server::command_server;
use hyperqueue::client::commands::submit::command::{open_job, SubmitJobConfOpts};
use hyperqueue::client::commands::submit::{
submit_computation, submit_computation_from_job_file, JobSubmitFileOpts, JobSubmitOpts,
};
Expand Down Expand Up @@ -64,6 +66,14 @@ async fn command_job_submit(
submit_computation(gsettings, &mut session, opts).await
}

async fn command_job_open(
gsettings: &GlobalSettings,
opts: SubmitJobConfOpts,
) -> anyhow::Result<()> {
let mut session = get_client_session(gsettings.server_directory()).await?;
open_job(gsettings, &mut session, opts).await
}

async fn command_submit_job_file(
gsettings: &GlobalSettings,
opts: JobSubmitFileOpts,
Expand All @@ -75,17 +85,17 @@ async fn command_submit_job_file(
async fn command_job_list(gsettings: &GlobalSettings, opts: JobListOpts) -> anyhow::Result<()> {
let mut connection = get_client_session(gsettings.server_directory()).await?;

let filter = if opts.filter.is_empty() {
let (filter, show_open) = if opts.filter.is_empty() {
Kobzol marked this conversation as resolved.
Show resolved Hide resolved
if opts.all {
vec![]
(vec![], true)
} else {
vec![Status::Waiting, Status::Running]
(vec![Status::Waiting, Status::Running, Status::Opened], true)
}
} else {
opts.filter
(opts.filter, false)
};

output_job_list(gsettings, &mut connection, filter).await
output_job_list(gsettings, &mut connection, filter, show_open).await
}

async fn command_job_summary(gsettings: &GlobalSettings) -> anyhow::Result<()> {
Expand Down Expand Up @@ -124,6 +134,11 @@ async fn command_job_cancel(gsettings: &GlobalSettings, opts: JobCancelOpts) ->
cancel_job(gsettings, &mut connection, opts.selector).await
}

async fn command_job_close(gsettings: &GlobalSettings, opts: JobCloseOpts) -> anyhow::Result<()> {
let mut connection = get_client_session(gsettings.server_directory()).await?;
close_job(gsettings, &mut connection, opts.selector).await
}

async fn command_job_delete(gsettings: &GlobalSettings, opts: JobForgetOpts) -> anyhow::Result<()> {
let mut connection = get_client_session(gsettings.server_directory()).await?;
forget_job(gsettings, &mut connection, opts).await
Expand All @@ -139,7 +154,13 @@ async fn command_job_task_ids(

async fn command_job_wait(gsettings: &GlobalSettings, opts: JobWaitOpts) -> anyhow::Result<()> {
let mut connection = get_client_session(gsettings.server_directory()).await?;
wait_for_jobs(gsettings, &mut connection, opts.selector).await
wait_for_jobs(
gsettings,
&mut connection,
opts.selector,
!opts.without_close,
)
.await
}

async fn command_job_progress(
Expand Down Expand Up @@ -439,6 +460,12 @@ async fn main() -> hyperqueue::Result<()> {
SubCommand::Job(JobOpts {
subcmd: JobCommand::TaskIds(opts),
}) => command_job_task_ids(&gsettings, opts).await,
SubCommand::Job(JobOpts {
subcmd: JobCommand::Open(opts),
}) => command_job_open(&gsettings, opts).await,
SubCommand::Job(JobOpts {
subcmd: JobCommand::Close(opts),
}) => command_job_close(&gsettings, opts).await,
SubCommand::Task(TaskOpts {
subcmd: TaskCommand::List(opts),
}) => command_task_list(&gsettings, opts).await,
Expand Down
23 changes: 21 additions & 2 deletions crates/hyperqueue/src/client/commands/event/output.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,13 +98,18 @@ fn format_payload(event: EventPayload) -> serde_json::Value {
"task": task_id,
"error": error
}),
EventPayload::JobCreated(job_id, data) => {
EventPayload::Submit {
job_id,
closed_job,
serialized_desc,
} => {
let job_desc: JobDescription = bincode_config()
.deserialize(&data)
.deserialize(&serialized_desc)
.expect("Invalid job description data");
json!({
"type": "job-created",
"job": job_id,
"closed_job": closed_job,
"desc": JobInfoFormatter(&job_desc).to_json(),
})
}
Expand All @@ -123,6 +128,20 @@ fn format_payload(event: EventPayload) -> serde_json::Value {
"type": "server-stop",
})
}
EventPayload::JobOpen(job_id, job_desc) => {
json!({
"type": "job-open",
"job_id": job_id,
"name": job_desc.name,
"max_fails": job_desc.max_fails,
})
}
EventPayload::JobClose(job_id) => {
json!({
"type": "job-close",
"job_id": job_id
})
}
}
}

Expand Down
48 changes: 44 additions & 4 deletions crates/hyperqueue/src/client/commands/job.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,9 @@ use crate::common::utils::str::pluralize;
use crate::rpc_call;
use crate::transfer::connection::ClientSession;
use crate::transfer::messages::{
CancelJobResponse, CancelRequest, ForgetJobRequest, FromClientMessage, IdSelector, JobDetail,
JobDetailRequest, JobInfoRequest, TaskIdSelector, TaskSelector, TaskStatusSelector,
ToClientMessage,
CancelJobResponse, CancelRequest, CloseJobRequest, CloseJobResponse, ForgetJobRequest,
FromClientMessage, IdSelector, JobDetail, JobDetailRequest, JobInfoRequest, TaskIdSelector,
TaskSelector, TaskStatusSelector, ToClientMessage,
};

#[derive(Parser)]
Expand Down Expand Up @@ -41,6 +41,13 @@ pub struct JobCancelOpts {
pub selector: IdSelector,
}

#[derive(Parser)]
pub struct JobCloseOpts {
/// Select job(s) to close
#[arg(value_parser = parse_last_all_range)]
pub selector: IdSelector,
}

#[derive(Parser)]
pub struct JobForgetOpts {
/// Select job(s) to forget
Expand Down Expand Up @@ -110,6 +117,7 @@ pub async fn output_job_list(
gsettings: &GlobalSettings,
session: &mut ClientSession,
job_filters: Vec<Status>,
show_open: bool,
) -> anyhow::Result<()> {
let message = FromClientMessage::JobInfo(JobInfoRequest {
selector: IdSelector::All,
Expand All @@ -121,7 +129,7 @@ pub async fn output_job_list(
if !job_filters.is_empty() {
response
.jobs
.retain(|j| job_filters.contains(&job_status(j)));
.retain(|j| (show_open && j.is_open) || job_filters.contains(&job_status(j)));
}
response.jobs.sort_unstable_by_key(|j| j.id);
gsettings
Expand Down Expand Up @@ -270,6 +278,38 @@ pub async fn cancel_job(
Ok(())
}

pub async fn close_job(
_gsettings: &GlobalSettings,
session: &mut ClientSession,
selector: IdSelector,
) -> anyhow::Result<()> {
let mut responses =
rpc_call!(session.connection(), FromClientMessage::CloseJob(CloseJobRequest {
selector,
}), ToClientMessage::CloseJobResponse(r) => r)
.await?;
responses.sort_unstable_by_key(|x| x.0);

if responses.is_empty() {
log::info!("There is nothing to close")
}

for (job_id, response) in responses {
match response {
CloseJobResponse::Closed => {
log::info!("Job {} closed", job_id)
}
CloseJobResponse::InvalidJob => {
log::error!("Closing job {} failed; job not found", job_id)
}
CloseJobResponse::AlreadyClosed => {
log::error!("Closing job {} failed; job is already closed", job_id)
}
}
}
Ok(())
}

pub async fn forget_job(
_gsettings: &GlobalSettings,
session: &mut ClientSession,
Expand Down
Loading
Loading