Skip to content

Commit

Permalink
HQ Job Cat: Task status
Browse files Browse the repository at this point in the history
  • Loading branch information
User3574 authored and Kobzol committed Jan 28, 2022
1 parent 65ba640 commit 53f0424
Show file tree
Hide file tree
Showing 10 changed files with 198 additions and 75 deletions.
1 change: 1 addition & 0 deletions crates/hyperqueue/src/bin/hq.rs
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,7 @@ async fn command_job_cat(gsettings: &GlobalSettings, opts: JobCatOpts) -> anyhow
opts.tasks.map(|selector| selector.into()),
opts.stream,
opts.print_task_header,
opts.task_status,
)
.await
}
Expand Down
79 changes: 69 additions & 10 deletions crates/hyperqueue/src/client/commands/job.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,20 @@
use crate::client::globalsettings::GlobalSettings;
use crate::client::job::get_worker_map;
use crate::client::output::outputs::OutputStream;
use crate::client::status::{job_status, Status};
use crate::client::output::resolve_task_paths;
use crate::client::status::{get_task_status, job_status, Status};
use crate::common::arraydef::IntArray;
use crate::common::cli::SelectorArg;
use crate::rpc_call;
use crate::server::job::JobTaskInfo;
use crate::transfer::connection::ClientConnection;
use crate::transfer::messages::{
CancelJobResponse, CancelRequest, FromClientMessage, JobDetailRequest, JobInfoRequest,
Selector, ToClientMessage,
};
use crate::JobId;
use crate::{rpc_call, JobTaskId};
use crate::{JobId, Set};
use clap::Parser;
use std::collections::HashMap;

#[derive(Parser)]
pub struct JobListOpts {
Expand Down Expand Up @@ -57,6 +60,16 @@ pub struct JobCatOpts {
#[clap(long)]
pub tasks: Option<SelectorArg>,

/// Display only task(s) with the given states.
/// You can use multiple states separated by a comma.
#[clap(
long,
multiple_occurrences(false),
use_delimiter(true),
possible_values = &["waiting", "running", "finished", "failed", "canceled"]
)]
pub task_status: Vec<Status>,

/// Prepend the output of each task with a header line that identifies the task
/// which produced that output.
#[clap(long)]
Expand Down Expand Up @@ -166,6 +179,7 @@ pub async fn output_job_cat(
task_selector: Option<Selector>,
output_stream: OutputStream,
task_header: bool,
task_status: Vec<Status>,
) -> anyhow::Result<()> {
let message = FromClientMessage::JobDetail(JobDetailRequest {
selector: Selector::Specific(IntArray::from_id(job_id)),
Expand All @@ -174,13 +188,58 @@ pub async fn output_job_cat(
let mut responses =
rpc_call!(connection, message, ToClientMessage::JobDetailResponse(r) => r).await?;

if let Some(job) = responses.pop().and_then(|v| v.1) {
return gsettings.printer().print_job_output(
job,
task_selector,
output_stream,
task_header,
);
if let Some(mut job) = responses.pop().and_then(|v| v.1) {
let all_ids: Set<JobTaskId> = job.tasks.iter().map(|task| task.task_id).collect();
if !task_status.is_empty() {
job.tasks
.retain(|task_info| task_status.contains(&get_task_status(&task_info.state)));
}

let task_paths = resolve_task_paths(&job);

let tasks: Vec<JobTaskInfo> = match task_selector {
None | Some(Selector::All) => {
job.tasks.sort_unstable_by_key(|task| task.task_id);
job.tasks
}

Some(Selector::Specific(arr)) => {
let tasks_map: HashMap<JobTaskId, JobTaskInfo> = job
.tasks
.into_iter()
.map(|info| (info.task_id, info))
.collect();

arr.iter()
.filter_map(|task_id| {
let task_id: JobTaskId = task_id.into();
let task_opt = tasks_map.get(&task_id);

if task_opt.is_none() && !all_ids.contains(&task_id) {
log::warn!("Task {task_id} not found");
}
task_opt.cloned()
})
.collect()
}

Some(Selector::LastN(_)) => {
let last_task = job.tasks.iter().max_by_key(|&x| x.task_id);
match last_task {
None => vec![],
Some(task) => vec![task.clone()],
}
}
};

if tasks.is_empty() {
log::warn!("No tasks were selected, there is nothing to print");
return Ok(());
}

return gsettings
.printer()
.print_job_output(tasks, output_stream, task_header, task_paths);
} else {
log::error!("Job {job_id} not found");
}
Expand Down
64 changes: 15 additions & 49 deletions crates/hyperqueue/src/client/output/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use std::io::Write as write;

use crate::client::job::WorkerMap;
use crate::client::output::outputs::{Output, OutputStream, MAX_DISPLAYED_WORKERS};
use crate::client::status::{job_status, task_status, Status};
use crate::client::status::{get_task_status, job_status, Status};
use crate::common::env::is_hq_env;
use crate::common::format::{human_duration, human_size};
use crate::common::manager::info::GetManagerInfo;
Expand All @@ -17,10 +17,10 @@ use crate::server::autoalloc::{
use crate::server::job::{JobTaskCounters, JobTaskInfo, JobTaskState, StartedTaskData};
use crate::stream::reader::logfile::Summary;
use crate::transfer::messages::{
AutoAllocListResponse, JobDescription, JobDetail, JobInfo, Selector, StatsResponse,
TaskDescription, WaitForJobsResponse, WorkerExitInfo, WorkerInfo,
AutoAllocListResponse, JobDescription, JobDetail, JobInfo, StatsResponse, TaskDescription,
WaitForJobsResponse, WorkerExitInfo, WorkerInfo,
};
use crate::{JobTaskCount, JobTaskId, WorkerId};
use crate::{JobTaskCount, WorkerId};

use chrono::{DateTime, Local, SubsecRound, Utc};
use core::time::Duration;
Expand All @@ -38,7 +38,7 @@ use crate::worker::start::WORKER_EXTRA_PROCESS_PID;
use anyhow::Error;
use colored::Color as Colorization;
use colored::Colorize;
use std::collections::{BTreeSet, HashMap};
use std::collections::BTreeSet;
use std::fs::File;
use tako::messages::gateway::{LostWorkerReason, ResourceRequest};

Expand Down Expand Up @@ -500,7 +500,7 @@ impl Output for CliOutput {

vec![
task.task_id.cell().justify(Justify::Right),
task_status_to_cell(task_status(&task.state)),
task_status_to_cell(get_task_status(&task.state)),
match task.state.get_worker() {
Some(worker) => format_worker(worker, &worker_map),
_ => "",
Expand Down Expand Up @@ -573,12 +573,12 @@ impl Output for CliOutput {

fn print_job_output(
&self,
job: JobDetail,
task_selector: Option<Selector>,
tasks: Vec<JobTaskInfo>,
output_stream: OutputStream,
task_header: bool,
task_paths: TaskToPathsMap,
) -> anyhow::Result<()> {
print_job_output(job, task_selector, output_stream, task_header)
print_job_output(tasks, output_stream, task_header, task_paths)
}

fn print_summary(&self, filename: &Path, summary: Summary) {
Expand Down Expand Up @@ -917,18 +917,16 @@ pub(crate) fn job_progress_bar(
}

pub fn print_job_output(
mut job: JobDetail,
task_selector: Option<Selector>,
tasks: Vec<JobTaskInfo>,
output_stream: OutputStream,
task_header: bool,
task_paths: TaskToPathsMap,
) -> anyhow::Result<()> {
let task_to_paths = resolve_task_paths(&job);

let read_stream = |task_info: &JobTaskInfo, output_stream: &OutputStream| {
let stdout = std::io::stdout();
let mut stdout = stdout.lock();

let (_, stdout_path, stderr_path) = get_task_paths(&task_to_paths, task_info);
let (_, stdout_path, stderr_path) = get_task_paths(&task_paths, task_info);
let (opt_path, stream_name) = match output_stream {
OutputStream::Stdout => (stdout_path, "stdout"),
OutputStream::Stderr => (stderr_path, "stderr"),
Expand Down Expand Up @@ -956,42 +954,10 @@ pub fn print_job_output(
}
};

match task_selector {
None | Some(Selector::All) => {
job.tasks.sort_unstable_by_key(|task| task.task_id);
job.tasks.iter().for_each(|task| {
read_stream(task, &output_stream);
});
}
Some(Selector::Specific(arr)) => {
let tasks_map: HashMap<JobTaskId, JobTaskInfo> = job
.tasks
.into_iter()
.map(|info| (info.task_id, info))
.collect();

let tasks: Vec<(JobTaskId, Option<&JobTaskInfo>)> = arr
.iter()
.map(|task_id| {
let task_id: JobTaskId = task_id.into();
(task_id, tasks_map.get(&task_id))
})
.collect();

for (id, task) in tasks {
match task {
None => log::warn!("Task {id} not found"),
Some(info) => read_stream(info, &output_stream),
}
}
}
Some(Selector::LastN(_)) => {
let last_task = job.tasks.iter().max_by_key(|&x| x.task_id);
if let Some(task) = last_task {
read_stream(task, &output_stream);
}
}
for task in tasks {
read_stream(&task, &output_stream);
}

Ok(())
}

Expand Down
8 changes: 4 additions & 4 deletions crates/hyperqueue/src/client/output/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ use crate::server::autoalloc::{
use crate::server::job::{JobTaskInfo, JobTaskState, StartedTaskData};
use crate::stream::reader::logfile::Summary;
use crate::transfer::messages::{
AutoAllocListResponse, JobDescription, JobDetail, JobInfo, QueueDescriptorData, Selector,
StatsResponse, TaskDescription, WaitForJobsResponse, WorkerInfo,
AutoAllocListResponse, JobDescription, JobDetail, JobInfo, QueueDescriptorData, StatsResponse,
TaskDescription, WaitForJobsResponse, WorkerInfo,
};
use crate::{JobTaskId, Map};

Expand Down Expand Up @@ -169,10 +169,10 @@ impl Output for JsonOutput {

fn print_job_output(
&self,
_job: JobDetail,
_task_selector: Option<Selector>,
_tasks: Vec<JobTaskInfo>,
_output_stream: OutputStream,
_task_header: bool,
_task_paths: TaskToPathsMap,
) -> anyhow::Result<()> {
anyhow::bail!("JSON output mode doesn't support job output");
}
Expand Down
1 change: 1 addition & 0 deletions crates/hyperqueue/src/client/output/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
pub mod cli;
mod common;
pub use common::resolve_task_paths;
pub mod json;
pub mod outputs;
pub mod quiet;
9 changes: 5 additions & 4 deletions crates/hyperqueue/src/client/output/outputs.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
use crate::common::serverdir::AccessRecord;
use crate::transfer::messages::{
AutoAllocListResponse, JobDetail, JobInfo, Selector, StatsResponse, WaitForJobsResponse,
WorkerInfo,
AutoAllocListResponse, JobDetail, JobInfo, StatsResponse, WaitForJobsResponse, WorkerInfo,
};

use crate::client::job::WorkerMap;
Expand All @@ -10,6 +9,8 @@ use crate::stream::reader::logfile::Summary;
use std::path::Path;
use std::str::FromStr;

use crate::client::output::common::TaskToPathsMap;
use crate::server::job::JobTaskInfo;
use clap::Parser;
use core::time::Duration;
use tako::common::resources::ResourceDescriptor;
Expand Down Expand Up @@ -74,10 +75,10 @@ pub trait Output {
fn print_job_wait(&self, duration: Duration, response: &WaitForJobsResponse);
fn print_job_output(
&self,
job: JobDetail,
task_selector: Option<Selector>,
tasks: Vec<JobTaskInfo>,
output_stream: OutputStream,
task_header: bool,
task_paths: TaskToPathsMap,
) -> anyhow::Result<()>;

// Log
Expand Down
12 changes: 7 additions & 5 deletions crates/hyperqueue/src/client/output/quiet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,16 @@ use tako::messages::gateway::LostWorkerReason;

use crate::client::job::WorkerMap;
use crate::client::output::cli::print_job_output;
use crate::client::output::common::TaskToPathsMap;
use crate::client::output::outputs::{Output, OutputStream};
use crate::client::status::{job_status, Status};
use crate::common::serverdir::AccessRecord;
use crate::server::autoalloc::{Allocation, AllocationEventHolder};
use crate::server::job::JobTaskInfo;
use crate::stream::reader::logfile::Summary;
use crate::transfer::messages::{
AutoAllocListResponse, JobDetail, JobInfo, Selector, StatsResponse, WaitForJobsResponse,
WorkerExitInfo, WorkerInfo,
AutoAllocListResponse, JobDetail, JobInfo, StatsResponse, WaitForJobsResponse, WorkerExitInfo,
WorkerInfo,
};

#[derive(Default)]
Expand Down Expand Up @@ -71,12 +73,12 @@ impl Output for Quiet {

fn print_job_output(
&self,
job: JobDetail,
task_selector: Option<Selector>,
tasks: Vec<JobTaskInfo>,
output_stream: OutputStream,
task_header: bool,
task_paths: TaskToPathsMap,
) -> anyhow::Result<()> {
print_job_output(job, task_selector, output_stream, task_header)
print_job_output(tasks, output_stream, task_header, task_paths)
}

// Log
Expand Down
2 changes: 1 addition & 1 deletion crates/hyperqueue/src/client/status.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ pub fn is_terminated(info: &JobInfo) -> bool {
info.counters.n_running_tasks == 0 && info.counters.n_waiting_tasks(info.n_tasks) == 0
}

pub fn task_status(status: &JobTaskState) -> Status {
pub fn get_task_status(status: &JobTaskState) -> Status {
match status {
JobTaskState::Waiting => Status::Waiting,
JobTaskState::Running { .. } => Status::Running,
Expand Down
4 changes: 2 additions & 2 deletions crates/hyperqueue/src/server/client/submit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use tako::messages::gateway::{
};
use tako::TaskId;

use crate::client::status::task_status;
use crate::client::status::get_task_status;
use crate::common::arraydef::IntArray;
use crate::common::env::{HQ_ENTRY, HQ_JOB_ID, HQ_SUBMIT_DIR, HQ_TASK_ID};
use crate::common::placeholders::{
Expand Down Expand Up @@ -153,7 +153,7 @@ pub async fn handle_resubmit(
.tasks
.values()
.filter_map(|v| {
if message.filter.contains(&task_status(&v.state)) {
if message.filter.contains(&get_task_status(&v.state)) {
Some(v.task_id.as_num())
} else {
None
Expand Down
Loading

0 comments on commit 53f0424

Please sign in to comment.