diff --git a/crates/hyperqueue/src/client/commands/submit/command.rs b/crates/hyperqueue/src/client/commands/submit/command.rs index 3747d1870..eccb45636 100644 --- a/crates/hyperqueue/src/client/commands/submit/command.rs +++ b/crates/hyperqueue/src/client/commands/submit/command.rs @@ -583,11 +583,12 @@ pub async fn submit_computation( None }; - let opts = handle_directives(opts, stdin.as_deref())?; - - if opts.directives == DirectivesMode::Stdin && stdin.is_none() { + if opts.opts.directives == DirectivesMode::Stdin && stdin.is_none() { bail!("You have to use `--stdin` when you specify `--directives=stdin`."); } + + let opts = handle_directives(opts, stdin.as_deref())?; + if opts.job.is_some() { if opts.conf.job_conf.name.is_some() { bail!("Parameter --name is not allowed when submitting to an open job."); diff --git a/crates/hyperqueue/src/server/client/mod.rs b/crates/hyperqueue/src/server/client/mod.rs index bcc63460c..f356bfc2d 100644 --- a/crates/hyperqueue/src/server/client/mod.rs +++ b/crates/hyperqueue/src/server/client/mod.rs @@ -232,7 +232,7 @@ async fn handle_wait_for_jobs_message( for job_id in job_ids { match state.get_job_mut(job_id) { Some(job) => { - if job.has_no_active_tasks() && !(wait_for_close && job.is_open) { + if job.has_no_active_tasks() && !(wait_for_close && job.is_open()) { update_counters(&mut response, &job.counters); } else { let rx = job.subscribe_to_completion(wait_for_close); @@ -424,7 +424,7 @@ async fn handle_job_close( let job_ids: Vec = match selector { IdSelector::All => state .jobs() - .filter(|job| job.is_open) + .filter(|job| job.is_open()) .map(|job| job.job_id) .collect(), IdSelector::LastN(n) => state.last_n_ids(*n).collect(), @@ -436,8 +436,8 @@ async fn handle_job_close( .into_iter() .map(|job_id| { let response = if let Some(job) = state.get_job_mut(job_id) { - if job.is_open { - job.is_open = false; + if job.is_open() { + job.close(); job.check_termination(senders, now); CloseJobResponse::Closed } else { diff --git a/crates/hyperqueue/src/server/client/submit.rs b/crates/hyperqueue/src/server/client/submit.rs index f91aee94b..5cc3ab577 100644 --- a/crates/hyperqueue/src/server/client/submit.rs +++ b/crates/hyperqueue/src/server/client/submit.rs @@ -75,7 +75,7 @@ pub(crate) async fn handle_submit( let mut state = state_ref.get_mut(); let (job_id, new_job) = if let Some(job_id) = message.job_id { if let Some(job) = state.get_job(job_id) { - if !job.is_open { + if !job.is_open() { return ToClientMessage::SubmitResponse(SubmitResponse::JobNotOpened); } match &mut message.submit_desc.task_desc { @@ -555,43 +555,3 @@ pub(crate) fn handle_open_job( state_ref.get_mut().add_job(job); ToClientMessage::OpenJobResponse(OpenJobResponse { job_id }) } - -// /*let job_id = state_ref.get_mut().new_job_id(); -// senders.events.on_job_open(job_id).unwrap(); -// -// let new_tasks = match submit_job_desc(&mut state_ref.get_mut(), job_id, job_desc) { -// Err(error) => { -// state_ref.get_mut().revert_to_job_id(job_id); -// return ToClientMessage::Error(error.to_string()); -// } -// Ok(new_tasks) => new_tasks, -// }; -// -// let (job_detail, log) = { -// let state = state_ref.get(); -// let job = state.get_job(job_id).unwrap(); -// let job_detail = job.make_job_detail(Some(&TaskSelector { -// id_selector: TaskIdSelector::All, -// status_selector: TaskStatusSelector::All, -// })); -// (job_detail, job.job_desc.log.clone()) -// }; -// -// if let Some(log) = log { -// start_log_streaming(senders, job_id, log).await; -// } -// -// match senders -// .backend -// .send_tako_message(FromGatewayMessage::NewTasks(new_tasks)) -// .await -// .unwrap() -// { -// ToGatewayMessage::NewTasksResponse(_) => { /* Ok */ } -// r => panic!("Invalid response: {r:?}"), -// }; -// -// ToClientMessage::SubmitResponse(SubmitResponse { -// job: job_detail, -// server_uid: state_ref.get().server_info().server_uid.clone(), -// }) diff --git a/crates/hyperqueue/src/server/job.rs b/crates/hyperqueue/src/server/job.rs index c479c30cd..1f1454983 100644 --- a/crates/hyperqueue/src/server/job.rs +++ b/crates/hyperqueue/src/server/job.rs @@ -108,11 +108,6 @@ impl JobTaskCounters { } } -pub struct JobPart { - pub base_task_id: TakoTaskId, - pub job_desc: Arc, -} - pub struct JobCompletionCallback { callback: oneshot::Sender, wait_for_close: bool, @@ -126,10 +121,10 @@ pub struct Job { pub job_desc: JobDescription, pub submit_descs: SmallVec<[(Arc, TakoTaskId); 1]>, - // If True, new tasks may be submitted into this job + // If true, new tasks may be submitted into this job // If true and all tasks in the job are terminated then the job // is in state OPEN not FINISHED. - pub is_open: bool, + is_open: bool, pub submission_date: DateTime, pub completion_date: Option>, @@ -154,6 +149,15 @@ impl Job { } } + #[inline] + pub fn is_open(&self) -> bool { + self.is_open + } + + pub fn close(&mut self) { + self.is_open = false; + } + pub fn make_task_id_set(&self) -> Set { self.tasks.values().map(|t| t.task_id).collect() } diff --git a/crates/hyperqueue/src/server/restore.rs b/crates/hyperqueue/src/server/restore.rs index d59404934..8e8bb6148 100644 --- a/crates/hyperqueue/src/server/restore.rs +++ b/crates/hyperqueue/src/server/restore.rs @@ -102,10 +102,6 @@ impl RestorerJob { pub fn add_submit(&mut self, submit_desc: JobSubmitDescription) { self.submit_descs.push(submit_desc) } - - pub fn close(&mut self) { - self.is_open = false; - } } #[derive(Default)] @@ -173,7 +169,6 @@ impl StateRestorer { bincode_config().deserialize(&serialized_submit_request)?; let mut job = RestorerJob::new(submit_request.job_desc, false); job.add_submit(submit_request.submit_desc); - job.close(); self.add_job(job_id, job); } EventPayload::JobCompleted(job_id) => { diff --git a/crates/pyhq/src/client/job.rs b/crates/pyhq/src/client/job.rs index bf16991f8..274f5467a 100644 --- a/crates/pyhq/src/client/job.rs +++ b/crates/pyhq/src/client/job.rs @@ -10,8 +10,9 @@ use hyperqueue::server::job::JobTaskState; use hyperqueue::transfer::messages::{ ForgetJobRequest, FromClientMessage, IdSelector, JobDescription, JobDetailRequest, JobInfoRequest, JobInfoResponse, JobSubmitDescription, JobTaskDescription as HqJobDescription, - PinMode, SubmitRequest, TaskDescription as HqTaskDescription, TaskIdSelector, TaskKind, - TaskKindProgram, TaskSelector, TaskStatusSelector, TaskWithDependencies, ToClientMessage, + PinMode, SubmitRequest, SubmitResponse, TaskDescription as HqTaskDescription, TaskIdSelector, + TaskKind, TaskKindProgram, TaskSelector, TaskStatusSelector, TaskWithDependencies, + ToClientMessage, }; use hyperqueue::{rpc_call, tako, JobTaskCount, JobTaskId, Set}; use pyo3::types::PyTuple; @@ -92,7 +93,15 @@ pub fn submit_job_impl(py: Python, ctx: ClientContextPtr, job: PyJobDescription) rpc_call!(ctx.session.connection(), message, ToClientMessage::SubmitResponse(r) => r) .await .map_py_err()?; - Ok(response.job.info.id.as_num()) + match response { + SubmitResponse::Ok { job, .. } => Ok(job.info.id.as_num()), + SubmitResponse::JobNotOpened + | SubmitResponse::JobNotFound + | SubmitResponse::TaskIdAlreadyExists(_) => { + // This code is unreachable as long Python cannot submit into open jobs + unreachable!() + } + } }) } diff --git a/scripts/extract_changelog.py b/scripts/extract_changelog.py index 3ba1ffe07..8bac4c36b 100644 --- a/scripts/extract_changelog.py +++ b/scripts/extract_changelog.py @@ -45,4 +45,3 @@ def get_matching_lines(text: str, tag: str): - **hyperqueue-{tag}-\\***: Wheel containing the `hyperqueue` package with HyperQueue Python bindings. """ - print(output) diff --git a/tests/output/test_json.py b/tests/output/test_json.py index 3ebdaa541..a4babda88 100644 --- a/tests/output/test_json.py +++ b/tests/output/test_json.py @@ -169,7 +169,6 @@ def test_print_job_detail(hq_env: HqEnv): hq_env.start_server() hq_env.command(["submit", "echo", "tt"]) output = parse_json_output(hq_env, ["--output-mode=json", "job", "info", "1"]) - print(output) schema = Schema([ARRAY_JOB_DETAIL_SCHEMA]) schema.validate(output) diff --git a/tests/test_job.py b/tests/test_job.py index 220ed2ff9..3e6d049e3 100644 --- a/tests/test_job.py +++ b/tests/test_job.py @@ -893,8 +893,8 @@ def test_job_submit_program_not_found(hq_env: HqEnv): table = hq_env.command(["task", "list", "1", "-v"], as_table=True) assert ( - 'Error: Cannot execute "foo --bar --baz=5": No such file or directory (os error 2)\n' - "The program that you have tried to execute (`foo`) was not found." == table.get_column_value("Error")[0] + 'Error: Cannot execute "foo --bar --baz=5": No such file or directory (os error 2)\n' + "The program that you have tried to execute (`foo`) was not found." == table.get_column_value("Error")[0] ) @@ -1286,6 +1286,7 @@ def test_job_wait_for_close(hq_env: HqEnv): time.sleep(1) assert p.poll() is None hq_env.command(["job", "close", "last"]) + wait_for_job_state(hq_env, 1, "FINISHED") time.sleep(0.3) r = p.poll() assert r == 0 @@ -1295,10 +1296,14 @@ def test_submit_job_opts_to_open_job(hq_env: HqEnv): hq_env.start_server() hq_env.command(["job", "open"]) - hq_env.command(["submit", "--job=1", "--name=Abc", "--", "hostname"], - expect_fail="Parameter --name is not allowed when submitting to an open job.") - hq_env.command(["submit", "--job=1", "--max-fails=12", "--", "hostname"], - expect_fail="Parameter --max-fails is not allowed when submitting to an open job.") + hq_env.command( + ["submit", "--job=1", "--name=Abc", "--", "hostname"], + expect_fail="Parameter --name is not allowed when submitting to an open job.", + ) + hq_env.command( + ["submit", "--job=1", "--max-fails=12", "--", "hostname"], + expect_fail="Parameter --max-fails is not allowed when submitting to an open job.", + ) def test_job_wait_for_open_job_without_close(hq_env: HqEnv): @@ -1406,7 +1411,7 @@ def test_close_job_before_worker(hq_env: HqEnv): hq_env.start_server() hq_env.command(["job", "open"]) for _ in range(4): - hq_env.command(["submit", "--job=1", "--", "bash", "-c", "echo 'test' > task.${HQ_TASK_ID}"]) + hq_env.command(["submit", "--job=1", "--", "sleep", "0"]) hq_env.command(["job", "close", "1"]) wait_for_job_state(hq_env, 1, "WAITING") hq_env.start_worker() diff --git a/tests/test_restore.py b/tests/test_restore.py index 7ac76081a..84e0ab1cc 100644 --- a/tests/test_restore.py +++ b/tests/test_restore.py @@ -253,4 +253,4 @@ def test_restore_open_job(hq_env: HqEnv, tmp_path): hq_env.start_server(args=["--journal", journal_path]) table = hq_env.command(["job", "info", "1"], as_table=True) - print(table) + table.check_row_value("Session", "open")