Skip to content

Commit

Permalink
Merge pull request #2 from quartiq/rj/cli-tweaks
Browse files Browse the repository at this point in the history
rj/cli tweaks
  • Loading branch information
jordens authored Oct 27, 2021
2 parents 01647b2 + 775d5e7 commit 43ed7d5
Showing 1 changed file with 34 additions and 33 deletions.
67 changes: 34 additions & 33 deletions src/bin/stream_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,25 +2,23 @@ use clap::Parser;
use stabilizer_streaming::StreamReceiver;
use std::time::{Duration, Instant};

const MIN_STREAM_EFFICIENCY: f32 = 0.95;
const MAX_LOSS: f32 = 0.05;

/// Execute stabilizer stream throughput testing.
/// Use `RUST_LOG=info cargo run` to increase logging verbosity.
#[derive(Parser)]
struct Opts {
/// The prefix of the stabilizer to use. E.g. dt/sinara/dual-iir/00-11-22-33-44-55
#[clap(long)]
prefix: String,

/// The IP of the interface to the broker.
#[clap(short, long, default_value = "127.0.0.1")]
/// The local IP to receive streaming data on.
#[clap(short, long, default_value = "0.0.0.0")]
ip: String,

#[clap(long, default_value = "2000")]
/// The UDP port to receive streaming data on.
#[clap(long, default_value = "9293")]
port: u16,

/// The test duration to execute for.
/// The test duration in seconds.
#[clap(long, default_value = "5")]
duration: u64,
duration: f32,
}

#[async_std::main]
Expand All @@ -32,42 +30,45 @@ async fn main() {

log::info!("Binding to socket");
let mut stream_receiver = StreamReceiver::new(ip, opts.port).await;
let frame = stream_receiver.next_frame().await.unwrap();

let mut total_batches = frame.batch_count();
let mut dropped_batches = 0;
let mut last_sequence = frame.sequence_number;
let mut total_batches = 0u64;
let mut dropped_batches = 0u64;
let mut expect_sequence = None;

let stop = Instant::now() + Duration::from_secs(opts.duration);
let stop = Instant::now() + Duration::from_millis((opts.duration * 1000.) as _);

log::info!("Reading frames");
while Instant::now() < stop {
let frame = stream_receiver.next_frame().await.unwrap();
total_batches += frame.batch_count() as u64;

let num_dropped = frame.sequence_number.wrapping_sub(last_sequence) as usize;
total_batches += frame.batch_count() + num_dropped;

if num_dropped > 0 {
if let Some(expect) = expect_sequence {
let num_dropped = frame.sequence_number.wrapping_sub(expect) as u64;
dropped_batches += num_dropped;
log::warn!(
"Frame drop detected: 0x{:X} -> 0x{:X} ({} batches)",
last_sequence,
frame.sequence_number,
num_dropped
)
total_batches += num_dropped;

if num_dropped > 0 {
log::warn!(
"Lost {} batches: {:#08X} -> {:#08X}",
num_dropped,
expect,
frame.sequence_number,
);
}
}

last_sequence = frame
.sequence_number
.wrapping_add(frame.batch_count() as u32);
expect_sequence = Some(frame.sequence_number.wrapping_add(frame.batch_count() as _));
}

assert!(total_batches > 0);
let stream_efficiency = 1.0 - (dropped_batches as f32 / total_batches as f32);
let loss = dropped_batches as f32 / total_batches as f32;

log::info!("Stream reception rate: {:.2} %", stream_efficiency * 100.0);
log::info!("Received {} batches", total_batches);
log::info!("Lost {} batches", dropped_batches);
log::info!(
"Loss: {} % ({}/{} batches)",
loss * 100.0,
dropped_batches,
total_batches
);

assert!(stream_efficiency > MIN_STREAM_EFFICIENCY);
assert!(loss < MAX_LOSS);
}

0 comments on commit 43ed7d5

Please sign in to comment.