diff --git a/crates/hyperqueue/src/client/commands/event/output.rs b/crates/hyperqueue/src/client/commands/event/output.rs index 1484ddae8..e3be5cab9 100644 --- a/crates/hyperqueue/src/client/commands/event/output.rs +++ b/crates/hyperqueue/src/client/commands/event/output.rs @@ -98,13 +98,18 @@ fn format_payload(event: EventPayload) -> serde_json::Value { "task": task_id, "error": error }), - EventPayload::JobCreated(job_id, data) => { + EventPayload::Submit { + job_id, + closed_job, + serialized_desc, + } => { let job_desc: JobDescription = bincode_config() - .deserialize(&data) + .deserialize(&serialized_desc) .expect("Invalid job description data"); json!({ "type": "job-created", "job": job_id, + "closed_job": closed_job, "desc": JobInfoFormatter(&job_desc).to_json(), }) } diff --git a/crates/hyperqueue/src/common/cli.rs b/crates/hyperqueue/src/common/cli.rs index 27cc6393f..d0b4f9326 100644 --- a/crates/hyperqueue/src/common/cli.rs +++ b/crates/hyperqueue/src/common/cli.rs @@ -336,7 +336,7 @@ pub struct JobWaitOpts { #[arg(value_parser = parse_last_all_range)] pub selector: IdSelector, - /// Wait only for task completion and terminate even job is open + /// Wait until all tasks are completed, even if the job is still open #[clap(long, action)] pub without_close: bool, } diff --git a/crates/hyperqueue/src/dashboard/data/timelines/job_timeline.rs b/crates/hyperqueue/src/dashboard/data/timelines/job_timeline.rs index 531757951..2c30265b0 100644 --- a/crates/hyperqueue/src/dashboard/data/timelines/job_timeline.rs +++ b/crates/hyperqueue/src/dashboard/data/timelines/job_timeline.rs @@ -53,7 +53,7 @@ impl JobTimeline { pub fn handle_new_events(&mut self, events: &[Event]) { for event in events { match &event.payload { - EventPayload::JobCreated(job_id, job_info) => { + EventPayload::Submit(job_id, job_info) => { self.job_timeline.insert( *job_id, DashboardJobInfo { diff --git a/crates/hyperqueue/src/server/client/submit.rs b/crates/hyperqueue/src/server/client/submit.rs index 5cc3ab577..eab1e694c 100644 --- a/crates/hyperqueue/src/server/client/submit.rs +++ b/crates/hyperqueue/src/server/client/submit.rs @@ -403,6 +403,20 @@ fn build_tasks_graph( }) } +pub(crate) fn handle_open_job( + state_ref: &StateRef, + senders: &Senders, + job_description: JobDescription, +) -> ToClientMessage { + let job_id = state_ref.get_mut().new_job_id(); + senders + .events + .on_job_opened(job_id, job_description.clone()); + let job = Job::new(job_id, job_description, true); + state_ref.get_mut().add_job(job); + ToClientMessage::OpenJobResponse(OpenJobResponse { job_id }) +} + #[cfg(test)] mod tests { use crate::server::client::submit::build_tasks_graph; @@ -541,17 +555,3 @@ mod tests { } } } - -pub(crate) fn handle_open_job( - state_ref: &StateRef, - senders: &Senders, - job_description: JobDescription, -) -> ToClientMessage { - let job_id = state_ref.get_mut().new_job_id(); - senders - .events - .on_job_opened(job_id, job_description.clone()); - let job = Job::new(job_id, job_description, true); - state_ref.get_mut().add_job(job); - ToClientMessage::OpenJobResponse(OpenJobResponse { job_id }) -} diff --git a/crates/hyperqueue/src/server/event/payload.rs b/crates/hyperqueue/src/server/event/payload.rs index 1ba44efd9..2d3a15965 100644 --- a/crates/hyperqueue/src/server/event/payload.rs +++ b/crates/hyperqueue/src/server/event/payload.rs @@ -22,7 +22,11 @@ pub enum EventPayload { /// Vec is serialized JobDescription; the main reason is avoiding duplication of JobDescription /// (we serialize it before it is stripped down) /// and a nice side effect is that Events can be deserialized without deserializing a potentially large submit data - JobCreated(JobId, Vec), + Submit { + job_id: JobId, + closed_job: bool, + serialized_desc: Vec, + }, /// All tasks of the job have finished. JobCompleted(JobId), JobOpen(JobId, JobDescription), diff --git a/crates/hyperqueue/src/server/event/streamer.rs b/crates/hyperqueue/src/server/event/streamer.rs index 57ee7c2e8..66d9c5c6d 100644 --- a/crates/hyperqueue/src/server/event/streamer.rs +++ b/crates/hyperqueue/src/server/event/streamer.rs @@ -61,10 +61,11 @@ impl EventStreamer { return Ok(()); } } - self.send_event(EventPayload::JobCreated( + self.send_event(EventPayload::Submit { job_id, - bincode_config().serialize(submit_request)?, - )); + closed_job: submit_request.job_id.is_none(), + serialized_desc: bincode_config().serialize(submit_request)?, + }); Ok(()) } diff --git a/crates/hyperqueue/src/server/job.rs b/crates/hyperqueue/src/server/job.rs index 1f1454983..7b0604bde 100644 --- a/crates/hyperqueue/src/server/job.rs +++ b/crates/hyperqueue/src/server/job.rs @@ -221,7 +221,7 @@ impl Job { } pub fn is_terminated(&self) -> bool { - !self.is_open && self.counters.is_terminated(self.n_tasks()) + !self.is_open && self.has_no_active_tasks() } pub fn get_task_state_mut( diff --git a/crates/hyperqueue/src/server/restore.rs b/crates/hyperqueue/src/server/restore.rs index 8e8bb6148..c152b9995 100644 --- a/crates/hyperqueue/src/server/restore.rs +++ b/crates/hyperqueue/src/server/restore.rs @@ -150,6 +150,10 @@ impl StateRestorer { self.max_job_id = self.max_job_id.max(job_id.as_num()); } + fn get_job_mut(&mut self, job_id: JobId) -> Option<&mut RestorerJob> { + self.jobs.get_mut(&job_id) + } + pub fn load_event_file(&mut self, path: &Path) -> crate::Result<()> { log::debug!("Loading event file {}", path.display()); let mut event_reader = EventLogReader::open(path)?; @@ -162,14 +166,23 @@ impl StateRestorer { } EventPayload::WorkerLost(_, _) => {} EventPayload::WorkerOverviewReceived(_) => {} - //EventPayload::JobCreatedShort(job_id, job_info) => {} - EventPayload::JobCreated(job_id, serialized_submit_request) => { - log::debug!("Replaying: JobCreated {job_id}"); + EventPayload::Submit { + job_id, + closed_job, + serialized_desc, + } => { + log::debug!("Replaying: JobTasksCreated {job_id}"); let submit_request: SubmitRequest = - bincode_config().deserialize(&serialized_submit_request)?; - let mut job = RestorerJob::new(submit_request.job_desc, false); - job.add_submit(submit_request.submit_desc); - self.add_job(job_id, job); + bincode_config().deserialize(&serialized_desc)?; + if closed_job { + let mut job = RestorerJob::new(submit_request.job_desc, false); + job.add_submit(submit_request.submit_desc); + self.add_job(job_id, job); + } else if let Some(job) = self.get_job_mut(job_id) { + job.add_submit(submit_request.submit_desc) + } else { + log::warn!("Ignoring submit attachment to an non-existing job") + } } EventPayload::JobCompleted(job_id) => { log::debug!("Replaying: JobCompleted {job_id}"); diff --git a/docs/jobs/openjobs.md b/docs/jobs/openjobs.md index c791d50c3..4a7e1b5ba 100644 --- a/docs/jobs/openjobs.md +++ b/docs/jobs/openjobs.md @@ -68,7 +68,7 @@ $ hq submit --job --array 0-12 -- hostname ## Job name and `--max-fails` Job's name and configuration open `--max-fails` are the property of the job. They can be set when job is opened and they -cannot be later changed. Submits options `--name` and `--max-fails` cannot be used if you are submitting into an open +cannot be later changed. Submit options `--name` and `--max-fails` cannot be used if you are submitting into an open job. ```commandline diff --git a/tests/test_job.py b/tests/test_job.py index 3e6d049e3..a4d5dcc50 100644 --- a/tests/test_job.py +++ b/tests/test_job.py @@ -1287,8 +1287,7 @@ def test_job_wait_for_close(hq_env: HqEnv): assert p.poll() is None hq_env.command(["job", "close", "last"]) wait_for_job_state(hq_env, 1, "FINISHED") - time.sleep(0.3) - r = p.poll() + r = p.wait(5) assert r == 0 @@ -1313,9 +1312,11 @@ def test_job_wait_for_open_job_without_close(hq_env: HqEnv): hq_env.command(["job", "wait", "1", "--without-close"]) hq_env.start_worker() - hq_env.command(["submit", "--job=1", "--", "sleep", "0"]) - + hq_env.command(["submit", "--job=1", "--", "sleep", "2"]) + t1 = time.time() hq_env.command(["job", "wait", "1", "--without-close"]) + t2 = time.time() + assert t2 - t1 > 1.8 def test_invalid_attach(hq_env: HqEnv): diff --git a/tests/test_restore.py b/tests/test_restore.py index 84e0ab1cc..d68a4282c 100644 --- a/tests/test_restore.py +++ b/tests/test_restore.py @@ -243,7 +243,7 @@ def test_restore_queues(hq_env: HqEnv, tmp_path): assert alloc_list3 == alloc_list4 -def test_restore_open_job(hq_env: HqEnv, tmp_path): +def test_restore_open_job_more_jobs(hq_env: HqEnv, tmp_path): journal_path = os.path.join(tmp_path, "my.journal") hq_env.start_server(args=["--journal", journal_path]) for _ in range(3): @@ -254,3 +254,19 @@ def test_restore_open_job(hq_env: HqEnv, tmp_path): table = hq_env.command(["job", "info", "1"], as_table=True) table.check_row_value("Session", "open") + + +def test_restore_open_job_more_submits(hq_env: HqEnv, tmp_path): + journal_path = os.path.join(tmp_path, "my.journal") + + hq_env.start_server(args=["--journal", journal_path]) + hq_env.command(["job", "open"]) + for _ in range(3): + hq_env.command(["job", "submit", "--job=1", "--", "hostname"]) + hq_env.stop_server() + hq_env.start_server(args=["--journal", journal_path]) + table = hq_env.command(["job", "info", "1"], as_table=True) + table.check_row_value("Session", "open") + table.check_row_value("Tasks", "3; Ids: 0-2") + hq_env.start_worker() + wait_for_job_state(hq_env, 1, "OPENED")