diff --git a/screenpipe-app-tauri/lib/hooks/use-settings.tsx b/screenpipe-app-tauri/lib/hooks/use-settings.tsx
index d71dcbb11d..a6559bd4d2 100644
--- a/screenpipe-app-tauri/lib/hooks/use-settings.tsx
+++ b/screenpipe-app-tauri/lib/hooks/use-settings.tsx
@@ -126,7 +126,7 @@ const DEFAULT_SETTINGS: Settings = {
monitorIds: ["default"],
audioDevices: ["default"],
usePiiRemoval: false,
- restartInterval: 120,
+ restartInterval: 0,
port: 3030,
dataDir: "default",
disableAudio: false,
diff --git a/screenpipe-app-tauri/src-tauri/Cargo.lock b/screenpipe-app-tauri/src-tauri/Cargo.lock
index 1e227b0783..3ebc029c88 100755
--- a/screenpipe-app-tauri/src-tauri/Cargo.lock
+++ b/screenpipe-app-tauri/src-tauri/Cargo.lock
@@ -5289,7 +5289,7 @@ dependencies = [
[[package]]
name = "screenpipe-app"
-version = "0.32.14"
+version = "0.32.16"
dependencies = [
"anyhow",
"async-stream",
diff --git a/screenpipe-app-tauri/src-tauri/src/sidecar.rs b/screenpipe-app-tauri/src-tauri/src/sidecar.rs
index 63eae390a8..fbdc70a0a5 100644
--- a/screenpipe-app-tauri/src-tauri/src/sidecar.rs
+++ b/screenpipe-app-tauri/src-tauri/src/sidecar.rs
@@ -520,39 +520,39 @@ impl SidecarManager {
// Spawn the sidecar
let child = spawn_sidecar(app)?;
self.child = Some(child);
- self.last_restart = Instant::now();
- info!("last restart: {:?}", self.last_restart);
-
- // kill previous task if any
- if let Some(task) = self.restart_task.take() {
- task.abort();
- }
-
- let restart_interval = self.restart_interval.clone();
- info!("restart_interval: {:?}", restart_interval);
- // Add this function outside the SidecarManager impl
- async fn check_and_restart_sidecar(app_handle: &tauri::AppHandle) -> Result<(), String> {
- let state = app_handle.state::
();
- let mut manager = state.0.lock().await;
- if let Some(manager) = manager.as_mut() {
- manager.check_and_restart(app_handle).await
- } else {
- Ok(())
- }
- }
-
- // In the spawn method
- let app_handle = app.app_handle().clone();
- self.restart_task = Some(tauri::async_runtime::spawn(async move {
- loop {
- let interval = *restart_interval.lock().await;
- info!("interval: {}", interval.as_secs());
- if let Err(e) = check_and_restart_sidecar(&app_handle).await {
- error!("Failed to check and restart sidecar: {}", e);
- }
- sleep(Duration::from_secs(60)).await;
- }
- }));
+ // self.last_restart = Instant::now();
+ // info!("last restart: {:?}", self.last_restart);
+
+ // // kill previous task if any
+ // if let Some(task) = self.restart_task.take() {
+ // task.abort();
+ // }
+
+ // let restart_interval = self.restart_interval.clone();
+ // info!("restart_interval: {:?}", restart_interval);
+ // // Add this function outside the SidecarManager impl
+ // async fn check_and_restart_sidecar(app_handle: &tauri::AppHandle) -> Result<(), String> {
+ // let state = app_handle.state::();
+ // let mut manager = state.0.lock().await;
+ // if let Some(manager) = manager.as_mut() {
+ // manager.check_and_restart(app_handle).await
+ // } else {
+ // Ok(())
+ // }
+ // }
+
+ // // In the spawn method
+ // let app_handle = app.app_handle().clone();
+ // self.restart_task = Some(tauri::async_runtime::spawn(async move {
+ // loop {
+ // let interval = *restart_interval.lock().await;
+ // info!("interval: {}", interval.as_secs());
+ // if let Err(e) = check_and_restart_sidecar(&app_handle).await {
+ // error!("Failed to check and restart sidecar: {}", e);
+ // }
+ // sleep(Duration::from_secs(60)).await;
+ // }
+ // }));
Ok(())
}
diff --git a/screenpipe-audio/Cargo.toml b/screenpipe-audio/Cargo.toml
index 3ef8f6cbcb..dd15aaa55d 100644
--- a/screenpipe-audio/Cargo.toml
+++ b/screenpipe-audio/Cargo.toml
@@ -62,26 +62,24 @@ webrtc-vad = "0.4.0"
reqwest = { workspace = true }
screenpipe-core = { path = "../screenpipe-core" }
-screenpipe-events = { path = "../screenpipe-events" }
# crossbeam
crossbeam = { workspace = true }
+dashmap = { workspace = true }
# Directories
dirs = "5.0.1"
-lazy_static = "1.4.0"
+lazy_static = { version = "1.4.0" }
realfft = "3.4.0"
regex = "1.11.0"
ndarray = "0.16"
ort = "=2.0.0-rc.6"
-knf-rs = { git = "https://github.com/Neptune650/knf-rs.git", branch = "main" }
+knf-rs = { git = "https://github.com/Neptune650/knf-rs.git" }
ort-sys = "=2.0.0-rc.8"
futures = "0.3.31"
-deepgram = { git = "https://github.com/EzraEllette/deepgram-rust-sdk.git" }
+deepgram = "0.6.4"
bytes = { version = "1.9.0", features = ["serde"] }
-lru = "0.13.0"
-num-traits = "0.2.19"
[target.'cfg(target_os = "windows")'.dependencies]
ort = { version = "=2.0.0-rc.6", features = [
diff --git a/screenpipe-audio/benches/record_and_transcribe_benchmark.rs b/screenpipe-audio/benches/record_and_transcribe_benchmark.rs
index dbc5dc40b7..ae81a7db39 100644
--- a/screenpipe-audio/benches/record_and_transcribe_benchmark.rs
+++ b/screenpipe-audio/benches/record_and_transcribe_benchmark.rs
@@ -1,10 +1,9 @@
use criterion::{black_box, criterion_group, criterion_main, BenchmarkId, Criterion};
use screenpipe_audio::vad_engine::VadSensitivity;
use screenpipe_audio::{
- create_whisper_channel, default_input_device, record_and_transcribe, AudioInput, AudioStream,
- AudioTranscriptionEngine,
+ create_whisper_channel, default_input_device, record_and_transcribe, AudioDevice, AudioInput,
+ AudioStream, AudioTranscriptionEngine,
};
-use screenpipe_core::{AudioDevice, DeviceManager};
use std::path::PathBuf;
use std::sync::atomic::AtomicBool;
use std::sync::Arc;
@@ -19,14 +18,14 @@ async fn setup_test() -> (
let audio_device = default_input_device().unwrap(); // TODO feed voice in automatically somehow
let output_path = PathBuf::from("/tmp/test_audio.mp4");
// let (whisper_sender, _) = mpsc::unbounded_channel();
- let (whisper_sender, _) = create_whisper_channel(
+ let (whisper_sender, _, _) = create_whisper_channel(
Arc::new(AudioTranscriptionEngine::WhisperDistilLargeV3),
screenpipe_audio::VadEngineEnum::Silero,
None,
&output_path,
VadSensitivity::High,
vec![],
- Arc::new(DeviceManager::default()),
+ None,
)
.await
.unwrap();
diff --git a/screenpipe-audio/examples/stt.rs b/screenpipe-audio/examples/stt.rs
index 3c5ac21845..be53a6199a 100644
--- a/screenpipe-audio/examples/stt.rs
+++ b/screenpipe-audio/examples/stt.rs
@@ -8,7 +8,6 @@ use screenpipe_audio::{AudioInput, AudioTranscriptionEngine};
use screenpipe_core::Language;
use std::path::PathBuf;
use std::sync::Arc;
-use std::sync::Mutex as StdMutex;
use strsim::levenshtein;
use tokio::sync::Mutex;
use tracing::debug;
@@ -91,15 +90,16 @@ async fn main() {
};
let mut segments = prepare_segments(
- audio_input.data,
+ &audio_input.data,
vad_engine.clone(),
&segmentation_model_path,
- Arc::new(StdMutex::new(embedding_manager)),
+ embedding_manager,
embedding_extractor,
&audio_input.device.to_string(),
)
.await
.unwrap();
+ let mut whisper_model_guard = whisper_model.lock().await;
let mut transcription = String::new();
while let Some(segment) = segments.recv().await {
@@ -107,7 +107,7 @@ async fn main() {
&segment.samples,
audio_input.sample_rate,
&audio_input.device.to_string(),
- whisper_model.clone(),
+ &mut whisper_model_guard,
Arc::new(AudioTranscriptionEngine::WhisperLargeV3Turbo),
None,
vec![Language::English],
@@ -117,6 +117,7 @@ async fn main() {
transcription.push_str(&transcript);
}
+ drop(whisper_model_guard);
let distance = levenshtein(expected_transcription, &transcription.to_lowercase());
let accuracy = 1.0 - (distance as f64 / expected_transcription.len() as f64);
diff --git a/screenpipe-audio/src/audio_processing.rs b/screenpipe-audio/src/audio_processing.rs
index 51040c0715..5f243033ec 100644
--- a/screenpipe-audio/src/audio_processing.rs
+++ b/screenpipe-audio/src/audio_processing.rs
@@ -161,281 +161,3 @@ pub fn write_audio_to_file(
}
Ok(file_path_clone)
}
-
-// Audio processing code, adapted from whisper.cpp
-// https://github.com/ggerganov/whisper.cpp
-
-use candle::utils::get_num_threads;
-
-pub trait Float:
- num_traits::Float + num_traits::FloatConst + num_traits::NumAssign + Send + Sync
-{
-}
-
-impl Float for f32 {}
-impl Float for f64 {}
-
-// https://github.com/ggerganov/whisper.cpp/blob/4774d2feb01a772a15de81ffc34b34a1f294f020/whisper.cpp#L2357
-fn fft(inp: &[T]) -> Vec {
- let n = inp.len();
- let zero = T::zero();
- if n == 1 {
- return vec![inp[0], zero];
- }
- if n % 2 == 1 {
- return dft(inp);
- }
- let mut out = vec![zero; n * 2];
-
- let mut even = Vec::with_capacity(n / 2);
- let mut odd = Vec::with_capacity(n / 2);
-
- for (i, &inp) in inp.iter().enumerate() {
- if i % 2 == 0 {
- even.push(inp)
- } else {
- odd.push(inp);
- }
- }
-
- let even_fft = fft(&even);
- let odd_fft = fft(&odd);
-
- let two_pi = T::PI() + T::PI();
- let n_t = T::from(n).unwrap();
- for k in 0..n / 2 {
- let k_t = T::from(k).unwrap();
- let theta = two_pi * k_t / n_t;
- let re = theta.cos();
- let im = -theta.sin();
-
- let re_odd = odd_fft[2 * k];
- let im_odd = odd_fft[2 * k + 1];
-
- out[2 * k] = even_fft[2 * k] + re * re_odd - im * im_odd;
- out[2 * k + 1] = even_fft[2 * k + 1] + re * im_odd + im * re_odd;
-
- out[2 * (k + n / 2)] = even_fft[2 * k] - re * re_odd + im * im_odd;
- out[2 * (k + n / 2) + 1] = even_fft[2 * k + 1] - re * im_odd - im * re_odd;
- }
- out
-}
-
-// https://github.com/ggerganov/whisper.cpp/blob/4774d2feb01a772a15de81ffc34b34a1f294f020/whisper.cpp#L2337
-fn dft(inp: &[T]) -> Vec {
- let zero = T::zero();
- let n = inp.len();
- let two_pi = T::PI() + T::PI();
-
- let mut out = Vec::with_capacity(2 * n);
- let n_t = T::from(n).unwrap();
- for k in 0..n {
- let k_t = T::from(k).unwrap();
- let mut re = zero;
- let mut im = zero;
-
- for (j, &inp) in inp.iter().enumerate() {
- let j_t = T::from(j).unwrap();
- let angle = two_pi * k_t * j_t / n_t;
- re += inp * angle.cos();
- im -= inp * angle.sin();
- }
-
- out.push(re);
- out.push(im);
- }
- out
-}
-
-#[allow(clippy::too_many_arguments)]
-// https://github.com/ggerganov/whisper.cpp/blob/4774d2feb01a772a15de81ffc34b34a1f294f020/whisper.cpp#L2414
-fn log_mel_spectrogram_w(
- ith: usize,
- hann: &[T],
- samples: &[T],
- filters: &[T],
- fft_size: usize,
- fft_step: usize,
- speed_up: bool,
- n_len: usize,
- n_mel: usize,
- n_threads: usize,
-) -> Vec {
- let n_fft = if speed_up {
- 1 + fft_size / 4
- } else {
- 1 + fft_size / 2
- };
-
- let zero = T::zero();
- let half = T::from(0.5).unwrap();
- let mut fft_in = vec![zero; fft_size];
- let mut mel = vec![zero; n_len * n_mel];
- let n_samples = samples.len();
- let end = std::cmp::min(n_samples / fft_step + 1, n_len);
-
- for i in (ith..end).step_by(n_threads) {
- let offset = i * fft_step;
-
- // apply Hanning window
- for j in 0..std::cmp::min(fft_size, n_samples - offset) {
- fft_in[j] = hann[j] * samples[offset + j];
- }
-
- // fill the rest with zeros
- if n_samples - offset < fft_size {
- fft_in[n_samples - offset..].fill(zero);
- }
-
- // FFT
- let mut fft_out: Vec = fft(&fft_in);
-
- // Calculate modulus^2 of complex numbers
- for j in 0..fft_size {
- fft_out[j] = fft_out[2 * j] * fft_out[2 * j] + fft_out[2 * j + 1] * fft_out[2 * j + 1];
- }
- for j in 1..fft_size / 2 {
- let v = fft_out[fft_size - j];
- fft_out[j] += v;
- }
-
- if speed_up {
- // scale down in the frequency domain results in a speed up in the time domain
- for j in 0..n_fft {
- fft_out[j] = half * (fft_out[2 * j] + fft_out[2 * j + 1]);
- }
- }
-
- // mel spectrogram
- for j in 0..n_mel {
- let mut sum = zero;
- let mut k = 0;
- // Unroll loop
- while k < n_fft.saturating_sub(3) {
- sum += fft_out[k] * filters[j * n_fft + k]
- + fft_out[k + 1] * filters[j * n_fft + k + 1]
- + fft_out[k + 2] * filters[j * n_fft + k + 2]
- + fft_out[k + 3] * filters[j * n_fft + k + 3];
- k += 4;
- }
- // Handle remainder
- while k < n_fft {
- sum += fft_out[k] * filters[j * n_fft + k];
- k += 1;
- }
- mel[j * n_len + i] = T::max(sum, T::from(1e-10).unwrap()).log10();
- }
- }
- mel
-}
-
-pub async fn log_mel_spectrogram_(
- samples: &[T],
- filters: &[T],
- fft_size: usize,
- fft_step: usize,
- n_mel: usize,
- speed_up: bool,
-) -> Vec {
- let zero = T::zero();
- let two_pi = T::PI() + T::PI();
- let half = T::from(0.5).unwrap();
- let one = T::from(1.0).unwrap();
- let four = T::from(4.0).unwrap();
- let fft_size_t = T::from(fft_size).unwrap();
-
- let hann: Vec = (0..fft_size)
- .map(|i| half * (one - ((two_pi * T::from(i).unwrap()) / fft_size_t).cos()))
- .collect();
- let n_len = samples.len() / fft_step;
-
- // pad audio with at least one extra chunk of zeros
- let pad = 100 * candle_transformers::models::whisper::CHUNK_LENGTH / 2;
- let n_len = if n_len % pad != 0 {
- (n_len / pad + 1) * pad
- } else {
- n_len
- };
- let n_len = n_len + pad;
- let samples = {
- let mut samples_padded = samples.to_vec();
- let to_add = n_len * fft_step - samples.len();
- samples_padded.extend(std::iter::repeat(zero).take(to_add));
- samples_padded
- };
-
- // ensure that the number of threads is even and less than 12
- let n_threads = std::cmp::min(get_num_threads() - get_num_threads() % 2, 12);
- let n_threads = std::cmp::max(n_threads, 2);
-
- // Create owned copies of the input data
- let samples = samples.to_vec();
- let filters = filters.to_vec();
-
- let mut handles = vec![];
- for thread_id in 0..n_threads {
- // Clone the owned vectors for each task
- let hann = hann.clone();
- let samples = samples.clone();
- let filters = filters.clone();
-
- handles.push(tokio::task::spawn(async move {
- log_mel_spectrogram_w(
- thread_id, &hann, &samples, &filters, fft_size, fft_step, speed_up, n_len, n_mel,
- n_threads,
- )
- }));
- }
-
- let all_outputs = futures::future::join_all(handles)
- .await
- .into_iter()
- .map(|res| res.expect("Task failed"))
- .collect::>();
-
- let l = all_outputs[0].len();
- let mut mel = vec![zero; l];
-
- // iterate over mel spectrogram segments, dividing work by threads.
- for segment_start in (0..l).step_by(n_threads) {
- // go through each thread's output.
- for thread_output in all_outputs.iter() {
- // add each thread's piece to our mel spectrogram.
- for offset in 0..n_threads {
- let mel_index = segment_start + offset; // find location in mel.
- if mel_index < mel.len() {
- // Make sure we don't go out of bounds.
- mel[mel_index] += thread_output[mel_index];
- }
- }
- }
- }
-
- let mmax = mel
- .iter()
- .max_by(|&u, &v| u.partial_cmp(v).unwrap_or(std::cmp::Ordering::Greater))
- .copied()
- .unwrap_or(zero)
- - T::from(8).unwrap();
- for m in mel.iter_mut() {
- let v = T::max(*m, mmax);
- *m = v / four + one
- }
- mel
-}
-
-pub async fn pcm_to_mel(
- cfg: &candle_transformers::models::whisper::Config,
- samples: &[T],
- filters: &[T],
-) -> Vec {
- log_mel_spectrogram_(
- samples,
- filters,
- candle_transformers::models::whisper::N_FFT,
- candle_transformers::models::whisper::HOP_LENGTH,
- cfg.num_mel_bins,
- false,
- )
- .await
-}
diff --git a/screenpipe-audio/src/bin/screenpipe-audio-forever.rs b/screenpipe-audio/src/bin/screenpipe-audio-forever.rs
index fbf5ad13c2..073919061b 100644
--- a/screenpipe-audio/src/bin/screenpipe-audio-forever.rs
+++ b/screenpipe-audio/src/bin/screenpipe-audio-forever.rs
@@ -8,11 +8,10 @@ use screenpipe_audio::list_audio_devices;
use screenpipe_audio::parse_audio_device;
use screenpipe_audio::record_and_transcribe;
use screenpipe_audio::vad_engine::VadSensitivity;
+use screenpipe_audio::AudioDevice;
use screenpipe_audio::AudioStream;
use screenpipe_audio::AudioTranscriptionEngine;
use screenpipe_audio::VadEngineEnum;
-use screenpipe_core::AudioDevice;
-use screenpipe_core::DeviceManager;
use screenpipe_core::Language;
use std::path::PathBuf;
use std::sync::atomic::AtomicBool;
@@ -89,14 +88,14 @@ async fn main() -> Result<()> {
}
let chunk_duration = Duration::from_secs_f32(args.audio_chunk_duration);
- let (whisper_sender, whisper_receiver) = create_whisper_channel(
+ let (whisper_sender, whisper_receiver, _) = create_whisper_channel(
Arc::new(AudioTranscriptionEngine::WhisperDistilLargeV3),
VadEngineEnum::Silero, // Or VadEngineEnum::WebRtc, hardcoded for now
args.deepgram_api_key,
&PathBuf::from("output.mp4"),
VadSensitivity::Medium,
languages,
- Arc::new(DeviceManager::default()),
+ None,
)
.await?;
diff --git a/screenpipe-audio/src/bin/screenpipe-audio.rs b/screenpipe-audio/src/bin/screenpipe-audio.rs
index 733780e643..22c3222b7f 100644
--- a/screenpipe-audio/src/bin/screenpipe-audio.rs
+++ b/screenpipe-audio/src/bin/screenpipe-audio.rs
@@ -8,11 +8,10 @@ use screenpipe_audio::list_audio_devices;
use screenpipe_audio::parse_audio_device;
use screenpipe_audio::record_and_transcribe;
use screenpipe_audio::vad_engine::VadSensitivity;
+use screenpipe_audio::AudioDevice;
use screenpipe_audio::AudioStream;
use screenpipe_audio::AudioTranscriptionEngine;
use screenpipe_audio::VadEngineEnum;
-use screenpipe_core::AudioDevice;
-use screenpipe_core::DeviceManager;
use screenpipe_core::Language;
use std::path::PathBuf;
use std::sync::atomic::AtomicBool;
@@ -93,14 +92,14 @@ async fn main() -> Result<()> {
let chunk_duration = Duration::from_secs(10);
let output_path = PathBuf::from("output.mp4");
- let (whisper_sender, whisper_receiver) = create_whisper_channel(
+ let (whisper_sender, whisper_receiver, _) = create_whisper_channel(
Arc::new(AudioTranscriptionEngine::WhisperDistilLargeV3),
VadEngineEnum::WebRtc, // Or VadEngineEnum::WebRtc, hardcoded for now
deepgram_api_key,
&output_path,
VadSensitivity::Medium,
languages,
- Arc::new(DeviceManager::default()),
+ None
)
.await?;
// Spawn threads for each device
diff --git a/screenpipe-audio/src/core.rs b/screenpipe-audio/src/core.rs
index 88a93c50bd..1d33ac9888 100644
--- a/screenpipe-audio/src/core.rs
+++ b/screenpipe-audio/src/core.rs
@@ -1,12 +1,13 @@
use crate::audio_processing::audio_to_mono;
-use crate::realtime::realtime_stt;
+use crate::realtime::{realtime_stt, RealtimeTranscriptionEvent};
use crate::AudioInput;
use anyhow::{anyhow, Result};
use cpal::traits::{DeviceTrait, HostTrait, StreamTrait};
use cpal::StreamError;
use lazy_static::lazy_static;
use log::{debug, error, info, warn};
-use screenpipe_core::{AudioDevice, AudioDeviceType, Language};
+use screenpipe_core::Language;
+use serde::{Deserialize, Serialize};
use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
use std::sync::mpsc;
use std::sync::Arc;
@@ -44,6 +45,68 @@ impl fmt::Display for AudioTranscriptionEngine {
}
}
+#[derive(Clone, Debug)]
+pub struct DeviceControl {
+ pub is_running: bool,
+ pub is_paused: bool,
+}
+
+#[derive(Clone, Eq, PartialEq, Hash, Serialize, Debug, Deserialize)]
+pub enum DeviceType {
+ Input,
+ Output,
+}
+
+#[derive(Clone, Eq, PartialEq, Hash, Serialize, Debug)]
+pub struct AudioDevice {
+ pub name: String,
+ pub device_type: DeviceType,
+}
+
+impl AudioDevice {
+ pub fn new(name: String, device_type: DeviceType) -> Self {
+ AudioDevice { name, device_type }
+ }
+
+ pub fn from_name(name: &str) -> Result {
+ if name.trim().is_empty() {
+ return Err(anyhow!("Device name cannot be empty"));
+ }
+
+ let (name, device_type) = if name.to_lowercase().ends_with("(input)") {
+ (
+ name.trim_end_matches("(input)").trim().to_string(),
+ DeviceType::Input,
+ )
+ } else if name.to_lowercase().ends_with("(output)") {
+ (
+ name.trim_end_matches("(output)").trim().to_string(),
+ DeviceType::Output,
+ )
+ } else {
+ return Err(anyhow!(
+ "Device type (input/output) not specified in the name"
+ ));
+ };
+
+ Ok(AudioDevice::new(name, device_type))
+ }
+}
+
+impl fmt::Display for AudioDevice {
+ fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
+ write!(
+ f,
+ "{} ({})",
+ self.name,
+ match self.device_type {
+ DeviceType::Input => "input",
+ DeviceType::Output => "output",
+ }
+ )
+ }
+}
+
pub fn parse_audio_device(name: &str) -> Result {
AudioDevice::from_name(name)
}
@@ -53,18 +116,18 @@ pub async fn get_device_and_config(
) -> Result<(cpal::Device, cpal::SupportedStreamConfig)> {
let host = cpal::default_host();
- let is_output_device = audio_device.device_type == AudioDeviceType::Output;
+ let is_output_device = audio_device.device_type == DeviceType::Output;
let is_display = audio_device.to_string().contains("Display");
let cpal_audio_device = if audio_device.to_string() == "default" {
match audio_device.device_type {
- AudioDeviceType::Input => host.default_input_device(),
- AudioDeviceType::Output => host.default_output_device(),
+ DeviceType::Input => host.default_input_device(),
+ DeviceType::Output => host.default_output_device(),
}
} else {
let mut devices = match audio_device.device_type {
- AudioDeviceType::Input => host.input_devices()?,
- AudioDeviceType::Output => host.output_devices()?,
+ DeviceType::Input => host.input_devices()?,
+ DeviceType::Output => host.output_devices()?,
};
#[cfg(target_os = "macos")]
@@ -89,7 +152,7 @@ pub async fn get_device_and_config(
.unwrap_or(false)
})
}
- .ok_or_else(|| anyhow!("audio device not found"))?;
+ .ok_or_else(|| anyhow!("Audio device not found"))?;
// if output device and windows, using output config
let config = if is_output_device && !is_display {
@@ -136,14 +199,16 @@ pub async fn record_and_transcribe(
pub async fn start_realtime_recording(
audio_stream: Arc,
- languages: Arc<[Language]>,
+ languages: Vec,
is_running: Arc,
+ realtime_transcription_sender: Arc>,
deepgram_api_key: Option,
) -> Result<()> {
while is_running.load(Ordering::Relaxed) {
match realtime_stt(
audio_stream.clone(),
languages.clone(),
+ realtime_transcription_sender.clone(),
is_running.clone(),
deepgram_api_key.clone(),
)
@@ -183,23 +248,26 @@ async fn run_record_and_transcribe(
);
const OVERLAP_SECONDS: usize = 2;
+ let mut collected_audio = Vec::new();
let sample_rate = audio_stream.device_config.sample_rate().0 as usize;
let overlap_samples = OVERLAP_SECONDS * sample_rate;
- let duration_samples = (duration.as_secs_f64() * sample_rate as f64).ceil() as usize;
- let max_samples = duration_samples + overlap_samples;
-
- let mut collected_audio = Vec::with_capacity(max_samples);
while is_running.load(Ordering::Relaxed)
&& !audio_stream.is_disconnected.load(Ordering::Relaxed)
{
let start_time = tokio::time::Instant::now();
- // Collect audio for the duration period
while start_time.elapsed() < duration && is_running.load(Ordering::Relaxed) {
match tokio::time::timeout(Duration::from_millis(100), receiver.recv()).await {
Ok(Ok(chunk)) => {
- collected_audio.extend_from_slice(&chunk);
+ collected_audio.extend(chunk);
+ LAST_AUDIO_CAPTURE.store(
+ std::time::SystemTime::now()
+ .duration_since(std::time::UNIX_EPOCH)
+ .unwrap()
+ .as_secs(),
+ Ordering::Relaxed,
+ );
}
Ok(Err(e)) => {
error!("error receiving audio data: {}", e);
@@ -209,12 +277,6 @@ async fn run_record_and_transcribe(
}
}
- // Discard oldest samples if we exceed buffer capacity
- if collected_audio.len() > max_samples {
- let excess = collected_audio.len() - max_samples;
- collected_audio.drain(0..excess);
- }
-
if !collected_audio.is_empty() {
debug!("sending audio segment to audio model");
match whisper_sender.try_send(AudioInput {
@@ -225,14 +287,9 @@ async fn run_record_and_transcribe(
}) {
Ok(_) => {
debug!("sent audio segment to audio model");
- // Retain only overlap samples for next iteration
- let current_len = collected_audio.len();
- if current_len > overlap_samples {
- let keep_from = current_len - overlap_samples;
- collected_audio.drain(0..keep_from);
- } else {
- // If we don't have enough samples, keep all (unlikely case)
- collected_audio.truncate(current_len);
+ if collected_audio.len() > overlap_samples {
+ collected_audio =
+ collected_audio.split_off(collected_audio.len() - overlap_samples);
}
}
Err(e) => {
@@ -258,7 +315,7 @@ pub async fn list_audio_devices() -> Result> {
for device in host.input_devices()? {
if let Ok(name) = device.name() {
- devices.push(AudioDevice::new(name, AudioDeviceType::Input));
+ devices.push(AudioDevice::new(name, DeviceType::Input));
}
}
@@ -285,7 +342,7 @@ pub async fn list_audio_devices() -> Result> {
for device in host.input_devices()? {
if let Ok(name) = device.name() {
if should_include_output_device(&name) {
- devices.push(AudioDevice::new(name, AudioDeviceType::Output));
+ devices.push(AudioDevice::new(name, DeviceType::Output));
}
}
}
@@ -296,7 +353,7 @@ pub async fn list_audio_devices() -> Result> {
for device in host.output_devices()? {
if let Ok(name) = device.name() {
if should_include_output_device(&name) {
- devices.push(AudioDevice::new(name, AudioDeviceType::Output));
+ devices.push(AudioDevice::new(name, DeviceType::Output));
}
}
}
@@ -308,10 +365,7 @@ pub async fn list_audio_devices() -> Result> {
&& should_include_output_device(&device.name().unwrap())
{
// TODO: not sure if it can be input, usually aggregate or multi output
- devices.push(AudioDevice::new(
- device.name().unwrap(),
- AudioDeviceType::Output,
- ));
+ devices.push(AudioDevice::new(device.name().unwrap(), DeviceType::Output));
}
}
@@ -323,7 +377,7 @@ pub fn default_input_device() -> Result {
let device = host
.default_input_device()
.ok_or(anyhow!("No default input device detected"))?;
- Ok(AudioDevice::new(device.name()?, AudioDeviceType::Input))
+ Ok(AudioDevice::new(device.name()?, DeviceType::Input))
}
// this should be optional ?
pub fn default_output_device() -> Result {
@@ -333,7 +387,7 @@ pub fn default_output_device() -> Result {
if let Ok(host) = cpal::host_from_id(cpal::HostId::ScreenCaptureKit) {
if let Some(device) = host.default_input_device() {
if let Ok(name) = device.name() {
- return Ok(AudioDevice::new(name, AudioDeviceType::Output));
+ return Ok(AudioDevice::new(name, DeviceType::Output));
}
}
}
@@ -341,7 +395,7 @@ pub fn default_output_device() -> Result {
let device = host
.default_output_device()
.ok_or_else(|| anyhow!("No default output device found"))?;
- Ok(AudioDevice::new(device.name()?, AudioDeviceType::Output))
+ Ok(AudioDevice::new(device.name()?, DeviceType::Output))
}
#[cfg(not(target_os = "macos"))]
@@ -350,7 +404,7 @@ pub fn default_output_device() -> Result {
let device = host
.default_output_device()
.ok_or_else(|| anyhow!("No default output device found"))?;
- return Ok(AudioDevice::new(device.name()?, AudioDeviceType::Output));
+ return Ok(AudioDevice::new(device.name()?, DeviceType::Output));
}
}
@@ -446,13 +500,6 @@ impl AudioStream {
move |data: &[f32], _: &_| {
let mono = audio_to_mono(data, channels);
let _ = tx.send(mono);
- LAST_AUDIO_CAPTURE.store(
- std::time::SystemTime::now()
- .duration_since(std::time::UNIX_EPOCH)
- .unwrap()
- .as_secs(),
- Ordering::Relaxed,
- );
},
error_callback,
None,
@@ -464,13 +511,6 @@ impl AudioStream {
move |data: &[i16], _: &_| {
let mono = audio_to_mono(bytemuck::cast_slice(data), channels);
let _ = tx.send(mono);
- LAST_AUDIO_CAPTURE.store(
- std::time::SystemTime::now()
- .duration_since(std::time::UNIX_EPOCH)
- .unwrap()
- .as_secs(),
- Ordering::Relaxed,
- );
},
error_callback,
None,
@@ -482,13 +522,6 @@ impl AudioStream {
move |data: &[i32], _: &_| {
let mono = audio_to_mono(bytemuck::cast_slice(data), channels);
let _ = tx.send(mono);
- LAST_AUDIO_CAPTURE.store(
- std::time::SystemTime::now()
- .duration_since(std::time::UNIX_EPOCH)
- .unwrap()
- .as_secs(),
- Ordering::Relaxed,
- );
},
error_callback,
None,
@@ -500,13 +533,6 @@ impl AudioStream {
move |data: &[i8], _: &_| {
let mono = audio_to_mono(bytemuck::cast_slice(data), channels);
let _ = tx.send(mono);
- LAST_AUDIO_CAPTURE.store(
- std::time::SystemTime::now()
- .duration_since(std::time::UNIX_EPOCH)
- .unwrap()
- .as_secs(),
- Ordering::Relaxed,
- );
},
error_callback,
None,
diff --git a/screenpipe-audio/src/deepgram/mod.rs b/screenpipe-audio/src/deepgram/mod.rs
index 62a8ee0220..c81843cd43 100644
--- a/screenpipe-audio/src/deepgram/mod.rs
+++ b/screenpipe-audio/src/deepgram/mod.rs
@@ -9,8 +9,8 @@ use std::env;
lazy_static! {
pub(crate) static ref DEEPGRAM_API_URL: String = env::var("DEEPGRAM_API_URL")
.unwrap_or_else(|_| "https://api.deepgram.com/v1/listen".to_string());
- pub(crate) static ref DEEPGRAM_WEBSOCKET_URL: String =
- env::var("DEEPGRAM_WEBSOCKET_URL").unwrap_or_else(|_| "".to_string());
+ pub(crate) static ref DEEPGRAM_WEBSOCKET_URL: String = env::var("DEEPGRAM_WEBSOCKET_URL")
+ .unwrap_or_else(|_| "wss://api.deepgram.com/v1/listen".to_string());
pub(crate) static ref CUSTOM_DEEPGRAM_API_TOKEN: String =
env::var("CUSTOM_DEEPGRAM_API_TOKEN").unwrap_or_else(|_| String::new());
}
diff --git a/screenpipe-audio/src/deepgram/realtime.rs b/screenpipe-audio/src/deepgram/realtime.rs
index 7666e8e20e..a5949fd5a1 100644
--- a/screenpipe-audio/src/deepgram/realtime.rs
+++ b/screenpipe-audio/src/deepgram/realtime.rs
@@ -1,7 +1,7 @@
use crate::{
- deepgram::CUSTOM_DEEPGRAM_API_TOKEN, deepgram::DEEPGRAM_WEBSOCKET_URL,
- realtime::RealtimeTranscriptionEvent, AudioStream,
+ deepgram::CUSTOM_DEEPGRAM_API_TOKEN, realtime::RealtimeTranscriptionEvent, AudioStream,
};
+use crate::{AudioDevice, DeviceType};
use anyhow::Result;
use bytes::BufMut;
use bytes::Bytes;
@@ -11,19 +11,16 @@ use deepgram::common::options::Encoding;
use deepgram::common::stream_response::StreamResponse;
use futures::channel::mpsc::{self, Receiver as FuturesReceiver};
use futures::{SinkExt, TryStreamExt};
-use screenpipe_core::AudioDevice;
-use screenpipe_core::AudioDeviceType;
use screenpipe_core::Language;
-use screenpipe_events::send_event;
use std::sync::{atomic::AtomicBool, Arc};
use std::time::Duration;
use tokio::sync::broadcast::Receiver;
-use tokio::sync::oneshot;
-use tracing::info;
+use tracing::error;
pub async fn stream_transcription_deepgram(
stream: Arc,
- languages: Arc<[Language]>,
+ realtime_transcription_sender: Arc>,
+ languages: Vec,
is_running: Arc,
deepgram_api_key: Option,
) -> Result<()> {
@@ -31,6 +28,7 @@ pub async fn stream_transcription_deepgram(
stream.subscribe().await,
stream.device.clone(),
stream.device_config.sample_rate().0,
+ realtime_transcription_sender,
is_running,
languages,
deepgram_api_key,
@@ -44,8 +42,9 @@ pub async fn start_deepgram_stream(
stream: Receiver>,
device: Arc,
sample_rate: u32,
+ realtime_transcription_sender: Arc>,
is_running: Arc,
- _languages: Arc<[Language]>,
+ _languages: Vec,
deepgram_api_key: Option,
) -> Result<()> {
let api_key = deepgram_api_key.unwrap_or(CUSTOM_DEEPGRAM_API_TOKEN.to_string());
@@ -54,28 +53,7 @@ pub async fn start_deepgram_stream(
return Err(anyhow::anyhow!("Deepgram API key not found"));
}
- // create shutdown rx from is_running
- let (shutdown_tx, mut shutdown_rx) = oneshot::channel();
-
- tokio::spawn(async move {
- loop {
- let running = is_running.load(std::sync::atomic::Ordering::SeqCst);
- if !running {
- shutdown_tx.send(()).unwrap();
- break;
- }
- tokio::time::sleep(Duration::from_millis(100)).await;
- }
- });
-
- info!("Starting deepgram stream for device: {}", device);
-
- let deepgram = match DEEPGRAM_WEBSOCKET_URL.as_str().is_empty() {
- true => deepgram::Deepgram::new(api_key)?,
- false => {
- deepgram::Deepgram::with_base_url_and_api_key(DEEPGRAM_WEBSOCKET_URL.as_str(), api_key)?
- }
- };
+ let deepgram = deepgram::Deepgram::new(api_key)?;
let deepgram_transcription = deepgram.transcription();
@@ -93,18 +71,20 @@ pub async fn start_deepgram_stream(
let mut handle = req.clone().handle().await?;
let mut results = req.stream(get_stream(stream)).await?;
+ let realtime_transcription_sender_clone = realtime_transcription_sender.clone();
let device_clone = device.clone();
loop {
+ if !is_running.load(std::sync::atomic::Ordering::SeqCst) {
+ break;
+ }
+
tokio::select! {
- _ = &mut shutdown_rx => {
- info!("Shutting down deepgram stream for device: {}", device);
- break;
- }
result = results.try_next() => {
if let Ok(Some(result)) = result {
handle_transcription(
result,
+ realtime_transcription_sender_clone.clone(),
device_clone.clone(),
).await;
}
@@ -128,35 +108,41 @@ fn get_stream(mut stream: Receiver>) -> FuturesReceiver) {
+async fn handle_transcription(
+ result: StreamResponse,
+ realtime_transcription_sender: Arc>,
+ device: Arc,
+) {
if let StreamResponse::TranscriptResponse {
channel, is_final, ..
} = result
{
let res = channel.alternatives.first().unwrap();
let text = res.transcript.clone();
- let is_input = device.device_type == AudioDeviceType::Input;
+ let is_input = device.device_type == DeviceType::Input;
if !text.is_empty() {
- let _ = send_event(
- "transcription",
- RealtimeTranscriptionEvent {
- timestamp: chrono::Utc::now(),
- device: device.to_string(),
- transcription: text.to_string(),
- is_final,
- is_input,
- },
- );
+ match realtime_transcription_sender.send(RealtimeTranscriptionEvent {
+ timestamp: chrono::Utc::now(),
+ device: device.to_string(),
+ transcription: text.to_string(),
+ is_final,
+ is_input,
+ }) {
+ Ok(_) => {}
+ Err(e) => {
+ if !e.to_string().contains("channel closed") {
+ error!("Error sending transcription event: {}", e);
+ }
+ }
+ }
}
}
}
diff --git a/screenpipe-audio/src/lib.rs b/screenpipe-audio/src/lib.rs
index 0cd10da334..6251b31003 100644
--- a/screenpipe-audio/src/lib.rs
+++ b/screenpipe-audio/src/lib.rs
@@ -14,7 +14,8 @@ pub use audio_processing::resample;
pub use core::{
default_input_device, default_output_device, get_device_and_config, list_audio_devices,
parse_audio_device, record_and_transcribe, start_realtime_recording, trigger_audio_permission,
- AudioStream, AudioTranscriptionEngine, LAST_AUDIO_CAPTURE,
+ AudioDevice, AudioStream, AudioTranscriptionEngine, DeviceControl, DeviceType,
+ LAST_AUDIO_CAPTURE,
};
pub mod realtime;
pub use encode::encode_single_audio;
diff --git a/screenpipe-audio/src/pyannote/embedding.rs b/screenpipe-audio/src/pyannote/embedding.rs
index 788dfb40ac..6f16ca245d 100644
--- a/screenpipe-audio/src/pyannote/embedding.rs
+++ b/screenpipe-audio/src/pyannote/embedding.rs
@@ -2,31 +2,24 @@ use crate::pyannote::session;
use anyhow::{Context, Result};
use ndarray::Array2;
use ort::Session;
-use std::{path::Path, sync::Mutex};
+use std::path::Path;
#[derive(Debug)]
-pub struct EmbeddingExtractor {}
-
-lazy_static::lazy_static! {
- static ref EMBEDDING_SESSION: Mutex