Skip to content

Commit

Permalink
fix: seaf healing
Browse files Browse the repository at this point in the history
  • Loading branch information
louis030195 committed Aug 6, 2024
1 parent 28f086e commit e1a3d07
Show file tree
Hide file tree
Showing 7 changed files with 300 additions and 228 deletions.
61 changes: 36 additions & 25 deletions screenpipe-server/src/bin/screenpipe-server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use crossbeam::queue::SegQueue;
use dirs::home_dir;
use log::{debug, error, info, warn, LevelFilter};
use screenpipe_audio::{
default_input_device, list_audio_devices, parse_audio_device, DeviceControl,
default_input_device, list_audio_devices, parse_audio_device, AudioDevice, DeviceControl,
};
use screenpipe_vision::OcrEngine;
use std::io::Write;
Expand Down Expand Up @@ -131,6 +131,27 @@ fn get_base_dir(custom_path: Option<String>) -> anyhow::Result<PathBuf> {
Ok(base_dir)
}

fn initialize_audio_devices(
audio_devices: &Vec<Arc<AudioDevice>>,
audio_devices_control: Arc<SegQueue<(AudioDevice, DeviceControl)>>,
) {
for device in audio_devices {
info!(" {}", device);

let device_control = DeviceControl {
is_running: true,
is_paused: false,
};
let device_clone = device.deref().clone();
let sender_clone = audio_devices_control.clone();

tokio::spawn(async move {
tokio::time::sleep(Duration::from_secs(15)).await;
let _ = sender_clone.push((device_clone, device_control));
});
}
}

#[tokio::main]
async fn main() -> anyhow::Result<()> {
if find_ffmpeg_path().is_none() {
Expand Down Expand Up @@ -256,27 +277,6 @@ async fn main() -> anyhow::Result<()> {
devices_status.insert(device, device_control);
}
}

if audio_devices.is_empty() {
eprintln!("No audio devices available. Audio recording will be disabled.");
} else {
info!("Using audio devices:");
for device in &audio_devices {
info!(" {}", device);

let device_control = DeviceControl {
is_running: true,
is_paused: false,
};
let device_clone = device.deref().clone();
let sender_clone = audio_devices_control.clone();
// send signal after everything started
tokio::spawn(async move {
tokio::time::sleep(Duration::from_secs(15)).await;
let _ = sender_clone.push((device_clone, device_control));
});
}
}
}

let (restart_sender, mut restart_receiver) = channel(10);
Expand Down Expand Up @@ -313,6 +313,7 @@ async fn main() -> anyhow::Result<()> {
let recording_state = Arc::clone(&recording_state);
let recording_state_clone = recording_state.clone();
let recording_state_clone_2 = recording_state.clone();
let recording_state_clone_3 = recording_state.clone();
let audio_devices_control = audio_devices_control.clone();
let friend_wearable_uid_clone = friend_wearable_uid.clone();

Expand All @@ -330,6 +331,9 @@ async fn main() -> anyhow::Result<()> {
state.is_running = true;
}

// Reinitialize audio devices on restart
initialize_audio_devices(&audio_devices, audio_devices_control.clone());

let recording_task = tokio::spawn(async move {
let result = start_continuous_recording(
db_clone,
Expand All @@ -353,7 +357,6 @@ async fn main() -> anyhow::Result<()> {
state.is_running = false;
});


tokio::select! {
_ = recording_task => {
debug!("Recording task ended.");
Expand All @@ -367,8 +370,16 @@ async fn main() -> anyhow::Result<()> {
}
}

// Short delay before restarting to avoid rapid restarts
tokio::time::sleep(Duration::from_secs(1)).await;
// Wait for the recording task to finish
// let _ = tokio::time::timeout(Duration::from_secs(10), recording_task).await;

// Reset the recording state
{
let mut state = recording_state_clone_3.lock().await;
state.reset();
}

// Cooldown period before attempting to restart
}
});

Expand Down
15 changes: 14 additions & 1 deletion screenpipe-server/src/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,13 @@ impl RecordingState {
let _ = self.cancel_sender.send(true);
}

pub fn reset(&mut self) {
self.is_running = false;
let (new_sender, new_receiver) = watch::channel(false);
self.cancel_sender = new_sender;
self.cancel_receiver = new_receiver;
}

pub fn get_cancel_receiver(&self) -> watch::Receiver<bool> {
self.cancel_receiver.clone()
}
Expand Down Expand Up @@ -167,7 +174,7 @@ async fn record_video(
});
};

let video_capture = VideoCapture::new(
let mut video_capture = VideoCapture::new(
&output_path,
fps,
new_chunk_callback,
Expand All @@ -180,6 +187,7 @@ async fn record_video(
_ = cancel_receiver.changed() => {
if *cancel_receiver.borrow() {
info!("Video recording cancelled");
video_capture.stop().await;
break;
}
}
Expand Down Expand Up @@ -259,6 +267,11 @@ async fn record_audio(
_ = cancel_receiver.changed() => {
if *cancel_receiver.borrow() {
info!("Audio recording cancelled");
// Stop all devices
while let Some((_, mut device_control)) = audio_devices_control.pop() {
device_control.is_running = false;
}

// Stop all running audio capture threads
for (_, handle) in handles.drain() {
handle.abort();
Expand Down
32 changes: 26 additions & 6 deletions screenpipe-server/src/resource_monitor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ pub struct ResourceMonitor {
health_check_failures: Mutex<u32>,
max_health_check_failures: u32,
restart_sender: Sender<RestartSignal>,
is_restarting: Arc<Mutex<bool>>,
last_restart_attempt: Mutex<Option<Instant>>,
restart_cooldown: Duration,
}
Expand All @@ -37,6 +38,7 @@ impl ResourceMonitor {
health_check_failures: Mutex::new(0),
max_health_check_failures,
restart_sender,
is_restarting: Arc::new(Mutex::new(false)),
last_restart_attempt: Mutex::new(None),
restart_cooldown: health_check_interval * 10,
})
Expand Down Expand Up @@ -120,18 +122,16 @@ impl ResourceMonitor {
Ok(health_data) => {
match health_data.status.as_str() {
"Healthy" => {
*self.health_check_failures.lock().await = 0;
let mut failures = self.health_check_failures.lock().await;
*failures = 0; // Reset failure count on successful health check
debug!("Health check passed: {:?}", health_data);
}
"Loading" => {
debug!("System is still loading: {:?}", health_data);
// Don't increment failure count, but don't reset it either
}
_ => {
warn!(
"Health check returned unhealthy status: {:?}",
health_data
);
warn!("Health check returned unhealthy status");
debug!("Health data: {:?}", health_data);
self.handle_health_check_failure().await;
}
}
Expand Down Expand Up @@ -166,10 +166,19 @@ impl ResourceMonitor {
}

if *failures >= self.max_health_check_failures {
let mut is_restarting = self.is_restarting.lock().await;
if *is_restarting {
warn!("Restart already in progress. Skipping restart attempt.");
return;
}

let mut last_restart = self.last_restart_attempt.lock().await;
let now = Instant::now();

warn!("Last restart: {:?}", last_restart);

if last_restart.map_or(true, |t| now.duration_since(t) > self.restart_cooldown) {
*is_restarting = true;
warn!("Max health check failures reached. Restarting recording tasks...");
if let Err(e) = self
.restart_sender
Expand All @@ -180,6 +189,17 @@ impl ResourceMonitor {
}
*failures = 0;
*last_restart = Some(now);

// Clone the necessary data for the spawned task
let is_restarting_clone = self.is_restarting.clone();
let restart_cooldown = self.restart_cooldown;

// Set a timer to reset the is_restarting flag after the cooldown period
tokio::spawn(async move {
tokio::time::sleep(restart_cooldown).await;
let mut is_restarting = is_restarting_clone.lock().await;
*is_restarting = false;
});
} else {
warn!("Restart cooldown in effect. Skipping restart attempt.");
}
Expand Down
2 changes: 1 addition & 1 deletion screenpipe-server/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -328,7 +328,7 @@ pub async fn health_check(State(state): State<Arc<AppState>>) -> JsonResponse<He
debug!("Last audio timestamp: {:?}", last_audio);

let now = Utc::now();
let threshold = Duration::from_secs(60);
let threshold: Duration = Duration::from_secs(60);
let loading_threshold = Duration::from_secs(120);

let app_start_time = state.app_start_time;
Expand Down
Loading

0 comments on commit e1a3d07

Please sign in to comment.