diff --git a/src/config.rs b/src/config.rs index 9ae4cb5..86fb37c 100644 --- a/src/config.rs +++ b/src/config.rs @@ -6,7 +6,8 @@ use ts_rs::TS; #[derive(Clone, TS, Serialize, Deserialize, Debug)] #[ts(export, export_to = "web/src/bindings/")] pub struct Config { - pub ytarchive: YtarchiveConfig, + pub ytarchive: RecorderConfig, + pub ytdlp: RecorderConfig, pub scraper: ScraperConfig, pub notifier: Option, pub webserver: Option, @@ -19,7 +20,7 @@ pub struct Config { #[derive(Clone, TS, Serialize, Deserialize, Debug)] #[ts(export, export_to = "web/src/bindings/")] -pub struct YtarchiveConfig { +pub struct RecorderConfig { pub executable_path: String, pub working_directory: String, pub args: Vec, @@ -88,6 +89,7 @@ pub struct ChannelConfig { pub outpath: String, /// If not present, will be fetched during runtime. pub picture_url: Option, + pub recorder: String, // TODO: Check this is a valid recorder on config load. } fn default_false() -> bool { diff --git a/src/main.rs b/src/main.rs index 8596965..10116f7 100644 --- a/src/main.rs +++ b/src/main.rs @@ -116,7 +116,7 @@ async fn main() -> Result<()> { let config = Arc::new(RwLock::new(config)); let h_scraper = run_module!(bus, module::scraper::RSS::new(config.clone())); - let h_recorder = run_module!(bus, module::recorder::YTArchive::new(config.clone())); + let h_recorder = run_module!(bus, module::recorder::RecorderRunner::new(config.clone())); let h_notifier = run_module!(bus, module::notifier::Discord::new(config.clone())); let h_webserver = run_module!(bus, module::web::WebServer::new(config.clone())); diff --git a/src/module/mod.rs b/src/module/mod.rs index 2941b9f..f4f95e4 100644 --- a/src/module/mod.rs +++ b/src/module/mod.rs @@ -1,4 +1,4 @@ -use self::recorder::YTAStatus; +use self::recorder::VideoStatus; use crate::{config::Config, msgbus::BusTx}; use anyhow::Result; use async_trait::async_trait; @@ -30,6 +30,7 @@ pub struct Task { pub channel_id: String, pub channel_picture: Option, pub output_directory: String, + pub recorder: String, } #[derive(Debug, Clone, TS)] @@ -43,7 +44,7 @@ pub struct Notification { #[ts(export, export_to = "web/src/bindings/")] pub struct RecordingStatus { pub task: Task, - pub status: YTAStatus, + pub status: VideoStatus, } #[derive(Debug, Clone, PartialEq, TS)] diff --git a/src/module/recorder/mod.rs b/src/module/recorder/mod.rs new file mode 100644 index 0000000..dcb43ef --- /dev/null +++ b/src/module/recorder/mod.rs @@ -0,0 +1,169 @@ +use crate::module::Module; + +mod ytarchive; +use ytarchive::YTArchive; +pub mod ytdlp; +use ytdlp::YTDlp; +use super::{Message, Notification, Task, TaskStatus}; +use crate::msgbus::BusTx; +use crate::{config::Config, module::RecordingStatus}; +use anyhow::{anyhow, Context, Result}; +use async_trait::async_trait; +use chrono::{DateTime, Utc}; +use lazy_static::lazy_static; +use regex::Regex; +use serde::{Deserialize, Serialize}; +use std::collections::HashSet; +use std::{ + fs, + path::Path, + process::Stdio, + sync::{ + atomic::{AtomicBool, Ordering}, + Arc, + }, +}; +use tokio::{ + io::{AsyncReadExt, BufReader}, + sync::{mpsc, RwLock}, +}; +use ts_rs::TS; + +/// The current state of video. +#[derive(Debug, Clone, TS, Serialize)] +#[ts(export, export_to = "web/src/bindings/")] +pub struct VideoStatus { + version: Option, + state: RecorderState, + last_output: Option, + last_update: chrono::DateTime, + video_fragments: Option, + audio_fragments: Option, + total_size: Option, + video_quality: Option, + output_file: Option, +} + +impl VideoStatus { + pub fn new() -> Self { + Self { + version: None, + state: RecorderState::Idle, + last_output: None, + last_update: chrono::Utc::now(), + video_fragments: None, + audio_fragments: None, + total_size: None, + video_quality: None, + output_file: None, + } + } +} + +#[derive(Debug, Clone, PartialEq, TS, Serialize, Deserialize)] +#[ts(export, export_to = "web/src/bindings/")] +pub enum RecorderState { + Idle, + Waiting(Option>), + Recording, + Muxing, + Finished, + AlreadyProcessed, + Ended, + Interrupted, + Errored, +} + +struct SpawnTask { + task: Task, + cfg: Config, + tx: BusTx, +} + +pub struct RecorderRunner { + config: Arc>, + active_ids: Arc>>, +} + +#[async_trait] +impl Module for RecorderRunner { + fn new(config: Arc>) -> Self { + let active_ids = Arc::new(RwLock::new(HashSet::new())); + Self { config, active_ids } + } + + async fn run(&self, tx: &BusTx, rx: &mut mpsc::Receiver) -> Result<()> { + // Create a spawn queue + let (spawn_tx, mut spawn_rx) = mpsc::unbounded_channel::(); + + // Future to handle spawning new tasks + let active_ids = self.active_ids.clone(); + let f_spawner = async move { + while let Some(mut task) = spawn_rx.recv().await { + let active_ids = active_ids.clone(); + let delay = task.cfg.ytarchive.delay_start; + + debug!("Spawning thread for task: {:?}", task.task); + tokio::spawn(async move { + let video_id = task.task.video_id.clone(); + active_ids.write().await.insert(video_id.clone()); + + match task.task.recorder.as_str() { + "ytarchive" => { + if let Err(e) = YTArchive::record(task.cfg, task.task, &mut task.tx).await { + error!("Failed to record task: {:?}", e); + } + } + "yt-dlp" => { + if let Err(e) = YTDlp::record(task.cfg, task.task, &mut task.tx).await { + error!("Failed to record task: {:?}", e); + } + } + _ => error!("Failed to record task: invalid recorder {:?}", task.task.recorder), + } + + active_ids.write().await.remove(&video_id); + }); + + // Wait a bit before starting the next task + tokio::time::sleep(delay).await; + } + + Ok::<(), anyhow::Error>(()) + }; + + // Future to handle incoming messages + let f_message = async move { + while let Some(message) = rx.recv().await { + match message { + Message::ToRecord(task) => { + // Check if the task is already active + if self.active_ids.read().await.contains(&task.video_id) { + warn!("Task {} is already active, skipping", task.video_id); + continue; + } + + debug!("Adding task to spawn queue: {:?}", task); + let tx = tx.clone(); + let cfg = self.config.read().await; + let cfg = cfg.clone(); + + if let Err(_) = spawn_tx.send(SpawnTask { task, cfg, tx }) { + debug!("Spawn queue closed, exiting"); + break; + } + } + _ => (), + } + } + + Ok::<(), anyhow::Error>(()) + }; + + // Run the futures + tokio::try_join!(f_spawner, f_message)?; + + debug!("RecorderRunner module finished"); + Ok(()) + } +} diff --git a/src/module/recorder.rs b/src/module/recorder/ytarchive.rs similarity index 67% rename from src/module/recorder.rs rename to src/module/recorder/ytarchive.rs index adb0a50..12133e1 100644 --- a/src/module/recorder.rs +++ b/src/module/recorder/ytarchive.rs @@ -1,4 +1,5 @@ -use super::{Message, Module, Notification, Task, TaskStatus}; +use super::super::{Message, Module, Notification, Task, TaskStatus}; +use super::{RecorderState, VideoStatus}; use crate::msgbus::BusTx; use crate::{config::Config, module::RecordingStatus}; use anyhow::{anyhow, Context, Result}; @@ -23,13 +24,10 @@ use tokio::{ }; use ts_rs::TS; -pub struct YTArchive { - config: Arc>, - active_ids: Arc>>, -} +pub struct YTArchive; impl YTArchive { - async fn record(cfg: Config, task: Task, bus: &mut BusTx) -> Result<()> { + pub async fn record(cfg: Config, task: Task, bus: &mut BusTx) -> Result<()> { let task_name = format!("[{}][{}][{}]", task.video_id, task.channel_name, task.title); // Ensure the working directory exists @@ -56,6 +54,8 @@ impl YTArchive { cfg.quality.clone(), ]); + // TODO: This code almost completely same between ytarchive and yt-dlp. Share it. + // Start the process debug!("{} Starting ytarchive with args {:?}", task_name, args); let mut process = tokio::process::Command::new(&cfg.executable_path) @@ -173,7 +173,7 @@ impl YTArchive { }); // Parse each line - let mut status = YTAStatus::new(); + let mut parser = YTAOutputParser::new(); loop { let line = match rx.recv().await { Some(line) => line, @@ -187,14 +187,14 @@ impl YTArchive { trace!("{}[yta:out] {}", task_name, line); - let old = status.clone(); - status.parse_line(&line); + let old_state = parser.video_status.state.clone(); + parser.parse_line(&line); // Push the current status to the bus if let Err(_) = bus .send(Message::RecordingStatus(RecordingStatus { task: task.clone(), - status: status.clone(), + status: parser.video_status.clone(), })) .await { @@ -202,37 +202,37 @@ impl YTArchive { } // Check if status changed - if old.state == status.state { + if old_state == parser.video_status.state { continue; } - let message = match status.state { - YTAState::Waiting(_) => { + let message = match parser.video_status.state { + RecorderState::Waiting(_) => { info!("{} Waiting for stream to go live", task_name); Some(Message::ToNotify(Notification { task: task.clone(), status: TaskStatus::Waiting, })) } - YTAState::Recording => { + RecorderState::Recording => { info!("{} Recording started", task_name); Some(Message::ToNotify(Notification { task: task.clone(), status: TaskStatus::Recording, })) } - YTAState::Finished => { + RecorderState::Finished => { info!("{} Recording finished", task_name); Some(Message::ToNotify(Notification { task: task.clone(), status: TaskStatus::Done, })) } - YTAState::AlreadyProcessed => { + RecorderState::AlreadyProcessed => { info!("{} Video already processed, skipping", task_name); None } - YTAState::Interrupted => { + RecorderState::Interrupted => { info!("{} Recording failed: interrupted", task_name); Some(Message::ToNotify(Notification { task: task.clone(), @@ -250,7 +250,7 @@ impl YTArchive { } } - trace!("{} Status loop exited: {:?}", task_name, status); + trace!("{} Status loop exited: {:?}", task_name, parser.video_status); // Wait for threads to finish let (r_wait, r_stdout, r_stderr) = futures::join!(h_wait, h_stdout, h_stderr); @@ -259,12 +259,12 @@ impl YTArchive { trace!("{} Stderr monitor quit: {:?}", task_name, r_stderr); // Skip moving files if it didn't finish - if status.state != YTAState::Finished { + if parser.video_status.state != RecorderState::Finished { return Ok(()); } // Move the video to the output directory - let frompath = status + let frompath = parser.video_status .output_file .ok_or(anyhow!("ytarchive did not emit an output file"))?; let frompath = Path::new(&frompath); @@ -297,114 +297,6 @@ impl YTArchive { } } -struct SpawnTask { - task: Task, - cfg: Config, - tx: BusTx, -} - -#[async_trait] -impl Module for YTArchive { - fn new(config: Arc>) -> Self { - let active_ids = Arc::new(RwLock::new(HashSet::new())); - Self { config, active_ids } - } - - async fn run(&self, tx: &BusTx, rx: &mut mpsc::Receiver) -> Result<()> { - // Create a spawn queue - let (spawn_tx, mut spawn_rx) = mpsc::unbounded_channel::(); - - // Future to handle spawning new tasks - let active_ids = self.active_ids.clone(); - let f_spawner = async move { - while let Some(mut task) = spawn_rx.recv().await { - let active_ids = active_ids.clone(); - let delay = task.cfg.ytarchive.delay_start; - - debug!("Spawning thread for task: {:?}", task.task); - tokio::spawn(async move { - let video_id = task.task.video_id.clone(); - active_ids.write().await.insert(video_id.clone()); - - if let Err(e) = YTArchive::record(task.cfg, task.task, &mut task.tx).await { - error!("Failed to record task: {:?}", e); - }; - - active_ids.write().await.remove(&video_id); - }); - - // Wait a bit before starting the next task - tokio::time::sleep(delay).await; - } - - Ok::<(), anyhow::Error>(()) - }; - - // Future to handle incoming messages - let f_message = async move { - while let Some(message) = rx.recv().await { - match message { - Message::ToRecord(task) => { - // Check if the task is already active - if self.active_ids.read().await.contains(&task.video_id) { - warn!("Task {} is already active, skipping", task.video_id); - continue; - } - - debug!("Adding task to spawn queue: {:?}", task); - let tx = tx.clone(); - let cfg = self.config.read().await; - let cfg = cfg.clone(); - - if let Err(_) = spawn_tx.send(SpawnTask { task, cfg, tx }) { - debug!("Spawn queue closed, exiting"); - break; - } - } - _ => (), - } - } - - Ok::<(), anyhow::Error>(()) - }; - - // Run the futures - tokio::try_join!(f_spawner, f_message)?; - - debug!("YTArchive module finished"); - Ok(()) - } -} - -/// The current state of ytarchive. -#[derive(Debug, Clone, TS, Serialize)] -#[ts(export, export_to = "web/src/bindings/")] -pub struct YTAStatus { - version: Option, - state: YTAState, - last_output: Option, - last_update: chrono::DateTime, - video_fragments: Option, - audio_fragments: Option, - total_size: Option, - video_quality: Option, - output_file: Option, -} - -#[derive(Debug, Clone, PartialEq, TS, Serialize)] -#[ts(export, export_to = "web/src/bindings/")] -pub enum YTAState { - Idle, - Waiting(Option>), - Recording, - Muxing, - Finished, - AlreadyProcessed, - Ended, - Interrupted, - Errored, -} - fn strip_ansi(s: &str) -> String { lazy_static! { static ref RE: Regex = Regex::new(concat!( @@ -421,18 +313,14 @@ fn strip_ansi(s: &str) -> String { .to_string() } -impl YTAStatus { +pub struct YTAOutputParser { + video_status: VideoStatus, +} + +impl YTAOutputParser { pub fn new() -> Self { Self { - version: None, - state: YTAState::Idle, - last_output: None, - last_update: chrono::Utc::now(), - video_fragments: None, - audio_fragments: None, - total_size: None, - video_quality: None, - output_file: None, + video_status: VideoStatus::new() } } @@ -449,36 +337,36 @@ impl YTAStatus { /// Muxing final file... /// Final file: /path/to/output.mp4 pub fn parse_line(&mut self, line: &str) { - self.last_output = Some(line.to_string()); - self.last_update = chrono::Utc::now(); + self.video_status.last_output = Some(line.to_string()); + self.video_status.last_update = chrono::Utc::now(); if line.starts_with("Video Fragments: ") { - self.state = YTAState::Recording; + self.video_status.state = RecorderState::Recording; let mut parts = line.split(';').map(|s| s.split(':').nth(1).unwrap_or("")); if let Some(x) = parts.next() { - self.video_fragments = x.trim().parse().ok(); + self.video_status.video_fragments = x.trim().parse().ok(); }; if let Some(x) = parts.next() { - self.audio_fragments = x.trim().parse().ok(); + self.video_status.audio_fragments = x.trim().parse().ok(); }; if let Some(x) = parts.next() { - self.total_size = Some(strip_ansi(x.trim())); + self.video_status.total_size = Some(strip_ansi(x.trim())); }; return; } else if line.starts_with("Audio Fragments: ") { - self.state = YTAState::Recording; + self.video_status.state = RecorderState::Recording; let mut parts = line.split(';').map(|s| s.split(':').nth(1).unwrap_or("")); if let Some(x) = parts.next() { - self.audio_fragments = x.trim().parse().ok(); + self.video_status.audio_fragments = x.trim().parse().ok(); }; if let Some(x) = parts.next() { - self.total_size = Some(strip_ansi(x.trim())); + self.video_status.total_size = Some(strip_ansi(x.trim())); }; return; } // New versions of ytarchive prepend a timestamp to the output - let line = if self.version == Some("0.3.2".into()) + let line = if self.video_status.version == Some("0.3.2".into()) && line.len() > 20 && line.chars().nth(4) == Some('/') { @@ -487,37 +375,37 @@ impl YTAStatus { line }; - if self.version == None && line.starts_with("ytarchive ") { - self.version = Some(strip_ansi(&line[10..])); - } else if self.video_quality == None && line.starts_with("Selected quality: ") { - self.video_quality = Some(strip_ansi(&line[18..])); + if self.video_status.version == None && line.starts_with("ytarchive ") { + self.video_status.version = Some(strip_ansi(&line[10..])); + } else if self.video_status.video_quality == None && line.starts_with("Selected quality: ") { + self.video_status.video_quality = Some(strip_ansi(&line[18..])); } else if line.starts_with("Stream starts at ") { let date = DateTime::parse_from_rfc3339(&line[17..42]) .ok() .map(|d| d.into()); - self.state = YTAState::Waiting(date); + self.video_status.state = RecorderState::Waiting(date); } else if line.starts_with("Stream is ") || line.starts_with("Waiting for stream") { - self.state = YTAState::Waiting(None); + self.video_status.state = RecorderState::Waiting(None); } else if line.starts_with("Muxing final file") { - self.state = YTAState::Muxing; + self.video_status.state = RecorderState::Muxing; } else if line.starts_with("Livestream has been processed") { - self.state = YTAState::AlreadyProcessed; + self.video_status.state = RecorderState::AlreadyProcessed; } else if line.starts_with("Livestream has ended and is being processed") || line.contains("use yt-dlp to download it.") { - self.state = YTAState::Ended; + self.video_status.state = RecorderState::Ended; } else if line.starts_with("Final file: ") { - self.state = YTAState::Finished; - self.output_file = Some(strip_ansi(&line[12..])); + self.video_status.state = RecorderState::Finished; + self.video_status.output_file = Some(strip_ansi(&line[12..])); } else if line.contains("User Interrupt") { - self.state = YTAState::Interrupted; + self.video_status.state = RecorderState::Interrupted; } else if line.contains("Error retrieving player response") || line.contains("unable to retrieve") || line.contains("error writing the muxcmd file") || line.contains("Something must have gone wrong with ffmpeg") || line.contains("At least one error occurred") { - self.state = YTAState::Errored; + self.video_status.state = RecorderState::Errored; } else if line.trim().is_empty() || line.contains("Loaded cookie file") || line.starts_with("Video Title: ") diff --git a/src/module/recorder/ytdlp.rs b/src/module/recorder/ytdlp.rs new file mode 100644 index 0000000..915430f --- /dev/null +++ b/src/module/recorder/ytdlp.rs @@ -0,0 +1,514 @@ +use super::super::{Message, Module, Notification, Task, TaskStatus}; +use super::{RecorderState, VideoStatus}; +use crate::msgbus::BusTx; +use crate::{config::Config, module::RecordingStatus}; +use anyhow::{anyhow, Context, Result}; +use async_trait::async_trait; +use chrono::{DateTime, Utc, Duration, Local}; +use lazy_static::lazy_static; +use regex::Regex; +use serde::Serialize; +use std::collections::{BTreeMap, HashMap, HashSet}; +use std::{ + fs, + path::Path, + process::Stdio, + sync::{ + atomic::{AtomicBool, Ordering}, + Arc, + }, +}; +use tokio::{ + io::{AsyncReadExt, BufReader}, + sync::{mpsc, RwLock}, +}; +use ts_rs::TS; + +pub struct YTDlp { + config: Arc>, + active_ids: Arc>>, +} + +const PROGRESS_BAR_FORMAT_MAP: [[&str; 2]; 10] = [ + ["percentage", "progress._percent_str"], + ["total_size", "progress._total_bytes_str"], + ["estimated_total_size", "progress._total_bytes_estimate_str"], + ["downloaded_size", "progress._downloaded_bytes_str"], + ["speed", "progress._speed_str"], + ["eta", "progress._eta_str"], + ["elapsed_time", "progress._elapsed_str"], + ["total_fragments", "progress.fragment_count"], + ["current_fragment_count", "progress.fragment_index"], + ["format", "info.format"], +]; + +impl YTDlp { + pub async fn record(cfg: Config, task: Task, bus: &mut BusTx) -> Result<()> { + let task_name = format!("[{}][{}][{}]", task.video_id, task.channel_name, task.title); + + // Ensure the working directory exists + let cfg = cfg.ytdlp; + tokio::fs::create_dir_all(&cfg.working_directory) + .await + .context("Failed to create working directory")?; + + // Ensure the output directory exists + tokio::fs::create_dir_all(&task.output_directory) + .await + .context("Failed to create output directory")?; + + // Construct the command line arguments + let mut args = cfg.args.clone(); + + // Add the --wait-for-video flag if not present + if !args.contains(&"--wait-for-video".to_string()) { + // --wait-for-video requires an arg dictating how often to poll, but at least for youtube it's appears to be ignored and yt-dlp uses the scheduled start time instead. + args.extend(vec![ + "--wait-for-video".to_string(), + "10".to_string(), + ]); + } + + // Add the --live-from-start flag if not present + if !args.contains(&"--live-from-start".to_string()) { + args.push("--live-from-start".to_string()); + } + + // Add the --no-colors flag if not present to not output ANSI codes + if !args.contains(&"--no-colors".to_string()) { + args.push("--no-colors".to_string()); + } + + if !args.contains(&"--newline".to_string()) { + args.push("--newline".to_string()); + } + + let progress_bar_template = PROGRESS_BAR_FORMAT_MAP.map(|x| format!("%({})s", x[1]) ).join(","); + args.extend(vec![ + "--progress-template".to_string(), + format!("[download_progress] {progress_bar_template}\n").to_string(), + ]); + + args.extend(vec![ + "--exec".to_string(), + r#""echo '[download_finished] output_file: (filepath,_filename|)q'""#.to_string(), + ]); + + args.extend(vec![ + format!("https://www.youtube.com/watch?v={}", task.video_id), + cfg.quality.clone(), + ]); + + // TODO: This code almost completely same between ytarchive and yt-dlp. Share it. + + // Start the process + debug!("{} Starting yt-dlp with args {:?}", task_name, args); + let mut process = tokio::process::Command::new(&cfg.executable_path) + .args(args) + .current_dir(&cfg.working_directory) + .stdin(Stdio::null()) + .stdout(Stdio::piped()) + .stderr(Stdio::piped()) + .spawn() + .context("Failed to start yt-dlp")?; + + // Grab stdout/stderr byte iterators + let mut stdout = BufReader::new( + process + .stdout + .take() + .ok_or(anyhow!("Failed to take stdout"))?, + ); + let mut stderr = BufReader::new( + process + .stderr + .take() + .ok_or(anyhow!("Failed to take stderr"))?, + ); + + // Create a channel to consolidate stdout and stderr + let (tx, mut rx) = mpsc::channel(1); + + // Flag to mark when the process has exited + let done = Arc::from(AtomicBool::new(false)); + + macro_rules! read_line { + ($reader:expr, $tx:expr) => {{ + // Read bytes until a \r or \n is returned + let mut bytes = Vec::new(); + loop { + match $reader.read_u8().await { + Ok(byte) => { + if byte == b'\r' || byte == b'\n' { + break; + } + bytes.push(byte); + } + _ => break, + } + } + + // Skip if there are no bytes + if bytes.is_empty() { + continue; + } + + // Convert to a string + let line = match std::str::from_utf8(&bytes) { + Ok(line) => line.to_owned(), + Err(e) => { + trace!("Failed to read utf8: {:?}", e); + break; + } + }; + + // Send the line to the channel + if let Err(e) = $tx.send(line).await { + trace!("Failed to send line: {:?}", e); + break; + } + }}; + } + + // Read stdout + let h_stdout = tokio::spawn({ + let done = done.clone(); + let task_name = task_name.clone(); + let tx = tx.clone(); + async move { + while !done.load(Ordering::Relaxed) { + read_line!(&mut stdout, tx); + } + trace!("{} stdout reader exited", task_name); + } + }); + + // Read stderr + let h_stderr = tokio::spawn({ + let done = done.clone(); + let task_name = task_name.clone(); + let tx = tx.clone(); + async move { + while !done.load(Ordering::Relaxed) { + read_line!(&mut stderr, tx); + } + trace!("{} stderr reader exited", task_name); + } + }); + + // Wait for the process to exit + let h_wait = tokio::spawn({ + let done = done.clone(); + let task_name = task_name.clone(); + async move { + let result = process.wait().await; + + // Wait a bit for the stdout to be completely read + tokio::time::sleep(tokio::time::Duration::from_millis(100)).await; + + // Stop threads + done.store(true, Ordering::Relaxed); + debug!("{} Process exited with {:?}", task_name, result); + + // Send a blank message to unblock the status monitor thread + let _ = tx.send("".into()); + + result + } + }); + + // Parse each line + let mut parser = YTDOutputParser::new(); + loop { + let line = match rx.recv().await { + Some(line) => line, + None => break, + }; + + // Stop when done + if done.load(Ordering::Relaxed) { + break; + } + + trace!("{}[ytd:out] {}", task_name, line); + + let old_state = parser.video_status.state.clone(); + parser.parse_line(&line); + + // Push the current status to the bus + if let Err(_) = bus + .send(Message::RecordingStatus(RecordingStatus { + task: task.clone(), + status: parser.video_status.clone(), + })) + .await + { + break; + } + + // Check if status changed + if old_state == parser.video_status.state { + continue; + } + + let message = match parser.video_status.state { + RecorderState::Waiting(_) => { + info!("{} Waiting for stream to go live", task_name); + Some(Message::ToNotify(Notification { + task: task.clone(), + status: TaskStatus::Waiting, + })) + } + RecorderState::Recording => { + info!("{} Recording started", task_name); + Some(Message::ToNotify(Notification { + task: task.clone(), + status: TaskStatus::Recording, + })) + } + RecorderState::Finished => { + info!("{} Recording finished", task_name); + Some(Message::ToNotify(Notification { + task: task.clone(), + status: TaskStatus::Done, + })) + } + RecorderState::AlreadyProcessed => { + info!("{} Video already processed, skipping", task_name); + None + } + RecorderState::Interrupted => { + info!("{} Recording failed: interrupted", task_name); + Some(Message::ToNotify(Notification { + task: task.clone(), + status: TaskStatus::Failed, + })) + } + _ => None, + }; + + if let Some(message) = message { + // Exit the loop if message failed to send + if let Err(_) = bus.send(message).await { + break; + } + } + } + + trace!("{} Status loop exited: {:?}", task_name, parser.video_status); + + // Wait for threads to finish + let (r_wait, r_stdout, r_stderr) = futures::join!(h_wait, h_stdout, h_stderr); + trace!("{} Process monitor exited: {:?}", task_name, r_wait); + trace!("{} Stdout monitor quit: {:?}", task_name, r_stdout); + trace!("{} Stderr monitor quit: {:?}", task_name, r_stderr); + + // Skip moving files if it didn't finish + if parser.video_status.state != RecorderState::Finished { + return Ok(()); + } + + // Move the video to the output directory + let frompath = parser.video_status + .output_file + .ok_or(anyhow!("yt-dlp did not emit an output file"))?; + let frompath = Path::new(&frompath); + let filename = frompath + .file_name() + .ok_or(anyhow!("Failed to get filename"))?; + let destpath = Path::new(&task.output_directory).join(filename); + + // Try to rename the file into the output directory + if let Err(_) = fs::rename(frompath, &destpath) { + debug!( + "{} Failed to rename file to output, trying to copy", + task_name, + ); + + // Copy the file into the output directory + fs::copy(frompath, &destpath) + .with_context(|| format!("Failed to copy file to output: {:?}", destpath))?; + info!( + "{} Copied output file to {}, removing original", + task_name, + destpath.display(), + ); + fs::remove_file(frompath) + .with_context(|| format!("Failed to remove original file: {:?}", frompath))?; + } + + info!("{} Moved output file to {}", task_name, destpath.display()); + Ok(()) + } +} + +pub struct YTDOutputParser { + video_status: VideoStatus, +} + +impl YTDOutputParser { + pub fn new() -> Self { + Self { + video_status: VideoStatus::new() + } + } + + /// parse_line parses a line of output from the yt-dlp process. + /// + /// Sample output: + /// [Cookies] Extracting cookies from firefox + /// [Cookies] Extracted 2449 cookies from firefox + /// [youtube] Extracting URL: https://www.youtube.com/watch?v=gEdOmal1A6Q + /// [youtube] gEdOmal1A6Q: Downloading webpage + /// [youtube] gEdOmal1A6Q: Downloading android player API JSON + /// [info] gEdOmal1A6Q: Downloading 1 format(s): 299+251 + /// [info] There's no subtitles for the requested languages + /// [info] Writing video metadata as JSON to: im orb [gEdOmal1A6Q].info.json + /// [dashsegments] Total fragments: 11 + /// [download] Destination: im orb [gEdOmal1A6Q].f299.mp4 + /// 1: [download_progress] 1.2%, N/A, 1.17GiB, 47.86MiB, 5.61MiB/s,Unknown,00:00:08,NA,278,299 - 1920x1080 (DASH video) + /// 2: [download_progress] 1.2%, N/A, 40MiB, 8.56MiB, 149.68KiB/s,Unknown,00:00:08,NA,414,140 - audio only (DASH audio) + /// [dashsegments] Total fragments: 2 + /// [download] Destination: im orb [gEdOmal1A6Q].f251.webm + /// WARNING: The download speed shown is only of one thread. This is a known issue + /// [download] 100% of 15.42MiB in 00:00:01 at 9.39MiB/s + /// [Merger] Merging formats into "im orb [gEdOmal1A6Q].mkv" + /// Deleting original file im orb [gEdOmal1A6Q].f299.mp4 (pass -k to keep) + /// Deleting original file im orb [gEdOmal1A6Q].f251.webm (pass -k to keep) + /// [EmbedSubtitle] There aren't any subtitles to embed + /// [Metadata] Adding metadata to "im orb [gEdOmal1A6Q].mkv" + pub fn parse_line(&mut self, line: &str) { + self.video_status.last_output = Some(line.to_string()); + self.video_status.last_update = chrono::Utc::now(); + + if line.contains("[download_progress]") { + self.video_status.state = RecorderState::Recording; + // + let re = Regex::new(r"(\d:\s)?\[download_progress\]").unwrap(); + let line = re.replace(line, ""); + let line_values: Vec<_> = line.split(",").map(|x| x.trim()).collect(); + + let parsed_line: HashMap = PROGRESS_BAR_FORMAT_MAP.map(|x| x[0] ).iter().zip( + line_values.iter() + ).map(|(x, y)| (x.to_string(), y.to_string())).collect(); + self.video_status.state = RecorderState::Recording; + self.video_status.last_update = chrono::Utc::now(); + + let total_size = parsed_line.get("total_size"); + if !total_size.eq(&Some(&"N/A".to_string())) { + self.video_status.total_size = total_size.cloned(); + } else { + self.video_status.total_size = parsed_line.get("estimated_total_size").cloned(); + }; + + // This works a bit different than ytarchive. + // This will be something like "299 - 1920x1080 (DASH video)". It's the youtube format and it's specific to the track so a download will have multiple. + // Setting this in the format property probably isn't right + self.video_status.video_quality = parsed_line.get("format").cloned(); + + // Hacky as this will only work DASH live streams. There is probably a better way to differentiate audio track from video track based format that will work in all cases. + if Regex::new(r"\d+x\d+").unwrap().is_match( self.video_status.video_quality.as_ref().unwrap()) { + self.video_status.video_fragments = parsed_line.get("current_fragment_count").unwrap().parse().ok(); + } else if self.video_status.video_quality.as_ref().unwrap().contains("audio only") { + self.video_status.audio_fragments = parsed_line.get("current_fragment_count").unwrap().parse().ok(); + } + + return; + } + + let waiting_text = "[wait] Remaining time until next attempt:"; // Does it make sense to have this as a variable? Move to constant? + if line.starts_with(waiting_text) { + // I like using split as it seems less error prone than referencing a specific position in the string, like is done in some places in ytarchive module. + // Not sure if it really matters or if there is a performance implication of doing it this way. + // [wait] Remaining time until next attempt: 759:35:3459:59:52 + let duration = line.rsplit(waiting_text).next().unwrap(); + + let mut duration_split = duration.rsplit(":").to_owned(); + + let mut seconds: i64 = duration_split.next().unwrap().parse().unwrap(); + if let Some(minutes) = duration_split.next() { + let minutes: i64 = minutes.parse().unwrap(); + seconds += minutes * 60; + }; + if let Some(hours) = duration_split.next() { + let hours: i64 = hours.parse().unwrap(); + seconds += hours * 60 * 60; + }; + + let date: DateTime = (Local::now() + Duration::seconds(seconds)).with_timezone(&Utc); + + self.video_status.state = RecorderState::Waiting(Some(date)); + } else if line.starts_with("[wait]") { + // Trying to handle the case when we are just waiting for the stream and there isn't a timestamp returned. + // I'm not sure of the yt-dlp output in that case. + self.video_status.state = RecorderState::Waiting(None); + } else if line.starts_with("[Merger]") || + line.starts_with("[Metadata]") || // yt-dlp is adding metadata to file. It can take some time depending on disk speed. For now treat it the same as muxing. + line.starts_with("[EmbedSubtitle]") // yt-dlp is adding subtitles to file. It can take a while depending on disk speed. For now treat it the same as muxing. + { + self.video_status.state = RecorderState::Muxing; + } else if line.starts_with("[download_finished]") { + self.video_status.state = RecorderState::Finished; + self.video_status.output_file = Some(line.rsplit("[download_finished] output_file: ").next().unwrap().trim().to_string()); + } else if line.contains("ERROR: Interrupted by user") { + self.video_status.state = RecorderState::Interrupted; + } else if line.starts_with("ERROR:") { // Need to investigate if this is the best way to catch errors + self.video_status.state = RecorderState::Errored; + } else if line.trim().is_empty() + || line.starts_with("[Cookies]") + || line.starts_with("[youtube]") + || line.starts_with("[info]") + || line.starts_with("[dashsegments]") + || line.starts_with("WARNING:") + || line.starts_with("[download]") + || line.starts_with("[generic]") + { + // There are probably more that need to be handled + // Ignore + } else { + warn!("Unknown yt-dlp output: {}", line); + } + + // RecorderState::Ended and RecorderState::AlreadyProcessed aren't relevant for yt-dlp + // May need to add an extra configuration to ignore older videos, or just be fine with scraper.rss ignore_older_than + } +} + +#[cfg(test)] +mod tests { + use super::YTDOutputParser; + use super::{RecorderState, VideoStatus}; + + #[test] + fn test_download_progress_parsing() { + let line = "[download_progress] 2.2%, N/A, 3.17GiB, 70.03MiB, 1.99MiB/s,01:04,00:00:01,325,7,299 - 1920x1080 (1080p60)"; + + let mut parser = YTDOutputParser::new(); + parser.parse_line(line); + + assert_eq!( + parser.video_status.last_output, + Some(line.into()) + ); + + assert_eq!( + parser.video_status.state, + RecorderState::Recording + ); + + assert_eq!( + parser.video_status.total_size, + Some("3.17GiB".into()) + ); + + assert_eq!( + parser.video_status.video_quality, + Some("299 - 1920x1080 (1080p60)".into()) + ); + + assert_eq!( + parser.video_status.video_fragments, + Some(7) + ); + } +} diff --git a/src/module/scraper.rs b/src/module/scraper.rs index f1c4f28..b1be144 100644 --- a/src/module/scraper.rs +++ b/src/module/scraper.rs @@ -128,6 +128,7 @@ impl RSS { channel_id: entry.channel_id.to_owned(), channel_picture: channel.picture_url.clone(), output_directory: channel.outpath.clone(), + recorder: channel.recorder.clone(), }) }) .collect(); diff --git a/src/module/web/handler.rs b/src/module/web/handler.rs index 0d1d94f..0210add 100644 --- a/src/module/web/handler.rs +++ b/src/module/web/handler.rs @@ -94,6 +94,7 @@ async fn post_task( channel_id: ipr.video_details.channel_id, channel_picture: Some(channel_picture), output_directory: taskreq.output_directory, + recorder: "ytarchive".to_owned(), // TODO: Allow choosing recorder in UI }; // Broadcast it to the bus diff --git a/src/module/web/mod.rs b/src/module/web/mod.rs index cf636aa..6acf565 100644 --- a/src/module/web/mod.rs +++ b/src/module/web/mod.rs @@ -1,4 +1,4 @@ -use super::{recorder::YTAStatus, Message, Module, Task}; +use super::{recorder::VideoStatus, Message, Module, Task}; use crate::{ config::{Config, WebserverConfig}, msgbus::BusTx, @@ -25,7 +25,7 @@ pub struct WebServer { #[ts(export, export_to = "web/src/bindings/")] pub struct TaskWithStatus { pub task: Task, - pub status: YTAStatus, + pub status: VideoStatus, } type TaskMap = Data>>; diff --git a/web/src/api/tasks.ts b/web/src/api/tasks.ts index 6c6b77b..21377cf 100644 --- a/web/src/api/tasks.ts +++ b/web/src/api/tasks.ts @@ -1,16 +1,16 @@ import { useMutation, useQuery, useQueryClient } from '@tanstack/react-query'; -import { YTAState } from '../bindings/YTAState'; +import { RecorderState } from '../bindings/RecorderState'; import { TaskWithStatus } from '../bindings/TaskWithStatus'; import { rejectError } from './api'; import { CreateTaskRequest } from '../bindings/CreateTaskRequest'; -export const stateString = (state: YTAState) => { +export const stateString = (state: RecorderState) => { if (typeof state === 'object' && 'Waiting' in state) return 'Waiting (' + state.Waiting + ')'; else if (state === 'AlreadyProcessed') return 'Already Processed'; else return state; }; -export const stateKey = (state: YTAState) => +export const stateKey = (state: RecorderState) => typeof state === 'object' ? (Object.keys(state) as (keyof typeof state)[])[0] : state; diff --git a/web/src/pages/TasksPage.tsx b/web/src/pages/TasksPage.tsx index 96ca92b..43df8d5 100644 --- a/web/src/pages/TasksPage.tsx +++ b/web/src/pages/TasksPage.tsx @@ -24,11 +24,11 @@ import { IconPlus } from '@tabler/icons'; import { closeAllModals, openModal } from '@mantine/modals'; import { showNotification } from '@mantine/notifications'; import { useQueryConfig } from '../api/config'; -import { YTAState } from '../bindings/YTAState'; +import { RecorderState } from '../bindings/RecorderState'; const SleepingPanda = React.lazy(() => import('../lotties/SleepingPanda')); -const TaskStateBadge = ({ state }: { state: YTAState }) => ( +const TaskStateBadge = ({ state }: { state: RecorderState }) => (