Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

yt-dlp support #31

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@ use ts_rs::TS;
#[derive(Clone, TS, Serialize, Deserialize, Debug)]
#[ts(export, export_to = "web/src/bindings/")]
pub struct Config {
pub ytarchive: YtarchiveConfig,
pub ytarchive: RecorderConfig,
pub ytdlp: RecorderConfig,
pub scraper: ScraperConfig,
pub notifier: Option<NotifierConfig>,
pub webserver: Option<WebserverConfig>,
Expand All @@ -19,7 +20,7 @@ pub struct Config {

#[derive(Clone, TS, Serialize, Deserialize, Debug)]
#[ts(export, export_to = "web/src/bindings/")]
pub struct YtarchiveConfig {
pub struct RecorderConfig {
pub executable_path: String,
pub working_directory: String,
pub args: Vec<String>,
Expand Down Expand Up @@ -88,6 +89,7 @@ pub struct ChannelConfig {
pub outpath: String,
/// If not present, will be fetched during runtime.
pub picture_url: Option<String>,
pub recorder: String, // TODO: Check this is a valid recorder on config load.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it'd be better if you make an enum instead of using a string here

}

fn default_false() -> bool {
Expand Down
2 changes: 1 addition & 1 deletion src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ async fn main() -> Result<()> {

let config = Arc::new(RwLock::new(config));
let h_scraper = run_module!(bus, module::scraper::RSS::new(config.clone()));
let h_recorder = run_module!(bus, module::recorder::YTArchive::new(config.clone()));
let h_recorder = run_module!(bus, module::recorder::RecorderRunner::new(config.clone()));
let h_notifier = run_module!(bus, module::notifier::Discord::new(config.clone()));
let h_webserver = run_module!(bus, module::web::WebServer::new(config.clone()));

Expand Down
5 changes: 3 additions & 2 deletions src/module/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use self::recorder::YTAStatus;
use self::recorder::VideoStatus;
use crate::{config::Config, msgbus::BusTx};
use anyhow::Result;
use async_trait::async_trait;
Expand Down Expand Up @@ -30,6 +30,7 @@ pub struct Task {
pub channel_id: String,
pub channel_picture: Option<String>,
pub output_directory: String,
pub recorder: String,
}

#[derive(Debug, Clone, TS)]
Expand All @@ -43,7 +44,7 @@ pub struct Notification {
#[ts(export, export_to = "web/src/bindings/")]
pub struct RecordingStatus {
pub task: Task,
pub status: YTAStatus,
pub status: VideoStatus,
}

#[derive(Debug, Clone, PartialEq, TS)]
Expand Down
169 changes: 169 additions & 0 deletions src/module/recorder/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,169 @@
use crate::module::Module;

mod ytarchive;
use ytarchive::YTArchive;
pub mod ytdlp;
use ytdlp::YTDlp;
use super::{Message, Notification, Task, TaskStatus};
use crate::msgbus::BusTx;
use crate::{config::Config, module::RecordingStatus};
use anyhow::{anyhow, Context, Result};
use async_trait::async_trait;
use chrono::{DateTime, Utc};
use lazy_static::lazy_static;
use regex::Regex;
use serde::{Deserialize, Serialize};
use std::collections::HashSet;
use std::{
fs,
path::Path,
process::Stdio,
sync::{
atomic::{AtomicBool, Ordering},
Arc,
},
};
use tokio::{
io::{AsyncReadExt, BufReader},
sync::{mpsc, RwLock},
};
use ts_rs::TS;

/// The current state of video.
#[derive(Debug, Clone, TS, Serialize)]
#[ts(export, export_to = "web/src/bindings/")]
pub struct VideoStatus {
version: Option<String>,
state: RecorderState,
last_output: Option<String>,
last_update: chrono::DateTime<chrono::Utc>,
video_fragments: Option<u32>,
audio_fragments: Option<u32>,
total_size: Option<String>,
video_quality: Option<String>,
output_file: Option<String>,
}

impl VideoStatus {
pub fn new() -> Self {
Self {
version: None,
state: RecorderState::Idle,
last_output: None,
last_update: chrono::Utc::now(),
video_fragments: None,
audio_fragments: None,
total_size: None,
video_quality: None,
output_file: None,
}
}
}

#[derive(Debug, Clone, PartialEq, TS, Serialize, Deserialize)]
#[ts(export, export_to = "web/src/bindings/")]
pub enum RecorderState {
Idle,
Waiting(Option<DateTime<Utc>>),
Recording,
Muxing,
Finished,
AlreadyProcessed,
Ended,
Interrupted,
Errored,
}

struct SpawnTask {
task: Task,
cfg: Config,
tx: BusTx<Message>,
}

pub struct RecorderRunner {
config: Arc<RwLock<Config>>,
active_ids: Arc<RwLock<HashSet<String>>>,
}

#[async_trait]
impl Module for RecorderRunner {
fn new(config: Arc<RwLock<Config>>) -> Self {
let active_ids = Arc::new(RwLock::new(HashSet::new()));
Self { config, active_ids }
}

async fn run(&self, tx: &BusTx<Message>, rx: &mut mpsc::Receiver<Message>) -> Result<()> {
// Create a spawn queue
let (spawn_tx, mut spawn_rx) = mpsc::unbounded_channel::<SpawnTask>();

// Future to handle spawning new tasks
let active_ids = self.active_ids.clone();
let f_spawner = async move {
while let Some(mut task) = spawn_rx.recv().await {
let active_ids = active_ids.clone();
let delay = task.cfg.ytarchive.delay_start;

debug!("Spawning thread for task: {:?}", task.task);
tokio::spawn(async move {
let video_id = task.task.video_id.clone();
active_ids.write().await.insert(video_id.clone());

match task.task.recorder.as_str() {
"ytarchive" => {
if let Err(e) = YTArchive::record(task.cfg, task.task, &mut task.tx).await {
error!("Failed to record task: {:?}", e);
}
}
"yt-dlp" => {
if let Err(e) = YTDlp::record(task.cfg, task.task, &mut task.tx).await {
error!("Failed to record task: {:?}", e);
}
}
_ => error!("Failed to record task: invalid recorder {:?}", task.task.recorder),
}

active_ids.write().await.remove(&video_id);
});

// Wait a bit before starting the next task
tokio::time::sleep(delay).await;
}

Ok::<(), anyhow::Error>(())
};

// Future to handle incoming messages
let f_message = async move {
while let Some(message) = rx.recv().await {
match message {
Message::ToRecord(task) => {
// Check if the task is already active
if self.active_ids.read().await.contains(&task.video_id) {
warn!("Task {} is already active, skipping", task.video_id);
continue;
}

debug!("Adding task to spawn queue: {:?}", task);
let tx = tx.clone();
let cfg = self.config.read().await;
let cfg = cfg.clone();

if let Err(_) = spawn_tx.send(SpawnTask { task, cfg, tx }) {
debug!("Spawn queue closed, exiting");
break;
}
}
_ => (),
}
}

Ok::<(), anyhow::Error>(())
};

// Run the futures
tokio::try_join!(f_spawner, f_message)?;

debug!("RecorderRunner module finished");
Ok(())
}
}
Loading