Skip to content

Commit

Permalink
Small code improvements
Browse files Browse the repository at this point in the history
  • Loading branch information
spirali committed Aug 19, 2024
1 parent 84cf437 commit aa73436
Show file tree
Hide file tree
Showing 10 changed files with 45 additions and 73 deletions.
7 changes: 4 additions & 3 deletions crates/hyperqueue/src/client/commands/submit/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -583,11 +583,12 @@ pub async fn submit_computation(
None
};

let opts = handle_directives(opts, stdin.as_deref())?;

if opts.directives == DirectivesMode::Stdin && stdin.is_none() {
if opts.opts.directives == DirectivesMode::Stdin && stdin.is_none() {
bail!("You have to use `--stdin` when you specify `--directives=stdin`.");
}

let opts = handle_directives(opts, stdin.as_deref())?;

if opts.job.is_some() {
if opts.conf.job_conf.name.is_some() {
bail!("Parameter --name is not allowed when submitting to an open job.");
Expand Down
8 changes: 4 additions & 4 deletions crates/hyperqueue/src/server/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,7 @@ async fn handle_wait_for_jobs_message(
for job_id in job_ids {
match state.get_job_mut(job_id) {
Some(job) => {
if job.has_no_active_tasks() && !(wait_for_close && job.is_open) {
if job.has_no_active_tasks() && !(wait_for_close && job.is_open()) {
update_counters(&mut response, &job.counters);
} else {
let rx = job.subscribe_to_completion(wait_for_close);
Expand Down Expand Up @@ -424,7 +424,7 @@ async fn handle_job_close(
let job_ids: Vec<JobId> = match selector {
IdSelector::All => state
.jobs()
.filter(|job| job.is_open)
.filter(|job| job.is_open())
.map(|job| job.job_id)
.collect(),
IdSelector::LastN(n) => state.last_n_ids(*n).collect(),
Expand All @@ -436,8 +436,8 @@ async fn handle_job_close(
.into_iter()
.map(|job_id| {
let response = if let Some(job) = state.get_job_mut(job_id) {
if job.is_open {
job.is_open = false;
if job.is_open() {
job.close();
job.check_termination(senders, now);
CloseJobResponse::Closed
} else {
Expand Down
42 changes: 1 addition & 41 deletions crates/hyperqueue/src/server/client/submit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ pub(crate) async fn handle_submit(
let mut state = state_ref.get_mut();
let (job_id, new_job) = if let Some(job_id) = message.job_id {
if let Some(job) = state.get_job(job_id) {
if !job.is_open {
if !job.is_open() {
return ToClientMessage::SubmitResponse(SubmitResponse::JobNotOpened);
}
match &mut message.submit_desc.task_desc {
Expand Down Expand Up @@ -555,43 +555,3 @@ pub(crate) fn handle_open_job(
state_ref.get_mut().add_job(job);
ToClientMessage::OpenJobResponse(OpenJobResponse { job_id })
}

// /*let job_id = state_ref.get_mut().new_job_id();
// senders.events.on_job_open(job_id).unwrap();
//
// let new_tasks = match submit_job_desc(&mut state_ref.get_mut(), job_id, job_desc) {
// Err(error) => {
// state_ref.get_mut().revert_to_job_id(job_id);
// return ToClientMessage::Error(error.to_string());
// }
// Ok(new_tasks) => new_tasks,
// };
//
// let (job_detail, log) = {
// let state = state_ref.get();
// let job = state.get_job(job_id).unwrap();
// let job_detail = job.make_job_detail(Some(&TaskSelector {
// id_selector: TaskIdSelector::All,
// status_selector: TaskStatusSelector::All,
// }));
// (job_detail, job.job_desc.log.clone())
// };
//
// if let Some(log) = log {
// start_log_streaming(senders, job_id, log).await;
// }
//
// match senders
// .backend
// .send_tako_message(FromGatewayMessage::NewTasks(new_tasks))
// .await
// .unwrap()
// {
// ToGatewayMessage::NewTasksResponse(_) => { /* Ok */ }
// r => panic!("Invalid response: {r:?}"),
// };
//
// ToClientMessage::SubmitResponse(SubmitResponse {
// job: job_detail,
// server_uid: state_ref.get().server_info().server_uid.clone(),
// })
18 changes: 11 additions & 7 deletions crates/hyperqueue/src/server/job.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,11 +108,6 @@ impl JobTaskCounters {
}
}

pub struct JobPart {
pub base_task_id: TakoTaskId,
pub job_desc: Arc<JobDescription>,
}

pub struct JobCompletionCallback {
callback: oneshot::Sender<JobId>,
wait_for_close: bool,
Expand All @@ -126,10 +121,10 @@ pub struct Job {
pub job_desc: JobDescription,
pub submit_descs: SmallVec<[(Arc<JobSubmitDescription>, TakoTaskId); 1]>,

// If True, new tasks may be submitted into this job
// If true, new tasks may be submitted into this job
// If true and all tasks in the job are terminated then the job
// is in state OPEN not FINISHED.
pub is_open: bool,
is_open: bool,

pub submission_date: DateTime<Utc>,
pub completion_date: Option<DateTime<Utc>>,
Expand All @@ -154,6 +149,15 @@ impl Job {
}
}

#[inline]
pub fn is_open(&self) -> bool {
self.is_open
}

pub fn close(&mut self) {
self.is_open = false;
}

pub fn make_task_id_set(&self) -> Set<JobTaskId> {
self.tasks.values().map(|t| t.task_id).collect()
}
Expand Down
5 changes: 0 additions & 5 deletions crates/hyperqueue/src/server/restore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,10 +102,6 @@ impl RestorerJob {
pub fn add_submit(&mut self, submit_desc: JobSubmitDescription) {
self.submit_descs.push(submit_desc)
}

pub fn close(&mut self) {
self.is_open = false;
}
}

#[derive(Default)]
Expand Down Expand Up @@ -173,7 +169,6 @@ impl StateRestorer {
bincode_config().deserialize(&serialized_submit_request)?;
let mut job = RestorerJob::new(submit_request.job_desc, false);
job.add_submit(submit_request.submit_desc);
job.close();
self.add_job(job_id, job);
}
EventPayload::JobCompleted(job_id) => {
Expand Down
15 changes: 12 additions & 3 deletions crates/pyhq/src/client/job.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,9 @@ use hyperqueue::server::job::JobTaskState;
use hyperqueue::transfer::messages::{
ForgetJobRequest, FromClientMessage, IdSelector, JobDescription, JobDetailRequest,
JobInfoRequest, JobInfoResponse, JobSubmitDescription, JobTaskDescription as HqJobDescription,
PinMode, SubmitRequest, TaskDescription as HqTaskDescription, TaskIdSelector, TaskKind,
TaskKindProgram, TaskSelector, TaskStatusSelector, TaskWithDependencies, ToClientMessage,
PinMode, SubmitRequest, SubmitResponse, TaskDescription as HqTaskDescription, TaskIdSelector,
TaskKind, TaskKindProgram, TaskSelector, TaskStatusSelector, TaskWithDependencies,
ToClientMessage,
};
use hyperqueue::{rpc_call, tako, JobTaskCount, JobTaskId, Set};
use pyo3::types::PyTuple;
Expand Down Expand Up @@ -92,7 +93,15 @@ pub fn submit_job_impl(py: Python, ctx: ClientContextPtr, job: PyJobDescription)
rpc_call!(ctx.session.connection(), message, ToClientMessage::SubmitResponse(r) => r)
.await
.map_py_err()?;
Ok(response.job.info.id.as_num())
match response {
SubmitResponse::Ok { job, .. } => Ok(job.info.id.as_num()),
SubmitResponse::JobNotOpened
| SubmitResponse::JobNotFound
| SubmitResponse::TaskIdAlreadyExists(_) => {
// This code is unreachable as long Python cannot submit into open jobs
unreachable!()
}
}
})
}

Expand Down
1 change: 0 additions & 1 deletion scripts/extract_changelog.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,4 +45,3 @@ def get_matching_lines(text: str, tag: str):
- **hyperqueue-{tag}-\\***: Wheel containing the `hyperqueue` package with HyperQueue Python
bindings.
"""
print(output)
1 change: 0 additions & 1 deletion tests/output/test_json.py
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,6 @@ def test_print_job_detail(hq_env: HqEnv):
hq_env.start_server()
hq_env.command(["submit", "echo", "tt"])
output = parse_json_output(hq_env, ["--output-mode=json", "job", "info", "1"])
print(output)
schema = Schema([ARRAY_JOB_DETAIL_SCHEMA])
schema.validate(output)

Expand Down
19 changes: 12 additions & 7 deletions tests/test_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -893,8 +893,8 @@ def test_job_submit_program_not_found(hq_env: HqEnv):

table = hq_env.command(["task", "list", "1", "-v"], as_table=True)
assert (
'Error: Cannot execute "foo --bar --baz=5": No such file or directory (os error 2)\n'
"The program that you have tried to execute (`foo`) was not found." == table.get_column_value("Error")[0]
'Error: Cannot execute "foo --bar --baz=5": No such file or directory (os error 2)\n'
"The program that you have tried to execute (`foo`) was not found." == table.get_column_value("Error")[0]
)


Expand Down Expand Up @@ -1286,6 +1286,7 @@ def test_job_wait_for_close(hq_env: HqEnv):
time.sleep(1)
assert p.poll() is None
hq_env.command(["job", "close", "last"])
wait_for_job_state(hq_env, 1, "FINISHED")
time.sleep(0.3)
r = p.poll()
assert r == 0
Expand All @@ -1295,10 +1296,14 @@ def test_submit_job_opts_to_open_job(hq_env: HqEnv):
hq_env.start_server()
hq_env.command(["job", "open"])

hq_env.command(["submit", "--job=1", "--name=Abc", "--", "hostname"],
expect_fail="Parameter --name is not allowed when submitting to an open job.")
hq_env.command(["submit", "--job=1", "--max-fails=12", "--", "hostname"],
expect_fail="Parameter --max-fails is not allowed when submitting to an open job.")
hq_env.command(
["submit", "--job=1", "--name=Abc", "--", "hostname"],
expect_fail="Parameter --name is not allowed when submitting to an open job.",
)
hq_env.command(
["submit", "--job=1", "--max-fails=12", "--", "hostname"],
expect_fail="Parameter --max-fails is not allowed when submitting to an open job.",
)


def test_job_wait_for_open_job_without_close(hq_env: HqEnv):
Expand Down Expand Up @@ -1406,7 +1411,7 @@ def test_close_job_before_worker(hq_env: HqEnv):
hq_env.start_server()
hq_env.command(["job", "open"])
for _ in range(4):
hq_env.command(["submit", "--job=1", "--", "bash", "-c", "echo 'test' > task.${HQ_TASK_ID}"])
hq_env.command(["submit", "--job=1", "--", "sleep", "0"])
hq_env.command(["job", "close", "1"])
wait_for_job_state(hq_env, 1, "WAITING")
hq_env.start_worker()
Expand Down
2 changes: 1 addition & 1 deletion tests/test_restore.py
Original file line number Diff line number Diff line change
Expand Up @@ -253,4 +253,4 @@ def test_restore_open_job(hq_env: HqEnv, tmp_path):
hq_env.start_server(args=["--journal", journal_path])

table = hq_env.command(["job", "info", "1"], as_table=True)
print(table)
table.check_row_value("Session", "open")

0 comments on commit aa73436

Please sign in to comment.