Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve panics in distributed builds #632

Merged
merged 5 commits into from
Oct 24, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ jobs:
uses: actions-rs/toolchain@v1
with:
profile: minimal
toolchain: 1.70.0
toolchain: 1.72.0
override: true

- uses: Swatinem/rust-cache@v1
Expand All @@ -61,6 +61,9 @@ jobs:
command: build
args: --profile dist

- name: Compress debuginfo
run: objcopy --compress-debug-sections=zlib-gnu target/dist/hq

- name: Prepare archive
id: archive
run: |
Expand Down Expand Up @@ -92,7 +95,7 @@ jobs:
uses: actions-rs/toolchain@v1
with:
profile: minimal
toolchain: 1.70.0
toolchain: 1.72.0
override: true

- uses: Swatinem/rust-cache@v1
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ jobs:
uses: actions-rs/toolchain@v1
with:
profile: minimal
toolchain: 1.70.0
toolchain: 1.72.0
override: true
components: clippy, rustfmt

Expand Down
4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ default-members = [
resolver = "2"

[workspace.package]
rust-version = "1.64.0"
rust-version = "1.72.0"
edition = "2021"
authors = ["Ada Böhm <[email protected]>", "Jakub Beránek <[email protected]>"]

Expand Down Expand Up @@ -52,4 +52,4 @@ panic = "abort"
inherits = "release"
lto = true
codegen-units = 1
strip = "debuginfo"
debug = "line-tables-only"
36 changes: 36 additions & 0 deletions crates/hyperqueue/src/bin/hq.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
use std::io;
use std::io::IsTerminal;
use std::panic::PanicInfo;

use clap::{CommandFactory, FromArgMatches};
use clap_complete::generate;
use cli_table::ColorChoice;
use colored::Colorize;

use hyperqueue::client::commands::autoalloc::command_autoalloc;
use hyperqueue::client::commands::event::command_event_log;
Expand Down Expand Up @@ -46,6 +49,7 @@ use hyperqueue::transfer::messages::{
use hyperqueue::worker::hwdetect::{
detect_additional_resources, detect_cpus, prune_hyper_threading,
};
use hyperqueue::HQ_VERSION;
use tako::resources::{ResourceDescriptor, ResourceDescriptorItem, CPU_RESOURCE_NAME};

#[cfg(feature = "jemalloc")]
Expand Down Expand Up @@ -326,8 +330,40 @@ fn generate_completion(opts: GenerateCompletionOpts) -> anyhow::Result<()> {
Ok(())
}

fn hq_panic_hook(_info: &PanicInfo) {
let message = format!(
r#"Oops, HyperQueue has crashed. This is a bug, sorry for that.
If you would be so kind, please report this issue at the HQ issue tracker: https://github.com/It4innovations/hyperqueue/issues/new?title=HQ%20crashes
Please include the above error (starting from "thread ... panicked ...") and the stack backtrace in the issue contents, along with the following information:

HyperQueue version: {version}

You can also re-run HyperQueue server (and its workers) with the `RUST_LOG=hq=debug,tako=debug`
environment variable, and attach the logs to the issue, to provide us more information.
"#,
version = HQ_VERSION
);

if io::stdout().is_terminal() {
eprintln!("{}", message.red());
} else {
eprintln!("{message}");
};
}

#[tokio::main(flavor = "current_thread")]
async fn main() -> hyperqueue::Result<()> {
// Augment panics - first print the error and backtrace like normally,
// and then print our own custom error message.
let std_panic = std::panic::take_hook();
std::panic::set_hook(Box::new(move |info: &PanicInfo| {
std_panic(info);
hq_panic_hook(info);
}));

// Also enable backtraces by default.
std::env::set_var("RUST_BACKTRACE", "full");

let matches = RootOptions::command().get_matches();
let top_opts = match RootOptions::from_arg_matches(&matches) {
Ok(opts) => opts,
Expand Down
11 changes: 8 additions & 3 deletions crates/hyperqueue/src/server/autoalloc/queue/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -144,10 +144,15 @@ pub fn build_worker_args(
.unwrap_or_else(get_default_worker_idle_time);
let idle_timeout = humantime::format_duration(idle_timeout);

let mut env = String::new();
if let Ok(log_env) = std::env::var("RUST_LOG") {
write!(env, "RUST_LOG={log_env} ").unwrap();
}

let mut args = format!(
"{} worker start --idle-timeout \"{idle_timeout}\" --manager \"{manager}\" --server-dir \"{}\"",
hq_path.display(),
server_dir.display()
"{env}{hq} worker start --idle-timeout \"{idle_timeout}\" --manager \"{manager}\" --server-dir \"{server_dir}\"",
hq = hq_path.display(),
server_dir = server_dir.display()
);

if !queue_info.worker_args.is_empty() {
Expand Down
4 changes: 4 additions & 0 deletions tests/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
# Bless tests
```bash
$ python -m pytest --inline-snapshot=create
```
105 changes: 80 additions & 25 deletions tests/autoalloc/test_autoalloc.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
import dataclasses
import time
from os.path import dirname, join
from pathlib import Path
from typing import List, Literal
from typing import Dict, List, Literal, Optional, Tuple

import pytest
from inline_snapshot import snapshot
Expand Down Expand Up @@ -297,8 +298,9 @@ def test_pbs_multinode_allocation(hq_env: HqEnv):
with open(qsub_script_path) as f:
commands = normalize_output(hq_env, "pbs", extract_script_commands(f.read()))
assert commands == snapshot(
'pbsdsh -- bash -l -c \'<hq-binary> worker start --idle-timeout "5m" --manager "<manager>" --server-dir'
' "<server-dir>/001" --on-server-lost "finish-running" --time-limit "1h"\''
"pbsdsh -- bash -l -c 'RUST_LOG=tako=trace,hyperqueue=trace <hq-binary> worker start --idle-timeout"
' "5m" --manager "<manager>" --server-dir "<server-dir>/001" --on-server-lost "finish-running"'
' --time-limit "1h"\''
)


Expand All @@ -315,8 +317,9 @@ def test_slurm_multinode_allocation(hq_env: HqEnv):
with open(sbatch_script_path) as f:
commands = normalize_output(hq_env, "slurm", extract_script_commands(f.read()))
assert commands == snapshot(
'srun --overlap <hq-binary> worker start --idle-timeout "5m" --manager "<manager>" --server-dir'
' "<server-dir>/001" --on-server-lost "finish-running" --time-limit "1h"'
'srun --overlap RUST_LOG=tako=trace,hyperqueue=trace <hq-binary> worker start --idle-timeout "5m"'
' --manager "<manager>" --server-dir "<server-dir>/001" --on-server-lost "finish-running" --time-limit'
' "1h"'
)


Expand Down Expand Up @@ -591,7 +594,7 @@ def test_too_high_time_request(hq_env: HqEnv, spec: ManagerSpec):
assert len(table) == 0


def get_exec_script(script_path: str):
def get_exec_line(script_path: str):
"""
`script_path` should be a path to qsub or sbatch submit script.
"""
Expand All @@ -600,11 +603,39 @@ def get_exec_script(script_path: str):
return [line for line in data.splitlines(keepends=False) if line and not line.startswith("#")][0]


def get_worker_args(script_path: str):
@dataclasses.dataclass(frozen=True)
class WorkerExecLine:
cmd: str
env: Dict[str, str]


def parse_exec_line(script_path: str) -> WorkerExecLine:
"""
`script_path` should be a path to qsub or sbatch submit script.
"""
return get_exec_script(script_path).split(" ")[1:]
line = get_exec_line(script_path)

def get_env(item: str) -> Optional[Tuple[str, str]]:
if "=" in item:
key, value = item.split("=", maxsplit=1)
if key.isupper():
return (key, value)
return None

env = {}
cmd = None
items = line.split(" ")
for index, item in enumerate(items):
env_item = get_env(item)
if env_item is not None:
assert env_item[0] not in env
env[env_item[0]] = env_item[1]
else:
cmd = " ".join(items[index:])
break

assert cmd is not None
return WorkerExecLine(cmd=cmd, env=env)


@all_managers
Expand Down Expand Up @@ -634,13 +665,33 @@ def test_pass_cpu_and_resources_to_worker(hq_env: HqEnv, spec: ManagerSpec):
)

script = queue.get()
assert normalize_output(hq_env, spec.manager_type(), " ".join(get_worker_args(script))) == snapshot(
'worker start --idle-timeout "5m" --manager "<manager>" --server-dir "<server-dir>/001" --cpus "2x8"'
' --resource "x=sum(100)" --resource "y=range(1-4)" --resource "z=[1,2,4]" --no-hyper-threading'
line = parse_exec_line(script)
assert normalize_output(hq_env, spec.manager_type(), line.cmd) == snapshot(
'<hq-binary> worker start --idle-timeout "5m" --manager "<manager>" --server-dir "<server-dir>/001" --cpus'
' "2x8" --resource "x=sum(100)" --resource "y=range(1-4)" --resource "z=[1,2,4]" --no-hyper-threading'
' --no-detect-resources --on-server-lost "finish-running" --time-limit "1h"'
)


@all_managers
def test_propagate_rust_log_env(hq_env: HqEnv, spec: ManagerSpec):
queue = ManagerQueue()
manager = ExtractSubmitScriptPath(queue, spec.manager)

with MockJobManager(hq_env, spec.adapt(manager)):
hq_env.start_server(env=dict(RUST_LOG="foo"))
prepare_tasks(hq_env)

add_queue(
hq_env,
manager=spec.manager_type(),
)

script = queue.get()
line = parse_exec_line(script)
assert line.env["RUST_LOG"] == "foo"


@all_managers
def test_pass_idle_timeout_to_worker(hq_env: HqEnv, spec: ManagerSpec):
queue = ManagerQueue()
Expand All @@ -660,9 +711,10 @@ def test_pass_idle_timeout_to_worker(hq_env: HqEnv, spec: ManagerSpec):
)

script_path = queue.get()
assert normalize_output(hq_env, spec.manager_type(), " ".join(get_worker_args(script_path))) == snapshot(
'worker start --idle-timeout "30m" --manager "<manager>" --server-dir "<server-dir>/001" --on-server-lost'
' "finish-running" --time-limit "1h"'
line = parse_exec_line(script_path)
assert normalize_output(hq_env, spec.manager_type(), line.cmd) == snapshot(
'<hq-binary> worker start --idle-timeout "30m" --manager "<manager>" --server-dir "<server-dir>/001"'
' --on-server-lost "finish-running" --time-limit "1h"'
)


Expand All @@ -680,10 +732,11 @@ def test_pass_on_server_lost(hq_env: HqEnv, spec: ManagerSpec):
manager=spec.manager_type(),
additional_worker_args=["--on-server-lost=stop"],
)
qsub_script_path = queue.get()
assert normalize_output(hq_env, spec.manager_type(), " ".join(get_worker_args(qsub_script_path))) == snapshot(
'worker start --idle-timeout "5m" --manager "<manager>" --server-dir "<server-dir>/001" --on-server-lost'
' "stop" --time-limit "1h"'
script_path = queue.get()
line = parse_exec_line(script_path)
assert normalize_output(hq_env, spec.manager_type(), line.cmd) == snapshot(
'<hq-binary> worker start --idle-timeout "5m" --manager "<manager>" --server-dir "<server-dir>/001"'
' --on-server-lost "stop" --time-limit "1h"'
)


Expand All @@ -697,10 +750,11 @@ def test_pass_worker_time_limit(hq_env: HqEnv, spec: ManagerSpec):
prepare_tasks(hq_env)

add_queue(hq_env, manager=spec.manager_type(), worker_time_limit="30m")
qsub_script_path = queue.get()
assert normalize_output(hq_env, spec.manager_type(), " ".join(get_worker_args(qsub_script_path))) == snapshot(
'worker start --idle-timeout "5m" --manager "<manager>" --server-dir "<server-dir>/001" --on-server-lost'
' "finish-running" --time-limit "30m"'
script_path = queue.get()
line = parse_exec_line(script_path)
assert normalize_output(hq_env, spec.manager_type(), line.cmd) == snapshot(
'<hq-binary> worker start --idle-timeout "5m" --manager "<manager>" --server-dir "<server-dir>/001"'
' --on-server-lost "finish-running" --time-limit "30m"'
)


Expand All @@ -721,9 +775,10 @@ def test_start_stop_cmd(hq_env: HqEnv, spec: ManagerSpec):
)

script = queue.get()
assert normalize_output(hq_env, spec.manager_type(), get_exec_script(script)) == snapshot(
'init.sh && <hq-binary> worker start --idle-timeout "5m" --manager "<manager>" --server-dir'
' "<server-dir>/001" --on-server-lost "finish-running" --time-limit "1h"; unload.sh'
assert normalize_output(hq_env, spec.manager_type(), get_exec_line(script)) == snapshot(
'init.sh && RUST_LOG=tako=trace,hyperqueue=trace <hq-binary> worker start --idle-timeout "5m" --manager'
' "<manager>" --server-dir "<server-dir>/001" --on-server-lost "finish-running" --time-limit "1h";'
" unload.sh"
)


Expand Down