From 94c00a6dabc5745fb8773a751d6062bffafade52 Mon Sep 17 00:00:00 2001
From: Ada Bohm <ada@kreatrix.org>
Date: Mon, 21 Oct 2024 11:40:29 +0200
Subject: [PATCH 1/9] Change Tako TaskId to u64

---
 crates/hyperqueue/src/server/autoalloc/process.rs | 2 +-
 crates/hyperqueue/src/server/state.rs             | 2 +-
 crates/tako/src/lib.rs                            | 2 +-
 3 files changed, 3 insertions(+), 3 deletions(-)

diff --git a/crates/hyperqueue/src/server/autoalloc/process.rs b/crates/hyperqueue/src/server/autoalloc/process.rs
index bec660ba3..01c2b431e 100644
--- a/crates/hyperqueue/src/server/autoalloc/process.rs
+++ b/crates/hyperqueue/src/server/autoalloc/process.rs
@@ -1804,7 +1804,7 @@ mod tests {
     fn attach_job(state: &mut State, job_id: u32, tasks: u32, min_time: TimeRequest) {
         let (job, submit) = create_job(job_id, tasks, min_time);
         state.add_job(job);
-        state.attach_submit(job_id.into(), submit, (job_id * 1_000).into());
+        state.attach_submit(job_id.into(), submit, (job_id as u64 * 1_000).into());
     }
 
     fn create_worker(allocation_id: &str) -> ManagerInfo {
diff --git a/crates/hyperqueue/src/server/state.rs b/crates/hyperqueue/src/server/state.rs
index c6d163079..79cf1bd79 100644
--- a/crates/hyperqueue/src/server/state.rs
+++ b/crates/hyperqueue/src/server/state.rs
@@ -196,7 +196,7 @@ impl State {
 
     pub fn new_task_id(&mut self, task_count: JobTaskCount) -> TakoTaskId {
         let id = self.task_id_counter;
-        self.task_id_counter += task_count;
+        self.task_id_counter += task_count as u64;
         id.into()
     }
 
diff --git a/crates/tako/src/lib.rs b/crates/tako/src/lib.rs
index 26a755420..1e2841515 100644
--- a/crates/tako/src/lib.rs
+++ b/crates/tako/src/lib.rs
@@ -16,7 +16,7 @@ pub use crate::internal::common::WrappedRcRefCell;
 pub use crate::internal::common::{Map, Set};
 
 define_id_type!(WorkerId, u32);
-define_id_type!(TaskId, u32);
+define_id_type!(TaskId, u64);
 define_id_type!(InstanceId, u32);
 
 // Priority: Bigger number -> Higher priority

From 22b0c2390d17058f1bbafdc8d4da8be1847eade4 Mon Sep 17 00:00:00 2001
From: Ada Bohm <ada@kreatrix.org>
Date: Tue, 22 Oct 2024 22:52:50 +0200
Subject: [PATCH 2/9] Direct transalation between job_id/task_id and tako_id

---
 crates/hyperqueue/src/client/output/cli.rs    | 115 +++++------
 crates/hyperqueue/src/client/output/common.rs |   8 +-
 crates/hyperqueue/src/client/output/json.rs   |  24 +--
 .../hyperqueue/src/client/output/outputs.rs   |   5 +-
 crates/hyperqueue/src/client/output/quiet.rs  |   4 +-
 crates/hyperqueue/src/client/task.rs          |   4 +-
 crates/hyperqueue/src/lib.rs                  |  25 +++
 .../src/server/autoalloc/estimator.rs         |   2 +-
 .../src/server/autoalloc/process.rs           |   2 +-
 crates/hyperqueue/src/server/client/mod.rs    |  13 +-
 crates/hyperqueue/src/server/client/submit.rs | 124 ++++--------
 crates/hyperqueue/src/server/job.rs           | 178 +++++++-----------
 crates/hyperqueue/src/server/restore.rs       |  19 +-
 crates/hyperqueue/src/server/state.rs         | 133 +++----------
 crates/hyperqueue/src/transfer/messages.rs    |   2 +-
 15 files changed, 266 insertions(+), 392 deletions(-)

diff --git a/crates/hyperqueue/src/client/output/cli.rs b/crates/hyperqueue/src/client/output/cli.rs
index d46b360a8..00b2cbf2b 100644
--- a/crates/hyperqueue/src/client/output/cli.rs
+++ b/crates/hyperqueue/src/client/output/cli.rs
@@ -18,7 +18,7 @@ use crate::transfer::messages::{
     ServerInfo, TaskDescription, TaskKind, TaskKindProgram, WaitForJobsResponse, WorkerExitInfo,
     WorkerInfo,
 };
-use crate::{JobId, JobTaskCount, WorkerId};
+use crate::{JobId, JobTaskCount, JobTaskId, WorkerId};
 
 use chrono::{DateTime, Local, SubsecRound, Utc};
 use core::time::Duration;
@@ -178,12 +178,17 @@ impl CliOutput {
         }
     }
 
-    fn print_task_summary(&self, tasks: &[JobTaskInfo], info: &JobInfo, worker_map: &WorkerMap) {
+    fn print_task_summary(
+        &self,
+        tasks: &[(JobTaskId, JobTaskInfo)],
+        info: &JobInfo,
+        worker_map: &WorkerMap,
+    ) {
         const SHOWN_TASKS: usize = 5;
 
         let fail_rows: Vec<_> = tasks
             .iter()
-            .filter_map(|t: &JobTaskInfo| match &t.state {
+            .filter_map(|(task_id, t): &(JobTaskId, JobTaskInfo)| match &t.state {
                 JobTaskState::Failed {
                     started_data,
                     error,
@@ -192,7 +197,7 @@ impl CliOutput {
                     let worker_ids = started_data.as_ref().map(|data| data.worker_ids.as_slice());
 
                     Some(vec![
-                        t.task_id.cell(),
+                        task_id.cell(),
                         format_workers(worker_ids, worker_map).cell(),
                         error.to_owned().cell().foreground_color(Some(Color::Red)),
                     ])
@@ -558,7 +563,7 @@ impl Output for CliOutput {
             ]);
             self.print_vertical_table(rows);
 
-            tasks.sort_unstable_by_key(|t| t.task_id);
+            tasks.sort_unstable_by_key(|t| t.0);
             self.print_task_summary(&tasks, &info, &worker_map);
         }
     }
@@ -624,12 +629,12 @@ impl Output for CliOutput {
 
         for (id, job) in jobs {
             let mut tasks = job.tasks;
-            tasks.sort_unstable_by_key(|t| t.task_id);
+            tasks.sort_unstable_by_key(|t| t.0);
 
             rows.append(
                 &mut tasks
                     .iter()
-                    .map(|task| {
+                    .map(|(task_id, task)| {
                         let (start, end) = get_task_time(&task.state);
                         let mut job_rows = match jobs_len {
                             1 => vec![],
@@ -637,7 +642,7 @@ impl Output for CliOutput {
                         };
 
                         job_rows.append(&mut vec![
-                            task.task_id.cell().justify(Justify::Right),
+                            task_id.cell().justify(Justify::Right),
                             status_to_cell(&get_task_status(&task.state)),
                             format_workers(task.state.get_workers(), &worker_map).cell(),
                             format_task_duration(start, end).cell(),
@@ -686,7 +691,7 @@ impl Output for CliOutput {
     fn print_task_info(
         &self,
         job: (JobId, JobDetail),
-        tasks: Vec<JobTaskInfo>,
+        tasks: &[(JobTaskId, JobTaskInfo)],
         worker_map: WorkerMap,
         server_uid: &str,
         verbosity: Verbosity,
@@ -695,10 +700,9 @@ impl Output for CliOutput {
         let task_to_paths = resolve_task_paths(&job, server_uid);
         let mut is_truncated = false;
 
-        for task in tasks {
+        for (task_id, task) in tasks {
             let (start, end) = get_task_time(&task.state);
-            let (cwd, stdout, stderr) = format_task_paths(&task_to_paths, &task);
-            let task_id = task.task_id;
+            let (cwd, stdout, stderr) = format_task_paths(&task_to_paths, *task_id);
 
             let (task_desc, task_deps) = if let Some(x) =
                 job.submit_descs
@@ -711,7 +715,7 @@ impl Output for CliOutput {
                         } if ids.contains(task_id.as_num()) => Some((task_desc, [].as_slice())),
                         JobTaskDescription::Array { .. } => None,
                         JobTaskDescription::Graph { tasks } => {
-                            tasks.iter().find(|t| t.id == task_id).map(|task_dep| {
+                            tasks.iter().find(|t| t.id == *task_id).map(|task_dep| {
                                 (&task_dep.task_desc, task_dep.dependencies.as_slice())
                             })
                         }
@@ -1151,45 +1155,42 @@ pub fn print_job_output(
     task_header: bool,
     task_paths: TaskToPathsMap,
 ) -> anyhow::Result<()> {
-    let read_stream = |task_info: &JobTaskInfo, output_stream: &OutputStream| {
-        let stdout = std::io::stdout();
-        let mut stdout = stdout.lock();
-
-        let (_, stdout_path, stderr_path) = get_task_paths(&task_paths, task_info);
-        let (opt_path, stream_name) = match output_stream {
-            OutputStream::Stdout => (stdout_path, "stdout"),
-            OutputStream::Stderr => (stderr_path, "stderr"),
-        };
+    let read_stream =
+        |task_id: JobTaskId, task_info: &JobTaskInfo, output_stream: &OutputStream| {
+            let stdout = std::io::stdout();
+            let mut stdout = stdout.lock();
+
+            let (_, stdout_path, stderr_path) = get_task_paths(&task_paths, task_id);
+            let (opt_path, stream_name) = match output_stream {
+                OutputStream::Stdout => (stdout_path, "stdout"),
+                OutputStream::Stderr => (stderr_path, "stderr"),
+            };
 
-        if task_header {
-            writeln!(
-                stdout,
-                "# Job {}, task {}",
-                job_detail.info.id, task_info.task_id
-            )
-            .expect("Could not write output");
-        }
+            if task_header {
+                writeln!(stdout, "# Job {}, task {}", job_detail.info.id, task_id)
+                    .expect("Could not write output");
+            }
 
-        if let Some(path) = opt_path {
-            match File::open(path) {
-                Ok(mut file) => {
-                    let copy = std::io::copy(&mut file, &mut stdout);
-                    if let Err(error) = copy {
-                        log::warn!("Could not output contents of `{path}`: {error:?}");
+            if let Some(path) = opt_path {
+                match File::open(path) {
+                    Ok(mut file) => {
+                        let copy = std::io::copy(&mut file, &mut stdout);
+                        if let Err(error) = copy {
+                            log::warn!("Could not output contents of `{path}`: {error:?}");
+                        }
                     }
-                }
-                Err(error) => log::warn!("File `{path}` cannot be opened: {error:?}"),
-            };
-        } else {
-            log::warn!(
-                "Task {} has no `{stream_name}` stream associated with it",
-                task_info.task_id
-            );
-        }
-    };
+                    Err(error) => log::warn!("File `{path}` cannot be opened: {error:?}"),
+                };
+            } else {
+                log::warn!(
+                    "Task {} has no `{stream_name}` stream associated with it",
+                    task_id
+                );
+            }
+        };
 
-    for task in &job_detail.tasks {
-        read_stream(task, &output_stream);
+    for (task_id, task) in &job_detail.tasks {
+        read_stream(*task_id, task, &output_stream);
     }
 
     Ok(())
@@ -1259,11 +1260,11 @@ fn format_resource_variants(rqv: &ResourceRequestVariants) -> String {
 }
 
 /// Formatting
-pub fn format_job_workers(tasks: &[JobTaskInfo], worker_map: &WorkerMap) -> String {
+pub fn format_job_workers(tasks: &[(JobTaskId, JobTaskInfo)], worker_map: &WorkerMap) -> String {
     // BTreeSet is used to both filter duplicates and keep a stable order
     let worker_set: BTreeSet<_> = tasks
         .iter()
-        .filter_map(|task| task.state.get_workers())
+        .filter_map(|(_, task)| task.state.get_workers())
         .flatten()
         .collect();
     let worker_count = worker_set.len();
@@ -1337,11 +1338,11 @@ fn get_task_time(state: &JobTaskState) -> (Option<DateTime<Utc>>, Option<DateTim
 }
 
 /// Returns Option(working directory, stdout, stderr)
-fn get_task_paths<'a>(
-    task_map: &'a TaskToPathsMap,
-    state: &JobTaskInfo,
-) -> (Option<&'a str>, Option<&'a str>, Option<&'a str>) {
-    match task_map[&state.task_id] {
+fn get_task_paths(
+    task_map: &TaskToPathsMap,
+    task_id: JobTaskId,
+) -> (Option<&str>, Option<&str>, Option<&str>) {
+    match task_map[&task_id] {
         Some(ref paths) => (
             Some(paths.cwd.to_str().unwrap()),
             match &paths.stdout {
@@ -1360,9 +1361,9 @@ fn get_task_paths<'a>(
 /// Returns (working directory, stdout, stderr)
 fn format_task_paths<'a>(
     task_map: &'a TaskToPathsMap,
-    state: &JobTaskInfo,
+    task_id: JobTaskId,
 ) -> (&'a str, &'a str, &'a str) {
-    match task_map[&state.task_id] {
+    match task_map[&task_id] {
         Some(ref paths) => (
             paths.cwd.to_str().unwrap(),
             stdio_to_str(&paths.stdout),
diff --git a/crates/hyperqueue/src/client/output/common.rs b/crates/hyperqueue/src/client/output/common.rs
index 4cd01f68f..4d68ff4fa 100644
--- a/crates/hyperqueue/src/client/output/common.rs
+++ b/crates/hyperqueue/src/client/output/common.rs
@@ -42,8 +42,8 @@ pub fn resolve_task_paths(job: &JobDetail, server_uid: &str) -> TaskToPathsMap {
 
     job.tasks
         .iter()
-        .map(|task| {
-            let (submit_dir, task_desc) = task_to_desc_map.get(&task.task_id).unwrap();
+        .map(|(task_id, task)| {
+            let (submit_dir, task_desc) = task_to_desc_map.get(task_id).unwrap();
             let paths = match &task_desc.kind {
                 TaskKind::ExternalProgram(TaskKindProgram { program, .. }) => match &task.state {
                     JobTaskState::Canceled {
@@ -58,7 +58,7 @@ pub fn resolve_task_paths(job: &JobDetail, server_uid: &str) -> TaskToPathsMap {
                     } => {
                         let ctx = CompletePlaceholderCtx {
                             job_id: job.info.id,
-                            task_id: task.task_id,
+                            task_id: *task_id,
                             instance_id: started_data.context.instance_id,
                             submit_dir,
                             server_uid,
@@ -80,7 +80,7 @@ pub fn resolve_task_paths(job: &JobDetail, server_uid: &str) -> TaskToPathsMap {
                     _ => None,
                 },
             };
-            (task.task_id, paths)
+            (*task_id, paths)
         })
         .collect()
 }
diff --git a/crates/hyperqueue/src/client/output/json.rs b/crates/hyperqueue/src/client/output/json.rs
index 8f944c4bf..bbff71445 100644
--- a/crates/hyperqueue/src/client/output/json.rs
+++ b/crates/hyperqueue/src/client/output/json.rs
@@ -133,7 +133,7 @@ impl Output for JsonOutput {
                                     }
                                 }
                                     ).collect::<Vec<_>>(),
-                                    "tasks": format_tasks(tasks, task_paths)
+                                    "tasks": format_tasks(&tasks, task_paths)
                                 })
             })
             .collect();
@@ -182,7 +182,7 @@ impl Output for JsonOutput {
         let mut json_obj = json!({});
         for (id, job) in jobs {
             let map = resolve_task_paths(&job, server_uid);
-            json_obj[id.to_string()] = format_tasks(job.tasks, map);
+            json_obj[id.to_string()] = format_tasks(&job.tasks, map);
         }
         self.print(json_obj);
     }
@@ -190,7 +190,7 @@ impl Output for JsonOutput {
     fn print_task_info(
         &self,
         job: (JobId, JobDetail),
-        tasks: Vec<JobTaskInfo>,
+        tasks: &[(JobTaskId, JobTaskInfo)],
         _worker_map: WorkerMap,
         server_uid: &str,
         _verbosity: Verbosity,
@@ -306,7 +306,7 @@ fn format_task_description(task_desc: &TaskDescription) -> Value {
     }
 }
 
-fn fill_task_started_data(dict: &mut Value, data: StartedTaskData) {
+fn fill_task_started_data(dict: &mut Value, data: &StartedTaskData) {
     dict["started_at"] = format_datetime(data.start_date);
     if data.worker_ids.len() == 1 {
         dict["worker"] = data.worker_ids[0].as_num().into();
@@ -360,10 +360,10 @@ fn format_job_info(info: &JobInfo) -> Value {
     })
 }
 
-fn format_tasks(tasks: Vec<JobTaskInfo>, map: TaskToPathsMap) -> Value {
+fn format_tasks(tasks: &[(JobTaskId, JobTaskInfo)], map: TaskToPathsMap) -> Value {
     tasks
         .into_iter()
-        .map(|task| {
+        .map(|(task_id, task)| {
             let state = &match task.state {
                 JobTaskState::Waiting => "waiting",
                 JobTaskState::Running { .. } => "running",
@@ -372,12 +372,12 @@ fn format_tasks(tasks: Vec<JobTaskInfo>, map: TaskToPathsMap) -> Value {
                 JobTaskState::Canceled { .. } => "canceled",
             };
             let mut data = json!({
-                "id": task.task_id,
+                "id": *task_id,
                 "state": state,
             });
-            fill_task_paths(&mut data, &map, task.task_id);
+            fill_task_paths(&mut data, &map, *task_id);
 
-            match task.state {
+            match &task.state {
                 JobTaskState::Running { started_data } => {
                     fill_task_started_data(&mut data, started_data);
                 }
@@ -386,7 +386,7 @@ fn format_tasks(tasks: Vec<JobTaskInfo>, map: TaskToPathsMap) -> Value {
                     end_date,
                 } => {
                     fill_task_started_data(&mut data, started_data);
-                    data["finished_at"] = format_datetime(end_date);
+                    data["finished_at"] = format_datetime(*end_date);
                 }
                 JobTaskState::Failed {
                     started_data,
@@ -396,8 +396,8 @@ fn format_tasks(tasks: Vec<JobTaskInfo>, map: TaskToPathsMap) -> Value {
                     if let Some(started_data) = started_data {
                         fill_task_started_data(&mut data, started_data);
                     }
-                    data["finished_at"] = format_datetime(end_date);
-                    data["error"] = error.into();
+                    data["finished_at"] = format_datetime(*end_date);
+                    data["error"] = error.clone().into();
                 }
                 _ => {}
             };
diff --git a/crates/hyperqueue/src/client/output/outputs.rs b/crates/hyperqueue/src/client/output/outputs.rs
index 4d7007da0..b5d26e26a 100644
--- a/crates/hyperqueue/src/client/output/outputs.rs
+++ b/crates/hyperqueue/src/client/output/outputs.rs
@@ -11,9 +11,10 @@ use crate::client::output::common::TaskToPathsMap;
 use crate::client::output::Verbosity;
 use crate::common::arraydef::IntArray;
 use crate::server::job::JobTaskInfo;
-use crate::JobId;
+use crate::{JobId, JobTaskId};
 use core::time::Duration;
 use tako::resources::ResourceDescriptor;
+use tako::Map;
 
 pub const MAX_DISPLAYED_WORKERS: usize = 2;
 
@@ -73,7 +74,7 @@ pub trait Output {
     fn print_task_info(
         &self,
         job: (JobId, JobDetail),
-        tasks: Vec<JobTaskInfo>,
+        tasks: &[(JobTaskId, JobTaskInfo)],
         worker_map: WorkerMap,
         server_uid: &str,
         verbosity: Verbosity,
diff --git a/crates/hyperqueue/src/client/output/quiet.rs b/crates/hyperqueue/src/client/output/quiet.rs
index 3b197fae5..000d71680 100644
--- a/crates/hyperqueue/src/client/output/quiet.rs
+++ b/crates/hyperqueue/src/client/output/quiet.rs
@@ -21,7 +21,7 @@ use crate::transfer::messages::{
     AutoAllocListResponse, JobDetail, JobInfo, ServerInfo, WaitForJobsResponse, WorkerExitInfo,
     WorkerInfo,
 };
-use crate::JobId;
+use crate::{JobId, JobTaskId};
 
 #[derive(Default)]
 pub struct Quiet;
@@ -129,7 +129,7 @@ impl Output for Quiet {
     fn print_task_info(
         &self,
         _job: (JobId, JobDetail),
-        _tasks: Vec<JobTaskInfo>,
+        _tasks: &[(JobTaskId, JobTaskInfo)],
         _worker_map: WorkerMap,
         _server_uid: &str,
         _verbosity: Verbosity,
diff --git a/crates/hyperqueue/src/client/task.rs b/crates/hyperqueue/src/client/task.rs
index 4a8adb57e..b932fd53f 100644
--- a/crates/hyperqueue/src/client/task.rs
+++ b/crates/hyperqueue/src/client/task.rs
@@ -115,7 +115,7 @@ pub async fn output_job_task_info(
         Some(job) => {
             gsettings.printer().print_task_info(
                 (*job_id, job.clone()),
-                job.tasks.clone(),
+                &job.tasks,
                 get_worker_map(session).await?,
                 &response.server_uid,
                 verbosity,
@@ -161,7 +161,7 @@ pub async fn output_job_task_ids(
                         .ok_or_else(|| HqError::GenericError("Job Id not found".to_string()))?
                         .tasks
                         .iter()
-                        .map(|info| info.task_id.as_num()),
+                        .map(|(task_id, _)| task_id.as_num()),
                 ),
             ))
         })
diff --git a/crates/hyperqueue/src/lib.rs b/crates/hyperqueue/src/lib.rs
index 73e94a997..4b1476190 100644
--- a/crates/hyperqueue/src/lib.rs
+++ b/crates/hyperqueue/src/lib.rs
@@ -42,3 +42,28 @@ pub const HQ_VERSION: &str = {
         None => const_format::concatcp!(env!("CARGO_PKG_VERSION"), "-dev"),
     }
 };
+
+pub fn make_tako_id(job_id: JobId, task_id: JobTaskId) -> TakoTaskId {
+    TakoTaskId::new(((job_id.as_num() as u64) << 32) + task_id.as_num() as u64)
+}
+
+pub fn unwrap_tako_id(tako_task_id: TakoTaskId) -> (JobId, JobTaskId) {
+    let num = tako_task_id.as_num();
+    (
+        JobId::new((num >> 32) as u32),
+        JobTaskId::new((num & 0xffffffff) as u32),
+    )
+}
+
+#[cfg(test)]
+mod test {
+    use crate::{make_tako_id, unwrap_tako_id, JobId, JobTaskId};
+
+    #[test]
+    fn test_make_tako_id() {
+        assert_eq!(
+            unwrap_tako_id(make_tako_id(JobId(123), JobTaskId(5))),
+            (JobId(123), JobTaskId(5))
+        );
+    }
+}
diff --git a/crates/hyperqueue/src/server/autoalloc/estimator.rs b/crates/hyperqueue/src/server/autoalloc/estimator.rs
index 4acef6cc9..34a0ec6c3 100644
--- a/crates/hyperqueue/src/server/autoalloc/estimator.rs
+++ b/crates/hyperqueue/src/server/autoalloc/estimator.rs
@@ -58,7 +58,7 @@ pub fn get_server_task_state(state: &State, queue_info: &QueueInfo) -> ServerTas
 fn can_queue_execute_job(job: &Job, queue_info: &QueueInfo) -> bool {
     job.submit_descs
         .iter()
-        .all(|submit_desc| match &submit_desc.0.task_desc {
+        .all(|submit_desc| match &submit_desc.task_desc {
             JobTaskDescription::Array {
                 task_desc: TaskDescription { resources, .. },
                 ..
diff --git a/crates/hyperqueue/src/server/autoalloc/process.rs b/crates/hyperqueue/src/server/autoalloc/process.rs
index 01c2b431e..08c1d5adb 100644
--- a/crates/hyperqueue/src/server/autoalloc/process.rs
+++ b/crates/hyperqueue/src/server/autoalloc/process.rs
@@ -1804,7 +1804,7 @@ mod tests {
     fn attach_job(state: &mut State, job_id: u32, tasks: u32, min_time: TimeRequest) {
         let (job, submit) = create_job(job_id, tasks, min_time);
         state.add_job(job);
-        state.attach_submit(job_id.into(), submit, (job_id as u64 * 1_000).into());
+        state.attach_submit(job_id.into(), submit);
     }
 
     fn create_worker(allocation_id: &str) -> ManagerInfo {
diff --git a/crates/hyperqueue/src/server/client/mod.rs b/crates/hyperqueue/src/server/client/mod.rs
index 46d1389ef..dc0bcfbb3 100644
--- a/crates/hyperqueue/src/server/client/mod.rs
+++ b/crates/hyperqueue/src/server/client/mod.rs
@@ -22,7 +22,7 @@ use crate::transfer::messages::{
     WorkerListResponse,
 };
 use crate::transfer::messages::{ForgetJobResponse, WaitForJobsResponse};
-use crate::{JobId, JobTaskCount, WorkerId};
+use crate::{unwrap_tako_id, JobId, JobTaskCount, WorkerId};
 
 pub mod autoalloc;
 mod submit;
@@ -440,7 +440,7 @@ async fn handle_job_close(
     ToClientMessage::CloseJobResponse(responses)
 }
 
-async fn cancel_job(state_ref: &StateRef, carrier: &Senders, job_id: JobId) -> CancelJobResponse {
+async fn cancel_job(state_ref: &StateRef, senders: &Senders, job_id: JobId) -> CancelJobResponse {
     let tako_task_ids;
     {
         let n_tasks = match state_ref.get().get_job(job_id) {
@@ -457,7 +457,7 @@ async fn cancel_job(state_ref: &StateRef, carrier: &Senders, job_id: JobId) -> C
         }
     }
 
-    let canceled_tasks = match carrier
+    let canceled_tasks = match senders
         .backend
         .send_tako_message(FromGatewayMessage::CancelTasks(CancelTasks {
             tasks: tako_task_ids,
@@ -476,7 +476,12 @@ async fn cancel_job(state_ref: &StateRef, carrier: &Senders, job_id: JobId) -> C
     if let Some(job) = state.get_job_mut(job_id) {
         let canceled_ids: Vec<_> = canceled_tasks
             .iter()
-            .map(|tako_id| job.set_cancel_state(*tako_id, carrier))
+            .map(|tako_id| {
+                let (j_id, task_id) = unwrap_tako_id(*tako_id);
+                assert_eq!(j_id, job_id);
+                job.set_cancel_state(task_id, senders);
+                task_id
+            })
             .collect();
         let already_finished = job.n_tasks() - canceled_ids.len() as JobTaskCount;
         CancelJobResponse::Canceled(canceled_ids, already_finished)
diff --git a/crates/hyperqueue/src/server/client/submit.rs b/crates/hyperqueue/src/server/client/submit.rs
index e159053c9..a840126db 100644
--- a/crates/hyperqueue/src/server/client/submit.rs
+++ b/crates/hyperqueue/src/server/client/submit.rs
@@ -25,30 +25,27 @@ use crate::transfer::messages::{
     SubmitResponse, TaskBuildDescription, TaskDescription, TaskIdSelector, TaskKind,
     TaskKindProgram, TaskSelector, TaskStatusSelector, TaskWithDependencies, ToClientMessage,
 };
-use crate::{JobId, JobTaskCount, JobTaskId, Priority, TakoTaskId};
+use crate::{make_tako_id, JobId, JobTaskCount, JobTaskId, Priority, TakoTaskId};
 
 fn create_new_task_message(
     job_id: JobId,
-    tako_base_id: TakoTaskId,
     submit_desc: &mut JobSubmitDescription,
-) -> anyhow::Result<NewTasksMessage> {
+) -> NewTasksMessage {
     match &mut submit_desc.task_desc {
         JobTaskDescription::Array {
             ids,
             entries,
             task_desc,
-        } => Ok(build_tasks_array(
+        } => build_tasks_array(
             job_id,
-            tako_base_id,
             ids,
             std::mem::take(entries),
             task_desc,
             &submit_desc.submit_dir,
             submit_desc.stream_path.as_ref(),
-        )),
+        ),
         JobTaskDescription::Graph { tasks } => build_tasks_graph(
             job_id,
-            tako_base_id,
             tasks,
             &submit_desc.submit_dir,
             submit_desc.stream_path.as_ref(),
@@ -56,15 +53,19 @@ fn create_new_task_message(
     }
 }
 
+fn validate_submit(state: &State, submit_decs: &JobSubmitDescription) -> crate::Result<()> {
+    Ok(())
+}
+
 pub(crate) fn submit_job_desc(
     state: &mut State,
     job_id: JobId,
     mut submit_desc: JobSubmitDescription,
 ) -> crate::Result<NewTasksMessage> {
-    let (job_id, tako_base_id) = prepare_job(job_id, &mut submit_desc, state);
-    let new_tasks = create_new_task_message(job_id, tako_base_id, &mut submit_desc)?;
+    prepare_job(job_id, &mut submit_desc, state);
+    let new_tasks = create_new_task_message(job_id, &mut submit_desc);
     submit_desc.strip_large_data();
-    state.attach_submit(job_id, Arc::new(submit_desc), tako_base_id);
+    state.attach_submit(job_id, Arc::new(submit_desc));
     Ok(new_tasks)
 }
 
@@ -91,10 +92,9 @@ pub(crate) async fn handle_submit(
                             *ids = IntArray::from_id(new_id)
                         }
                     } else {
-                        let id_set = job.make_task_id_set();
                         for id in ids.iter() {
                             let id = JobTaskId::new(id);
-                            if id_set.contains(&id) {
+                            if job.tasks.contains_key(&id) {
                                 return ToClientMessage::SubmitResponse(
                                     SubmitResponse::TaskIdAlreadyExists(id),
                                 );
@@ -103,9 +103,8 @@ pub(crate) async fn handle_submit(
                     }
                 }
                 JobTaskDescription::Graph { tasks } => {
-                    let id_set = job.make_task_id_set();
                     for task in tasks {
-                        if id_set.contains(&task.id) {
+                        if job.tasks.contains_key(&task.id) {
                             let id = task.id;
                             return ToClientMessage::SubmitResponse(
                                 SubmitResponse::TaskIdAlreadyExists(id),
@@ -186,11 +185,7 @@ pub(crate) async fn handle_submit(
 }
 
 /// Prefills placeholders in the submit request and creates job ID
-fn prepare_job(
-    job_id: JobId,
-    submit_desc: &mut JobSubmitDescription,
-    state: &mut State,
-) -> (JobId, TakoTaskId) {
+fn prepare_job(job_id: JobId, submit_desc: &mut JobSubmitDescription, state: &mut State) {
     // Prefill currently known placeholders eagerly
     if let JobTaskDescription::Array {
         ref mut task_desc, ..
@@ -217,9 +212,6 @@ fn prepare_job(
         );
         *path = normalize_path(path.as_path(), &submit_desc.submit_dir);
     }
-
-    let task_count = submit_desc.task_desc.task_count();
-    (job_id, state.new_task_id(task_count))
 }
 
 fn serialize_task_body(
@@ -246,28 +238,23 @@ fn serialize_task_body(
 
 fn build_tasks_array(
     job_id: JobId,
-    tako_base_id: TakoTaskId,
     ids: &IntArray,
     entries: Option<Vec<BString>>,
     task_desc: &TaskDescription,
     submit_dir: &PathBuf,
     stream_path: Option<&PathBuf>,
 ) -> NewTasksMessage {
-    let tako_base_id = tako_base_id.as_num();
-
-    let build_task_conf =
-        |body: Box<[u8]>, tako_id: <TakoTaskId as ItemId>::IdType| TaskConfiguration {
-            id: tako_id.into(),
-            shared_data_index: 0,
-            task_deps: Vec::new(),
-            body,
-        };
+    let build_task_conf = |body: Box<[u8]>, tako_id: TakoTaskId| TaskConfiguration {
+        id: tako_id,
+        shared_data_index: 0,
+        task_deps: Vec::new(),
+        body,
+    };
 
     let tasks = match entries {
         None => ids
             .iter()
-            .zip(tako_base_id..)
-            .map(|(task_id, tako_id)| {
+            .map(|task_id| {
                 build_task_conf(
                     serialize_task_body(
                         job_id,
@@ -277,15 +264,14 @@ fn build_tasks_array(
                         submit_dir,
                         stream_path,
                     ),
-                    tako_id,
+                    make_tako_id(job_id, task_id.into()),
                 )
             })
             .collect(),
         Some(entries) => ids
             .iter()
-            .zip(tako_base_id..)
             .zip(entries)
-            .map(|((task_id, tako_id), entry)| {
+            .map(|(task_id, entry)| {
                 build_task_conf(
                     serialize_task_body(
                         job_id,
@@ -295,7 +281,7 @@ fn build_tasks_array(
                         submit_dir,
                         stream_path,
                     ),
-                    tako_id,
+                    make_tako_id(job_id, task_id.into()),
                 )
             })
             .collect(),
@@ -316,24 +302,10 @@ fn build_tasks_array(
 
 fn build_tasks_graph(
     job_id: JobId,
-    tako_base_id: TakoTaskId,
     tasks: &[TaskWithDependencies],
     submit_dir: &PathBuf,
     stream_path: Option<&PathBuf>,
-) -> anyhow::Result<NewTasksMessage> {
-    let mut job_task_id_to_tako_id: Map<JobTaskId, TaskId> = Map::with_capacity(tasks.len());
-
-    let mut tako_id = tako_base_id.as_num();
-    for task in tasks {
-        if job_task_id_to_tako_id
-            .insert(task.id, tako_id.into())
-            .is_some()
-        {
-            return Err(anyhow::anyhow!("Duplicate task ID {}", task.id));
-        }
-        tako_id += 1;
-    }
-
+) -> NewTasksMessage {
     let mut shared_data = vec![];
     let mut shared_data_map =
         Map::<(Cow<ResourceRequestVariants>, Option<Duration>, Priority), usize>::new();
@@ -378,36 +350,24 @@ fn build_tasks_graph(
         );
         let shared_data_index = allocate_shared_data(&task.task_desc);
 
-        let mut task_deps = Vec::with_capacity(task.dependencies.len());
-        for dependency in &task.dependencies {
-            if *dependency == task.id {
-                return Err(anyhow::anyhow!("Task {} depends on itself", task.id));
-            }
-            match job_task_id_to_tako_id.get(dependency) {
-                Some(id) => task_deps.push(*id),
-                None => {
-                    return Err(anyhow::anyhow!(
-                        "Task {} depends on an unknown task with ID {}",
-                        task.id,
-                        dependency
-                    ));
-                }
-            }
-        }
-
+        let task_deps = task
+            .dependencies
+            .iter()
+            .map(|task_id| make_tako_id(job_id, *task_id))
+            .collect();
         task_configs.push(TaskConfiguration {
-            id: job_task_id_to_tako_id[&task.id],
+            id: make_tako_id(job_id, task.id),
             shared_data_index,
             task_deps,
             body,
         });
     }
 
-    Ok(NewTasksMessage {
+    NewTasksMessage {
         tasks: task_configs,
         shared_data,
         adjust_instance_id: Default::default(),
-    })
+    }
 }
 
 pub(crate) fn handle_open_job(
@@ -454,8 +414,7 @@ mod tests {
             task(4, desc_a(), vec![]),
         ];
 
-        let msg =
-            build_tasks_graph(1.into(), 2.into(), &tasks, &PathBuf::from("foo"), None).unwrap();
+        let msg = build_tasks_graph(1.into(), &tasks, &PathBuf::from("foo"), None).unwrap();
 
         check_shared_data(&msg, vec![desc_a(), desc_c(), desc_b()]);
         assert_eq!(
@@ -478,8 +437,7 @@ mod tests {
             task(4, desc(), vec![0]),
         ];
 
-        let msg =
-            build_tasks_graph(1.into(), 2.into(), &tasks, &PathBuf::from("foo"), None).unwrap();
+        let msg = build_tasks_graph(1.into(), &tasks, &PathBuf::from("foo"), None).unwrap();
         assert_eq!(msg.tasks[0].task_deps, vec![4.into(), 3.into()]);
         assert_eq!(msg.tasks[1].task_deps, vec![2.into()]);
         assert_eq!(msg.tasks[2].task_deps, vec![5.into(), 6.into()]);
@@ -496,9 +454,7 @@ mod tests {
             task(0, desc(), vec![]),
         ];
 
-        assert!(
-            build_tasks_graph(1.into(), 2.into(), &tasks, &PathBuf::from("foo"), None).is_err()
-        );
+        assert!(build_tasks_graph(1.into(), &tasks, &PathBuf::from("foo"), None).is_err());
     }
 
     #[test]
@@ -506,9 +462,7 @@ mod tests {
         let desc = || task_desc(None, 0, 1);
         let tasks = vec![task(0, desc(), vec![]), task(1, desc(), vec![1])];
 
-        assert!(
-            build_tasks_graph(1.into(), 2.into(), &tasks, &PathBuf::from("foo"), None).is_err()
-        );
+        assert!(build_tasks_graph(1.into(), &tasks, &PathBuf::from("foo"), None).is_err());
     }
 
     #[test]
@@ -516,9 +470,7 @@ mod tests {
         let desc = || task_desc(None, 0, 1);
         let tasks = vec![task(0, desc(), vec![3]), task(1, desc(), vec![])];
 
-        assert!(
-            build_tasks_graph(1.into(), 2.into(), &tasks, &PathBuf::from("foo"), None).is_err()
-        );
+        assert!(build_tasks_graph(1.into(), &tasks, &PathBuf::from("foo"), None).is_err());
     }
 
     fn check_shared_data(msg: &NewTasksMessage, expected: Vec<TaskDescription>) {
diff --git a/crates/hyperqueue/src/server/job.rs b/crates/hyperqueue/src/server/job.rs
index 6f11b8db3..a54fd6958 100644
--- a/crates/hyperqueue/src/server/job.rs
+++ b/crates/hyperqueue/src/server/job.rs
@@ -7,7 +7,7 @@ use crate::transfer::messages::{
     TaskStatusSelector,
 };
 use crate::worker::start::RunningTaskContext;
-use crate::{JobId, JobTaskCount, JobTaskId, Map, TakoTaskId, WorkerId};
+use crate::{make_tako_id, JobId, JobTaskCount, JobTaskId, Map, TakoTaskId, WorkerId};
 use chrono::{DateTime, Utc};
 use smallvec::SmallVec;
 use std::sync::Arc;
@@ -65,7 +65,6 @@ impl JobTaskState {
 #[derive(Serialize, Deserialize, Debug, Clone)]
 pub struct JobTaskInfo {
     pub state: JobTaskState,
-    pub task_id: JobTaskId,
 }
 
 #[derive(Serialize, Deserialize, Debug, Copy, Clone, Default)]
@@ -115,10 +114,10 @@ pub struct JobCompletionCallback {
 pub struct Job {
     pub job_id: JobId,
     pub counters: JobTaskCounters,
-    pub tasks: Map<TakoTaskId, JobTaskInfo>,
+    pub tasks: Map<JobTaskId, JobTaskInfo>,
 
     pub job_desc: JobDescription,
-    pub submit_descs: SmallVec<[(Arc<JobSubmitDescription>, TakoTaskId); 1]>,
+    pub submit_descs: SmallVec<[Arc<JobSubmitDescription>; 1]>,
 
     // If true, new tasks may be submitted into this job
     // If true and all tasks in the job are terminated then the job
@@ -157,42 +156,43 @@ impl Job {
         self.is_open = false;
     }
 
-    pub fn make_task_id_set(&self) -> Set<JobTaskId> {
-        self.tasks.values().map(|t| t.task_id).collect()
-    }
-
     pub fn max_id(&self) -> Option<JobTaskId> {
-        self.tasks.values().map(|t| t.task_id).max()
+        self.tasks.keys().max().copied()
     }
 
     pub fn make_job_detail(&self, task_selector: Option<&TaskSelector>) -> JobDetail {
-        let mut tasks: Vec<JobTaskInfo> = Vec::new();
-        let mut tasks_not_found: Vec<JobTaskId> = vec![];
-
-        if let Some(selector) = task_selector {
-            filter_tasks(self.tasks.values(), selector, &mut tasks);
-
-            if let TaskIdSelector::Specific(requested_ids) = &selector.id_selector {
-                let task_ids: Set<_> = self
-                    .tasks
-                    .values()
-                    .map(|task| task.task_id.as_num())
-                    .collect();
-                tasks_not_found.extend(
-                    requested_ids
+        let (mut tasks, tasks_not_found) = if let Some(selector) = task_selector {
+            match &selector.id_selector {
+                TaskIdSelector::All => (
+                    self.tasks
                         .iter()
-                        .filter(|id| !task_ids.contains(id))
-                        .map(JobTaskId::new),
-                );
+                        .map(|(task_id, info)| (task_id.clone(), info.clone()))
+                        .collect(),
+                    Vec::new(),
+                ),
+                TaskIdSelector::Specific(ids) => {
+                    let mut not_found = Vec::new();
+                    let mut tasks = Vec::with_capacity(ids.id_count() as usize);
+                    for task_id in ids.iter() {
+                        if let Some(task) = self.tasks.get(&JobTaskId::new(task_id)) {
+                            tasks.push((JobTaskId::new(task_id), task.clone()));
+                        } else {
+                            not_found.push(JobTaskId::new(task_id));
+                        }
+                    }
+                    (tasks, not_found)
+                }
             }
-        }
+        } else {
+            (Vec::new(), Vec::new())
+        };
 
-        tasks.sort_unstable_by_key(|task| task.task_id);
+        tasks.sort_unstable_by_key(|(task_id, _)| *task_id);
 
         JobDetail {
             info: self.make_job_info(),
             job_desc: self.job_desc.clone(),
-            submit_descs: self.submit_descs.iter().map(|x| x.0.clone()).collect(),
+            submit_descs: self.submit_descs.iter().map(|x| x.clone()).collect(),
             tasks,
             tasks_not_found,
             submission_date: self.submission_date,
@@ -223,46 +223,38 @@ impl Job {
         !self.is_open && self.has_no_active_tasks()
     }
 
-    pub fn get_task_state_mut(
-        &mut self,
-        tako_task_id: TakoTaskId,
-    ) -> (JobTaskId, &mut JobTaskState) {
-        let state = self.tasks.get_mut(&tako_task_id).unwrap();
-        (state.task_id, &mut state.state)
-    }
-
-    pub fn iter_task_states(
-        &self,
-    ) -> impl Iterator<Item = (TakoTaskId, JobTaskId, &JobTaskState)> + '_ {
-        self.tasks.iter().map(|(k, v)| (*k, v.task_id, &v.state))
+    pub fn iter_task_states(&self) -> impl Iterator<Item = (JobTaskId, &JobTaskState)> + '_ {
+        self.tasks
+            .iter()
+            .map(|(task_id, task_info)| (*task_id, &task_info.state))
     }
 
     pub fn non_finished_task_ids(&self) -> Vec<TakoTaskId> {
-        let mut result = Vec::new();
-        for (tako_id, _task_id, state) in self.iter_task_states() {
-            match state {
-                JobTaskState::Waiting | JobTaskState::Running { .. } => result.push(tako_id),
+        self.iter_task_states()
+            .filter_map(|(task_id, state)| match state {
+                JobTaskState::Waiting | JobTaskState::Running { .. } => {
+                    Some(make_tako_id(self.job_id, task_id))
+                }
                 JobTaskState::Finished { .. }
                 | JobTaskState::Failed { .. }
-                | JobTaskState::Canceled { .. } => { /* Do nothing */ }
-            }
-        }
-        result
+                | JobTaskState::Canceled { .. } => None,
+            })
+            .collect()
     }
 
     pub fn set_running_state(
         &mut self,
-        tako_task_id: TakoTaskId,
+        task_id: JobTaskId,
         workers: SmallVec<[WorkerId; 1]>,
         context: SerializedTaskContext,
-    ) -> JobTaskId {
-        let (task_id, state) = self.get_task_state_mut(tako_task_id);
+    ) {
+        let task = self.tasks.get_mut(&task_id).unwrap();
 
         let context: RunningTaskContext =
             deserialize(&context).expect("Could not deserialize task context");
 
-        if matches!(state, JobTaskState::Waiting) {
-            *state = JobTaskState::Running {
+        if matches!(task.state, JobTaskState::Waiting) {
+            task.state = JobTaskState::Running {
                 started_data: StartedTaskData {
                     start_date: Utc::now(),
                     context,
@@ -271,8 +263,6 @@ impl Job {
             };
             self.counters.n_running_tasks += 1;
         }
-
-        task_id
     }
 
     pub fn check_termination(&mut self, senders: &Senders, now: DateTime<Utc>) {
@@ -302,45 +292,48 @@ impl Job {
 
     pub fn set_finished_state(
         &mut self,
-        tako_task_id: TakoTaskId,
+        task_id: JobTaskId,
         now: DateTime<Utc>,
         senders: &Senders,
     ) -> JobTaskId {
-        let (task_id, state) = self.get_task_state_mut(tako_task_id);
-        match state {
+        let task = self.tasks.get_mut(&task_id).unwrap();
+        match &task.state {
             JobTaskState::Running { started_data } => {
-                *state = JobTaskState::Finished {
+                task.state = JobTaskState::Finished {
                     started_data: started_data.clone(),
                     end_date: now,
                 };
                 self.counters.n_running_tasks -= 1;
                 self.counters.n_finished_tasks += 1;
             }
-            _ => panic!("Invalid worker state, expected Running, got {state:?}"),
+            _ => panic!(
+                "Invalid worker state, expected Running, got {:?}",
+                task.state
+            ),
         }
         senders.events.on_task_finished(self.job_id, task_id);
         self.check_termination(senders, now);
         task_id
     }
 
-    pub fn set_waiting_state(&mut self, tako_task_id: TakoTaskId) {
-        let (_, state) = self.get_task_state_mut(tako_task_id);
-        assert!(matches!(state, JobTaskState::Running { .. }));
-        *state = JobTaskState::Waiting;
+    pub fn set_waiting_state(&mut self, task_id: JobTaskId) {
+        let task = self.tasks.get_mut(&task_id).unwrap();
+        assert!(matches!(task.state, JobTaskState::Running { .. }));
+        task.state = JobTaskState::Waiting;
         self.counters.n_running_tasks -= 1;
     }
 
     pub fn set_failed_state(
         &mut self,
-        tako_task_id: TakoTaskId,
+        task_id: JobTaskId,
         error: String,
         senders: &Senders,
     ) -> JobTaskId {
-        let (task_id, state) = self.get_task_state_mut(tako_task_id);
+        let task = self.tasks.get_mut(&task_id).unwrap();
         let now = Utc::now();
-        match state {
+        match &task.state {
             JobTaskState::Running { started_data } => {
-                *state = JobTaskState::Failed {
+                task.state = JobTaskState::Failed {
                     error: error.clone(),
                     started_data: Some(started_data.clone()),
                     end_date: now,
@@ -349,13 +342,16 @@ impl Job {
                 self.counters.n_running_tasks -= 1;
             }
             JobTaskState::Waiting => {
-                *state = JobTaskState::Failed {
+                task.state = JobTaskState::Failed {
                     error: error.clone(),
                     started_data: None,
                     end_date: now,
                 }
             }
-            _ => panic!("Invalid task {task_id} state, expected Running or Waiting, got {state:?}"),
+            _ => panic!(
+                "Invalid task {task_id} state, expected Running or Waiting, got {:?}",
+                task.state
+            ),
         }
         self.counters.n_failed_tasks += 1;
 
@@ -364,20 +360,19 @@ impl Job {
         task_id
     }
 
-    pub fn set_cancel_state(&mut self, tako_task_id: TakoTaskId, senders: &Senders) -> JobTaskId {
+    pub fn set_cancel_state(&mut self, task_id: JobTaskId, senders: &Senders) -> JobTaskId {
+        let task = self.tasks.get_mut(&task_id).unwrap();
         let now = Utc::now();
-
-        let (task_id, state) = self.get_task_state_mut(tako_task_id);
-        match state {
+        match &task.state {
             JobTaskState::Running { started_data, .. } => {
-                *state = JobTaskState::Canceled {
+                task.state = JobTaskState::Canceled {
                     started_data: Some(started_data.clone()),
                     cancelled_date: now,
                 };
                 self.counters.n_running_tasks -= 1;
             }
             JobTaskState::Waiting => {
-                *state = JobTaskState::Canceled {
+                task.state = JobTaskState::Canceled {
                     started_data: None,
                     cancelled_date: now,
                 };
@@ -403,32 +398,3 @@ impl Job {
         rx
     }
 }
-
-/// Applies ID and status filters from `selector` to filter `tasks`.
-fn filter_tasks<'a, T: Iterator<Item = &'a JobTaskInfo>>(
-    tasks: T,
-    selector: &TaskSelector,
-    result: &mut Vec<JobTaskInfo>,
-) {
-    for task in tasks {
-        match &selector.id_selector {
-            TaskIdSelector::Specific(ids) => {
-                if !ids.contains(task.task_id.as_num()) {
-                    continue;
-                }
-            }
-            TaskIdSelector::All => {}
-        }
-
-        match &selector.status_selector {
-            TaskStatusSelector::Specific(states) => {
-                if !states.contains(&get_task_status(&task.state)) {
-                    continue;
-                }
-            }
-            TaskStatusSelector::All => {}
-        }
-
-        result.push(task.clone());
-    }
-}
diff --git a/crates/hyperqueue/src/server/restore.rs b/crates/hyperqueue/src/server/restore.rs
index 4325640e0..79ffaed93 100644
--- a/crates/hyperqueue/src/server/restore.rs
+++ b/crates/hyperqueue/src/server/restore.rs
@@ -8,7 +8,7 @@ use crate::transfer::messages::{
     AllocationQueueParams, JobDescription, JobSubmitDescription, SubmitRequest,
 };
 use crate::worker::start::RunningTaskContext;
-use crate::{JobId, JobTaskId, Map};
+use crate::{make_tako_id, unwrap_tako_id, JobId, JobTaskId, Map};
 use std::path::Path;
 use tako::gateway::NewTasksMessage;
 use tako::{ItemId, WorkerId};
@@ -55,25 +55,22 @@ impl RestorerJob {
             let job = state.get_job_mut(job_id).unwrap();
 
             new_tasks.tasks.retain(|t| {
+                let (_, task_id) = unwrap_tako_id(t.id);
                 self.tasks
-                    .get(&job.get_task_state_mut(t.id).0)
+                    .get(&task_id)
                     .map(|tt| !tt.is_completed())
                     .unwrap_or(true)
             });
 
-            if new_tasks.tasks.is_empty() {
-                continue;
-            }
-
-            for (tako_id, job_task) in job.tasks.iter_mut() {
-                if let Some(task) = self.tasks.get_mut(&job_task.task_id) {
+            for (task_id, job_task) in job.tasks.iter_mut() {
+                if let Some(task) = self.tasks.get_mut(task_id) {
                     match &task.state {
                         JobTaskState::Waiting => continue,
                         JobTaskState::Running { started_data } => {
                             let instance_id = started_data.context.instance_id.as_num() + 1;
                             new_tasks
                                 .adjust_instance_id
-                                .insert(*tako_id, instance_id.into());
+                                .insert(make_tako_id(job_id, *task_id), instance_id.into());
                             continue;
                         }
                         JobTaskState::Finished { .. } => job.counters.n_finished_tasks += 1,
@@ -83,7 +80,9 @@ impl RestorerJob {
                     job_task.state = task.state.clone();
                 }
             }
-            result.push(new_tasks)
+            if !new_tasks.tasks.is_empty() {
+                result.push(new_tasks);
+            }
         }
         Ok(result)
     }
diff --git a/crates/hyperqueue/src/server/state.rs b/crates/hyperqueue/src/server/state.rs
index 79cf1bd79..127eb1e3f 100644
--- a/crates/hyperqueue/src/server/state.rs
+++ b/crates/hyperqueue/src/server/state.rs
@@ -16,30 +16,13 @@ use crate::server::restore::StateRestorer;
 use crate::server::worker::Worker;
 use crate::server::Senders;
 use crate::transfer::messages::{JobSubmitDescription, JobTaskDescription, ServerInfo};
-use crate::WrappedRcRefCell;
+use crate::{make_tako_id, unwrap_tako_id, JobTaskId, WrappedRcRefCell};
 use crate::{JobId, JobTaskCount, Map, TakoTaskId, WorkerId};
 
 pub struct State {
     jobs: Map<JobId, Job>,
     workers: Map<WorkerId, Worker>,
-
-    // Here we store TaskId -> JobId data, but to make it sparse
-    // we store ONLY the base_task_id there, i.e. each job has here
-    // only one entry.
-    // Example:
-    // Real mapping: TaskId   JobId
-    //                 1   ->    1
-    //                 2   ->    1
-    //                 3   ->    2
-    //                 4   ->    2
-    //                 5   ->    2
-    // The actual base_task_id_to_job will be 1 -> 1, 3 -> 2
-    // Therefore we need to find biggest key that is lower then a given task id
-    // To make this query efficient, we use BTreeMap and not Map
-    base_task_id_to_job_id: BTreeMap<TakoTaskId, JobId>,
     job_id_counter: <JobId as ItemId>::IdType,
-    task_id_counter: <TakoTaskId as ItemId>::IdType,
-
     server_info: ServerInfo,
 }
 
@@ -68,7 +51,9 @@ fn cancel_tasks_from_callback(
                     log::debug!("Tasks {:?} canceled", msg.cancelled_tasks);
                     log::debug!("Tasks {:?} already finished", msg.already_finished);
                     for tako_id in msg.cancelled_tasks {
-                        job.set_cancel_state(tako_id, &senders2);
+                        let (j_id, task_id) = unwrap_tako_id(tako_id);
+                        assert_eq!(j_id, job.job_id);
+                        job.set_cancel_state(task_id, &senders2);
                     }
                 }
             }
@@ -114,23 +99,17 @@ impl State {
         assert!(self.jobs.insert(job_id, job).is_none());
     }
 
-    pub fn attach_submit(
-        &mut self,
-        job_id: JobId,
-        submit_desc: Arc<JobSubmitDescription>,
-        base_task_id: TakoTaskId,
-    ) {
-        let base = base_task_id.as_num();
+    pub fn attach_submit(&mut self, job_id: JobId, submit_desc: Arc<JobSubmitDescription>) {
         let job = self.get_job_mut(job_id).unwrap();
         match &submit_desc.task_desc {
             JobTaskDescription::Array { ids, .. } => {
                 job.tasks.reserve(ids.id_count() as usize);
                 ids.iter().enumerate().for_each(|(i, task_id)| {
+                    let task_id = JobTaskId::new(task_id);
                     job.tasks.insert(
-                        TakoTaskId::new(base + i as <TaskId as ItemId>::IdType),
+                        task_id,
                         JobTaskInfo {
                             state: JobTaskState::Waiting,
-                            task_id: task_id.into(),
                         },
                     );
                 })
@@ -139,20 +118,15 @@ impl State {
                 job.tasks.reserve(tasks.len());
                 tasks.iter().enumerate().for_each(|(i, task)| {
                     job.tasks.insert(
-                        TakoTaskId::new(base + i as <TaskId as ItemId>::IdType),
+                        task.id,
                         JobTaskInfo {
                             state: JobTaskState::Waiting,
-                            task_id: task.id,
                         },
                     );
                 })
             }
         };
-        job.submit_descs.push((submit_desc, base_task_id));
-        assert!(self
-            .base_task_id_to_job_id
-            .insert(base_task_id, job_id)
-            .is_none());
+        job.submit_descs.push(submit_desc);
     }
 
     /// Completely forgets this job, in order to reduce memory usage.
@@ -167,15 +141,6 @@ impl State {
                 return None;
             }
         };
-        for (_, base_task_id) in &job.submit_descs {
-            self.base_task_id_to_job_id.remove(base_task_id);
-        }
-        Some(job)
-    }
-
-    pub fn get_job_mut_by_tako_task_id(&mut self, task_id: TakoTaskId) -> Option<&mut Job> {
-        let job_id: JobId = *self.base_task_id_to_job_id.range(..=task_id).next_back()?.1;
-        let job = self.jobs.get_mut(&job_id)?;
         Some(job)
     }
 
@@ -194,12 +159,6 @@ impl State {
         ((self.job_id_counter - n)..self.job_id_counter).map(|id| id.into())
     }
 
-    pub fn new_task_id(&mut self, task_count: JobTaskCount) -> TakoTaskId {
-        let id = self.task_id_counter;
-        self.task_id_counter += task_count as u64;
-        id.into()
-    }
-
     pub fn get_workers(&self) -> &Map<WorkerId, Worker> {
         &self.workers
     }
@@ -220,15 +179,18 @@ impl State {
     ) {
         log::debug!("Task id={} failed: {:?}", msg.id, msg.info);
 
-        let job = self.get_job_mut_by_tako_task_id(msg.id).unwrap();
-        for task_id in msg.cancelled_tasks {
+        let (job_id, task_id) = unwrap_tako_id(msg.id);
+        let job = self.get_job_mut(job_id).unwrap();
+        for tako_id in msg.cancelled_tasks {
             log::debug!(
                 "Task id={} canceled because of task dependency fails",
-                task_id
+                tako_id
             );
+            let (j_id, task_id) = unwrap_tako_id(tako_id);
+            assert_eq!(job_id, j_id);
             job.set_cancel_state(task_id, senders);
         }
-        job.set_failed_state(msg.id, msg.info.message, senders);
+        job.set_failed_state(task_id, msg.info.message, senders);
 
         if let Some(max_fails) = &job.job_desc.max_fails {
             if job.counters.n_failed_tasks > *max_fails {
@@ -246,22 +208,23 @@ impl State {
                 worker_ids,
                 context,
             } => {
-                let job = self.get_job_mut_by_tako_task_id(msg.id).unwrap();
-                let task_id = job.set_running_state(msg.id, worker_ids.clone(), context);
-
-                let job_id = job.job_id;
+                let (job_id, task_id) = unwrap_tako_id(msg.id);
+                let job = self.get_job_mut(job_id).unwrap();
+                job.set_running_state(task_id, worker_ids.clone(), context);
                 senders
                     .events
                     .on_task_started(job_id, task_id, instance_id, worker_ids.clone());
             }
             TaskState::Finished => {
                 let now = Utc::now();
-                let job = self.get_job_mut_by_tako_task_id(msg.id).unwrap();
-                job.set_finished_state(msg.id, now, senders);
+                let (job_id, task_id) = unwrap_tako_id(msg.id);
+                let job = self.get_job_mut(job_id).unwrap();
+                job.set_finished_state(task_id, now, senders);
             }
             TaskState::Waiting => {
-                let job = self.get_job_mut_by_tako_task_id(msg.id).unwrap();
-                job.set_waiting_state(msg.id);
+                let (job_id, task_id) = unwrap_tako_id(msg.id);
+                let job = self.get_job_mut(job_id).unwrap();
+                job.set_waiting_state(task_id);
             }
             TaskState::Invalid => {
                 unreachable!()
@@ -288,8 +251,9 @@ impl State {
         msg: LostWorkerMessage,
     ) {
         log::debug!("Worker lost id={}", msg.worker_id);
-        for task_id in msg.running_tasks {
-            let job = self.get_job_mut_by_tako_task_id(task_id).unwrap();
+        for tako_id in msg.running_tasks {
+            let (job_id, task_id) = unwrap_tako_id(tako_id);
+            let job = self.get_job_mut(job_id).unwrap();
             job.set_waiting_state(task_id);
         }
 
@@ -317,9 +281,7 @@ impl StateRef {
         Self(WrappedRcRefCell::wrap(State {
             jobs: Default::default(),
             workers: Default::default(),
-            base_task_id_to_job_id: Default::default(),
             job_id_counter: 1,
-            task_id_counter: 1,
             server_info,
         }))
     }
@@ -339,7 +301,7 @@ mod tests {
         JobDescription, JobSubmitDescription, JobTaskDescription, PinMode, TaskDescription,
         TaskKind, TaskKindProgram,
     };
-    use crate::{JobId, TakoTaskId};
+    use crate::{unwrap_tako_id, JobId, TakoTaskId};
 
     fn dummy_program_definition() -> ProgramDefinition {
         ProgramDefinition {
@@ -390,43 +352,6 @@ mod tests {
                 submit_dir: Default::default(),
                 stream_path: None,
             }),
-            base_task_id.into(),
         )
     }
-
-    fn check_id<T: Into<TakoTaskId>>(state: &mut State, task_id: T, expected: Option<u32>) {
-        assert_eq!(
-            state
-                .get_job_mut_by_tako_task_id(task_id.into())
-                .map(|j| j.job_id.as_num()),
-            expected
-        );
-    }
-
-    #[test]
-    fn test_find_job_id_by_task_id() {
-        let state_ref = create_hq_state();
-        let mut state = state_ref.get_mut();
-        add_test_job(&mut state, IntArray::from_range(0, 10), 223, 100);
-        add_test_job(&mut state, IntArray::from_range(0, 15), 224, 110);
-        add_test_job(&mut state, IntArray::from_id(0), 225, 125);
-        add_test_job(&mut state, IntArray::from_id(0), 226, 126);
-        add_test_job(&mut state, IntArray::from_id(0), 227, 130);
-
-        let state = &mut state;
-        check_id(state, 99, None);
-
-        check_id(state, 100, Some(223));
-        check_id(state, 101, Some(223));
-        check_id(state, 109, Some(223));
-
-        check_id(state, 110, Some(224));
-        check_id(state, 124, Some(224));
-
-        check_id(state, 125, Some(225));
-
-        check_id(state, 126, Some(226));
-
-        check_id(state, 130, Some(227));
-    }
 }
diff --git a/crates/hyperqueue/src/transfer/messages.rs b/crates/hyperqueue/src/transfer/messages.rs
index 3821e38cb..95356b522 100644
--- a/crates/hyperqueue/src/transfer/messages.rs
+++ b/crates/hyperqueue/src/transfer/messages.rs
@@ -421,7 +421,7 @@ pub struct JobDetail {
     pub info: JobInfo,
     pub job_desc: JobDescription,
     pub submit_descs: Vec<Arc<JobSubmitDescription>>,
-    pub tasks: Vec<JobTaskInfo>,
+    pub tasks: Vec<(JobTaskId, JobTaskInfo)>,
     pub tasks_not_found: Vec<JobTaskId>,
 
     // Date when job was submitted

From dced5fce385e98af7b2d4be9e6a233fffa5a9c9c Mon Sep 17 00:00:00 2001
From: Ada Bohm <ada@kreatrix.org>
Date: Wed, 23 Oct 2024 23:20:44 +0200
Subject: [PATCH 3/9] Better task validation

---
 .../src/client/commands/submit/command.rs     |   6 +
 crates/hyperqueue/src/server/client/mod.rs    |   2 +-
 crates/hyperqueue/src/server/client/submit.rs | 129 +++++++++---------
 crates/hyperqueue/src/server/restore.rs       |  10 +-
 crates/hyperqueue/src/server/state.rs         |  30 ++--
 crates/hyperqueue/src/transfer/messages.rs    |   2 +
 6 files changed, 98 insertions(+), 81 deletions(-)

diff --git a/crates/hyperqueue/src/client/commands/submit/command.rs b/crates/hyperqueue/src/client/commands/submit/command.rs
index 838e9265c..d2deca309 100644
--- a/crates/hyperqueue/src/client/commands/submit/command.rs
+++ b/crates/hyperqueue/src/client/commands/submit/command.rs
@@ -741,6 +741,12 @@ pub(crate) async fn send_submit_request(
         SubmitResponse::TaskIdAlreadyExists(task_id) => {
             bail!("Task {task_id} already exists in job {job_id}.")
         }
+        SubmitResponse::NonUniqueTaskId(task_id) => {
+            bail!("Task {task_id} is defined more than once.")
+        }
+        SubmitResponse::InvalidDependencies(task_id) => {
+            bail!("Invalid dependancy on {task_id}")
+        }
     }
     Ok(())
 }
diff --git a/crates/hyperqueue/src/server/client/mod.rs b/crates/hyperqueue/src/server/client/mod.rs
index dc0bcfbb3..0d5407aa5 100644
--- a/crates/hyperqueue/src/server/client/mod.rs
+++ b/crates/hyperqueue/src/server/client/mod.rs
@@ -30,7 +30,7 @@ mod submit;
 use crate::common::error::HqError;
 use crate::server::client::submit::handle_open_job;
 use crate::server::Senders;
-pub(crate) use submit::submit_job_desc;
+pub(crate) use submit::{submit_job_desc, validate_submit};
 
 pub async fn handle_client_connections(
     state_ref: StateRef,
diff --git a/crates/hyperqueue/src/server/client/submit.rs b/crates/hyperqueue/src/server/client/submit.rs
index a840126db..17156dadd 100644
--- a/crates/hyperqueue/src/server/client/submit.rs
+++ b/crates/hyperqueue/src/server/client/submit.rs
@@ -4,8 +4,8 @@ use std::sync::Arc;
 use std::time::Duration;
 
 use bstr::BString;
-use tako::ItemId;
 use tako::Map;
+use tako::{ItemId, Set};
 
 use tako::gateway::{
     FromGatewayMessage, NewTasksMessage, ResourceRequestVariants, SharedTaskConfiguration,
@@ -14,6 +14,7 @@ use tako::gateway::{
 use tako::TaskId;
 
 use crate::common::arraydef::IntArray;
+use crate::common::error::HqError;
 use crate::common::placeholders::{
     fill_placeholders_after_submit, fill_placeholders_log, normalize_path,
 };
@@ -53,20 +54,63 @@ fn create_new_task_message(
     }
 }
 
-fn validate_submit(state: &State, submit_decs: &JobSubmitDescription) -> crate::Result<()> {
-    Ok(())
-}
-
 pub(crate) fn submit_job_desc(
     state: &mut State,
     job_id: JobId,
     mut submit_desc: JobSubmitDescription,
-) -> crate::Result<NewTasksMessage> {
+) -> NewTasksMessage {
     prepare_job(job_id, &mut submit_desc, state);
     let new_tasks = create_new_task_message(job_id, &mut submit_desc);
     submit_desc.strip_large_data();
     state.attach_submit(job_id, Arc::new(submit_desc));
-    Ok(new_tasks)
+    new_tasks
+}
+
+pub(crate) fn validate_submit(
+    job: Option<&Job>,
+    task_desc: &JobTaskDescription,
+) -> Option<SubmitResponse> {
+    match &task_desc {
+        JobTaskDescription::Array { ids, .. } => {
+            if let Some(job) = job {
+                for id in ids.iter() {
+                    let id = JobTaskId::new(id);
+                    if job.tasks.contains_key(&id) {
+                        return Some(SubmitResponse::TaskIdAlreadyExists(id));
+                    }
+                }
+            }
+        }
+        JobTaskDescription::Graph { tasks } => {
+            if let Some(job) = job {
+                for task in tasks {
+                    if job.tasks.contains_key(&task.id) {
+                        let id = task.id;
+                        return Some(SubmitResponse::TaskIdAlreadyExists(id));
+                    }
+                }
+            }
+            let mut task_ids = Set::new();
+            for task in tasks {
+                if !task_ids.insert(task.id) {
+                    return Some(SubmitResponse::NonUniqueTaskId(task.id));
+                }
+            }
+            for task in tasks {
+                for dep_id in &task.dependencies {
+                    if *dep_id == task.id
+                        || (!task_ids.contains(dep_id)
+                            && !job
+                                .map(|job| job.tasks.contains_key(dep_id))
+                                .unwrap_or(false))
+                    {
+                        return Some(SubmitResponse::InvalidDependencies(*dep_id));
+                    }
+                }
+            }
+        }
+    }
+    None
 }
 
 #[allow(clippy::await_holding_refcell_ref)] // Disable lint as it does not work well with drop
@@ -76,6 +120,13 @@ pub(crate) async fn handle_submit(
     mut message: SubmitRequest,
 ) -> ToClientMessage {
     let mut state = state_ref.get_mut();
+    if let Some(err) = validate_submit(
+        message.job_id.and_then(|job_id| state.get_job(job_id)),
+        &message.submit_desc.task_desc,
+    ) {
+        return ToClientMessage::SubmitResponse(err);
+    }
+
     let (job_id, new_job) = if let Some(job_id) = message.job_id {
         if let Some(job) = state.get_job(job_id) {
             if !job.is_open() {
@@ -91,27 +142,9 @@ pub(crate) async fn handle_submit(
                         } else {
                             *ids = IntArray::from_id(new_id)
                         }
-                    } else {
-                        for id in ids.iter() {
-                            let id = JobTaskId::new(id);
-                            if job.tasks.contains_key(&id) {
-                                return ToClientMessage::SubmitResponse(
-                                    SubmitResponse::TaskIdAlreadyExists(id),
-                                );
-                            }
-                        }
-                    }
-                }
-                JobTaskDescription::Graph { tasks } => {
-                    for task in tasks {
-                        if job.tasks.contains_key(&task.id) {
-                            let id = task.id;
-                            return ToClientMessage::SubmitResponse(
-                                SubmitResponse::TaskIdAlreadyExists(id),
-                            );
-                        }
                     }
                 }
+                JobTaskDescription::Graph { .. } => {}
             }
         } else {
             return ToClientMessage::SubmitResponse(SubmitResponse::JobNotFound);
@@ -147,16 +180,7 @@ pub(crate) async fn handle_submit(
         state.add_job(job);
     }
 
-    let new_tasks = match submit_job_desc(&mut state, job_id, submit_desc) {
-        Err(error) => {
-            if new_job {
-                state.forget_job(job_id);
-                state.revert_to_job_id(job_id);
-            }
-            return ToClientMessage::Error(error.to_string());
-        }
-        Ok(new_tasks) => new_tasks,
-    };
+    let new_tasks = submit_job_desc(&mut state, job_id, submit_desc);
     senders.autoalloc.on_job_created(job_id);
 
     let job_detail = state
@@ -387,6 +411,7 @@ pub(crate) fn handle_open_job(
 #[cfg(test)]
 mod tests {
     use crate::server::client::submit::build_tasks_graph;
+    use crate::server::client::validate_submit;
     use crate::transfer::messages::{
         PinMode, TaskDescription, TaskKind, TaskKindProgram, TaskWithDependencies,
     };
@@ -414,7 +439,7 @@ mod tests {
             task(4, desc_a(), vec![]),
         ];
 
-        let msg = build_tasks_graph(1.into(), &tasks, &PathBuf::from("foo"), None).unwrap();
+        let msg = build_tasks_graph(1.into(), &tasks, &PathBuf::from("foo"), None);
 
         check_shared_data(&msg, vec![desc_a(), desc_c(), desc_b()]);
         assert_eq!(
@@ -437,7 +462,7 @@ mod tests {
             task(4, desc(), vec![0]),
         ];
 
-        let msg = build_tasks_graph(1.into(), &tasks, &PathBuf::from("foo"), None).unwrap();
+        let msg = build_tasks_graph(1.into(), &tasks, &PathBuf::from("foo"), None);
         assert_eq!(msg.tasks[0].task_deps, vec![4.into(), 3.into()]);
         assert_eq!(msg.tasks[1].task_deps, vec![2.into()]);
         assert_eq!(msg.tasks[2].task_deps, vec![5.into(), 6.into()]);
@@ -445,34 +470,6 @@ mod tests {
         assert_eq!(msg.tasks[4].task_deps, vec![2.into()]);
     }
 
-    #[test]
-    fn test_build_graph_duplicate_id() {
-        let desc = || task_desc(None, 0, 1);
-        let tasks = vec![
-            task(0, desc(), vec![]),
-            task(1, desc(), vec![]),
-            task(0, desc(), vec![]),
-        ];
-
-        assert!(build_tasks_graph(1.into(), &tasks, &PathBuf::from("foo"), None).is_err());
-    }
-
-    #[test]
-    fn test_build_graph_task_depends_on_itself() {
-        let desc = || task_desc(None, 0, 1);
-        let tasks = vec![task(0, desc(), vec![]), task(1, desc(), vec![1])];
-
-        assert!(build_tasks_graph(1.into(), &tasks, &PathBuf::from("foo"), None).is_err());
-    }
-
-    #[test]
-    fn test_build_graph_task_missing_dependency() {
-        let desc = || task_desc(None, 0, 1);
-        let tasks = vec![task(0, desc(), vec![3]), task(1, desc(), vec![])];
-
-        assert!(build_tasks_graph(1.into(), &tasks, &PathBuf::from("foo"), None).is_err());
-    }
-
     fn check_shared_data(msg: &NewTasksMessage, expected: Vec<TaskDescription>) {
         assert_eq!(msg.shared_data.len(), expected.len());
         for (shared, expected) in msg.shared_data.iter().zip(expected) {
diff --git a/crates/hyperqueue/src/server/restore.rs b/crates/hyperqueue/src/server/restore.rs
index 79ffaed93..48f8fe336 100644
--- a/crates/hyperqueue/src/server/restore.rs
+++ b/crates/hyperqueue/src/server/restore.rs
@@ -1,5 +1,6 @@
+use crate::common::error::HqError;
 use crate::server::autoalloc::QueueId;
-use crate::server::client::submit_job_desc;
+use crate::server::client::{submit_job_desc, validate_submit};
 use crate::server::event::log::JournalReader;
 use crate::server::event::payload::EventPayload;
 use crate::server::job::{Job, JobTaskState, StartedTaskData};
@@ -51,7 +52,12 @@ impl RestorerJob {
         state.add_job(job);
         let mut result: Vec<NewTasksMessage> = Vec::new();
         for submit_desc in self.submit_descs {
-            let mut new_tasks = submit_job_desc(state, job_id, submit_desc)?;
+            if let Some(e) = validate_submit(state.get_job(job_id), &submit_desc.task_desc) {
+                return Err(HqError::GenericError(format!(
+                    "Job validation failed {e:?}"
+                )));
+            }
+            let mut new_tasks = submit_job_desc(state, job_id, submit_desc);
             let job = state.get_job_mut(job_id).unwrap();
 
             new_tasks.tasks.retain(|t| {
diff --git a/crates/hyperqueue/src/server/state.rs b/crates/hyperqueue/src/server/state.rs
index 127eb1e3f..9e012790b 100644
--- a/crates/hyperqueue/src/server/state.rs
+++ b/crates/hyperqueue/src/server/state.rs
@@ -106,23 +106,29 @@ impl State {
                 job.tasks.reserve(ids.id_count() as usize);
                 ids.iter().enumerate().for_each(|(i, task_id)| {
                     let task_id = JobTaskId::new(task_id);
-                    job.tasks.insert(
-                        task_id,
-                        JobTaskInfo {
-                            state: JobTaskState::Waiting,
-                        },
-                    );
+                    assert!(job
+                        .tasks
+                        .insert(
+                            task_id,
+                            JobTaskInfo {
+                                state: JobTaskState::Waiting,
+                            },
+                        )
+                        .is_none());
                 })
             }
             JobTaskDescription::Graph { tasks } => {
                 job.tasks.reserve(tasks.len());
                 tasks.iter().enumerate().for_each(|(i, task)| {
-                    job.tasks.insert(
-                        task.id,
-                        JobTaskInfo {
-                            state: JobTaskState::Waiting,
-                        },
-                    );
+                    assert!(job
+                        .tasks
+                        .insert(
+                            task.id,
+                            JobTaskInfo {
+                                state: JobTaskState::Waiting,
+                            },
+                        )
+                        .is_none());
                 })
             }
         };
diff --git a/crates/hyperqueue/src/transfer/messages.rs b/crates/hyperqueue/src/transfer/messages.rs
index 95356b522..1e1c75a55 100644
--- a/crates/hyperqueue/src/transfer/messages.rs
+++ b/crates/hyperqueue/src/transfer/messages.rs
@@ -380,6 +380,8 @@ pub enum SubmitResponse {
     JobNotOpened,
     JobNotFound,
     TaskIdAlreadyExists(JobTaskId),
+    NonUniqueTaskId(JobTaskId),
+    InvalidDependencies(JobTaskId),
 }
 
 #[derive(Serialize, Deserialize, Debug)]

From b1b10c9e249deb0bccbb7adfebe94b86d7746877 Mon Sep 17 00:00:00 2001
From: Ada Bohm <ada@kreatrix.org>
Date: Thu, 24 Oct 2024 10:12:24 +0200
Subject: [PATCH 4/9] Refactoring & tests & better validation

---
 .../src/server/autoalloc/process.rs           | 51 +++++-----
 crates/hyperqueue/src/server/client/submit.rs | 95 ++++++++++++++++++-
 crates/hyperqueue/src/server/job.rs           | 39 +++++++-
 crates/hyperqueue/src/server/state.rs         | 51 ++--------
 4 files changed, 158 insertions(+), 78 deletions(-)

diff --git a/crates/hyperqueue/src/server/autoalloc/process.rs b/crates/hyperqueue/src/server/autoalloc/process.rs
index 08c1d5adb..2d78db0c4 100644
--- a/crates/hyperqueue/src/server/autoalloc/process.rs
+++ b/crates/hyperqueue/src/server/autoalloc/process.rs
@@ -1747,11 +1747,7 @@ mod tests {
         allocations
     }
 
-    fn create_job(
-        job_id: u32,
-        tasks: u32,
-        min_time: TimeRequest,
-    ) -> (Job, Arc<JobSubmitDescription>) {
+    fn create_job(job_id: u32, tasks: u32, min_time: TimeRequest) -> Job {
         let def = ProgramDefinition {
             args: vec![],
             env: Default::default(),
@@ -1769,7 +1765,7 @@ mod tests {
             }],
         });
 
-        let job = Job::new(
+        let mut job = Job::new(
             job_id.into(),
             JobDescription {
                 name: "job".to_string(),
@@ -1777,34 +1773,31 @@ mod tests {
             },
             false,
         );
-        (
-            job,
-            Arc::new(JobSubmitDescription {
-                task_desc: JobTaskDescription::Array {
-                    ids: IntArray::from_range(0, tasks),
-                    entries: None,
-                    task_desc: TaskDescription {
-                        kind: TaskKind::ExternalProgram(TaskKindProgram {
-                            program: def,
-                            pin_mode: PinMode::None,
-                            task_dir: false,
-                        }),
-                        resources,
-                        time_limit: None,
-                        priority: 0,
-                        crash_limit: 5,
-                    },
+        job.attach_submit(Arc::new(JobSubmitDescription {
+            task_desc: JobTaskDescription::Array {
+                ids: IntArray::from_range(0, tasks),
+                entries: None,
+                task_desc: TaskDescription {
+                    kind: TaskKind::ExternalProgram(TaskKindProgram {
+                        program: def,
+                        pin_mode: PinMode::None,
+                        task_dir: false,
+                    }),
+                    resources,
+                    time_limit: None,
+                    priority: 0,
+                    crash_limit: 5,
                 },
-                submit_dir: Default::default(),
-                stream_path: None,
-            }),
-        )
+            },
+            submit_dir: Default::default(),
+            stream_path: None,
+        }));
+        job
     }
 
     fn attach_job(state: &mut State, job_id: u32, tasks: u32, min_time: TimeRequest) {
-        let (job, submit) = create_job(job_id, tasks, min_time);
+        let job = create_job(job_id, tasks, min_time);
         state.add_job(job);
-        state.attach_submit(job_id.into(), submit);
     }
 
     fn create_worker(allocation_id: &str) -> ManagerInfo {
diff --git a/crates/hyperqueue/src/server/client/submit.rs b/crates/hyperqueue/src/server/client/submit.rs
index 17156dadd..8269d4e69 100644
--- a/crates/hyperqueue/src/server/client/submit.rs
+++ b/crates/hyperqueue/src/server/client/submit.rs
@@ -62,7 +62,10 @@ pub(crate) fn submit_job_desc(
     prepare_job(job_id, &mut submit_desc, state);
     let new_tasks = create_new_task_message(job_id, &mut submit_desc);
     submit_desc.strip_large_data();
-    state.attach_submit(job_id, Arc::new(submit_desc));
+    state
+        .get_job_mut(job_id)
+        .unwrap()
+        .attach_submit(Arc::new(submit_desc));
     new_tasks
 }
 
@@ -410,13 +413,17 @@ pub(crate) fn handle_open_job(
 
 #[cfg(test)]
 mod tests {
+    use crate::common::arraydef::IntArray;
     use crate::server::client::submit::build_tasks_graph;
     use crate::server::client::validate_submit;
+    use crate::server::job::Job;
     use crate::transfer::messages::{
-        PinMode, TaskDescription, TaskKind, TaskKindProgram, TaskWithDependencies,
+        JobDescription, JobSubmitDescription, JobTaskDescription, PinMode, SubmitResponse,
+        TaskDescription, TaskKind, TaskKindProgram, TaskWithDependencies,
     };
     use smallvec::smallvec;
     use std::path::PathBuf;
+    use std::sync::Arc;
     use std::time::Duration;
     use tako::gateway::{
         NewTasksMessage, ResourceRequest, ResourceRequestEntry, ResourceRequestVariants,
@@ -451,6 +458,90 @@ mod tests {
         );
     }
 
+    #[test]
+    fn test_validate_submit() {
+        let mut job = Job::new(
+            10.into(),
+            JobDescription {
+                name: "".to_string(),
+                max_fails: None,
+            },
+            true,
+        );
+        job.attach_submit(Arc::new(JobSubmitDescription {
+            task_desc: JobTaskDescription::Array {
+                ids: IntArray::from_range(100, 10),
+                entries: None,
+                task_desc: task_desc(None, 0, 1),
+            },
+            submit_dir: Default::default(),
+            stream_path: None,
+        }));
+
+        let job_task_desc = JobTaskDescription::Array {
+            ids: IntArray::from_range(109, 2),
+            entries: None,
+            task_desc: task_desc(None, 0, 1),
+        };
+        assert!(validate_submit(None, &job_task_desc).is_none());
+        assert!(matches!(
+            validate_submit(Some(&job), &job_task_desc),
+            Some(SubmitResponse::TaskIdAlreadyExists(x)) if x.as_num() == 109
+        ));
+        let job_task_desc = JobTaskDescription::Graph {
+            tasks: vec![TaskWithDependencies {
+                id: 102.into(),
+                task_desc: task_desc(None, 0, 1),
+                dependencies: vec![],
+            }],
+        };
+        assert!(validate_submit(None, &job_task_desc).is_none());
+        assert!(matches!(
+            validate_submit(Some(&job), &job_task_desc),
+            Some(SubmitResponse::TaskIdAlreadyExists(x)) if x.as_num() == 102
+        ));
+        let job_task_desc = JobTaskDescription::Graph {
+            tasks: vec![
+                TaskWithDependencies {
+                    id: 2.into(),
+                    task_desc: task_desc(None, 0, 1),
+                    dependencies: vec![],
+                },
+                TaskWithDependencies {
+                    id: 2.into(),
+                    task_desc: task_desc(None, 0, 1),
+                    dependencies: vec![],
+                },
+            ],
+        };
+        assert!(matches!(
+            validate_submit(None, &job_task_desc),
+            Some(SubmitResponse::NonUniqueTaskId(x)) if x.as_num() == 2
+        ));
+        let job_task_desc = JobTaskDescription::Graph {
+            tasks: vec![TaskWithDependencies {
+                id: 2.into(),
+                task_desc: task_desc(None, 0, 1),
+                dependencies: vec![3.into()],
+            }],
+        };
+        assert!(matches!(
+            validate_submit(None, &job_task_desc),
+            Some(SubmitResponse::InvalidDependencies(x)) if x.as_num() == 3
+        ));
+        let job_task_desc = JobTaskDescription::Graph {
+            tasks: vec![TaskWithDependencies {
+                id: 2.into(),
+                task_desc: task_desc(None, 0, 1),
+                dependencies: vec![2.into()],
+            }],
+        };
+        assert!(matches!(
+            validate_submit(None, &job_task_desc),
+            Some(SubmitResponse::InvalidDependencies(x)) if x.as_num() == 2
+        ));
+    }
+
     #[test]
     fn test_build_graph_with_dependencies() {
         let desc = || task_desc(None, 0, 1);
diff --git a/crates/hyperqueue/src/server/job.rs b/crates/hyperqueue/src/server/job.rs
index a54fd6958..44fe61adc 100644
--- a/crates/hyperqueue/src/server/job.rs
+++ b/crates/hyperqueue/src/server/job.rs
@@ -3,8 +3,8 @@ use serde::{Deserialize, Serialize};
 use crate::client::status::get_task_status;
 use crate::server::Senders;
 use crate::transfer::messages::{
-    JobDescription, JobDetail, JobInfo, JobSubmitDescription, TaskIdSelector, TaskSelector,
-    TaskStatusSelector,
+    JobDescription, JobDetail, JobInfo, JobSubmitDescription, JobTaskDescription, TaskIdSelector,
+    TaskSelector, TaskStatusSelector,
 };
 use crate::worker::start::RunningTaskContext;
 use crate::{make_tako_id, JobId, JobTaskCount, JobTaskId, Map, TakoTaskId, WorkerId};
@@ -397,4 +397,39 @@ impl Job {
         });
         rx
     }
+
+    pub fn attach_submit(&mut self, submit_desc: Arc<JobSubmitDescription>) {
+        match &submit_desc.task_desc {
+            JobTaskDescription::Array { ids, .. } => {
+                self.tasks.reserve(ids.id_count() as usize);
+                ids.iter().enumerate().for_each(|(i, task_id)| {
+                    let task_id = JobTaskId::new(task_id);
+                    assert!(self
+                        .tasks
+                        .insert(
+                            task_id,
+                            JobTaskInfo {
+                                state: JobTaskState::Waiting,
+                            },
+                        )
+                        .is_none());
+                })
+            }
+            JobTaskDescription::Graph { tasks } => {
+                self.tasks.reserve(tasks.len());
+                tasks.iter().enumerate().for_each(|(i, task)| {
+                    assert!(self
+                        .tasks
+                        .insert(
+                            task.id,
+                            JobTaskInfo {
+                                state: JobTaskState::Waiting,
+                            },
+                        )
+                        .is_none());
+                })
+            }
+        };
+        self.submit_descs.push(submit_desc);
+    }
 }
diff --git a/crates/hyperqueue/src/server/state.rs b/crates/hyperqueue/src/server/state.rs
index 9e012790b..3f4b6e3e3 100644
--- a/crates/hyperqueue/src/server/state.rs
+++ b/crates/hyperqueue/src/server/state.rs
@@ -99,42 +99,6 @@ impl State {
         assert!(self.jobs.insert(job_id, job).is_none());
     }
 
-    pub fn attach_submit(&mut self, job_id: JobId, submit_desc: Arc<JobSubmitDescription>) {
-        let job = self.get_job_mut(job_id).unwrap();
-        match &submit_desc.task_desc {
-            JobTaskDescription::Array { ids, .. } => {
-                job.tasks.reserve(ids.id_count() as usize);
-                ids.iter().enumerate().for_each(|(i, task_id)| {
-                    let task_id = JobTaskId::new(task_id);
-                    assert!(job
-                        .tasks
-                        .insert(
-                            task_id,
-                            JobTaskInfo {
-                                state: JobTaskState::Waiting,
-                            },
-                        )
-                        .is_none());
-                })
-            }
-            JobTaskDescription::Graph { tasks } => {
-                job.tasks.reserve(tasks.len());
-                tasks.iter().enumerate().for_each(|(i, task)| {
-                    assert!(job
-                        .tasks
-                        .insert(
-                            task.id,
-                            JobTaskInfo {
-                                state: JobTaskState::Waiting,
-                            },
-                        )
-                        .is_none());
-                })
-            }
-        };
-        job.submit_descs.push(submit_desc);
-    }
-
     /// Completely forgets this job, in order to reduce memory usage.
     pub(crate) fn forget_job(&mut self, job_id: JobId) -> Option<Job> {
         let job = match self.jobs.remove(&job_id) {
@@ -342,7 +306,7 @@ mod tests {
                 crash_limit: 5,
             },
         };
-        let job = Job::new(
+        let mut job = Job::new(
             job_id,
             JobDescription {
                 name: "".to_string(),
@@ -350,14 +314,11 @@ mod tests {
             },
             false,
         );
+        job.attach_submit(Arc::new(JobSubmitDescription {
+            task_desc,
+            submit_dir: Default::default(),
+            stream_path: None,
+        }));
         state.add_job(job);
-        state.attach_submit(
-            job_id,
-            Arc::new(JobSubmitDescription {
-                task_desc,
-                submit_dir: Default::default(),
-                stream_path: None,
-            }),
-        )
     }
 }

From 608f93452cb4f16ccf32a033c993c75bae9d4f83 Mon Sep 17 00:00:00 2001
From: Ada Bohm <ada@kreatrix.org>
Date: Thu, 24 Oct 2024 10:25:03 +0200
Subject: [PATCH 5/9] Clippy issues fixed

---
 crates/hyperqueue/src/client/output/cli.rs    | 68 ++++++++--------
 crates/hyperqueue/src/client/output/json.rs   |  2 +-
 .../hyperqueue/src/client/output/outputs.rs   |  1 -
 crates/hyperqueue/src/server/client/submit.rs |  4 +-
 crates/hyperqueue/src/server/job.rs           | 12 ++-
 crates/hyperqueue/src/server/state.rs         | 78 ++-----------------
 crates/tako/src/internal/worker/test_util.rs  |  4 +-
 7 files changed, 46 insertions(+), 123 deletions(-)

diff --git a/crates/hyperqueue/src/client/output/cli.rs b/crates/hyperqueue/src/client/output/cli.rs
index 00b2cbf2b..019379bfd 100644
--- a/crates/hyperqueue/src/client/output/cli.rs
+++ b/crates/hyperqueue/src/client/output/cli.rs
@@ -1155,42 +1155,41 @@ pub fn print_job_output(
     task_header: bool,
     task_paths: TaskToPathsMap,
 ) -> anyhow::Result<()> {
-    let read_stream =
-        |task_id: JobTaskId, task_info: &JobTaskInfo, output_stream: &OutputStream| {
-            let stdout = std::io::stdout();
-            let mut stdout = stdout.lock();
-
-            let (_, stdout_path, stderr_path) = get_task_paths(&task_paths, task_id);
-            let (opt_path, stream_name) = match output_stream {
-                OutputStream::Stdout => (stdout_path, "stdout"),
-                OutputStream::Stderr => (stderr_path, "stderr"),
-            };
+    let read_stream = |task_id: JobTaskId, output_stream: &OutputStream| {
+        let stdout = std::io::stdout();
+        let mut stdout = stdout.lock();
+
+        let (_, stdout_path, stderr_path) = get_task_paths(&task_paths, task_id);
+        let (opt_path, stream_name) = match output_stream {
+            OutputStream::Stdout => (stdout_path, "stdout"),
+            OutputStream::Stderr => (stderr_path, "stderr"),
+        };
 
-            if task_header {
-                writeln!(stdout, "# Job {}, task {}", job_detail.info.id, task_id)
-                    .expect("Could not write output");
-            }
+        if task_header {
+            writeln!(stdout, "# Job {}, task {}", job_detail.info.id, task_id)
+                .expect("Could not write output");
+        }
 
-            if let Some(path) = opt_path {
-                match File::open(path) {
-                    Ok(mut file) => {
-                        let copy = std::io::copy(&mut file, &mut stdout);
-                        if let Err(error) = copy {
-                            log::warn!("Could not output contents of `{path}`: {error:?}");
-                        }
+        if let Some(path) = opt_path {
+            match File::open(path) {
+                Ok(mut file) => {
+                    let copy = std::io::copy(&mut file, &mut stdout);
+                    if let Err(error) = copy {
+                        log::warn!("Could not output contents of `{path}`: {error:?}");
                     }
-                    Err(error) => log::warn!("File `{path}` cannot be opened: {error:?}"),
-                };
-            } else {
-                log::warn!(
-                    "Task {} has no `{stream_name}` stream associated with it",
-                    task_id
-                );
-            }
-        };
+                }
+                Err(error) => log::warn!("File `{path}` cannot be opened: {error:?}"),
+            };
+        } else {
+            log::warn!(
+                "Task {} has no `{stream_name}` stream associated with it",
+                task_id
+            );
+        }
+    };
 
-    for (task_id, task) in &job_detail.tasks {
-        read_stream(*task_id, task, &output_stream);
+    for (task_id, _task) in &job_detail.tasks {
+        read_stream(*task_id, &output_stream);
     }
 
     Ok(())
@@ -1359,10 +1358,7 @@ fn get_task_paths(
 }
 
 /// Returns (working directory, stdout, stderr)
-fn format_task_paths<'a>(
-    task_map: &'a TaskToPathsMap,
-    task_id: JobTaskId,
-) -> (&'a str, &'a str, &'a str) {
+fn format_task_paths(task_map: &TaskToPathsMap, task_id: JobTaskId) -> (&str, &str, &str) {
     match task_map[&task_id] {
         Some(ref paths) => (
             paths.cwd.to_str().unwrap(),
diff --git a/crates/hyperqueue/src/client/output/json.rs b/crates/hyperqueue/src/client/output/json.rs
index bbff71445..a76b6e684 100644
--- a/crates/hyperqueue/src/client/output/json.rs
+++ b/crates/hyperqueue/src/client/output/json.rs
@@ -362,7 +362,7 @@ fn format_job_info(info: &JobInfo) -> Value {
 
 fn format_tasks(tasks: &[(JobTaskId, JobTaskInfo)], map: TaskToPathsMap) -> Value {
     tasks
-        .into_iter()
+        .iter()
         .map(|(task_id, task)| {
             let state = &match task.state {
                 JobTaskState::Waiting => "waiting",
diff --git a/crates/hyperqueue/src/client/output/outputs.rs b/crates/hyperqueue/src/client/output/outputs.rs
index b5d26e26a..53d1bd5ce 100644
--- a/crates/hyperqueue/src/client/output/outputs.rs
+++ b/crates/hyperqueue/src/client/output/outputs.rs
@@ -14,7 +14,6 @@ use crate::server::job::JobTaskInfo;
 use crate::{JobId, JobTaskId};
 use core::time::Duration;
 use tako::resources::ResourceDescriptor;
-use tako::Map;
 
 pub const MAX_DISPLAYED_WORKERS: usize = 2;
 
diff --git a/crates/hyperqueue/src/server/client/submit.rs b/crates/hyperqueue/src/server/client/submit.rs
index 8269d4e69..ad5519453 100644
--- a/crates/hyperqueue/src/server/client/submit.rs
+++ b/crates/hyperqueue/src/server/client/submit.rs
@@ -5,16 +5,14 @@ use std::time::Duration;
 
 use bstr::BString;
 use tako::Map;
-use tako::{ItemId, Set};
+use tako::Set;
 
 use tako::gateway::{
     FromGatewayMessage, NewTasksMessage, ResourceRequestVariants, SharedTaskConfiguration,
     TaskConfiguration, ToGatewayMessage,
 };
-use tako::TaskId;
 
 use crate::common::arraydef::IntArray;
-use crate::common::error::HqError;
 use crate::common::placeholders::{
     fill_placeholders_after_submit, fill_placeholders_log, normalize_path,
 };
diff --git a/crates/hyperqueue/src/server/job.rs b/crates/hyperqueue/src/server/job.rs
index 44fe61adc..59d703fc2 100644
--- a/crates/hyperqueue/src/server/job.rs
+++ b/crates/hyperqueue/src/server/job.rs
@@ -1,10 +1,9 @@
 use serde::{Deserialize, Serialize};
 
-use crate::client::status::get_task_status;
 use crate::server::Senders;
 use crate::transfer::messages::{
     JobDescription, JobDetail, JobInfo, JobSubmitDescription, JobTaskDescription, TaskIdSelector,
-    TaskSelector, TaskStatusSelector,
+    TaskSelector,
 };
 use crate::worker::start::RunningTaskContext;
 use crate::{make_tako_id, JobId, JobTaskCount, JobTaskId, Map, TakoTaskId, WorkerId};
@@ -13,7 +12,6 @@ use smallvec::SmallVec;
 use std::sync::Arc;
 use tako::comm::deserialize;
 use tako::task::SerializedTaskContext;
-use tako::Set;
 use tokio::sync::oneshot;
 
 /// State of a task that has been started at least once.
@@ -166,7 +164,7 @@ impl Job {
                 TaskIdSelector::All => (
                     self.tasks
                         .iter()
-                        .map(|(task_id, info)| (task_id.clone(), info.clone()))
+                        .map(|(task_id, info)| (*task_id, info.clone()))
                         .collect(),
                     Vec::new(),
                 ),
@@ -192,7 +190,7 @@ impl Job {
         JobDetail {
             info: self.make_job_info(),
             job_desc: self.job_desc.clone(),
-            submit_descs: self.submit_descs.iter().map(|x| x.clone()).collect(),
+            submit_descs: self.submit_descs.iter().cloned().collect(),
             tasks,
             tasks_not_found,
             submission_date: self.submission_date,
@@ -402,7 +400,7 @@ impl Job {
         match &submit_desc.task_desc {
             JobTaskDescription::Array { ids, .. } => {
                 self.tasks.reserve(ids.id_count() as usize);
-                ids.iter().enumerate().for_each(|(i, task_id)| {
+                ids.iter().for_each(|task_id| {
                     let task_id = JobTaskId::new(task_id);
                     assert!(self
                         .tasks
@@ -417,7 +415,7 @@ impl Job {
             }
             JobTaskDescription::Graph { tasks } => {
                 self.tasks.reserve(tasks.len());
-                tasks.iter().enumerate().for_each(|(i, task)| {
+                tasks.iter().for_each(|task| {
                     assert!(self
                         .tasks
                         .insert(
diff --git a/crates/hyperqueue/src/server/state.rs b/crates/hyperqueue/src/server/state.rs
index 3f4b6e3e3..ab91c059c 100644
--- a/crates/hyperqueue/src/server/state.rs
+++ b/crates/hyperqueue/src/server/state.rs
@@ -1,23 +1,21 @@
 use std::cmp::min;
-use std::collections::BTreeMap;
-use std::sync::Arc;
 
 use chrono::Utc;
+use tako::define_wrapped_type;
 use tako::gateway::{
     CancelTasks, FromGatewayMessage, LostWorkerMessage, NewWorkerMessage, TaskFailedMessage,
     TaskState, TaskUpdate, ToGatewayMessage,
 };
 use tako::ItemId;
-use tako::{define_wrapped_type, TaskId};
 
 use crate::server::autoalloc::LostWorkerDetails;
-use crate::server::job::{Job, JobTaskInfo, JobTaskState};
+use crate::server::job::Job;
 use crate::server::restore::StateRestorer;
 use crate::server::worker::Worker;
 use crate::server::Senders;
-use crate::transfer::messages::{JobSubmitDescription, JobTaskDescription, ServerInfo};
-use crate::{make_tako_id, unwrap_tako_id, JobTaskId, WrappedRcRefCell};
-use crate::{JobId, JobTaskCount, Map, TakoTaskId, WorkerId};
+use crate::transfer::messages::ServerInfo;
+use crate::{unwrap_tako_id, WrappedRcRefCell};
+use crate::{JobId, Map, TakoTaskId, WorkerId};
 
 pub struct State {
     jobs: Map<JobId, Job>,
@@ -256,69 +254,3 @@ impl StateRef {
         }))
     }
 }
-
-#[cfg(test)]
-mod tests {
-
-    use std::sync::Arc;
-    use tako::program::{ProgramDefinition, StdioDef};
-
-    use crate::common::arraydef::IntArray;
-    use crate::server::job::Job;
-    use crate::server::state::State;
-    use crate::tests::utils::create_hq_state;
-    use crate::transfer::messages::{
-        JobDescription, JobSubmitDescription, JobTaskDescription, PinMode, TaskDescription,
-        TaskKind, TaskKindProgram,
-    };
-    use crate::{unwrap_tako_id, JobId, TakoTaskId};
-
-    fn dummy_program_definition() -> ProgramDefinition {
-        ProgramDefinition {
-            args: vec![],
-            env: Default::default(),
-            stdout: StdioDef::Null,
-            stderr: StdioDef::Null,
-            stdin: vec![],
-            cwd: Default::default(),
-        }
-    }
-
-    fn add_test_job<J: Into<JobId>, T: Into<TakoTaskId>>(
-        state: &mut State,
-        ids: IntArray,
-        job_id: J,
-        base_task_id: T,
-    ) {
-        let job_id: JobId = job_id.into();
-        let task_desc = JobTaskDescription::Array {
-            ids,
-            entries: None,
-            task_desc: TaskDescription {
-                kind: TaskKind::ExternalProgram(TaskKindProgram {
-                    program: dummy_program_definition(),
-                    pin_mode: PinMode::None,
-                    task_dir: false,
-                }),
-                resources: Default::default(),
-                time_limit: None,
-                priority: 0,
-                crash_limit: 5,
-            },
-        };
-        let mut job = Job::new(
-            job_id,
-            JobDescription {
-                name: "".to_string(),
-                max_fails: None,
-            },
-            false,
-        );
-        job.attach_submit(Arc::new(JobSubmitDescription {
-            task_desc,
-            submit_dir: Default::default(),
-            stream_path: None,
-        }));
-        state.add_job(job);
-    }
-}
diff --git a/crates/tako/src/internal/worker/test_util.rs b/crates/tako/src/internal/worker/test_util.rs
index d8670a0d2..a7f25b82a 100644
--- a/crates/tako/src/internal/worker/test_util.rs
+++ b/crates/tako/src/internal/worker/test_util.rs
@@ -102,7 +102,7 @@ impl ResourceQueueBuilder {
         self.queue
             .try_start_tasks(&self.task_map, None)
             .into_iter()
-            .map(|(t, a, _)| (t.as_num() as u64, a))
+            .map(|(t, a, _)| (t.as_num(), a))
             .collect()
     }
 
@@ -110,7 +110,7 @@ impl ResourceQueueBuilder {
         self.queue
             .try_start_tasks(&self.task_map, Some(duration))
             .into_iter()
-            .map(|(t, a, _)| (t.as_num() as u64, a))
+            .map(|(t, a, _)| (t.as_num(), a))
             .collect()
     }
 }

From f5ea7ca1c90e2cfa9486bfcb5cc45a15c00fba04 Mon Sep 17 00:00:00 2001
From: Ada Bohm <ada@kreatrix.org>
Date: Thu, 24 Oct 2024 10:33:38 +0200
Subject: [PATCH 6/9] Python binding updated

---
 crates/pyhq/src/client/job.rs | 16 +++++++++++-----
 1 file changed, 11 insertions(+), 5 deletions(-)

diff --git a/crates/pyhq/src/client/job.rs b/crates/pyhq/src/client/job.rs
index d2c27e73a..1f08eb9c1 100644
--- a/crates/pyhq/src/client/job.rs
+++ b/crates/pyhq/src/client/job.rs
@@ -14,7 +14,8 @@ use hyperqueue::transfer::messages::{
     TaskKind, TaskKindProgram, TaskSelector, TaskStatusSelector, TaskWithDependencies,
     ToClientMessage,
 };
-use hyperqueue::{rpc_call, tako, JobTaskCount, JobTaskId, Set};
+use hyperqueue::{rpc_call, tako, JobTaskCount, Set};
+use pyo3::exceptions::PyException;
 use pyo3::types::PyTuple;
 use pyo3::{IntoPy, PyAny, PyResult, Python};
 use std::collections::HashMap;
@@ -101,6 +102,12 @@ pub fn submit_job_impl(py: Python, ctx: ClientContextPtr, job: PyJobDescription)
                 // This code is unreachable as long Python cannot submit into open jobs
                 unreachable!()
             }
+            SubmitResponse::NonUniqueTaskId(_) => Err(PyException::new_err(
+                "Non unique IDs in submitted task graph",
+            )),
+            SubmitResponse::InvalidDependencies(_) => {
+                Err(PyException::new_err("Invalid dependency id"))
+            }
         }
     })
 }
@@ -344,11 +351,10 @@ pub fn get_failed_tasks_impl(
             if let Some(job_detail) = job_detail {
                 let mut task_path_map = resolve_task_paths(&job_detail, &response.server_uid);
                 let mut tasks = HashMap::with_capacity(job_detail.tasks.len());
-                for task in job_detail.tasks {
+                for (task_id, task) in job_detail.tasks {
                     match task.state {
                         JobTaskState::Failed { error, .. } => {
-                            let id = task.task_id.as_num();
-                            let path_ctx = task_path_map.remove(&JobTaskId::from(id)).flatten();
+                            let path_ctx = task_path_map.remove(&task_id).flatten();
                             let (stdout, stderr, cwd) = match path_ctx {
                                 Some(paths) => (
                                     stdio_to_string(paths.stdout),
@@ -359,7 +365,7 @@ pub fn get_failed_tasks_impl(
                             };
 
                             tasks.insert(
-                                id,
+                                task_id.as_num(),
                                 FailedTaskContext {
                                     stdout,
                                     stderr,

From 4713c3890649022362851d88e287a9de50c70907 Mon Sep 17 00:00:00 2001
From: Ada Bohm <ada@kreatrix.org>
Date: Thu, 24 Oct 2024 10:43:51 +0200
Subject: [PATCH 7/9] Fixed test: test_build_graph_with_dependencies

---
 crates/hyperqueue/src/server/client/submit.rs | 27 ++++++++++++++++---
 1 file changed, 23 insertions(+), 4 deletions(-)

diff --git a/crates/hyperqueue/src/server/client/submit.rs b/crates/hyperqueue/src/server/client/submit.rs
index ad5519453..405fc2adc 100644
--- a/crates/hyperqueue/src/server/client/submit.rs
+++ b/crates/hyperqueue/src/server/client/submit.rs
@@ -412,6 +412,7 @@ pub(crate) fn handle_open_job(
 #[cfg(test)]
 mod tests {
     use crate::common::arraydef::IntArray;
+    use crate::make_tako_id;
     use crate::server::client::submit::build_tasks_graph;
     use crate::server::client::validate_submit;
     use crate::server::job::Job;
@@ -552,11 +553,29 @@ mod tests {
         ];
 
         let msg = build_tasks_graph(1.into(), &tasks, &PathBuf::from("foo"), None);
-        assert_eq!(msg.tasks[0].task_deps, vec![4.into(), 3.into()]);
-        assert_eq!(msg.tasks[1].task_deps, vec![2.into()]);
-        assert_eq!(msg.tasks[2].task_deps, vec![5.into(), 6.into()]);
+        assert_eq!(
+            msg.tasks[0].task_deps,
+            vec![
+                make_tako_id(1.into(), 2.into()),
+                make_tako_id(1.into(), 1.into())
+            ]
+        );
+        assert_eq!(
+            msg.tasks[1].task_deps,
+            vec![make_tako_id(1.into(), 0.into())]
+        );
+        assert_eq!(
+            msg.tasks[2].task_deps,
+            vec![
+                make_tako_id(1.into(), 3.into()),
+                make_tako_id(1.into(), 4.into())
+            ]
+        );
         assert_eq!(msg.tasks[3].task_deps, vec![]);
-        assert_eq!(msg.tasks[4].task_deps, vec![2.into()]);
+        assert_eq!(
+            msg.tasks[4].task_deps,
+            vec![make_tako_id(1.into(), 0.into()),]
+        );
     }
 
     fn check_shared_data(msg: &NewTasksMessage, expected: Vec<TaskDescription>) {

From 5cb1695e1b92b27ad6d63877fab728762c2e9302 Mon Sep 17 00:00:00 2001
From: Ada Bohm <ada@kreatrix.org>
Date: Thu, 24 Oct 2024 11:56:48 +0200
Subject: [PATCH 8/9] Fixed task selector

---
 crates/hyperqueue/src/server/job.rs | 33 +++++++++++++++++++++++------
 tests/job/test_job_cat.py           |  1 -
 2 files changed, 27 insertions(+), 7 deletions(-)

diff --git a/crates/hyperqueue/src/server/job.rs b/crates/hyperqueue/src/server/job.rs
index 59d703fc2..906575bc1 100644
--- a/crates/hyperqueue/src/server/job.rs
+++ b/crates/hyperqueue/src/server/job.rs
@@ -1,9 +1,10 @@
 use serde::{Deserialize, Serialize};
 
+use crate::client::status::get_task_status;
 use crate::server::Senders;
 use crate::transfer::messages::{
     JobDescription, JobDetail, JobInfo, JobSubmitDescription, JobTaskDescription, TaskIdSelector,
-    TaskSelector,
+    TaskSelector, TaskStatusSelector,
 };
 use crate::worker::start::RunningTaskContext;
 use crate::{make_tako_id, JobId, JobTaskCount, JobTaskId, Map, TakoTaskId, WorkerId};
@@ -160,20 +161,40 @@ impl Job {
 
     pub fn make_job_detail(&self, task_selector: Option<&TaskSelector>) -> JobDetail {
         let (mut tasks, tasks_not_found) = if let Some(selector) = task_selector {
-            match &selector.id_selector {
-                TaskIdSelector::All => (
+            match (&selector.id_selector, &selector.status_selector) {
+                (TaskIdSelector::All, TaskStatusSelector::All) => (
                     self.tasks
                         .iter()
                         .map(|(task_id, info)| (*task_id, info.clone()))
                         .collect(),
                     Vec::new(),
                 ),
-                TaskIdSelector::Specific(ids) => {
+                (TaskIdSelector::All, TaskStatusSelector::Specific(status)) => (
+                    self.tasks
+                        .iter()
+                        .filter_map(|(task_id, info)| {
+                            if status.contains(&get_task_status(&info.state)) {
+                                Some((*task_id, info.clone()))
+                            } else {
+                                None
+                            }
+                        })
+                        .collect(),
+                    Vec::new(),
+                ),
+                (TaskIdSelector::Specific(ids), status) => {
                     let mut not_found = Vec::new();
                     let mut tasks = Vec::with_capacity(ids.id_count() as usize);
                     for task_id in ids.iter() {
-                        if let Some(task) = self.tasks.get(&JobTaskId::new(task_id)) {
-                            tasks.push((JobTaskId::new(task_id), task.clone()));
+                        if let Some(info) = self.tasks.get(&JobTaskId::new(task_id)) {
+                            if match status {
+                                TaskStatusSelector::All => true,
+                                TaskStatusSelector::Specific(s) => {
+                                    s.contains(&get_task_status(&info.state))
+                                }
+                            } {
+                                tasks.push((JobTaskId::new(task_id), info.clone()));
+                            }
                         } else {
                             not_found.push(JobTaskId::new(task_id));
                         }
diff --git a/tests/job/test_job_cat.py b/tests/job/test_job_cat.py
index 8ba6c018e..02aa2d06c 100644
--- a/tests/job/test_job_cat.py
+++ b/tests/job/test_job_cat.py
@@ -146,7 +146,6 @@ def test_job_cat_status(hq_env: HqEnv):
         ]
     )
     wait_for_job_state(hq_env, 1, "FAILED")
-
     output = hq_env.command(["job", "cat", "--task-status=finished", "1", "stdout", "--print-task-header"])
     assert (
         output

From c8103195fc7f20a20a8910efe840088cc4a26fce Mon Sep 17 00:00:00 2001
From: Ada Bohm <ada@kreatrix.org>
Date: Thu, 24 Oct 2024 12:25:50 +0200
Subject: [PATCH 9/9] Removed no longer valid assert

---
 crates/hyperqueue/src/server/state.rs | 1 -
 1 file changed, 1 deletion(-)

diff --git a/crates/hyperqueue/src/server/state.rs b/crates/hyperqueue/src/server/state.rs
index ab91c059c..b1812324e 100644
--- a/crates/hyperqueue/src/server/state.rs
+++ b/crates/hyperqueue/src/server/state.rs
@@ -92,7 +92,6 @@ impl State {
     }
 
     pub fn add_job(&mut self, job: Job) {
-        assert!(job.submit_descs.is_empty());
         let job_id = job.job_id;
         assert!(self.jobs.insert(job_id, job).is_none());
     }