Skip to content

Commit

Permalink
feat: ability to disable audio in cli
Browse files Browse the repository at this point in the history
  • Loading branch information
louis030195 committed Jul 3, 2024
1 parent 8336f44 commit 583ee6a
Show file tree
Hide file tree
Showing 5 changed files with 117 additions and 78 deletions.
3 changes: 2 additions & 1 deletion screenpipe-server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,8 @@ tower-http = { version = "0.5.2", features = ["cors"] }
log = { workspace = true }
env_logger = "0.10"


# Cli ! shouldn't be required if using as lib
clap = { version = "4.3", features = ["derive"] }

[dev-dependencies]
tempfile = "3.3.0"
Expand Down
33 changes: 27 additions & 6 deletions screenpipe-server/src/bin/screenpipe-server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,37 @@ use std::{
sync::{mpsc::channel, Arc},
};

use clap::Parser;
use tokio::time::Duration;

use screenpipe_server::{start_continuous_recording, DatabaseManager, Server};

#[derive(Parser)]
#[command(author, version, about, long_about = None)]
struct Cli {
/// FPS for continuous recording
#[arg(short, long, default_value_t = 10.0)]
fps: f64,

/// Audio chunk duration in seconds
#[arg(short, long, default_value_t = 5)]
audio_chunk_duration: u64,

/// Port to run the server on
#[arg(short, long, default_value_t = 3030)]
port: u16,

/// Disable audio recording
#[arg(long, default_value_t = false)]
disable_audio: bool,
}

#[tokio::main]
async fn main() -> anyhow::Result<()> {
// Initialize logging
use env_logger::Builder;
use log::LevelFilter;
let cli = Cli::parse();

Builder::new()
.filter(None, LevelFilter::Info)
Expand All @@ -33,27 +55,26 @@ async fn main() -> anyhow::Result<()> {
// Start continuous recording in a separate task
let local_data_dir_clone = local_data_dir.clone();
let _recording_task = tokio::spawn(async move {
let fps = 10.0;
let audio_chunk_duration = Duration::from_secs(5);
let audio_chunk_duration = Duration::from_secs(cli.audio_chunk_duration);

start_continuous_recording(
db,
&local_data_dir_clone,
fps,
cli.fps,
audio_chunk_duration,
control_rx,
!cli.disable_audio,
)
.await
});

tokio::spawn(async move {
// start_frame_server(tx, local_data_dir_clone.to_string(), db.clone()).await;
let server = Server::new(db_server, SocketAddr::from(([0, 0, 0, 0], 3030)));
let server = Server::new(db_server, SocketAddr::from(([0, 0, 0, 0], cli.port)));
server.start().await.unwrap();
});

// Wait for the server to start
println!("Server started on http://localhost:3030");
println!("Server started on http://localhost:{}", cli.port);

// Keep the main thread running
loop {
Expand Down
153 changes: 82 additions & 71 deletions screenpipe-server/src/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,18 @@ pub async fn start_continuous_recording(
fps: f64,
audio_chunk_duration: Duration,
control_rx: Receiver<RecorderControl>,
enable_audio: bool,
) -> Result<()> {
info!("Starting continuous recording");
if !enable_audio {
info!("Audio recording disabled");
}

// all those clones ...https://www.youtube.com/watch?v=evUoUzRgr1o
let db_manager_video = Arc::clone(&db);
let db_manager_audio = Arc::clone(&db);

let new_chunk_callback = move |file_path: String| {
let db_manager = Arc::clone(&db);
// for some reason need to do this blocking dark magic (don't try to do async in here it will crash in runtime)
let rt = Runtime::new().expect("Failed to create runtime");
if let Err(e) = rt.block_on(db_manager.insert_video_chunk(&file_path)) {
error!("Failed to insert new video chunk: {}", e);
Expand All @@ -41,14 +43,6 @@ pub async fn start_continuous_recording(
let control_rx_video = Arc::clone(&control_rx);
let control_rx_audio = Arc::clone(&control_rx);

let (audio_control_tx, audio_control_rx) = mpsc::channel();
let (audio_result_tx, audio_result_rx) = mpsc::channel();

let audio_thread = thread::spawn(move || {
info!("Starting audio capture thread");
continuous_audio_capture(audio_control_rx, audio_result_tx, audio_chunk_duration)
});
// TODO: too muhc nesting 🤦‍♂️
let video_thread = thread::spawn(move || {
let runtime = tokio::runtime::Runtime::new().unwrap();
let _ = runtime.block_on(async {
Expand Down Expand Up @@ -97,79 +91,96 @@ pub async fn start_continuous_recording(
});
});

let output_path_clone = output_path.to_string();
if enable_audio {
let (audio_control_tx, audio_control_rx) = mpsc::channel();
let (audio_result_tx, audio_result_rx) = mpsc::channel();

let audio_processing_thread = thread::spawn(move || {
let runtime = tokio::runtime::Runtime::new().unwrap();
let _ = runtime.block_on(async {
info!("Starting audio processing thread");
let mut is_paused = false;
loop {
match control_rx_audio.lock().unwrap().try_recv() {
Ok(RecorderControl::Pause) => {
info!("Pausing audio processing");
is_paused = true;
}
Ok(RecorderControl::Resume) => {
info!("Resuming audio processing");
is_paused = false;
}
Ok(RecorderControl::Stop) => {
info!("Stopping audio processing");
break;
let audio_thread = thread::spawn(move || {
info!("Starting audio capture thread");
continuous_audio_capture(audio_control_rx, audio_result_tx, audio_chunk_duration)
});

let output_path_clone = output_path.to_string();

let audio_processing_thread = thread::spawn(move || {
let runtime = tokio::runtime::Runtime::new().unwrap();
let _ = runtime.block_on(async {
info!("Starting audio processing thread");
let mut is_paused = false;
loop {
match control_rx_audio.lock().unwrap().try_recv() {
Ok(RecorderControl::Pause) => {
info!("Pausing audio processing");
is_paused = true;
}
Ok(RecorderControl::Resume) => {
info!("Resuming audio processing");
is_paused = false;
}
Ok(RecorderControl::Stop) => {
info!("Stopping audio processing");
break;
}
Err(_) => {}
}
Err(_) => {}
}

if !is_paused {
match audio_result_rx.recv() {
Ok(result) => {
info!("Received audio chunk, processing...");
info!("Audio chunk size: {}", result.audio.len());
// Create an audio file
let time = Utc::now();
let file_path = format!("{}/{}.wav", output_path_clone, time);
info!("Saving audio chunk to {}", file_path);
match save_audio_to_file(&result.audio, &file_path) {
Ok(_) => info!("Successfully saved audio file"),
Err(e) => error!("Failed to save audio file: {}", e),
}
if !is_paused {
match audio_result_rx.recv() {
Ok(result) => {
info!("Received audio chunk, processing...");
info!("Audio chunk size: {}", result.audio.len());
let time = Utc::now();
let file_path = format!("{}/{}.wav", output_path_clone, time);
info!("Saving audio chunk to {}", file_path);
match save_audio_to_file(&result.audio, &file_path) {
Ok(_) => info!("Successfully saved audio file"),
Err(e) => error!("Failed to save audio file: {}", e),
}

match db_manager_audio.insert_audio_chunk(&file_path).await {
Ok(audio_chunk_id) => {
debug!("Inserted audio chunk with id: {}", audio_chunk_id);
if let Err(e) = db_manager_audio
.insert_audio_transcription(audio_chunk_id, &result.text, 0)
.await
// TODO offset
{
error!("Failed to insert audio transcription: {}", e);
} else {
debug!(
"Inserted audio transcription for chunk {}",
audio_chunk_id
);
match db_manager_audio.insert_audio_chunk(&file_path).await {
Ok(audio_chunk_id) => {
debug!("Inserted audio chunk with id: {}", audio_chunk_id);
if let Err(e) = db_manager_audio
.insert_audio_transcription(
audio_chunk_id,
&result.text,
0,
)
.await
// TODO offset
{
error!("Failed to insert audio transcription: {}", e);
} else {
debug!(
"Inserted audio transcription for chunk {}",
audio_chunk_id
);
}
}
Err(e) => error!("Failed to insert audio chunk: {}", e),
}
Err(e) => error!("Failed to insert audio chunk: {}", e),
}
}
Err(e) => {
error!("Failed to receive audio chunk: {}", e);
Err(e) => {
error!("Failed to receive audio chunk: {}", e);
}
}
}
}
}
Ok::<_, anyhow::Error>(())
Ok::<_, anyhow::Error>(())
});
});
});

// Wait for threads to finish
info!("Waiting for threads to finish");
video_thread.join().unwrap();
audio_processing_thread.join().unwrap();
audio_control_tx.send(ControlMessage::Stop).unwrap();
let _ = audio_thread.join().unwrap();
// Wait for threads to finish
info!("Waiting for threads to finish");
video_thread.join().unwrap();
audio_processing_thread.join().unwrap();
audio_control_tx.send(ControlMessage::Stop).unwrap();
let _ = audio_thread.join().unwrap();
} else {
// Only wait for video thread if audio is disabled
info!("Waiting for video thread to finish");
video_thread.join().unwrap();
}

info!("Continuous recording stopped");
Ok(())
Expand Down
3 changes: 3 additions & 0 deletions screenpipe-vision/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,9 @@ byteorder = "1.5.0"
hf-hub = "0.3.2"
rand = "0.8.5"

# Log
log = { workspace = true }


[dev-dependencies]
tempfile = "3.3.0"
Expand Down
3 changes: 3 additions & 0 deletions screenpipe-vision/src/core.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use image::DynamicImage;
use log::info;
use rusty_tesseract::{Args, Image};
use threadpool::ThreadPool;
use xcap::Monitor;
Expand Down Expand Up @@ -31,6 +32,8 @@ pub fn continuous_capture(
let pool_size = (cpu_count as f32 * 1.2) as usize;
let pool_size = std::cmp::min(pool_size, MAX_THREADS);

info!("Will use {} threads for OCR", pool_size);

let ocr_pool = ThreadPool::new(pool_size);
let is_paused = Arc::new(Mutex::new(false));
let should_stop = Arc::new(Mutex::new(false));
Expand Down

0 comments on commit 583ee6a

Please sign in to comment.