Skip to content

Commit

Permalink
v0 fix self healing
Browse files Browse the repository at this point in the history
  • Loading branch information
louis030195 committed Aug 6, 2024
1 parent 6f89a1a commit 28f086e
Show file tree
Hide file tree
Showing 6 changed files with 348 additions and 271 deletions.
94 changes: 54 additions & 40 deletions screenpipe-server/src/bin/screenpipe-server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use std::{
net::SocketAddr,
ops::Deref,
path::PathBuf,
sync::{atomic::AtomicBool, Arc},
sync::Arc,
time::Duration,
};

Expand All @@ -13,18 +13,17 @@ use clap::Parser;
use colored::Colorize;
use crossbeam::queue::SegQueue;
use dirs::home_dir;
use log::{debug, error, info, LevelFilter};
use log::{debug, error, info, warn, LevelFilter};
use screenpipe_audio::{
default_input_device, default_output_device, list_audio_devices, parse_audio_device,
DeviceControl,
default_input_device, list_audio_devices, parse_audio_device, DeviceControl,
};
use screenpipe_vision::OcrEngine;
use std::io::Write;

use screenpipe_core::find_ffmpeg_path;
use screenpipe_server::logs::MultiWriter;
use screenpipe_server::{logs::MultiWriter, RecordingState};
use screenpipe_server::{start_continuous_recording, DatabaseManager, ResourceMonitor, Server};
use tokio::sync::mpsc::channel;
use tokio::sync::{mpsc::channel, Mutex};

use clap::ValueEnum;
use screenpipe_vision::utils::OcrEngine as CoreOcrEngine;
Expand Down Expand Up @@ -231,16 +230,19 @@ async fn main() -> anyhow::Result<()> {
};
devices_status.insert(input_device, device_control);
}

// audio output only supported on linux atm
// see https://github.com/louis030195/screen-pipe/pull/106
#[cfg(target_os = "linux")]
if let Ok(output_device) = default_output_device() {
audio_devices.push(Arc::new(output_device.clone()));
let device_control = DeviceControl {
is_running: true,
is_paused: false,
};
devices_status.insert(output_device, device_control);
if cfg!(target_os = "linux") {
use screenpipe_audio::default_output_device;
if let Ok(output_device) = default_output_device() {
audio_devices.push(Arc::new(output_device.clone()));
let device_control = DeviceControl {
is_running: true,
is_paused: false,
};
devices_status.insert(output_device, device_control);
}
}
} else {
// Use specified devices
Expand Down Expand Up @@ -279,7 +281,7 @@ async fn main() -> anyhow::Result<()> {

let (restart_sender, mut restart_receiver) = channel(10);
let resource_monitor =
ResourceMonitor::new(cli.self_healing, Duration::from_secs(60), 3, restart_sender);
ResourceMonitor::new(cli.self_healing, Duration::from_secs(5), 3, restart_sender);
resource_monitor.start_monitoring(Duration::from_secs(10));

let db = Arc::new(
Expand All @@ -296,10 +298,7 @@ async fn main() -> anyhow::Result<()> {
);
let db_server = db.clone();

// Channel for controlling the recorder ! TODO RENAME SHIT
let vision_control = Arc::new(AtomicBool::new(true));

let vision_control_server_clone = vision_control.clone();
let recording_state = Arc::new(Mutex::new(RecordingState::new()));

// Before the loop starts, clone friend_wearable_uid
let friend_wearable_uid = cli.friend_wearable_uid.clone();
Expand All @@ -308,49 +307,65 @@ async fn main() -> anyhow::Result<()> {

// Function to start or restart the recording task
let _start_recording = tokio::spawn(async move {
// hack
let mut recording_task = tokio::spawn(async move {});

loop {
let db_clone = db.clone();
let local_data_dir = local_data_dir.clone();
let vision_control = vision_control.clone();
let recording_state = Arc::clone(&recording_state);
let recording_state_clone = recording_state.clone();
let recording_state_clone_2 = recording_state.clone();
let audio_devices_control = audio_devices_control.clone();
let friend_wearable_uid_clone = friend_wearable_uid.clone(); // Clone for each iteration
let friend_wearable_uid_clone = friend_wearable_uid.clone();

tokio::select! {
_ = &mut recording_task => {
// Recording task completed or errored, restart it
debug!("Recording task ended. Restarting...");
}
Some(_) = restart_receiver.recv() => {
// Received restart signal, cancel the current task and restart
info!("Received restart signal. Restarting recording task...");
recording_task.abort();
}
}
let core_ocr_engine: CoreOcrEngine = cli.ocr_engine.clone().into();
let ocr_engine = Arc::new(OcrEngine::from(core_ocr_engine));
recording_task = tokio::spawn(async move {

{
let mut state = recording_state.lock().await;
if state.is_running {
warn!("Recording is already running. Waiting before retry.");
drop(state);
tokio::time::sleep(Duration::from_secs(30)).await;
continue;
}
state.is_running = true;
}

let recording_task = tokio::spawn(async move {
let result = start_continuous_recording(
db_clone,
Arc::new(local_data_dir.join("data").to_string_lossy().into_owned()),
cli.fps,
Duration::from_secs(cli.audio_chunk_duration),
vision_control,
recording_state.clone(),
audio_devices_control,
cli.save_text_files,
cli.cloud_audio_on,
ocr_engine,
friend_wearable_uid_clone, // Use the cloned version
friend_wearable_uid_clone,
)
.await;

if let Err(e) = result {
error!("Continuous recording error: {:?}", e);
}

let mut state = recording_state.lock().await;
state.is_running = false;
});
debug!("Recording task started");


tokio::select! {
_ = recording_task => {
debug!("Recording task ended.");
}
Some(_) = restart_receiver.recv() => {
info!("Received restart signal. Cancelling current recording task...");
let state = recording_state_clone.lock().await;
state.cancel();
drop(state);
recording_state_clone_2.lock().await.cancel();
}
}

// Short delay before restarting to avoid rapid restarts
tokio::time::sleep(Duration::from_secs(1)).await;
Expand All @@ -369,7 +384,6 @@ async fn main() -> anyhow::Result<()> {
let server = Server::new(
db_server,
SocketAddr::from(([0, 0, 0, 0], cli.port)),
vision_control_server_clone,
audio_devices_control_server,
);
server.start(devices_status, api_plugin).await.unwrap();
Expand Down
Loading

0 comments on commit 28f086e

Please sign in to comment.