Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Resuming server state from journal + live event streaming #706

Merged
merged 20 commits into from
May 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

## New features

* Server resilience. Server state can be loaded back from journal when server crashes.

* `HQ_NUM_NODES` for multi-node tasks introduced. It contains the number of nodes assigned to task.
You do not need to manually count lines in `HQ_NODE_FILE` anymore.

Expand Down
14 changes: 0 additions & 14 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 3 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,10 @@ authors = ["Ada Böhm <[email protected]>", "Jakub Beránek <[email protected]>
[workspace.dependencies]
tokio = "1.36"
log = "0.4"
env_logger = { version = "0.11", features = ["color"] }
env_logger = { version = "0.11", features = ["color"] }
clap = "4"
clap_complete = "4"
serde = "1"
serde = { version = "1", features = ["rc"] }
serde_json = "1"
serde_bytes = "0.11"
bytes = "1"
Expand Down Expand Up @@ -54,4 +54,4 @@ panic = "abort"
inherits = "release"
lto = true
codegen-units = 1
debug = "line-tables-only"
debug = "line-tables-only"
4 changes: 2 additions & 2 deletions crates/hyperqueue/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ nom-supreme = { version = "0.8"}
colored = "2"
const_format = "0.2"
textwrap = "0.16"
async-compression = { version = "0.4", features = ["tokio", "gzip"] }
# async-compression = { version = "0.4", features = ["tokio", "gzip"] }
flate2 = { version = "1", features = ["default"] }
chumsky = "0.9"
toml = "0.8"
Expand All @@ -72,7 +72,7 @@ insta = "1.15.0"
criterion = { version = "0.3", features = ["html_reports"] }

[features]
default = ["jemalloc", "dashboard"]
default = ["jemalloc"]
# Mode that does not execute tasks, useful for benchmarking HQ overhead
zero-worker = []
# Use the jemalloc allocator
Expand Down
2 changes: 1 addition & 1 deletion crates/hyperqueue/src/bin/hq.rs
Original file line number Diff line number Diff line change
Expand Up @@ -449,7 +449,7 @@ async fn main() -> hyperqueue::Result<()> {
SubCommand::Dashboard(opts) => command_dashboard_start(&gsettings, opts).await,
SubCommand::Log(opts) => command_log(&gsettings, opts),
SubCommand::AutoAlloc(opts) => command_autoalloc(&gsettings, opts).await,
SubCommand::EventLog(opts) => command_event_log(opts),
SubCommand::EventLog(opts) => command_event_log(&gsettings, opts).await,
SubCommand::GenerateCompletion(opts) => generate_completion(opts),
};

Expand Down
20 changes: 11 additions & 9 deletions crates/hyperqueue/src/client/commands/autoalloc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,10 @@ pub async fn command_autoalloc(
Ok(())
}

fn args_to_params(args: SharedQueueOpts) -> anyhow::Result<AllocationQueueParams> {
fn args_to_params(
manager: ManagerType,
args: SharedQueueOpts,
) -> anyhow::Result<AllocationQueueParams> {
let SharedQueueOpts {
backlog,
time_limit,
Expand Down Expand Up @@ -301,6 +304,7 @@ wasted allocation duration."
]);

Ok(AllocationQueueParams {
manager,
workers_per_alloc,
backlog,
timelimit: time_limit,
Expand All @@ -315,12 +319,11 @@ wasted allocation duration."
}

async fn dry_run_command(mut session: ClientSession, opts: DryRunOpts) -> anyhow::Result<()> {
let (manager, parameters) = match opts.subcmd {
DryRunCommand::Pbs(params) => (ManagerType::Pbs, args_to_params(params)),
DryRunCommand::Slurm(params) => (ManagerType::Slurm, args_to_params(params)),
let parameters = match opts.subcmd {
DryRunCommand::Pbs(params) => args_to_params(ManagerType::Pbs, params),
DryRunCommand::Slurm(params) => args_to_params(ManagerType::Slurm, params),
};
let message = FromClientMessage::AutoAlloc(AutoAllocRequest::DryRun {
manager,
parameters: parameters?,
});

Expand All @@ -337,19 +340,18 @@ wasting resources."
}

async fn add_queue(mut session: ClientSession, opts: AddQueueOpts) -> anyhow::Result<()> {
let (manager, parameters, dry_run) = match opts.subcmd {
let (parameters, dry_run) = match opts.subcmd {
AddQueueCommand::Pbs(params) => {
let no_dry_run = params.no_dry_run;
(ManagerType::Pbs, args_to_params(params), !no_dry_run)
(args_to_params(ManagerType::Pbs, params), !no_dry_run)
}
AddQueueCommand::Slurm(params) => {
let no_dry_run = params.no_dry_run;
(ManagerType::Slurm, args_to_params(params), !no_dry_run)
(args_to_params(ManagerType::Slurm, params), !no_dry_run)
}
};

let message = FromClientMessage::AutoAlloc(AutoAllocRequest::AddQueue {
manager,
parameters: parameters?,
dry_run,
});
Expand Down
37 changes: 34 additions & 3 deletions crates/hyperqueue/src/client/commands/event/mod.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
mod output;

use crate::client::commands::event::output::format_event;
use crate::client::globalsettings::GlobalSettings;
use crate::common::utils::str::pluralize;
use crate::server::bootstrap::get_client_session;
use crate::server::event::log::EventLogReader;
use crate::transfer::messages::{FromClientMessage, ToClientMessage};
use anyhow::anyhow;
use clap::{Parser, ValueHint};
use std::io::{BufWriter, Write};
Expand All @@ -20,6 +23,9 @@ enum EventCommand {
/// Export events from a log file to NDJSON (line-delimited JSON).
/// Events will be exported to `stdout`, you can redirect it e.g. to a file.
Export(ExportOpts),

/// Live stream events from server
Stream,
}

#[derive(Parser)]
Expand All @@ -30,14 +36,39 @@ struct ExportOpts {
logfile: PathBuf,
}

pub fn command_event_log(opts: EventLogOpts) -> anyhow::Result<()> {
pub async fn command_event_log(
gsettings: &GlobalSettings,
opts: EventLogOpts,
) -> anyhow::Result<()> {
match opts.command {
EventCommand::Export(opts) => export_json(opts),
EventCommand::Stream => stream_json(gsettings).await,
}
}

async fn stream_json(gsettings: &GlobalSettings) -> anyhow::Result<()> {
let mut connection = get_client_session(gsettings.server_directory()).await?;
connection
.connection()
.send(FromClientMessage::StreamEvents)
.await?;
let stdout = std::io::stdout();
let stdout = stdout.lock();
let mut stdout = BufWriter::new(stdout);
while let Some(event) = connection.connection().receive().await {
let event = event?;
if let ToClientMessage::Event(e) = event {
writeln!(stdout, "{}", format_event(e))?;
stdout.flush()?;
Kobzol marked this conversation as resolved.
Show resolved Hide resolved
} else {
anyhow::bail!("Invalid message receive, not ToClientMessage::Event");
}
}
Ok(())
}

fn export_json(opts: ExportOpts) -> anyhow::Result<()> {
let file = EventLogReader::open(&opts.logfile).map_err(|error| {
let mut file = EventLogReader::open(&opts.logfile).map_err(|error| {
anyhow!(
"Cannot open event log file at `{}`: {error:?}",
opts.logfile.display()
Expand All @@ -49,7 +80,7 @@ fn export_json(opts: ExportOpts) -> anyhow::Result<()> {
let mut stdout = BufWriter::new(stdout);

let mut count = 0;
for event in file {
for event in &mut file {
match event {
Ok(event) => {
writeln!(stdout, "{}", format_event(event))?;
Expand Down
89 changes: 60 additions & 29 deletions crates/hyperqueue/src/client/commands/event/output.rs
Original file line number Diff line number Diff line change
@@ -1,49 +1,50 @@
use crate::client::output::json::format_datetime;
use crate::server::event::events::{JobInfo, MonitoringEventPayload};
use crate::server::event::MonitoringEvent;
use crate::server::event::payload::EventPayload;
use crate::server::event::{bincode_config, Event};
use crate::transfer::messages::JobDescription;
use bincode::Options;
use serde_json::json;
use tako::worker::WorkerOverview;

pub fn format_event(event: MonitoringEvent) -> serde_json::Value {
pub fn format_event(event: Event) -> serde_json::Value {
json!({
"id": event.id,
"time": format_datetime(event.time),
"event": format_payload(event.payload)
})
}

fn format_payload(event: MonitoringEventPayload) -> serde_json::Value {
fn format_payload(event: EventPayload) -> serde_json::Value {
match event {
MonitoringEventPayload::WorkerConnected(id, configuration) => json!({
EventPayload::WorkerConnected(id, configuration) => json!({
"type": "worker-connected",
"id": id,
"extra": configuration.extra
}),
MonitoringEventPayload::WorkerLost(id, reason) => json!({
EventPayload::WorkerLost(id, reason) => json!({
"type": "worker-lost",
"id": id,
"reason": reason
}),
MonitoringEventPayload::WorkerOverviewReceived(WorkerOverview { id, hw_state, .. }) => {
EventPayload::WorkerOverviewReceived(WorkerOverview { id, hw_state, .. }) => {
json!({
"type": "worker-overview",
"id": id,
"hw-state": hw_state
})
}
MonitoringEventPayload::AllocationQueueCreated(id, _params) => {
EventPayload::AllocationQueueCreated(id, _params) => {
json!({
"type": "autoalloc-queue-created",
"queue-id": id
})
}
MonitoringEventPayload::AllocationQueueRemoved(id) => {
EventPayload::AllocationQueueRemoved(id) => {
json!({
"type": "autoalloc-queue-removed",
"queue-id": id
})
}
MonitoringEventPayload::AllocationQueued {
EventPayload::AllocationQueued {
queue_id,
allocation_id,
worker_count,
Expand All @@ -55,48 +56,78 @@ fn format_payload(event: MonitoringEventPayload) -> serde_json::Value {
"worker-count": worker_count
})
}
MonitoringEventPayload::AllocationStarted(queue_id, allocation_id) => {
EventPayload::AllocationStarted(queue_id, allocation_id) => {
json!({
"type": "autoalloc-allocation-started",
"queue-id": queue_id,
"allocation-id": allocation_id,
})
}
MonitoringEventPayload::AllocationFinished(queue_id, allocation_id) => {
EventPayload::AllocationFinished(queue_id, allocation_id) => {
json!({
"type": "autoalloc-allocation-finished",
"queue-id": queue_id,
"allocation-id": allocation_id,
})
}
MonitoringEventPayload::TaskStarted { task_id, worker_id } => json!({
EventPayload::TaskStarted {
job_id, task_id, ..
} => json!({
"type": "task-started",
"id": task_id,
"worker": worker_id
"job": job_id,
"task": task_id,
"worker": -1
}),
MonitoringEventPayload::TaskFinished(task_id) => json!({
EventPayload::TaskFinished { job_id, task_id } => json!({
"type": "task-finished",
"id": task_id
"job": job_id,
"task": task_id
}),
MonitoringEventPayload::TaskFailed(task_id) => json!({
"type": "task-failed",
"id": task_id
EventPayload::TaskCanceled { job_id, task_id } => json!({
"type": "task-canceled",
"job": job_id,
"task": task_id
}),
MonitoringEventPayload::JobCreated(job_id, job_info) => json!({
"type": "job-created",
"job-id": job_id,
"job-info": JobInfoFormatter(&job_info).to_json(),
EventPayload::TaskFailed {
job_id,
task_id,
error,
} => json!({
"type": "task-failed",
"job": job_id,
"task": task_id,
"error": error
}),
MonitoringEventPayload::JobCompleted(job_id, completion_date) => json!({
EventPayload::JobCreated(job_id, data) => {
let job_desc: JobDescription = bincode_config()
.deserialize(&data)
.expect("Invalid job description data");
json!({
"type": "job-created",
"job": job_id,
"desc": JobInfoFormatter(&job_desc).to_json(),
})
}
EventPayload::JobCompleted(job_id) => json!({
"type": "job-completed",
"job-id": job_id,
"completion-date": completion_date
"job": job_id,
}),
EventPayload::ServerStart { server_uid } => {
json!({
"type": "server-start",
"server_uid": server_uid
})
}
EventPayload::ServerStop => {
json!({
"type": "server-stop",
})
}
}
}

// We need a special formatter, since BString cannot be used as a hashmap key for JSON
struct JobInfoFormatter<'a>(&'a JobInfo);
struct JobInfoFormatter<'a>(&'a JobDescription);

impl<'a> JobInfoFormatter<'a> {
fn to_json(&self) -> serde_json::Value {
Expand Down
Loading
Loading