Skip to content

Commit

Permalink
Better error message when submitting to open job
Browse files Browse the repository at this point in the history
  • Loading branch information
spirali committed Aug 19, 2024
1 parent 529ef4e commit 56a374d
Show file tree
Hide file tree
Showing 3 changed files with 40 additions and 25 deletions.
38 changes: 25 additions & 13 deletions crates/hyperqueue/src/client/commands/submit/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@ use crate::common::utils::time::parse_human_time;
use crate::transfer::connection::ClientSession;
use crate::transfer::messages::{
FromClientMessage, IdSelector, JobDescription, JobSubmitDescription, JobTaskDescription,
PinMode, SubmitRequest, TaskDescription, TaskKind, TaskKindProgram, ToClientMessage,
PinMode, SubmitRequest, SubmitResponse, TaskDescription, TaskKind, TaskKindProgram,
ToClientMessage,
};
use crate::{rpc_call, JobId, JobTaskCount, Map};

Expand Down Expand Up @@ -703,22 +704,33 @@ pub(crate) async fn send_submit_request(
wait: bool,
progress: bool,
) -> anyhow::Result<()> {
let job_id = request.job_id.unwrap_or_else(|| JobId::new(0));
let message = FromClientMessage::Submit(request);

let response =
rpc_call!(session.connection(), message, ToClientMessage::SubmitResponse(r) => r).await?;
let info = response.job.info.clone();

gsettings.printer().print_job_submitted(response.job);
if wait {
wait_for_jobs(
gsettings,
session,
IdSelector::Specific(IntArray::from_id(info.id.into())),
)
.await?;
} else if progress {
wait_for_jobs_with_progress(session, &[info]).await?;

match response {
SubmitResponse::Ok { job, server_uid: _ } => {
let info = job.info.clone();

gsettings.printer().print_job_submitted(job);
if wait {
wait_for_jobs(
gsettings,
session,
IdSelector::Specific(IntArray::from_id(info.id.into())),
)
.await?;
} else if progress {
wait_for_jobs_with_progress(session, &[info]).await?;
}
}
SubmitResponse::JobNotOpened => bail!("Job {job_id} is not opened."),
SubmitResponse::JobNotFound => bail!("Job {job_id} not found."),
SubmitResponse::TaskIdAlreadyExists(task_id) => {
bail!("Task {task_id} already exists in job {job_id}.")
}
}
Ok(())
}
Expand Down
18 changes: 9 additions & 9 deletions crates/hyperqueue/src/server/client/submit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ pub(crate) async fn handle_submit(
let (job_id, new_job) = if let Some(job_id) = message.job_id {
if let Some(job) = state.get_job(job_id) {
if !job.is_open {
return ToClientMessage::Error(format!("Job {job_id} is not open"));
return ToClientMessage::SubmitResponse(SubmitResponse::JobNotOpened);
}
match &mut message.submit_desc.task_desc {
JobTaskDescription::Array { ids, entries, .. } => {
Expand All @@ -93,9 +93,9 @@ pub(crate) async fn handle_submit(
for id in ids.iter() {
let id = JobTaskId::new(id);
if id_set.contains(&id) {
return ToClientMessage::Error(format!(
"Task {id} already exists in job {job_id}"
));
return ToClientMessage::SubmitResponse(
SubmitResponse::TaskIdAlreadyExists(id),
);
}
}
}
Expand All @@ -105,15 +105,15 @@ pub(crate) async fn handle_submit(
for task in tasks {
if id_set.contains(&task.id) {
let id = task.id;
return ToClientMessage::Error(format!(
"Task {id} already exists in job {job_id}"
));
return ToClientMessage::SubmitResponse(
SubmitResponse::TaskIdAlreadyExists(id),
);
}
}
}
}
} else {
return ToClientMessage::Error(format!("Job {job_id} not found"));
return ToClientMessage::SubmitResponse(SubmitResponse::JobNotFound);
}
(job_id, false)
} else {
Expand Down Expand Up @@ -181,7 +181,7 @@ pub(crate) async fn handle_submit(
r => panic!("Invalid response: {r:?}"),
};

ToClientMessage::SubmitResponse(SubmitResponse {
ToClientMessage::SubmitResponse(SubmitResponse::Ok {
job: job_detail,
server_uid: state_ref.get().server_info().server_uid.clone(),
})
Expand Down
9 changes: 6 additions & 3 deletions crates/hyperqueue/src/transfer/messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -386,9 +386,12 @@ pub struct StatsResponse {
}

#[derive(Serialize, Deserialize, Debug)]
pub struct SubmitResponse {
pub job: JobDetail,
pub server_uid: String,
#[allow(clippy::large_enum_variant)]
pub enum SubmitResponse {
Ok { job: JobDetail, server_uid: String },
JobNotOpened,
JobNotFound,
TaskIdAlreadyExists(JobTaskId),
}

#[derive(Serialize, Deserialize, Debug)]
Expand Down

0 comments on commit 56a374d

Please sign in to comment.