Skip to content

Commit

Permalink
Fixed restoring open jobs
Browse files Browse the repository at this point in the history
  • Loading branch information
spirali authored and Kobzol committed Aug 23, 2024
1 parent c35f7a9 commit c2731c9
Show file tree
Hide file tree
Showing 11 changed files with 76 additions and 36 deletions.
9 changes: 7 additions & 2 deletions crates/hyperqueue/src/client/commands/event/output.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,13 +98,18 @@ fn format_payload(event: EventPayload) -> serde_json::Value {
"task": task_id,
"error": error
}),
EventPayload::JobCreated(job_id, data) => {
EventPayload::Submit {
job_id,
closed_job,
serialized_desc,
} => {
let job_desc: JobDescription = bincode_config()
.deserialize(&data)
.deserialize(&serialized_desc)
.expect("Invalid job description data");
json!({
"type": "job-created",
"job": job_id,
"closed_job": closed_job,
"desc": JobInfoFormatter(&job_desc).to_json(),
})
}
Expand Down
2 changes: 1 addition & 1 deletion crates/hyperqueue/src/common/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -336,7 +336,7 @@ pub struct JobWaitOpts {
#[arg(value_parser = parse_last_all_range)]
pub selector: IdSelector,

/// Wait only for task completion and terminate even job is open
/// Wait until all tasks are completed, even if the job is still open
#[clap(long, action)]
pub without_close: bool,
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ impl JobTimeline {
pub fn handle_new_events(&mut self, events: &[Event]) {
for event in events {
match &event.payload {
EventPayload::JobCreated(job_id, job_info) => {
EventPayload::Submit(job_id, job_info) => {
self.job_timeline.insert(
*job_id,
DashboardJobInfo {
Expand Down
28 changes: 14 additions & 14 deletions crates/hyperqueue/src/server/client/submit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -403,6 +403,20 @@ fn build_tasks_graph(
})
}

pub(crate) fn handle_open_job(
state_ref: &StateRef,
senders: &Senders,
job_description: JobDescription,
) -> ToClientMessage {
let job_id = state_ref.get_mut().new_job_id();
senders
.events
.on_job_opened(job_id, job_description.clone());
let job = Job::new(job_id, job_description, true);
state_ref.get_mut().add_job(job);
ToClientMessage::OpenJobResponse(OpenJobResponse { job_id })
}

#[cfg(test)]
mod tests {
use crate::server::client::submit::build_tasks_graph;
Expand Down Expand Up @@ -541,17 +555,3 @@ mod tests {
}
}
}

pub(crate) fn handle_open_job(
state_ref: &StateRef,
senders: &Senders,
job_description: JobDescription,
) -> ToClientMessage {
let job_id = state_ref.get_mut().new_job_id();
senders
.events
.on_job_opened(job_id, job_description.clone());
let job = Job::new(job_id, job_description, true);
state_ref.get_mut().add_job(job);
ToClientMessage::OpenJobResponse(OpenJobResponse { job_id })
}
6 changes: 5 additions & 1 deletion crates/hyperqueue/src/server/event/payload.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,11 @@ pub enum EventPayload {
/// Vec<u8> is serialized JobDescription; the main reason is avoiding duplication of JobDescription
/// (we serialize it before it is stripped down)
/// and a nice side effect is that Events can be deserialized without deserializing a potentially large submit data
JobCreated(JobId, Vec<u8>),
Submit {
job_id: JobId,
closed_job: bool,
serialized_desc: Vec<u8>,
},
/// All tasks of the job have finished.
JobCompleted(JobId),
JobOpen(JobId, JobDescription),
Expand Down
7 changes: 4 additions & 3 deletions crates/hyperqueue/src/server/event/streamer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,10 +61,11 @@ impl EventStreamer {
return Ok(());
}
}
self.send_event(EventPayload::JobCreated(
self.send_event(EventPayload::Submit {
job_id,
bincode_config().serialize(submit_request)?,
));
closed_job: submit_request.job_id.is_none(),
serialized_desc: bincode_config().serialize(submit_request)?,
});
Ok(())
}

Expand Down
2 changes: 1 addition & 1 deletion crates/hyperqueue/src/server/job.rs
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@ impl Job {
}

pub fn is_terminated(&self) -> bool {
!self.is_open && self.counters.is_terminated(self.n_tasks())
!self.is_open && self.has_no_active_tasks()
}

pub fn get_task_state_mut(
Expand Down
27 changes: 20 additions & 7 deletions crates/hyperqueue/src/server/restore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,10 @@ impl StateRestorer {
self.max_job_id = self.max_job_id.max(job_id.as_num());
}

fn get_job_mut(&mut self, job_id: JobId) -> Option<&mut RestorerJob> {
self.jobs.get_mut(&job_id)
}

pub fn load_event_file(&mut self, path: &Path) -> crate::Result<()> {
log::debug!("Loading event file {}", path.display());
let mut event_reader = EventLogReader::open(path)?;
Expand All @@ -162,14 +166,23 @@ impl StateRestorer {
}
EventPayload::WorkerLost(_, _) => {}
EventPayload::WorkerOverviewReceived(_) => {}
//EventPayload::JobCreatedShort(job_id, job_info) => {}
EventPayload::JobCreated(job_id, serialized_submit_request) => {
log::debug!("Replaying: JobCreated {job_id}");
EventPayload::Submit {
job_id,
closed_job,
serialized_desc,
} => {
log::debug!("Replaying: JobTasksCreated {job_id}");
let submit_request: SubmitRequest =
bincode_config().deserialize(&serialized_submit_request)?;
let mut job = RestorerJob::new(submit_request.job_desc, false);
job.add_submit(submit_request.submit_desc);
self.add_job(job_id, job);
bincode_config().deserialize(&serialized_desc)?;
if closed_job {
let mut job = RestorerJob::new(submit_request.job_desc, false);
job.add_submit(submit_request.submit_desc);
self.add_job(job_id, job);
} else if let Some(job) = self.get_job_mut(job_id) {
job.add_submit(submit_request.submit_desc)
} else {
log::warn!("Ignoring submit attachment to an non-existing job")
}
}
EventPayload::JobCompleted(job_id) => {
log::debug!("Replaying: JobCompleted {job_id}");
Expand Down
2 changes: 1 addition & 1 deletion docs/jobs/openjobs.md
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ $ hq submit --job <JOB_ID> --array 0-12 -- hostname
## Job name and `--max-fails`

Job's name and configuration open `--max-fails` are the property of the job. They can be set when job is opened and they
cannot be later changed. Submits options `--name` and `--max-fails` cannot be used if you are submitting into an open
cannot be later changed. Submit options `--name` and `--max-fails` cannot be used if you are submitting into an open
job.

```commandline
Expand Down
9 changes: 5 additions & 4 deletions tests/test_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -1287,8 +1287,7 @@ def test_job_wait_for_close(hq_env: HqEnv):
assert p.poll() is None
hq_env.command(["job", "close", "last"])
wait_for_job_state(hq_env, 1, "FINISHED")
time.sleep(0.3)
r = p.poll()
r = p.wait(5)
assert r == 0


Expand All @@ -1313,9 +1312,11 @@ def test_job_wait_for_open_job_without_close(hq_env: HqEnv):
hq_env.command(["job", "wait", "1", "--without-close"])

hq_env.start_worker()
hq_env.command(["submit", "--job=1", "--", "sleep", "0"])

hq_env.command(["submit", "--job=1", "--", "sleep", "2"])
t1 = time.time()
hq_env.command(["job", "wait", "1", "--without-close"])
t2 = time.time()
assert t2 - t1 > 1.8


def test_invalid_attach(hq_env: HqEnv):
Expand Down
18 changes: 17 additions & 1 deletion tests/test_restore.py
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,7 @@ def test_restore_queues(hq_env: HqEnv, tmp_path):
assert alloc_list3 == alloc_list4


def test_restore_open_job(hq_env: HqEnv, tmp_path):
def test_restore_open_job_more_jobs(hq_env: HqEnv, tmp_path):
journal_path = os.path.join(tmp_path, "my.journal")
hq_env.start_server(args=["--journal", journal_path])
for _ in range(3):
Expand All @@ -254,3 +254,19 @@ def test_restore_open_job(hq_env: HqEnv, tmp_path):

table = hq_env.command(["job", "info", "1"], as_table=True)
table.check_row_value("Session", "open")


def test_restore_open_job_more_submits(hq_env: HqEnv, tmp_path):
journal_path = os.path.join(tmp_path, "my.journal")

hq_env.start_server(args=["--journal", journal_path])
hq_env.command(["job", "open"])
for _ in range(3):
hq_env.command(["job", "submit", "--job=1", "--", "hostname"])
hq_env.stop_server()
hq_env.start_server(args=["--journal", journal_path])
table = hq_env.command(["job", "info", "1"], as_table=True)
table.check_row_value("Session", "open")
table.check_row_value("Tasks", "3; Ids: 0-2")
hq_env.start_worker()
wait_for_job_state(hq_env, 1, "OPENED")

0 comments on commit c2731c9

Please sign in to comment.