Skip to content

Commit

Permalink
Initial yt-dlp support
Browse files Browse the repository at this point in the history
  • Loading branch information
eligodeploy authored and gmanley committed Mar 20, 2023
1 parent 4f7bbab commit e7ee462
Show file tree
Hide file tree
Showing 11 changed files with 744 additions and 173 deletions.
6 changes: 4 additions & 2 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@ use ts_rs::TS;
#[derive(Clone, TS, Serialize, Deserialize, Debug)]
#[ts(export, export_to = "web/src/bindings/")]
pub struct Config {
pub ytarchive: YtarchiveConfig,
pub ytarchive: RecorderConfig,
pub ytdlp: RecorderConfig,
pub scraper: ScraperConfig,
pub notifier: Option<NotifierConfig>,
pub webserver: Option<WebserverConfig>,
Expand All @@ -19,7 +20,7 @@ pub struct Config {

#[derive(Clone, TS, Serialize, Deserialize, Debug)]
#[ts(export, export_to = "web/src/bindings/")]
pub struct YtarchiveConfig {
pub struct RecorderConfig {
pub executable_path: String,
pub working_directory: String,
pub args: Vec<String>,
Expand Down Expand Up @@ -88,6 +89,7 @@ pub struct ChannelConfig {
pub outpath: String,
/// If not present, will be fetched during runtime.
pub picture_url: Option<String>,
pub recorder: String, // TODO: Check this is a valid recorder on config load.
}

fn default_false() -> bool {
Expand Down
2 changes: 1 addition & 1 deletion src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ async fn main() -> Result<()> {

let config = Arc::new(RwLock::new(config));
let h_scraper = run_module!(bus, module::scraper::RSS::new(config.clone()));
let h_recorder = run_module!(bus, module::recorder::YTArchive::new(config.clone()));
let h_recorder = run_module!(bus, module::recorder::RecorderRunner::new(config.clone()));
let h_notifier = run_module!(bus, module::notifier::Discord::new(config.clone()));
let h_webserver = run_module!(bus, module::web::WebServer::new(config.clone()));

Expand Down
5 changes: 3 additions & 2 deletions src/module/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use self::recorder::YTAStatus;
use self::recorder::VideoStatus;
use crate::{config::Config, msgbus::BusTx};
use anyhow::Result;
use async_trait::async_trait;
Expand Down Expand Up @@ -30,6 +30,7 @@ pub struct Task {
pub channel_id: String,
pub channel_picture: Option<String>,
pub output_directory: String,
pub recorder: String,
}

#[derive(Debug, Clone, TS)]
Expand All @@ -43,7 +44,7 @@ pub struct Notification {
#[ts(export, export_to = "web/src/bindings/")]
pub struct RecordingStatus {
pub task: Task,
pub status: YTAStatus,
pub status: VideoStatus,
}

#[derive(Debug, Clone, PartialEq, TS)]
Expand Down
169 changes: 169 additions & 0 deletions src/module/recorder/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,169 @@
use crate::module::Module;

mod ytarchive;
use ytarchive::YTArchive;
pub mod ytdlp;
use ytdlp::YTDlp;
use super::{Message, Notification, Task, TaskStatus};
use crate::msgbus::BusTx;
use crate::{config::Config, module::RecordingStatus};
use anyhow::{anyhow, Context, Result};
use async_trait::async_trait;
use chrono::{DateTime, Utc};
use lazy_static::lazy_static;
use regex::Regex;
use serde::{Deserialize, Serialize};
use std::collections::HashSet;
use std::{
fs,
path::Path,
process::Stdio,
sync::{
atomic::{AtomicBool, Ordering},
Arc,
},
};
use tokio::{
io::{AsyncReadExt, BufReader},
sync::{mpsc, RwLock},
};
use ts_rs::TS;

/// The current state of video.
#[derive(Debug, Clone, TS, Serialize)]
#[ts(export, export_to = "web/src/bindings/")]
pub struct VideoStatus {
version: Option<String>,
state: RecorderState,
last_output: Option<String>,
last_update: chrono::DateTime<chrono::Utc>,
video_fragments: Option<u32>,
audio_fragments: Option<u32>,
total_size: Option<String>,
video_quality: Option<String>,
output_file: Option<String>,
}

impl VideoStatus {
pub fn new() -> Self {
Self {
version: None,
state: RecorderState::Idle,
last_output: None,
last_update: chrono::Utc::now(),
video_fragments: None,
audio_fragments: None,
total_size: None,
video_quality: None,
output_file: None,
}
}
}

#[derive(Debug, Clone, PartialEq, TS, Serialize, Deserialize)]
#[ts(export, export_to = "web/src/bindings/")]
pub enum RecorderState {
Idle,
Waiting(Option<DateTime<Utc>>),
Recording,
Muxing,
Finished,
AlreadyProcessed,
Ended,
Interrupted,
Errored,
}

struct SpawnTask {
task: Task,
cfg: Config,
tx: BusTx<Message>,
}

pub struct RecorderRunner {
config: Arc<RwLock<Config>>,
active_ids: Arc<RwLock<HashSet<String>>>,
}

#[async_trait]
impl Module for RecorderRunner {
fn new(config: Arc<RwLock<Config>>) -> Self {
let active_ids = Arc::new(RwLock::new(HashSet::new()));
Self { config, active_ids }
}

async fn run(&self, tx: &BusTx<Message>, rx: &mut mpsc::Receiver<Message>) -> Result<()> {
// Create a spawn queue
let (spawn_tx, mut spawn_rx) = mpsc::unbounded_channel::<SpawnTask>();

// Future to handle spawning new tasks
let active_ids = self.active_ids.clone();
let f_spawner = async move {
while let Some(mut task) = spawn_rx.recv().await {
let active_ids = active_ids.clone();
let delay = task.cfg.ytarchive.delay_start;

debug!("Spawning thread for task: {:?}", task.task);
tokio::spawn(async move {
let video_id = task.task.video_id.clone();
active_ids.write().await.insert(video_id.clone());

match task.task.recorder.as_str() {
"ytarchive" => {
if let Err(e) = YTArchive::record(task.cfg, task.task, &mut task.tx).await {
error!("Failed to record task: {:?}", e);
}
}
"yt-dlp" => {
if let Err(e) = YTDlp::record(task.cfg, task.task, &mut task.tx).await {
error!("Failed to record task: {:?}", e);
}
}
_ => error!("Failed to record task: invalid recorder {:?}", task.task.recorder),
}

active_ids.write().await.remove(&video_id);
});

// Wait a bit before starting the next task
tokio::time::sleep(delay).await;
}

Ok::<(), anyhow::Error>(())
};

// Future to handle incoming messages
let f_message = async move {
while let Some(message) = rx.recv().await {
match message {
Message::ToRecord(task) => {
// Check if the task is already active
if self.active_ids.read().await.contains(&task.video_id) {
warn!("Task {} is already active, skipping", task.video_id);
continue;
}

debug!("Adding task to spawn queue: {:?}", task);
let tx = tx.clone();
let cfg = self.config.read().await;
let cfg = cfg.clone();

if let Err(_) = spawn_tx.send(SpawnTask { task, cfg, tx }) {
debug!("Spawn queue closed, exiting");
break;
}
}
_ => (),
}
}

Ok::<(), anyhow::Error>(())
};

// Run the futures
tokio::try_join!(f_spawner, f_message)?;

debug!("RecorderRunner module finished");
Ok(())
}
}
Loading

0 comments on commit e7ee462

Please sign in to comment.