Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
jdx committed Dec 10, 2024
1 parent db84961 commit 420096b
Show file tree
Hide file tree
Showing 11 changed files with 173 additions and 89 deletions.
3 changes: 2 additions & 1 deletion .idea/pitchfork.iml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

19 changes: 14 additions & 5 deletions docs/cli/commands.json
Original file line number Diff line number Diff line change
Expand Up @@ -254,12 +254,12 @@
"full_cmd": [
"logs"
],
"usage": "logs [-n <N>] [-t --tail] [NAME]...",
"usage": "logs [-n <N>] [-t --tail] [ID]...",
"subcommands": {},
"args": [
{
"name": "NAME",
"usage": "[NAME]...",
"name": "ID",
"usage": "[ID]...",
"help": "Show only logs for the specified daemon(s)",
"help_first_line": "Show only logs for the specified daemon(s)",
"required": false,
Expand Down Expand Up @@ -576,9 +576,18 @@
"full_cmd": [
"wait"
],
"usage": "wait",
"usage": "wait <ID>",
"subcommands": {},
"args": [],
"args": [
{
"name": "ID",
"usage": "<ID>",
"help": "The name of the daemon to wait for",
"help_first_line": "The name of the daemon to wait for",
"required": true,
"hide": false
}
],
"flags": [],
"mounts": [],
"hide": false,
Expand Down
4 changes: 2 additions & 2 deletions docs/cli/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
- [`pitchfork disable <ID>`](/cli/disable.md)
- [`pitchfork enable <ID>`](/cli/enable.md)
- [`pitchfork list [--hide-header]`](/cli/list.md)
- [`pitchfork logs [-n <N>] [-t --tail] [NAME]...`](/cli/logs.md)
- [`pitchfork logs [-n <N>] [-t --tail] [ID]...`](/cli/logs.md)
- [`pitchfork run [-f --force] <NAME> [CMD]...`](/cli/run.md)
- [`pitchfork start [NAME]...`](/cli/start.md)
- [`pitchfork status <ID>`](/cli/status.md)
Expand All @@ -25,4 +25,4 @@
- [`pitchfork supervisor start [-f --force]`](/cli/supervisor/start.md)
- [`pitchfork supervisor status`](/cli/supervisor/status.md)
- [`pitchfork supervisor stop`](/cli/supervisor/stop.md)
- [`pitchfork wait`](/cli/wait.md)
- [`pitchfork wait <ID>`](/cli/wait.md)
4 changes: 2 additions & 2 deletions docs/cli/logs.md
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
# `pitchfork logs`

- **Usage**: `pitchfork logs [-n <N>] [-t --tail] [NAME]...`
- **Usage**: `pitchfork logs [-n <N>] [-t --tail] [ID]...`
- **Aliases**: `l`

Displays logs for daemon(s)

## Arguments

### `[NAME]...`
### `[ID]...`

Show only logs for the specified daemon(s)

Expand Down
8 changes: 7 additions & 1 deletion docs/cli/wait.md
Original file line number Diff line number Diff line change
@@ -1,8 +1,14 @@
# `pitchfork wait`

- **Usage**: `pitchfork wait`
- **Usage**: `pitchfork wait <ID>`
- **Aliases**: `w`

Wait for a daemon to stop, tailing the logs along the way

Exits with the same status code as the daemon

## Arguments

### `<ID>`

The name of the daemon to wait for
3 changes: 2 additions & 1 deletion pitchfork.usage.kdl
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ cmd "logs" help="Displays logs for daemon(s)" {
arg "<N>"
}
flag "-t --tail" help="Show logs in real-time"
arg "[NAME]..." help="Show only logs for the specified daemon(s)" var=true
arg "[ID]..." help="Show only logs for the specified daemon(s)" var=true
}
cmd "run" help="Runs a one-off daemon" {
alias "r"
Expand Down Expand Up @@ -89,6 +89,7 @@ cmd "wait" help="Wait for a daemon to stop, tailing the logs along the way" {
long_help r"Wait for a daemon to stop, tailing the logs along the way

Exits with the same status code as the daemon"
arg "<ID>" help="The name of the daemon to wait for"
}

complete "id" run="pitchfork ls | awk '{print $1}'"
Expand Down
137 changes: 75 additions & 62 deletions src/cli/logs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use xx::regex;
#[clap(visible_alias = "l", verbatim_doc_comment)]
pub struct Logs {
/// Show only logs for the specified daemon(s)
name: Vec<String>,
id: Vec<String>,

/// Show N lines of logs
///
Expand All @@ -30,38 +30,16 @@ pub struct Logs {

impl Logs {
pub async fn run(&self) -> Result<()> {
let names = self.name.iter().collect::<HashSet<_>>();
let mut log_files = xx::file::ls(&*env::PITCHFORK_LOGS_DIR)?
.into_iter()
.filter(|d| !d.starts_with("."))
.filter(|d| d.is_dir())
.filter_map(|d| d.file_name().map(|f| f.to_string_lossy().to_string()))
.filter(|n| names.is_empty() || names.contains(n))
.map(|n| {
let path = env::PITCHFORK_LOGS_DIR
.join(&n)
.join(format!("{n}.log"))
.canonicalize()
.into_diagnostic()?;
Ok((
n.clone(),
LogFile {
_name: n,
file: xx::file::open(&path)?,
// TODO: might be better to build the length when reading the file so we don't have gaps
cur: xx::file::metadata(&path).into_diagnostic()?.len(),
path,
},
))
})
.filter_ok(|(_, f)| f.path.exists())
.collect::<Result<BTreeMap<_, _>>>()?;
self.print_existing_logs()?;
if self.tail {
tail_logs(&self.id).await?;
}

let files_to_name = log_files
.iter()
.map(|(n, f)| (f.path.clone(), n.clone()))
.collect::<HashMap<_, _>>();
Ok(())
}

fn print_existing_logs(&self) -> Result<()> {
let log_files = get_log_file_infos(&self.id)?;
let log_lines = log_files.iter().flat_map(|(name, lf)| {
let rev = match xx::file::open(&lf.path) {
Ok(f) => rev_lines::RevLines::new(f),
Expand Down Expand Up @@ -91,37 +69,6 @@ impl Logs {
for (date, name, msg) in log_lines {
println!("{} {} {}", date, name, msg);
}

if self.tail {
let mut wf = WatchFiles::new(Duration::from_millis(10))?;

for lf in log_files.values() {
wf.watch(&lf.path, RecursiveMode::NonRecursive)?;
}

while let Some(paths) = wf.rx.recv().await {
let mut out = vec![];
for path in paths {
let name = files_to_name.get(&path).unwrap().to_string();
let info = log_files.get_mut(&name).unwrap();
info.file
.seek(SeekFrom::Start(info.cur))
.into_diagnostic()?;
let reader = BufReader::new(&info.file);
let lines = reader.lines().map_while(Result::ok).collect_vec();
info.cur += lines.iter().fold(0, |acc, l| acc + l.len() as u64);
out.extend(merge_log_lines(&name, lines));
}
let out = out
.into_iter()
.sorted_by_cached_key(|l| l.0.to_string())
.collect_vec();
for (date, name, msg) in out {
println!("{} {} {}", date, name, msg);
}
}
}

Ok(())
}
}
Expand All @@ -145,6 +92,72 @@ fn merge_log_lines(name: &str, lines: Vec<String>) -> Vec<(String, String, Strin
})
}

fn get_log_file_infos(names: &Vec<String>) -> Result<BTreeMap<String, LogFile>> {
let names = names.iter().collect::<HashSet<_>>();
xx::file::ls(&*env::PITCHFORK_LOGS_DIR)?
.into_iter()
.filter(|d| !d.starts_with("."))
.filter(|d| d.is_dir())
.filter_map(|d| d.file_name().map(|f| f.to_string_lossy().to_string()))
.filter(|n| names.is_empty() || names.contains(n))
.map(|n| {
let path = env::PITCHFORK_LOGS_DIR
.join(&n)
.join(format!("{n}.log"))
.canonicalize()
.into_diagnostic()?;
Ok((
n.clone(),
LogFile {
_name: n,
file: xx::file::open(&path)?,
// TODO: might be better to build the length when reading the file so we don't have gaps
cur: xx::file::metadata(&path).into_diagnostic()?.len(),
path,
},
))
})
.filter_ok(|(_, f)| f.path.exists())
.collect::<Result<BTreeMap<_, _>>>()
}

pub async fn tail_logs(names: &Vec<String>) -> Result<()> {
let mut log_files = get_log_file_infos(names)?;
let mut wf = WatchFiles::new(Duration::from_millis(10))?;

for lf in log_files.values() {
wf.watch(&lf.path, RecursiveMode::NonRecursive)?;
}

let files_to_name = log_files
.iter()
.map(|(n, f)| (f.path.clone(), n.clone()))
.collect::<HashMap<_, _>>();

while let Some(paths) = wf.rx.recv().await {
let mut out = vec![];
for path in paths {
let name = files_to_name.get(&path).unwrap().to_string();
let info = log_files.get_mut(&name).unwrap();
info.file
.seek(SeekFrom::Start(info.cur))
.into_diagnostic()?;
let reader = BufReader::new(&info.file);
let lines = reader.lines().map_while(Result::ok).collect_vec();
info.cur += lines.iter().fold(0, |acc, l| acc + l.len() as u64);
out.extend(merge_log_lines(&name, lines));
}
let out = out
.into_iter()
.sorted_by_cached_key(|l| l.0.to_string())
.collect_vec();
for (date, name, msg) in out {
println!("{} {} {}", date, name, msg);
}
}
Ok(())
}

struct LogFile {
_name: String,
path: PathBuf,
Expand Down
9 changes: 5 additions & 4 deletions src/cli/supervisor/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use crate::{procs, Result};
use crate::procs::Procs;
use crate::Result;

mod run;
mod start;
Expand Down Expand Up @@ -34,16 +35,16 @@ impl Supervisor {
/// if --force is passed, will kill existing process
/// Returns false if existing pid is running and --force was not passed (so we should cancel starting the daemon)
pub fn kill_or_stop(existing_pid: u32, force: bool) -> Result<bool> {
if let Some(process) = procs::get_process(existing_pid) {
if let Some(process) = Procs::new().get_process(existing_pid) {
if force {
debug!("Killing existing pitchfork supervisor with pid {existing_pid}");
debug!("killing pid {existing_pid}");
if sysinfo::Process::kill_with(process, sysinfo::Signal::Term).is_none() {
sysinfo::Process::kill(process);
}
Ok(true)
} else {
let existing_pid = process.pid();
warn!("Pitchfork is already running with pid {existing_pid}. Kill it with `--force`");
warn!("pitchfork supervisor is already running with pid {existing_pid}. Kill it with `--force`");
Ok(false)
}
} else {
Expand Down
19 changes: 13 additions & 6 deletions src/cli/supervisor/start.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use crate::cli::supervisor::kill_or_stop;
use crate::ipc::client::IpcClient;
use crate::state_file::StateFile;
use crate::{env, Result};
use duct::cmd;
Expand All @@ -16,19 +17,25 @@ pub struct Start {
impl Start {
pub async fn run(&self) -> Result<()> {
let sf = StateFile::get();
let mut running = false;
if let Some(d) = sf.daemons.get("pitchfork") {
if let Some(pid) = d.pid {
if !(kill_or_stop(pid, self.force)?) {
return Ok(());
running = true;
}
}
}

cmd!(&*env::BIN_PATH, "daemon", "run")
.stdout_null()
.stderr_null()
.start()
.into_diagnostic()?;
if !running {
cmd!(&*env::BIN_PATH, "supervisor", "run")
.stdout_null()
.stderr_null()
.start()
.into_diagnostic()?;
}

IpcClient::connect().await?;
println!("Pitchfork daemon is running");

Ok(())
}
Expand Down
32 changes: 31 additions & 1 deletion src/cli/wait.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,44 @@
use crate::cli::logs;
use crate::procs::Procs;
use crate::state_file::StateFile;
use crate::Result;
use tokio::time;

/// Wait for a daemon to stop, tailing the logs along the way
///
/// Exits with the same status code as the daemon
#[derive(Debug, clap::Args)]
#[clap(visible_alias = "w", verbatim_doc_comment)]
pub struct Wait {}
pub struct Wait {
/// The name of the daemon to wait for
id: String,
}

impl Wait {
pub async fn run(&self) -> Result<()> {
let sf = StateFile::get();
let pid = if let Some(pid) = sf.daemons.get(&self.id).and_then(|d| d.pid) {
pid
} else {
warn!("{} is not running", self.id);
return Ok(());
};

let tail_names = vec![self.id.to_string()];
tokio::spawn(async move {
logs::tail_logs(&tail_names).await.unwrap_or_default();
});

let mut procs = Procs::new();
let mut interval = time::interval(time::Duration::from_millis(100));
loop {
if procs.get_process(pid).is_none() {
break;
}
interval.tick().await;
procs.refresh_processes();
}

Ok(())
}
}
Loading

0 comments on commit 420096b

Please sign in to comment.