Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

v0 fix self healing #114

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
151 changes: 88 additions & 63 deletions screenpipe-server/src/bin/screenpipe-server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use std::{
net::SocketAddr,
ops::Deref,
path::PathBuf,
sync::{atomic::AtomicBool, Arc},
sync::Arc,
time::Duration,
};

Expand All @@ -13,18 +13,17 @@ use clap::Parser;
use colored::Colorize;
use crossbeam::queue::SegQueue;
use dirs::home_dir;
use log::{debug, error, info, LevelFilter};
use log::{debug, error, info, warn, LevelFilter};
use screenpipe_audio::{
default_input_device, default_output_device, list_audio_devices, parse_audio_device,
DeviceControl,
default_input_device, list_audio_devices, parse_audio_device, AudioDevice, DeviceControl,
};
use screenpipe_vision::OcrEngine;
use std::io::Write;

use screenpipe_core::find_ffmpeg_path;
use screenpipe_server::logs::MultiWriter;
use screenpipe_server::{logs::MultiWriter, RecordingState};
use screenpipe_server::{start_continuous_recording, DatabaseManager, ResourceMonitor, Server};
use tokio::sync::mpsc::channel;
use tokio::sync::{mpsc::channel, Mutex};

use clap::ValueEnum;
use screenpipe_vision::utils::OcrEngine as CoreOcrEngine;
Expand Down Expand Up @@ -132,6 +131,27 @@ fn get_base_dir(custom_path: Option<String>) -> anyhow::Result<PathBuf> {
Ok(base_dir)
}

fn initialize_audio_devices(
audio_devices: &Vec<Arc<AudioDevice>>,
audio_devices_control: Arc<SegQueue<(AudioDevice, DeviceControl)>>,
) {
for device in audio_devices {
info!(" {}", device);

let device_control = DeviceControl {
is_running: true,
is_paused: false,
};
let device_clone = device.deref().clone();
let sender_clone = audio_devices_control.clone();

tokio::spawn(async move {
tokio::time::sleep(Duration::from_secs(15)).await;
let _ = sender_clone.push((device_clone, device_control));
});
}
}

#[tokio::main]
async fn main() -> anyhow::Result<()> {
if find_ffmpeg_path().is_none() {
Expand Down Expand Up @@ -231,16 +251,19 @@ async fn main() -> anyhow::Result<()> {
};
devices_status.insert(input_device, device_control);
}

// audio output only supported on linux atm
// see https://github.com/louis030195/screen-pipe/pull/106
#[cfg(target_os = "linux")]
if let Ok(output_device) = default_output_device() {
audio_devices.push(Arc::new(output_device.clone()));
let device_control = DeviceControl {
is_running: true,
is_paused: false,
};
devices_status.insert(output_device, device_control);
if cfg!(target_os = "linux") {
use screenpipe_audio::default_output_device;
if let Ok(output_device) = default_output_device() {
audio_devices.push(Arc::new(output_device.clone()));
let device_control = DeviceControl {
is_running: true,
is_paused: false,
};
devices_status.insert(output_device, device_control);
}
}
} else {
// Use specified devices
Expand All @@ -254,32 +277,11 @@ async fn main() -> anyhow::Result<()> {
devices_status.insert(device, device_control);
}
}

if audio_devices.is_empty() {
eprintln!("No audio devices available. Audio recording will be disabled.");
} else {
info!("Using audio devices:");
for device in &audio_devices {
info!(" {}", device);

let device_control = DeviceControl {
is_running: true,
is_paused: false,
};
let device_clone = device.deref().clone();
let sender_clone = audio_devices_control.clone();
// send signal after everything started
tokio::spawn(async move {
tokio::time::sleep(Duration::from_secs(15)).await;
let _ = sender_clone.push((device_clone, device_control));
});
}
}
}

let (restart_sender, mut restart_receiver) = channel(10);
let resource_monitor =
ResourceMonitor::new(cli.self_healing, Duration::from_secs(60), 3, restart_sender);
ResourceMonitor::new(cli.self_healing, Duration::from_secs(5), 3, restart_sender);
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

todo set 60 back when merging

resource_monitor.start_monitoring(Duration::from_secs(10));

let db = Arc::new(
Expand All @@ -296,10 +298,7 @@ async fn main() -> anyhow::Result<()> {
);
let db_server = db.clone();

// Channel for controlling the recorder ! TODO RENAME SHIT
let vision_control = Arc::new(AtomicBool::new(true));

let vision_control_server_clone = vision_control.clone();
let recording_state = Arc::new(Mutex::new(RecordingState::new()));

// Before the loop starts, clone friend_wearable_uid
let friend_wearable_uid = cli.friend_wearable_uid.clone();
Expand All @@ -308,52 +307,79 @@ async fn main() -> anyhow::Result<()> {

// Function to start or restart the recording task
let _start_recording = tokio::spawn(async move {
// hack
let mut recording_task = tokio::spawn(async move {});

loop {
let db_clone = db.clone();
let local_data_dir = local_data_dir.clone();
let vision_control = vision_control.clone();
let recording_state = Arc::clone(&recording_state);
let recording_state_clone = recording_state.clone();
let recording_state_clone_2 = recording_state.clone();
let recording_state_clone_3 = recording_state.clone();
let audio_devices_control = audio_devices_control.clone();
let friend_wearable_uid_clone = friend_wearable_uid.clone(); // Clone for each iteration
let friend_wearable_uid_clone = friend_wearable_uid.clone();

tokio::select! {
_ = &mut recording_task => {
// Recording task completed or errored, restart it
debug!("Recording task ended. Restarting...");
}
Some(_) = restart_receiver.recv() => {
// Received restart signal, cancel the current task and restart
info!("Received restart signal. Restarting recording task...");
recording_task.abort();
}
}
let core_ocr_engine: CoreOcrEngine = cli.ocr_engine.clone().into();
let ocr_engine = Arc::new(OcrEngine::from(core_ocr_engine));
recording_task = tokio::spawn(async move {

{
let mut state = recording_state.lock().await;
if state.is_running {
warn!("Recording is already running. Waiting before retry.");
drop(state);
tokio::time::sleep(Duration::from_secs(30)).await;
continue;
}
state.is_running = true;
}

// Reinitialize audio devices on restart
initialize_audio_devices(&audio_devices, audio_devices_control.clone());

let recording_task = tokio::spawn(async move {
let result = start_continuous_recording(
db_clone,
Arc::new(local_data_dir.join("data").to_string_lossy().into_owned()),
cli.fps,
Duration::from_secs(cli.audio_chunk_duration),
vision_control,
recording_state.clone(),
audio_devices_control,
cli.save_text_files,
cli.cloud_audio_on,
ocr_engine,
friend_wearable_uid_clone, // Use the cloned version
friend_wearable_uid_clone,
)
.await;

if let Err(e) = result {
error!("Continuous recording error: {:?}", e);
}

let mut state = recording_state.lock().await;
state.is_running = false;
});
debug!("Recording task started");

// Short delay before restarting to avoid rapid restarts
tokio::time::sleep(Duration::from_secs(1)).await;
tokio::select! {
_ = recording_task => {
debug!("Recording task ended.");
}
Some(_) = restart_receiver.recv() => {
info!("Received restart signal. Cancelling current recording task...");
let state = recording_state_clone.lock().await;
state.cancel();
drop(state);
recording_state_clone_2.lock().await.cancel();
}
}

// Wait for the recording task to finish
// let _ = tokio::time::timeout(Duration::from_secs(10), recording_task).await;

// Reset the recording state
{
let mut state = recording_state_clone_3.lock().await;
state.reset();
}

// Cooldown period before attempting to restart
}
});

Expand All @@ -369,7 +395,6 @@ async fn main() -> anyhow::Result<()> {
let server = Server::new(
db_server,
SocketAddr::from(([0, 0, 0, 0], cli.port)),
vision_control_server_clone,
audio_devices_control_server,
);
server.start(devices_status, api_plugin).await.unwrap();
Expand Down
Loading
Loading