From 583ee6a2d382a90f6046c51c00cfb175ead09981 Mon Sep 17 00:00:00 2001 From: Louis Beaumont Date: Wed, 3 Jul 2024 16:00:41 +0200 Subject: [PATCH] feat: ability to disable audio in cli --- screenpipe-server/Cargo.toml | 3 +- .../src/bin/screenpipe-server.rs | 33 +++- screenpipe-server/src/core.rs | 153 ++++++++++-------- screenpipe-vision/Cargo.toml | 3 + screenpipe-vision/src/core.rs | 3 + 5 files changed, 117 insertions(+), 78 deletions(-) diff --git a/screenpipe-server/Cargo.toml b/screenpipe-server/Cargo.toml index 6228bf732..fd3587c1f 100644 --- a/screenpipe-server/Cargo.toml +++ b/screenpipe-server/Cargo.toml @@ -59,7 +59,8 @@ tower-http = { version = "0.5.2", features = ["cors"] } log = { workspace = true } env_logger = "0.10" - +# Cli ! shouldn't be required if using as lib +clap = { version = "4.3", features = ["derive"] } [dev-dependencies] tempfile = "3.3.0" diff --git a/screenpipe-server/src/bin/screenpipe-server.rs b/screenpipe-server/src/bin/screenpipe-server.rs index 454ad1995..3bc9b2426 100644 --- a/screenpipe-server/src/bin/screenpipe-server.rs +++ b/screenpipe-server/src/bin/screenpipe-server.rs @@ -4,15 +4,37 @@ use std::{ sync::{mpsc::channel, Arc}, }; +use clap::Parser; use tokio::time::Duration; use screenpipe_server::{start_continuous_recording, DatabaseManager, Server}; +#[derive(Parser)] +#[command(author, version, about, long_about = None)] +struct Cli { + /// FPS for continuous recording + #[arg(short, long, default_value_t = 10.0)] + fps: f64, + + /// Audio chunk duration in seconds + #[arg(short, long, default_value_t = 5)] + audio_chunk_duration: u64, + + /// Port to run the server on + #[arg(short, long, default_value_t = 3030)] + port: u16, + + /// Disable audio recording + #[arg(long, default_value_t = false)] + disable_audio: bool, +} + #[tokio::main] async fn main() -> anyhow::Result<()> { // Initialize logging use env_logger::Builder; use log::LevelFilter; + let cli = Cli::parse(); Builder::new() .filter(None, LevelFilter::Info) @@ -33,27 +55,26 @@ async fn main() -> anyhow::Result<()> { // Start continuous recording in a separate task let local_data_dir_clone = local_data_dir.clone(); let _recording_task = tokio::spawn(async move { - let fps = 10.0; - let audio_chunk_duration = Duration::from_secs(5); + let audio_chunk_duration = Duration::from_secs(cli.audio_chunk_duration); start_continuous_recording( db, &local_data_dir_clone, - fps, + cli.fps, audio_chunk_duration, control_rx, + !cli.disable_audio, ) .await }); tokio::spawn(async move { - // start_frame_server(tx, local_data_dir_clone.to_string(), db.clone()).await; - let server = Server::new(db_server, SocketAddr::from(([0, 0, 0, 0], 3030))); + let server = Server::new(db_server, SocketAddr::from(([0, 0, 0, 0], cli.port))); server.start().await.unwrap(); }); // Wait for the server to start - println!("Server started on http://localhost:3030"); + println!("Server started on http://localhost:{}", cli.port); // Keep the main thread running loop { diff --git a/screenpipe-server/src/core.rs b/screenpipe-server/src/core.rs index 5a75483a4..f21cd20fa 100644 --- a/screenpipe-server/src/core.rs +++ b/screenpipe-server/src/core.rs @@ -21,16 +21,18 @@ pub async fn start_continuous_recording( fps: f64, audio_chunk_duration: Duration, control_rx: Receiver, + enable_audio: bool, ) -> Result<()> { info!("Starting continuous recording"); + if !enable_audio { + info!("Audio recording disabled"); + } - // all those clones ...https://www.youtube.com/watch?v=evUoUzRgr1o let db_manager_video = Arc::clone(&db); let db_manager_audio = Arc::clone(&db); let new_chunk_callback = move |file_path: String| { let db_manager = Arc::clone(&db); - // for some reason need to do this blocking dark magic (don't try to do async in here it will crash in runtime) let rt = Runtime::new().expect("Failed to create runtime"); if let Err(e) = rt.block_on(db_manager.insert_video_chunk(&file_path)) { error!("Failed to insert new video chunk: {}", e); @@ -41,14 +43,6 @@ pub async fn start_continuous_recording( let control_rx_video = Arc::clone(&control_rx); let control_rx_audio = Arc::clone(&control_rx); - let (audio_control_tx, audio_control_rx) = mpsc::channel(); - let (audio_result_tx, audio_result_rx) = mpsc::channel(); - - let audio_thread = thread::spawn(move || { - info!("Starting audio capture thread"); - continuous_audio_capture(audio_control_rx, audio_result_tx, audio_chunk_duration) - }); - // TODO: too muhc nesting 🤦‍♂️ let video_thread = thread::spawn(move || { let runtime = tokio::runtime::Runtime::new().unwrap(); let _ = runtime.block_on(async { @@ -97,79 +91,96 @@ pub async fn start_continuous_recording( }); }); - let output_path_clone = output_path.to_string(); + if enable_audio { + let (audio_control_tx, audio_control_rx) = mpsc::channel(); + let (audio_result_tx, audio_result_rx) = mpsc::channel(); - let audio_processing_thread = thread::spawn(move || { - let runtime = tokio::runtime::Runtime::new().unwrap(); - let _ = runtime.block_on(async { - info!("Starting audio processing thread"); - let mut is_paused = false; - loop { - match control_rx_audio.lock().unwrap().try_recv() { - Ok(RecorderControl::Pause) => { - info!("Pausing audio processing"); - is_paused = true; - } - Ok(RecorderControl::Resume) => { - info!("Resuming audio processing"); - is_paused = false; - } - Ok(RecorderControl::Stop) => { - info!("Stopping audio processing"); - break; + let audio_thread = thread::spawn(move || { + info!("Starting audio capture thread"); + continuous_audio_capture(audio_control_rx, audio_result_tx, audio_chunk_duration) + }); + + let output_path_clone = output_path.to_string(); + + let audio_processing_thread = thread::spawn(move || { + let runtime = tokio::runtime::Runtime::new().unwrap(); + let _ = runtime.block_on(async { + info!("Starting audio processing thread"); + let mut is_paused = false; + loop { + match control_rx_audio.lock().unwrap().try_recv() { + Ok(RecorderControl::Pause) => { + info!("Pausing audio processing"); + is_paused = true; + } + Ok(RecorderControl::Resume) => { + info!("Resuming audio processing"); + is_paused = false; + } + Ok(RecorderControl::Stop) => { + info!("Stopping audio processing"); + break; + } + Err(_) => {} } - Err(_) => {} - } - if !is_paused { - match audio_result_rx.recv() { - Ok(result) => { - info!("Received audio chunk, processing..."); - info!("Audio chunk size: {}", result.audio.len()); - // Create an audio file - let time = Utc::now(); - let file_path = format!("{}/{}.wav", output_path_clone, time); - info!("Saving audio chunk to {}", file_path); - match save_audio_to_file(&result.audio, &file_path) { - Ok(_) => info!("Successfully saved audio file"), - Err(e) => error!("Failed to save audio file: {}", e), - } + if !is_paused { + match audio_result_rx.recv() { + Ok(result) => { + info!("Received audio chunk, processing..."); + info!("Audio chunk size: {}", result.audio.len()); + let time = Utc::now(); + let file_path = format!("{}/{}.wav", output_path_clone, time); + info!("Saving audio chunk to {}", file_path); + match save_audio_to_file(&result.audio, &file_path) { + Ok(_) => info!("Successfully saved audio file"), + Err(e) => error!("Failed to save audio file: {}", e), + } - match db_manager_audio.insert_audio_chunk(&file_path).await { - Ok(audio_chunk_id) => { - debug!("Inserted audio chunk with id: {}", audio_chunk_id); - if let Err(e) = db_manager_audio - .insert_audio_transcription(audio_chunk_id, &result.text, 0) - .await - // TODO offset - { - error!("Failed to insert audio transcription: {}", e); - } else { - debug!( - "Inserted audio transcription for chunk {}", - audio_chunk_id - ); + match db_manager_audio.insert_audio_chunk(&file_path).await { + Ok(audio_chunk_id) => { + debug!("Inserted audio chunk with id: {}", audio_chunk_id); + if let Err(e) = db_manager_audio + .insert_audio_transcription( + audio_chunk_id, + &result.text, + 0, + ) + .await + // TODO offset + { + error!("Failed to insert audio transcription: {}", e); + } else { + debug!( + "Inserted audio transcription for chunk {}", + audio_chunk_id + ); + } } + Err(e) => error!("Failed to insert audio chunk: {}", e), } - Err(e) => error!("Failed to insert audio chunk: {}", e), } - } - Err(e) => { - error!("Failed to receive audio chunk: {}", e); + Err(e) => { + error!("Failed to receive audio chunk: {}", e); + } } } } - } - Ok::<_, anyhow::Error>(()) + Ok::<_, anyhow::Error>(()) + }); }); - }); - // Wait for threads to finish - info!("Waiting for threads to finish"); - video_thread.join().unwrap(); - audio_processing_thread.join().unwrap(); - audio_control_tx.send(ControlMessage::Stop).unwrap(); - let _ = audio_thread.join().unwrap(); + // Wait for threads to finish + info!("Waiting for threads to finish"); + video_thread.join().unwrap(); + audio_processing_thread.join().unwrap(); + audio_control_tx.send(ControlMessage::Stop).unwrap(); + let _ = audio_thread.join().unwrap(); + } else { + // Only wait for video thread if audio is disabled + info!("Waiting for video thread to finish"); + video_thread.join().unwrap(); + } info!("Continuous recording stopped"); Ok(()) diff --git a/screenpipe-vision/Cargo.toml b/screenpipe-vision/Cargo.toml index 97cc84acd..2e338f3b4 100644 --- a/screenpipe-vision/Cargo.toml +++ b/screenpipe-vision/Cargo.toml @@ -42,6 +42,9 @@ byteorder = "1.5.0" hf-hub = "0.3.2" rand = "0.8.5" +# Log +log = { workspace = true } + [dev-dependencies] tempfile = "3.3.0" diff --git a/screenpipe-vision/src/core.rs b/screenpipe-vision/src/core.rs index 6b001ba62..79fee0465 100644 --- a/screenpipe-vision/src/core.rs +++ b/screenpipe-vision/src/core.rs @@ -1,4 +1,5 @@ use image::DynamicImage; +use log::info; use rusty_tesseract::{Args, Image}; use threadpool::ThreadPool; use xcap::Monitor; @@ -31,6 +32,8 @@ pub fn continuous_capture( let pool_size = (cpu_count as f32 * 1.2) as usize; let pool_size = std::cmp::min(pool_size, MAX_THREADS); + info!("Will use {} threads for OCR", pool_size); + let ocr_pool = ThreadPool::new(pool_size); let is_paused = Arc::new(Mutex::new(false)); let should_stop = Arc::new(Mutex::new(false));