Skip to content

Commit

Permalink
refactor: make screenpipe-audio 100% based on tokio tasks instead of …
Browse files Browse the repository at this point in the history
…a mess of OS thread + async. also remove dead code, and add benchmarks
  • Loading branch information
louis030195 committed Jul 16, 2024
1 parent ee26658 commit 3e3d2a3
Show file tree
Hide file tree
Showing 23 changed files with 629 additions and 31,242 deletions.
47 changes: 47 additions & 0 deletions .github/workflows/benchmark.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
# act -W .github/workflows/benchmark.yml --container-architecture linux/amd64 -j benchmark -P ubuntu-latest=catthehacker/ubuntu:act-latest

name: Rust Benchmark

on: [push]

env:
CARGO_TERM_COLOR: always

jobs:
benchmark:
name: Run Rust benchmark
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- uses: dtolnay/rust-toolchain@stable

- name: Install dependencies
run: |
sudo apt-get update
sudo apt-get install -y libavformat-dev libavfilter-dev libavdevice-dev ffmpeg libasound2-dev
- name: Run benchmark
run: |
cargo bench --bench pcm_decode_benchmark stt_benchmark -- --output-format bencher | tee output.txt
echo "Benchmark output:"
cat output.txt
- name: Store benchmark result
uses: benchmark-action/github-action-benchmark@v1
with:
name: Rust Benchmark
tool: "cargo"
output-file-path: output.txt
github-token: ${{ secrets.GITHUB_TOKEN }}
auto-push: false # TODO: https://github.com/benchmark-action/github-action-benchmark/tree/master?tab=readme-ov-file#charts-on-github-pages-1
# Show alert with commit comment on detecting possible performance regression
alert-threshold: "200%"
comment-on-alert: true
fail-on-alert: true
alert-comment-cc-users: "@louis030195"

- name: Upload benchmark results
uses: actions/upload-artifact@v3
with:
name: benchmark-results
path: output.txt
29 changes: 19 additions & 10 deletions CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,25 @@ cargo test

## Other hacks

### Debugging memory errors

```bash
RUSTFLAGS="-Z sanitizer=address" cargo run --bin screenpipe
# or
RUSTFLAGS="-Z sanitizer=leak" cargo run --bin screenpipe
```

For performance monitoring, you can use the following command:

```bash
cargo install cargo-instruments
# tracking leaks over 60 minutes time limit
cargo instruments -t Leaks --bin screenpipe --features metal --time-limit 600000 --open
```

Then open the file in `target/release/instruments` using Xcode -> Open Developer Tool -> Instruments.


### Benchmarks

```
Expand All @@ -92,17 +111,7 @@ cargo install sqlx-cli
sqlx migrate add <migration_name>
```

### Optimization

For performance optimization, you can use the following command:

```bash
cargo install cargo-instruments
# tracking leaks over 60 minutes time limit
cargo instruments -t Leaks --bin screenpipe --features metal --time-limit 600000 --open
```

Then open the file in `target/release/instruments` using Xcode -> Open Developer Tool -> Instruments.


## Join the Community
Expand Down
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ tracing = "0.1.37"
tokio = { version = "1.15", features = ["full", "tracing"] }

# dev
criterion = { version = "0.5.1", features = ["async_tokio"], dev = true }
criterion = { version = "0.5.1", features = ["async_tokio"] }



18 changes: 14 additions & 4 deletions screenpipe-audio/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,6 @@ tempfile = "3"
# Tracing
tracing = { workspace = true }

# Concurrency
crossbeam = "0.8"

# Bytes
bytemuck = "1.16.1"

Expand All @@ -65,7 +62,7 @@ tokio = { workspace = true }
[dev-dependencies]
tempfile = "3.3.0"
infer = "0.15"
tokio = { version = "1.0", features = ["full"] }
criterion = { workspace = true }

[features]
metal = ["candle/metal", "candle-nn/metal", "candle-transformers/metal"]
Expand All @@ -75,3 +72,16 @@ cuda = ["candle/cuda", "candle-nn/cuda", "candle-transformers/cuda"]
[[bin]]
name = "screenpipe-audio"
path = "src/bin/screenpipe-audio.rs"

[[bench]]
name = "audio_benchmark"
harness = false

[[bench]]
name = "pcm_decode_benchmark"
harness = false

[[bench]]
name = "stt_benchmark"
harness = false

117 changes: 117 additions & 0 deletions screenpipe-audio/benches/audio_benchmark.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
// cargo bench --bench audio_benchmark

use criterion::{black_box, criterion_group, criterion_main, Criterion};
use screenpipe_audio::{create_whisper_channel, stt, AudioInput, WhisperModel};
use std::time::{Duration, Instant};
use tokio::runtime::Runtime;

fn generate_large_audio_file(path: &str, duration_secs: u32) {
use std::process::Command;

Command::new("ffmpeg")
.args(&[
"-f",
"lavfi",
"-i",
&format!("sine=frequency=1000:duration={}", duration_secs),
"-acodec",
"pcm_s16le",
"-ar",
"44100",
path,
])
.output()
.expect("Failed to generate audio file");
}

fn benchmark_stt(c: &mut Criterion) {
let whisper_model = WhisperModel::new().unwrap();
let test_file = "test_audio.wav";
generate_large_audio_file(test_file, 60); // 1-minute audio file

c.bench_function("stt_1min_audio", |b| {
b.iter(|| {
let start = Instant::now();
let result = stt(black_box(test_file), black_box(&whisper_model));
let stt_duration = start.elapsed();
println!("STT duration: {:?}", stt_duration);
result.unwrap();
})
});

std::fs::remove_file(test_file).unwrap();
}

fn benchmark_concurrent_stt(c: &mut Criterion) {
let runtime = Runtime::new().unwrap();
let test_files: Vec<String> = (0..10).map(|i| format!("test_audio_{}.wav", i)).collect();

for file in &test_files {
generate_large_audio_file(file, 30); // 30-second audio files
}

c.bench_function("concurrent_stt_10x30s", |b| {
b.iter(|| {
runtime.block_on(async {
let (sender, mut receiver) = create_whisper_channel().await.unwrap();

for file in &test_files {
let input = AudioInput {
path: file.clone(),
device: "test_device".to_string(),
};
sender.send(input).unwrap();
}

for _ in 0..test_files.len() {
receiver.recv().await.unwrap();
}
});
})
});

for file in test_files {
std::fs::remove_file(file).unwrap();
}
}

fn benchmark_large_file(c: &mut Criterion) {
let whisper_model = WhisperModel::new().unwrap();
let large_file = "large_test_audio.wav";
generate_large_audio_file(large_file, 10);

c.bench_function("stt_10min_audio", |b| {
b.iter(|| {
stt(black_box(large_file), black_box(&whisper_model)).unwrap();
})
});

std::fs::remove_file(large_file).unwrap();
}

criterion_group! {
name = benches;
config = Criterion::default()
.sample_size(10)
.measurement_time(Duration::from_secs(300)); // Increase to 5 minutes
targets = benchmark_stt, benchmark_concurrent_stt, benchmark_large_file
}
criterion_main!(benches);

// Benchmarking stt_1min_audio: Warming up for 3.0000 s
// Warning: Unable to complete 10 samples in 60.0s. You may wish to increase target time to 143.6s.
// stt_1min_audio time: [12.127 s 13.636 s 15.211 s]

// Benchmarking concurrent_stt_10x30s: Warming up for 3.0000 s
// Warning: Unable to complete 10 samples in 60.0s. You may wish to increase target time to 1334.5s.
// concurrent_stt_10x30s time: [133.02 s 138.45 s 144.25 s]
// Found 2 outliers among 10 measurements (20.00%)
// 1 (10.00%) low mild
// 1 (10.00%) high mild

// Benchmarking stt_10min_audio: Warming up for 3.0000 s
// Warning: Unable to complete 10 samples in 60.0s. You may wish to increase target time to 168.5s.
// stt_10min_audio time: [19.699 s 21.192 s 23.431 s]
// Found 2 outliers among 10 measurements (20.00%)
// 1 (10.00%) low mild
// 1 (10.00%) high severe
20 changes: 20 additions & 0 deletions screenpipe-audio/benches/pcm_decode_benchmark.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
// cargo bench --bench pcm_decode_benchmark
use criterion::{black_box, criterion_group, criterion_main, Criterion};
use screenpipe_audio::pcm_decode::pcm_decode;
use std::path::PathBuf;

fn benchmark_pcm_decode(c: &mut Criterion) {
// Assuming you have a sample audio file in your project for testing
let test_file_path = PathBuf::from(env!("CARGO_MANIFEST_DIR"))
.join("test_data")
.join("selah.mp3");

c.bench_function("pcm_decode", |b| {
b.iter(|| {
let _ = pcm_decode(black_box(&test_file_path));
})
});
}

criterion_group!(benches, benchmark_pcm_decode);
criterion_main!(benches);
23 changes: 23 additions & 0 deletions screenpipe-audio/benches/stt_benchmark.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
// cargo bench --bench stt_benchmark

use criterion::{black_box, criterion_group, criterion_main, Criterion};
use screenpipe_audio::stt::{stt, WhisperModel};
use std::path::PathBuf;

fn benchmark_stt(c: &mut Criterion) {
let test_file_path = PathBuf::from(env!("CARGO_MANIFEST_DIR"))
.join("test_data")
.join("selah.mp3");

// Initialize WhisperModel outside the benchmark loop
let whisper_model = WhisperModel::new().expect("Failed to initialize WhisperModel");

c.bench_function("stt", |b| {
b.iter(|| {
let _ = stt(black_box(&test_file_path.to_str().unwrap()), &whisper_model);
})
});
}

criterion_group!(benches, benchmark_stt);
criterion_main!(benches);
24 changes: 11 additions & 13 deletions screenpipe-audio/src/bin/screenpipe-audio.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ use screenpipe_audio::AudioDevice;
use std::path::PathBuf;
use std::sync::atomic::AtomicBool;
use std::sync::Arc;
use std::thread;
use std::time::Duration;

#[derive(Parser, Debug)]
Expand All @@ -37,7 +36,8 @@ fn print_devices(devices: &[AudioDevice]) {

// TODO - kinda bad cli here

fn main() -> Result<()> {
#[tokio::main]
async fn main() -> Result<()> {
use env_logger::Builder;
use log::LevelFilter;

Expand Down Expand Up @@ -74,22 +74,24 @@ fn main() -> Result<()> {

let chunk_duration = Duration::from_secs(5);
let output_path = PathBuf::from("output.mp3");
let (whisper_sender, whisper_receiver) = create_whisper_channel()?;
let (whisper_sender, mut whisper_receiver) = create_whisper_channel().await?;
// Spawn threads for each device
let recording_threads: Vec<_> = devices
.into_iter()
.enumerate()
.map(|(i, device)| {
let device = Arc::new(device);
let whisper_sender = whisper_sender.clone();
let output_path = output_path.with_file_name(format!("output_{}.mp3", i));
let device_control = Arc::new(AtomicBool::new(true));
let device_clone = device.clone();
let device_clone = Arc::clone(&device);

thread::spawn(move || {
tokio::spawn(async move {
let device_control_clone = Arc::clone(&device_control);
let device_clone_2 = Arc::clone(&device_clone);

record_and_transcribe(
&device_clone,
device_clone_2,
chunk_duration,
output_path,
whisper_sender,
Expand All @@ -103,29 +105,25 @@ fn main() -> Result<()> {

// Main loop to receive and print transcriptions
loop {
match whisper_receiver.recv_timeout(Duration::from_secs(5)) {
match whisper_receiver.try_recv() {
Ok(result) => {
info!("Transcription: {:?}", result);
consecutive_timeouts = 0; // Reset the counter on successful receive
}
Err(crossbeam::channel::RecvTimeoutError::Timeout) => {
Err(_) => {
consecutive_timeouts += 1;
if consecutive_timeouts >= max_consecutive_timeouts {
info!("No transcriptions received for a while, stopping...");
break;
}
continue;
}
Err(crossbeam::channel::RecvTimeoutError::Disconnected) => {
// All senders have been dropped, recording is complete
break;
}
}
}

// Wait for all recording threads to finish
for (i, thread) in recording_threads.into_iter().enumerate() {
let file_path = thread.join().unwrap()?;
let file_path = thread.await.unwrap().await;
println!("Recording {} complete: {:?}", i, file_path);
}

Expand Down
Loading

0 comments on commit 3e3d2a3

Please sign in to comment.