Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Task task id refactoring #778

Merged
merged 9 commits into from
Oct 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions crates/hyperqueue/src/client/commands/submit/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -741,6 +741,12 @@ pub(crate) async fn send_submit_request(
SubmitResponse::TaskIdAlreadyExists(task_id) => {
bail!("Task {task_id} already exists in job {job_id}.")
}
SubmitResponse::NonUniqueTaskId(task_id) => {
bail!("Task {task_id} is defined more than once.")
}
SubmitResponse::InvalidDependencies(task_id) => {
bail!("Invalid dependancy on {task_id}")
}
}
Ok(())
}
Expand Down
69 changes: 33 additions & 36 deletions crates/hyperqueue/src/client/output/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use crate::transfer::messages::{
ServerInfo, TaskDescription, TaskKind, TaskKindProgram, WaitForJobsResponse, WorkerExitInfo,
WorkerInfo,
};
use crate::{JobId, JobTaskCount, WorkerId};
use crate::{JobId, JobTaskCount, JobTaskId, WorkerId};

use chrono::{DateTime, Local, SubsecRound, Utc};
use core::time::Duration;
Expand Down Expand Up @@ -178,12 +178,17 @@ impl CliOutput {
}
}

fn print_task_summary(&self, tasks: &[JobTaskInfo], info: &JobInfo, worker_map: &WorkerMap) {
fn print_task_summary(
&self,
tasks: &[(JobTaskId, JobTaskInfo)],
info: &JobInfo,
worker_map: &WorkerMap,
) {
const SHOWN_TASKS: usize = 5;

let fail_rows: Vec<_> = tasks
.iter()
.filter_map(|t: &JobTaskInfo| match &t.state {
.filter_map(|(task_id, t): &(JobTaskId, JobTaskInfo)| match &t.state {
JobTaskState::Failed {
started_data,
error,
Expand All @@ -192,7 +197,7 @@ impl CliOutput {
let worker_ids = started_data.as_ref().map(|data| data.worker_ids.as_slice());

Some(vec![
t.task_id.cell(),
task_id.cell(),
format_workers(worker_ids, worker_map).cell(),
error.to_owned().cell().foreground_color(Some(Color::Red)),
])
Expand Down Expand Up @@ -558,7 +563,7 @@ impl Output for CliOutput {
]);
self.print_vertical_table(rows);

tasks.sort_unstable_by_key(|t| t.task_id);
tasks.sort_unstable_by_key(|t| t.0);
self.print_task_summary(&tasks, &info, &worker_map);
}
}
Expand Down Expand Up @@ -624,20 +629,20 @@ impl Output for CliOutput {

for (id, job) in jobs {
let mut tasks = job.tasks;
tasks.sort_unstable_by_key(|t| t.task_id);
tasks.sort_unstable_by_key(|t| t.0);

rows.append(
&mut tasks
.iter()
.map(|task| {
.map(|(task_id, task)| {
let (start, end) = get_task_time(&task.state);
let mut job_rows = match jobs_len {
1 => vec![],
_ => vec![id.cell().justify(Justify::Right)],
};

job_rows.append(&mut vec![
task.task_id.cell().justify(Justify::Right),
task_id.cell().justify(Justify::Right),
status_to_cell(&get_task_status(&task.state)),
format_workers(task.state.get_workers(), &worker_map).cell(),
format_task_duration(start, end).cell(),
Expand Down Expand Up @@ -686,7 +691,7 @@ impl Output for CliOutput {
fn print_task_info(
&self,
job: (JobId, JobDetail),
tasks: Vec<JobTaskInfo>,
tasks: &[(JobTaskId, JobTaskInfo)],
worker_map: WorkerMap,
server_uid: &str,
verbosity: Verbosity,
Expand All @@ -695,10 +700,9 @@ impl Output for CliOutput {
let task_to_paths = resolve_task_paths(&job, server_uid);
let mut is_truncated = false;

for task in tasks {
for (task_id, task) in tasks {
let (start, end) = get_task_time(&task.state);
let (cwd, stdout, stderr) = format_task_paths(&task_to_paths, &task);
let task_id = task.task_id;
let (cwd, stdout, stderr) = format_task_paths(&task_to_paths, *task_id);

let (task_desc, task_deps) = if let Some(x) =
job.submit_descs
Expand All @@ -711,7 +715,7 @@ impl Output for CliOutput {
} if ids.contains(task_id.as_num()) => Some((task_desc, [].as_slice())),
JobTaskDescription::Array { .. } => None,
JobTaskDescription::Graph { tasks } => {
tasks.iter().find(|t| t.id == task_id).map(|task_dep| {
tasks.iter().find(|t| t.id == *task_id).map(|task_dep| {
(&task_dep.task_desc, task_dep.dependencies.as_slice())
})
}
Expand Down Expand Up @@ -1151,23 +1155,19 @@ pub fn print_job_output(
task_header: bool,
task_paths: TaskToPathsMap,
) -> anyhow::Result<()> {
let read_stream = |task_info: &JobTaskInfo, output_stream: &OutputStream| {
let read_stream = |task_id: JobTaskId, output_stream: &OutputStream| {
let stdout = std::io::stdout();
let mut stdout = stdout.lock();

let (_, stdout_path, stderr_path) = get_task_paths(&task_paths, task_info);
let (_, stdout_path, stderr_path) = get_task_paths(&task_paths, task_id);
let (opt_path, stream_name) = match output_stream {
OutputStream::Stdout => (stdout_path, "stdout"),
OutputStream::Stderr => (stderr_path, "stderr"),
};

if task_header {
writeln!(
stdout,
"# Job {}, task {}",
job_detail.info.id, task_info.task_id
)
.expect("Could not write output");
writeln!(stdout, "# Job {}, task {}", job_detail.info.id, task_id)
.expect("Could not write output");
}

if let Some(path) = opt_path {
Expand All @@ -1183,13 +1183,13 @@ pub fn print_job_output(
} else {
log::warn!(
"Task {} has no `{stream_name}` stream associated with it",
task_info.task_id
task_id
);
}
};

for task in &job_detail.tasks {
read_stream(task, &output_stream);
for (task_id, _task) in &job_detail.tasks {
read_stream(*task_id, &output_stream);
}

Ok(())
Expand Down Expand Up @@ -1259,11 +1259,11 @@ fn format_resource_variants(rqv: &ResourceRequestVariants) -> String {
}

/// Formatting
pub fn format_job_workers(tasks: &[JobTaskInfo], worker_map: &WorkerMap) -> String {
pub fn format_job_workers(tasks: &[(JobTaskId, JobTaskInfo)], worker_map: &WorkerMap) -> String {
// BTreeSet is used to both filter duplicates and keep a stable order
let worker_set: BTreeSet<_> = tasks
.iter()
.filter_map(|task| task.state.get_workers())
.filter_map(|(_, task)| task.state.get_workers())
.flatten()
.collect();
let worker_count = worker_set.len();
Expand Down Expand Up @@ -1337,11 +1337,11 @@ fn get_task_time(state: &JobTaskState) -> (Option<DateTime<Utc>>, Option<DateTim
}

/// Returns Option(working directory, stdout, stderr)
fn get_task_paths<'a>(
task_map: &'a TaskToPathsMap,
state: &JobTaskInfo,
) -> (Option<&'a str>, Option<&'a str>, Option<&'a str>) {
match task_map[&state.task_id] {
fn get_task_paths(
task_map: &TaskToPathsMap,
task_id: JobTaskId,
) -> (Option<&str>, Option<&str>, Option<&str>) {
match task_map[&task_id] {
Some(ref paths) => (
Some(paths.cwd.to_str().unwrap()),
match &paths.stdout {
Expand All @@ -1358,11 +1358,8 @@ fn get_task_paths<'a>(
}

/// Returns (working directory, stdout, stderr)
fn format_task_paths<'a>(
task_map: &'a TaskToPathsMap,
state: &JobTaskInfo,
) -> (&'a str, &'a str, &'a str) {
match task_map[&state.task_id] {
fn format_task_paths(task_map: &TaskToPathsMap, task_id: JobTaskId) -> (&str, &str, &str) {
match task_map[&task_id] {
Some(ref paths) => (
paths.cwd.to_str().unwrap(),
stdio_to_str(&paths.stdout),
Expand Down
8 changes: 4 additions & 4 deletions crates/hyperqueue/src/client/output/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,8 @@ pub fn resolve_task_paths(job: &JobDetail, server_uid: &str) -> TaskToPathsMap {

job.tasks
.iter()
.map(|task| {
let (submit_dir, task_desc) = task_to_desc_map.get(&task.task_id).unwrap();
.map(|(task_id, task)| {
let (submit_dir, task_desc) = task_to_desc_map.get(task_id).unwrap();
let paths = match &task_desc.kind {
TaskKind::ExternalProgram(TaskKindProgram { program, .. }) => match &task.state {
JobTaskState::Canceled {
Expand All @@ -58,7 +58,7 @@ pub fn resolve_task_paths(job: &JobDetail, server_uid: &str) -> TaskToPathsMap {
} => {
let ctx = CompletePlaceholderCtx {
job_id: job.info.id,
task_id: task.task_id,
task_id: *task_id,
instance_id: started_data.context.instance_id,
submit_dir,
server_uid,
Expand All @@ -80,7 +80,7 @@ pub fn resolve_task_paths(job: &JobDetail, server_uid: &str) -> TaskToPathsMap {
_ => None,
},
};
(task.task_id, paths)
(*task_id, paths)
})
.collect()
}
Expand Down
26 changes: 13 additions & 13 deletions crates/hyperqueue/src/client/output/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ impl Output for JsonOutput {
}
}
).collect::<Vec<_>>(),
"tasks": format_tasks(tasks, task_paths)
"tasks": format_tasks(&tasks, task_paths)
})
})
.collect();
Expand Down Expand Up @@ -182,15 +182,15 @@ impl Output for JsonOutput {
let mut json_obj = json!({});
for (id, job) in jobs {
let map = resolve_task_paths(&job, server_uid);
json_obj[id.to_string()] = format_tasks(job.tasks, map);
json_obj[id.to_string()] = format_tasks(&job.tasks, map);
}
self.print(json_obj);
}

fn print_task_info(
&self,
job: (JobId, JobDetail),
tasks: Vec<JobTaskInfo>,
tasks: &[(JobTaskId, JobTaskInfo)],
_worker_map: WorkerMap,
server_uid: &str,
_verbosity: Verbosity,
Expand Down Expand Up @@ -306,7 +306,7 @@ fn format_task_description(task_desc: &TaskDescription) -> Value {
}
}

fn fill_task_started_data(dict: &mut Value, data: StartedTaskData) {
fn fill_task_started_data(dict: &mut Value, data: &StartedTaskData) {
dict["started_at"] = format_datetime(data.start_date);
if data.worker_ids.len() == 1 {
dict["worker"] = data.worker_ids[0].as_num().into();
Expand Down Expand Up @@ -360,10 +360,10 @@ fn format_job_info(info: &JobInfo) -> Value {
})
}

fn format_tasks(tasks: Vec<JobTaskInfo>, map: TaskToPathsMap) -> Value {
fn format_tasks(tasks: &[(JobTaskId, JobTaskInfo)], map: TaskToPathsMap) -> Value {
tasks
.into_iter()
.map(|task| {
.iter()
.map(|(task_id, task)| {
let state = &match task.state {
JobTaskState::Waiting => "waiting",
JobTaskState::Running { .. } => "running",
Expand All @@ -372,12 +372,12 @@ fn format_tasks(tasks: Vec<JobTaskInfo>, map: TaskToPathsMap) -> Value {
JobTaskState::Canceled { .. } => "canceled",
};
let mut data = json!({
"id": task.task_id,
"id": *task_id,
"state": state,
});
fill_task_paths(&mut data, &map, task.task_id);
fill_task_paths(&mut data, &map, *task_id);

match task.state {
match &task.state {
JobTaskState::Running { started_data } => {
fill_task_started_data(&mut data, started_data);
}
Expand All @@ -386,7 +386,7 @@ fn format_tasks(tasks: Vec<JobTaskInfo>, map: TaskToPathsMap) -> Value {
end_date,
} => {
fill_task_started_data(&mut data, started_data);
data["finished_at"] = format_datetime(end_date);
data["finished_at"] = format_datetime(*end_date);
}
JobTaskState::Failed {
started_data,
Expand All @@ -396,8 +396,8 @@ fn format_tasks(tasks: Vec<JobTaskInfo>, map: TaskToPathsMap) -> Value {
if let Some(started_data) = started_data {
fill_task_started_data(&mut data, started_data);
}
data["finished_at"] = format_datetime(end_date);
data["error"] = error.into();
data["finished_at"] = format_datetime(*end_date);
data["error"] = error.clone().into();
}
_ => {}
};
Expand Down
4 changes: 2 additions & 2 deletions crates/hyperqueue/src/client/output/outputs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use crate::client::output::common::TaskToPathsMap;
use crate::client::output::Verbosity;
use crate::common::arraydef::IntArray;
use crate::server::job::JobTaskInfo;
use crate::JobId;
use crate::{JobId, JobTaskId};
use core::time::Duration;
use tako::resources::ResourceDescriptor;

Expand Down Expand Up @@ -73,7 +73,7 @@ pub trait Output {
fn print_task_info(
&self,
job: (JobId, JobDetail),
tasks: Vec<JobTaskInfo>,
tasks: &[(JobTaskId, JobTaskInfo)],
worker_map: WorkerMap,
server_uid: &str,
verbosity: Verbosity,
Expand Down
4 changes: 2 additions & 2 deletions crates/hyperqueue/src/client/output/quiet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use crate::transfer::messages::{
AutoAllocListResponse, JobDetail, JobInfo, ServerInfo, WaitForJobsResponse, WorkerExitInfo,
WorkerInfo,
};
use crate::JobId;
use crate::{JobId, JobTaskId};

#[derive(Default)]
pub struct Quiet;
Expand Down Expand Up @@ -129,7 +129,7 @@ impl Output for Quiet {
fn print_task_info(
&self,
_job: (JobId, JobDetail),
_tasks: Vec<JobTaskInfo>,
_tasks: &[(JobTaskId, JobTaskInfo)],
_worker_map: WorkerMap,
_server_uid: &str,
_verbosity: Verbosity,
Expand Down
4 changes: 2 additions & 2 deletions crates/hyperqueue/src/client/task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ pub async fn output_job_task_info(
Some(job) => {
gsettings.printer().print_task_info(
(*job_id, job.clone()),
job.tasks.clone(),
&job.tasks,
get_worker_map(session).await?,
&response.server_uid,
verbosity,
Expand Down Expand Up @@ -161,7 +161,7 @@ pub async fn output_job_task_ids(
.ok_or_else(|| HqError::GenericError("Job Id not found".to_string()))?
.tasks
.iter()
.map(|info| info.task_id.as_num()),
.map(|(task_id, _)| task_id.as_num()),
),
))
})
Expand Down
Loading
Loading