Skip to content

Commit

Permalink
psd working
Browse files Browse the repository at this point in the history
  • Loading branch information
jordens committed Aug 31, 2023
1 parent c63f295 commit 1f2d81e
Show file tree
Hide file tree
Showing 5 changed files with 190 additions and 119 deletions.
72 changes: 68 additions & 4 deletions src/bin/main.rs
Original file line number Diff line number Diff line change
@@ -1,22 +1,86 @@
#![cfg_attr(not(debug_assertions), windows_subsystem = "windows")] // hide console window on Windows in release

use anyhow::Result;
use clap::Parser;
use eframe::egui;
use eframe::egui::plot::{Legend, Line, Plot, PlotPoints};
use std::sync::mpsc;
use std::time::Duration;

fn main() -> Result<(), eframe::Error> {
use stabilizer_streaming::{Frame, Loss, Psd};

/// Execute stabilizer stream throughput testing.
/// Use `RUST_LOG=info cargo run` to increase logging verbosity.
#[derive(Parser)]
struct Opts {
/// The local IP to receive streaming data on.
#[clap(short, long, default_value = "0.0.0.0")]
ip: std::net::Ipv4Addr,

/// The UDP port to receive streaming data on.
#[clap(long, long, default_value = "9293")]
port: u16,
}

fn main() -> Result<()> {
env_logger::init(); // Log to stderr (if you run with `RUST_LOG=debug`).
let opts = Opts::parse();

let (cmd_send, cmd_recv) = mpsc::channel();
let receiver = std::thread::spawn(move || {
log::info!("Binding to {}:{}", opts.ip, opts.port);
let socket = std::net::UdpSocket::bind((opts.ip, opts.port))?;
socket2::SockRef::from(&socket).set_recv_buffer_size(1 << 20)?;
socket.set_read_timeout(Some(Duration::from_millis(100)))?;
log::info!("Receiving frames");

let mut loss = Loss::default();
let mut dec: Vec<_> = (0..4).map(|_| Psd::new(1 << 9, 3)).collect();

let mut buf = vec![0; 2048];
while cmd_recv.try_recv() == Err(mpsc::TryRecvError::Empty) {
let len = socket.recv(&mut buf)?;
match Frame::from_bytes(&buf[..len]) {
Ok(frame) => {
loss.update(&frame);
for (dec, x) in dec.iter_mut().zip(frame.data.traces()) {
dec.process(x);
}
}
Err(e) => log::warn!("{e}"),
};
}

loss.analyze();

let (b, y) = dec[1].get(4);
println!("{:?}, {:?}", b, y);

Result::<()>::Ok(())
});

let options = eframe::NativeOptions {
initial_window_size: Some(egui::vec2(350.0, 400.0)),
..Default::default()
};
eframe::run_native("FLS", options, Box::new(|_cc| Box::<MyApp>::default()))
eframe::run_native("FLS", options, Box::new(|cc| Box::new(FLS::new(cc)))).unwrap();

cmd_send.send(())?;
receiver.join().unwrap()?;

Ok(())
}

#[derive(Default)]
struct MyApp {}
pub struct FLS {}

impl FLS {
pub fn new(_cc: &eframe::CreationContext<'_>) -> Self {
Self {}
}
}

impl eframe::App for MyApp {
impl eframe::App for FLS {
fn update(&mut self, ctx: &egui::Context, _frame: &mut eframe::Frame) {
egui::CentralPanel::default().show(ctx, |ui| {
let height = 200.0;
Expand Down
45 changes: 5 additions & 40 deletions src/bin/stream_test.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use anyhow::Result;
use clap::Parser;
use stabilizer_streaming::{Frame, Psd};
use stabilizer_streaming::{Frame, Psd, Loss};
use std::sync::mpsc;
use std::time::Duration;

Expand All @@ -21,43 +21,6 @@ struct Opts {
duration: f32,
}

#[derive(Clone, Copy, Default)]
struct Loss {
received: u64,
dropped: u64,
seq: Option<u32>,
}

impl Loss {
fn update(&mut self, frame: &Frame) {
self.received += frame.batches() as u64;
if let Some(seq) = self.seq {
let missing = frame.seq().wrapping_sub(seq) as u64;
self.dropped += missing;
if missing > 0 {
log::warn!(
"Lost {} batches: {:#08X} -> {:#08X}",
missing,
seq,
frame.seq(),
);
}
}
self.seq = Some(frame.seq().wrapping_add(frame.batches() as _));
}

fn analyze(&self) {
assert!(self.received > 0);
let loss = self.dropped as f32 / (self.received + self.dropped) as f32;
log::info!(
"Loss: {} % ({} of {})",
loss * 100.0,
self.dropped,
self.received + self.dropped
);
assert!(loss < 0.05);
}
}
fn main() -> Result<()> {
env_logger::init();
let opts = Opts::parse();
Expand All @@ -73,7 +36,7 @@ fn main() -> Result<()> {

let mut loss = Loss::default();

let mut dec: Vec<_> = (0..4).map(|_| Psd::new(1 << 8, 3)).collect();
let mut dec: Vec<_> = (0..4).map(|_| Psd::new(1 << 9, 3)).collect();

while cmd_recv.try_recv() == Err(mpsc::TryRecvError::Empty) {
let len = socket.recv(&mut buf)?;
Expand All @@ -82,7 +45,6 @@ fn main() -> Result<()> {
loss.update(&frame);
for (dec, x) in dec.iter_mut().zip(frame.data.traces()) {
dec.process(x);
let _y = dec.get();
}
}
Err(e) => log::warn!("{e}"),
Expand All @@ -91,6 +53,9 @@ fn main() -> Result<()> {

loss.analyze();

let (b, y) = dec[1].get(4);
println!("{:?}, {:?}", b, y);

Result::<()>::Ok(())
});

Expand Down
95 changes: 60 additions & 35 deletions src/hbf.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
/// Decimation/interpolation filters
///
///
/// These focus on half-band filters (rate change of 2) and cascades thereof.
/// These focus on half-band filters (rate change of 2) and cascades of HBF.
/// The half-band filter has unique properties that make it preferrable in many cases:
///
/// * only needs N multiplications (fused multiply accumulate) for 4*N taps
Expand All @@ -17,16 +16,16 @@
/// The implementations here are all `no_std` and `no-alloc`.
/// They support (but don't require) in-place filtering to reduce memory usage.
/// They unroll and optimmize extremely well targetting current architectures,
/// e.g. requiring less than 4 instructions per input item for `HbfDecCascade` on Skylake.
/// e.g. requiring less than 4 instructions per input item for the full `HbfDecCascade` on Skylake.
/// The filters are optimized for decent block sizes and perform best (i.e. with negligible
/// overhead) for blocks of 32 high-rate items or more, depending very much on architecture.

/// Filter slices of input items into slices of output items.
/// Filter input items into output items.
pub trait Filter {
/// Input/output item type.
type Item;

/// Process items.
/// Process a block of items.
///
/// Input items can be either in `x` or in `y`.
/// In the latter case the filtering operation is done in-place.
Expand All @@ -39,14 +38,20 @@ pub trait Filter {
/// contain the output items.
fn process_block(&mut self, x: Option<&[Self::Item]>, y: &mut [Self::Item]) -> usize;

/// Return the maximum block size supported.
/// Return the block size granularity and the maximum block size.
///
/// This is the input block size in the case of decimation and the
/// output block size in the case of interpolation. For in-place operation,
/// this is the maximum length of `y`.
fn max_block(&self) -> usize;

// TODO: provide fn process() with auto-blocking
/// Output blocks must always adhere to the size requirements implied (e.g. by the rate change).
fn block_size(&self) -> (usize, usize);

/// Process items
fn process(&mut self, x: Option<&[Self::Item]>, y: &mut [Self::Item]) -> usize {
if let Some(x) = x {
x.chunks(self.block_size().1)
.fold(0, |i, xi| i + self.process_block(Some(xi), &mut y[i..]))
} else {
unimplemented!()
}
}
}

/// Symmetric FIR filter prototype.
Expand All @@ -69,6 +74,7 @@ impl<'a, const M: usize, const N: usize> SymFir<'a, M, N> {
Self { x: [0.0; N], taps }
}

/// Perform the FIR convolution and yield results iteratively.
#[inline]
fn get(&self) -> impl Iterator<Item = f32> + '_ {
self.x.windows(2 * M).map(|x| {
Expand Down Expand Up @@ -109,8 +115,8 @@ impl<'a, const M: usize, const N: usize> HbfDec<'a, M, N> {
impl<'a, const M: usize, const N: usize> Filter for HbfDec<'a, M, N> {
type Item = f32;

fn max_block(&self) -> usize {
2 * (N - (2 * M - 1))
fn block_size(&self) -> (usize, usize) {
(2, 2 * (N - (2 * M - 1)))
}

fn process_block(&mut self, x: Option<&[Self::Item]>, y: &mut [Self::Item]) -> usize {
Expand Down Expand Up @@ -163,8 +169,8 @@ impl<'a, const M: usize, const N: usize> HbfInt<'a, M, N> {
impl<'a, const M: usize, const N: usize> Filter for HbfInt<'a, M, N> {
type Item = f32;

fn max_block(&self) -> usize {
2 * (N - (2 * M - 1))
fn block_size(&self) -> (usize, usize) {
(1, N - (2 * M - 1))
}

fn process_block(&mut self, x: Option<&[Self::Item]>, y: &mut [Self::Item]) -> usize {
Expand All @@ -188,6 +194,7 @@ impl<'a, const M: usize, const N: usize> Filter for HbfInt<'a, M, N> {
}

/// Standard/optimal half-band filter cascade taps
///
/// * more than 98 dB stop band attenuation
/// * 0.8 nyquist pass band (relative to lowest rate)
/// * less than 0.001 dB ripple
Expand Down Expand Up @@ -231,13 +238,18 @@ pub const HBF_TAPS: ([f32; 15], [f32; 6], [f32; 3], [f32; 3], [f32; 2]) = (
// 2, .94
[-0.03145898, 0.28145805],
);
// Passband width in units of Nyquist

/// Passband width in units of Nyquist
pub const HBF_PASSBAND: f32 = 0.8;

/// Max low-rate block size (HbfIntCascade input, HbfDecCascade output)
pub const HBF_CASCADE_BLOCK: usize = 1 << 8;

/// Half-band filter cascade with optimal taps
/// Half-band decimation filter cascade with optimal taps
///
/// See [HBF_TAPS].
/// Only in-place processing is implemented.
/// Supports rate changes of 1, 2, 4, 8, and 16.
pub struct HbfDecCascade {
n: usize,
stages: (
Expand Down Expand Up @@ -276,14 +288,18 @@ impl HbfDecCascade {
impl Filter for HbfDecCascade {
type Item = f32;

fn max_block(&self) -> usize {
match self.n {
0 => usize::MAX,
1 => self.stages.3.max_block(),
2 => self.stages.2.max_block(),
3 => self.stages.1.max_block(),
_ => self.stages.0.max_block(),
}
fn block_size(&self) -> (usize, usize) {
(
1 << self.n,
match self.n {
0 => (1, usize::MAX),
1 => self.stages.3.block_size(),
2 => self.stages.2.block_size(),
3 => self.stages.1.block_size(),
_ => self.stages.0.block_size(),
}
.1,
)
}

fn process_block(&mut self, x: Option<&[f32]>, y: &mut [f32]) -> usize {
Expand All @@ -309,6 +325,11 @@ impl Filter for HbfDecCascade {
}
}

/// Half-band interpolation filter cascade with optimal taps.
///
/// See [HBF_TAPS].
/// Only in-place processing is implemented.
/// Supports rate changes of 1, 2, 4, 8, and 16.
pub struct HbfIntCascade {
n: usize,
pub stages: (
Expand Down Expand Up @@ -347,14 +368,18 @@ impl HbfIntCascade {
impl Filter for HbfIntCascade {
type Item = f32;

fn max_block(&self) -> usize {
match self.n {
0 => usize::MAX,
1 => self.stages.0.max_block(),
2 => self.stages.1.max_block(),
3 => self.stages.2.max_block(),
_ => self.stages.3.max_block(),
}
fn block_size(&self) -> (usize, usize) {
(
1,
match self.n {
0 => (1, usize::MAX),
1 => self.stages.0.block_size(),
2 => self.stages.1.block_size(),
3 => self.stages.2.block_size(),
_ => self.stages.3.block_size(),
}
.1 >> self.n(),
)
}

fn process_block(&mut self, x: Option<&[f32]>, y: &mut [f32]) -> usize {
Expand Down Expand Up @@ -422,7 +447,7 @@ mod test {
let n = h.process_block(None, &mut x);
println!("{:?}", &x[..n]); // interpolator impulse response

let g = 2.0f32.powi(h.n() as _);
let g = (1 << h.n()) as f32;
let mut y = Vec::from_iter(x[..n].iter().map(|&x| Complex { re: x / g, im: 0.0 }));
// pad
y.resize(5 << 10, Complex::default());
Expand Down
4 changes: 2 additions & 2 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,12 @@ use thiserror::Error;

mod de;
pub use de::*;

mod hbf;
pub use hbf::*;

mod psd;
pub use psd::*;
mod loss;
pub use loss::*;

#[derive(Debug, Error)]
pub enum Error {
Expand Down
Loading

0 comments on commit 1f2d81e

Please sign in to comment.